Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 159 additions & 25 deletions commp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
package commp

import (
"bytes"
"encoding"
"hash"
"math/bits"
"sync"

"github.com/filecoin-project/go-fil-commp-hashhash/internal"
sha256simd "github.com/minio/sha256-simd"
"golang.org/x/xerrors"
)
Expand All @@ -25,14 +28,21 @@ type Calc struct {
state
mu sync.Mutex
}
type message interface{}
type marshall struct {
twinHolds [][]byte
}

type state struct {
quadsEnqueued uint64
layerQueues [MaxLayers + 2]chan []byte // one extra layer for the initial leaves, one more for the dummy never-to-use channel
resultCommP chan []byte
layerQueues [MaxLayers + 2]chan message // one extra layer for the initial leaves, one more for the dummy never-to-use channel
result chan message
buffer []byte
}

var _ hash.Hash = &Calc{} // make sure we are hash.Hash compliant
var _ encoding.BinaryMarshaler = &Calc{}
var _ encoding.BinaryUnmarshaler = &Calc{}

// MaxLayers is the current maximum height of the rust-fil-proofs proving tree.
const MaxLayers = uint(31) // result of log2( 64 GiB / 32 )
Expand Down Expand Up @@ -93,7 +103,7 @@ func (cp *Calc) Reset() {
// we are resetting without digesting: close everything out to terminate
// the layer workers
close(cp.layerQueues[0])
<-cp.resultCommP
<-cp.result
}
cp.state = state{} // reset
cp.mu.Unlock()
Expand Down Expand Up @@ -158,7 +168,7 @@ func (cp *Calc) Digest() (commP []byte, paddedPieceSize uint64, err error) {
paddedPieceSize = 1 << uint(64-bits.LeadingZeros64(paddedPieceSize))
}

return <-cp.resultCommP, paddedPieceSize, nil
return (<-cp.result).([]byte), paddedPieceSize, nil
}

// Write adds bytes to the accumulator, for a subsequent Digest(). Upon the
Expand Down Expand Up @@ -189,8 +199,8 @@ func (cp *Calc) Write(input []byte) (int, error) {
// just starting: initialize internal state, start first background layer-goroutine
if cp.buffer == nil {
cp.buffer = make([]byte, 0, bufferSize)
cp.resultCommP = make(chan []byte, 1)
cp.layerQueues[0] = make(chan []byte, layerQueueDepth)
cp.result = make(chan message, 1)
cp.layerQueues[0] = make(chan message, layerQueueDepth)
cp.addLayer(0)
}

Expand Down Expand Up @@ -278,26 +288,33 @@ func (cp *Calc) digestQuads(inSlab []byte) {
}

func (cp *Calc) addLayer(myIdx uint) {
cp.addLayerWithTwinhold(myIdx, nil)
}

func (cp *Calc) addLayerWithTwinhold(myIdx uint, twinHold []byte) {
// the next layer channel, which we might *not* use
if cp.layerQueues[myIdx+1] != nil {
panic("addLayer called more than once with identical idx argument")
}
cp.layerQueues[myIdx+1] = make(chan []byte, layerQueueDepth)
cp.layerQueues[myIdx+1] = make(chan message, layerQueueDepth)

go func() {
s256 := sha256simd.New()
var twinHold []byte

for {
slab, queueIsOpen := <-cp.layerQueues[myIdx]
message, queueIsOpen := <-cp.layerQueues[myIdx]

// the dream is collapsing
if !queueIsOpen {
defer func() { twinHold = nil }()

// I am last
if myIdx == MaxLayers || cp.layerQueues[myIdx+2] == nil {
cp.resultCommP <- append(make([]byte, 0, 32), twinHold[0:32]...)
if twinHold == nil {
// just send empty 32-byte slice -- this is a reset
cp.result <- make([]byte, 32)
return
}
cp.result <- append(make([]byte, 0, 32), twinHold[0:32]...)
return
}

Expand All @@ -311,22 +328,56 @@ func (cp *Calc) addLayer(myIdx uint) {
close(cp.layerQueues[myIdx+1])
return
}

switch {
case uint64(len(slab)) > uint64(1<<(5+myIdx)): // uint64 cast needed on 32-bit systems
cp.hashSlab254(s256, myIdx, slab)
cp.layerQueues[myIdx+1] <- slab
case twinHold != nil:
copy(twinHold[32:64], slab[0:32])
cp.hashSlab254(s256, 0, twinHold[0:64])
cp.layerQueues[myIdx+1] <- twinHold[0:32:64]
twinHold = nil
default:
twinHold = slab[0:32:64]
// avoid code below
switch typed := message.(type) {
case []byte:
slab := typed
switch {
case uint64(len(slab)) > uint64(1<<(5+myIdx)): // uint64 cast needed on 32-bit systems
// check if we need to pull off beginning for twinHold
if twinHold != nil {
copy(twinHold[32:64], slab[0:32])
cp.hashSlab254(s256, 0, twinHold[0:64])
cp.layerQueues[myIdx+1] <- twinHold[0:32:64]
slab = slab[(1 << (5 + myIdx)):]
twinHold = nil
// do we still have a larger block or just a remaining twinhold?
if uint64(len(slab)) <= uint64(1<<(5+myIdx)) {
twinHold = slab[0:32:64]
continue
}
}
// check if we need to pull twinHold off the end
if len(slab)%(1<<(6+myIdx)) != 0 {
twinHold = slab[len(slab)-(len(slab)%(1<<(6+myIdx))):]
slab = slab[:len(slab)-(len(slab)%(1<<(6+myIdx)))]
twinHold = append(make([]byte, 0, 64), twinHold...)
}
cp.hashSlab254(s256, myIdx, slab)
cp.layerQueues[myIdx+1] <- slab
case twinHold != nil:
copy(twinHold[32:64], slab[0:32])
cp.hashSlab254(s256, 0, twinHold[0:64])
cp.layerQueues[myIdx+1] <- twinHold[0:32:64]
twinHold = nil
default:
twinHold = slab[0:32:64]
// avoid code below
continue
}
case marshall:
marshallMsg := marshall{
twinHolds: append(typed.twinHolds, twinHold),
}
// am I last?
if myIdx == MaxLayers || cp.layerQueues[myIdx+2] == nil {
cp.result <- marshallMsg
} else {
cp.layerQueues[myIdx+1] <- marshallMsg
}
continue
default:
panic("unexpected message type received in layer worker")
}

// Check whether we need another worker for what we just pushed
//
// n.b. we will not blow out of the preallocated layerQueues array,
Expand All @@ -348,6 +399,89 @@ func (cp *Calc) hashSlab254(h hash.Hash, layerIdx uint, slab []byte) {
}
}

// MarshalBinary is an experimental function that allows marshalling
// intermediate commP calculation state into a byte slice, which can later
// be restored with UnmarshalBinary(). This is useful for pausing and
// resuming commP calculations.
func (cp *Calc) MarshalBinary() ([]byte, error) {
cp.mu.Lock()
defer cp.mu.Unlock()

if cp.buffer == nil {
return nil, xerrors.New("cannot marshall uninitialized commP calculator")
}

// Flush bytes in buffer till there are less than 127 remaining
if len(cp.buffer) > 0 {
for len(cp.buffer) >= 127 {
// FIXME: there is a smarter way to do this instead of 127-at-a-time,
// but that's for another PR
cp.digestQuads(cp.buffer[:127])
cp.buffer = cp.buffer[127:]
}
}

cp.layerQueues[0] <- marshall{}

msg := (<-cp.result).(marshall)

serializedState := internal.SerializedState{
QuadsEnqueued: cp.quadsEnqueued,
Buffer: cp.buffer,
LayerQueueTwinholds: msg.twinHolds,
}

buf := &bytes.Buffer{}
err := serializedState.MarshalCBOR(buf)
if err != nil {
return nil, xerrors.Errorf("failed to marshall commP state: %w", err)
}
return buf.Bytes(), nil
}

// UnmarshalBinary is an experimental function that allows restoring
// intermediate commP calculation state from a byte slice previously produced
// by MarshalBinary(). This is useful for pausing and resuming commP
// calculations. It will reset any existing state in the Calc object.
func (cp *Calc) UnmarshalBinary(data []byte) error {
cp.mu.Lock()
defer cp.mu.Unlock()

// reset any existing state in the Calc object
if cp.buffer != nil {
// we are resetting without digesting: close everything out to terminate
// the layer workers
close(cp.layerQueues[0])
<-cp.result
}
cp.state = state{} // reset

var serializedState internal.SerializedState
err := serializedState.UnmarshalCBOR(bytes.NewReader(data))
if err != nil {
return xerrors.Errorf("failed to unmarshall commP state: %w", err)
}

cp.quadsEnqueued = serializedState.QuadsEnqueued
cp.buffer = make([]byte, 0, bufferSize)
cp.buffer = append(cp.buffer, serializedState.Buffer...)

// re-initialize the layer 0 queue and worker
cp.layerQueues[0] = make(chan message, layerQueueDepth)
for i := uint(0); i < uint(len(serializedState.LayerQueueTwinholds)); i++ {
if len(serializedState.LayerQueueTwinholds[i]) == 0 {
cp.addLayerWithTwinhold(i, nil)
} else {
twinHold := make([]byte, 0, 64)
twinHold = append(twinHold, serializedState.LayerQueueTwinholds[i]...)
cp.addLayerWithTwinhold(i, twinHold)
}
}
cp.result = make(chan message, 1)

return nil
}

// PadCommP is experimental, do not use it.
func PadCommP(sourceCommP []byte, sourcePaddedSize, targetPaddedSize uint64) ([]byte, error) {

Expand Down
Loading