From 2ecdfd25c225b097bc0b383496a5bd90f574a96c Mon Sep 17 00:00:00 2001 From: dzh <1051072440@qq.com> Date: Mon, 27 Sep 2021 14:42:15 +0800 Subject: [PATCH] add new add-piece --- build/pieces/pieces.go | 130 ++++++++++++++++++ build/storage/storage.go | 110 ++++++++++++++- cmd/bench/main.go | 2 +- seal/basicfs/fs.go | 8 +- seal/seal_api.go | 290 ++++++++++++++++++++------------------- seal/test.go | 57 +++++--- seal/test_seal.go | 120 ++++++++++------ seal/type.go | 13 +- 8 files changed, 509 insertions(+), 221 deletions(-) create mode 100644 build/pieces/pieces.go diff --git a/build/pieces/pieces.go b/build/pieces/pieces.go new file mode 100644 index 0000000..17576de --- /dev/null +++ b/build/pieces/pieces.go @@ -0,0 +1,130 @@ +package pieces + +import ( + "math/bits" + "github.com/minio/sha256-simd" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + "fil_integrate/build/fr32" +) + +const NODE_SIZE = 32 + +var a = false + +func nextPowerOfTwo(v uint64) uint64 { + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + return v +} + +func GeneratePieceCommitmentFast(data []byte, unpad uint64) ([32]byte, error) { + var result [32]byte + + inLen := paddedSize(unpad) + + in := make([]byte, inLen) + copy(in, data) + pow2 := nextPowerOfTwo(unpad) + + out := make([]byte, pow2) + + fr32.Pad(in, out) + + // r, err := MerkleTreeRecurse(out) + r, err := MerkleTreeLoop(out) + if err != nil { + return [32]byte{}, err + } + copy(result[:], r) + return result, nil +} + +func MerkleTreeRecurse(D []byte) ([]byte, error) { + n := uint64(len(D)) + //å¶ç»“点,直接返回 + if n < 32 { + return D, xerrors.Errorf("can not generate the merkle tree") + } + if n == 32 { + return D, nil + } + k := len(D) / 2 + h := sha256.New() + //æ±‚å·¦åæ ‘ + x, err := MerkleTreeRecurse(D[0:k]) + if err != nil { + return nil, err + } + // 修剪到fr域 + trim_to_fr32(x) + h.Write(x[:]) + //求å³åæ ‘ + x, err = MerkleTreeRecurse(D[k:n]) + if err != nil { + return nil, err + } + trim_to_fr32(x) + h.Write(x[:]) + //得到哈希结果 + res := h.Sum(nil) + trim_to_fr32(res) + return res, nil +} + +func MerkleTreeLoop(D []byte) ([]byte, error) { + n := uint64(len(D)) + + if n != nextPowerOfTwo(n) { + return nil, xerrors.Errorf("can not generate the merkle tree") + } + for lenth := uint64(32); lenth < n; lenth <<= 1 { + for index := uint64(0); index < n; { + windex := index + h := sha256.New() + + // write left child + trim_to_fr32(D[index:index+32]) + h.Write(D[index:index+32]) + index += lenth + // write right child + trim_to_fr32(D[index:index+32]) + h.Write(D[index:index+32]) + index += lenth + + res := h.Sum(nil) + copy(D[windex:windex+32], res) + } + } + trim_to_fr32(D[:32]) + return D[:32], nil +} + +func trim_to_fr32(data []byte) { + // strip last two bits, to ensure result is in Fr. + data[31] &= 0b0011_1111 +} + +func paddedSize(size uint64) abi.UnpaddedPieceSize { + if size <= 127 { + return abi.UnpaddedPieceSize(127) + } + + // round to the nearest 127-divisible, find out fr32-padded size + paddedPieceSize := (size + 126) / 127 * 128 + + // round up if not power of 2 + if bits.OnesCount64(paddedPieceSize) != 1 { + paddedPieceSize = 1 << uint(64-bits.LeadingZeros64(paddedPieceSize)) + } + + // get the unpadded size of the now-determind piece + return abi.PaddedPieceSize(paddedPieceSize).Unpadded() +} diff --git a/build/storage/storage.go b/build/storage/storage.go index 2daf7f7..4863abc 100644 --- a/build/storage/storage.go +++ b/build/storage/storage.go @@ -2,6 +2,10 @@ package storage import ( "io" + "bytes" + "encoding/binary" + + "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/ipfs/go-cid" @@ -11,13 +15,6 @@ type Data = io.Reader type Hash = [32]byte -type DecodedData struct { - HasPre bool - PreHash Hash - Data []byte - PieceHash []Hash -} - type SectorRef struct { ID abi.SectorID ProofType abi.RegisteredSealProof @@ -30,4 +27,103 @@ type Commit1Out []byte type SectorCids struct { Unsealed cid.Cid Sealed cid.Cid +} + +type Piece struct { + Commitment Hash + Size abi.UnpaddedPieceSize +} + +type DecodedData struct { + HasPre bool + PreHash Hash + + Data []byte + + PieceHash []Hash + HashData []byte +} + +func (data *DecodedData)Serialize() ([]byte, error) { + var buf []byte + MetaLen := uint32(len(data.Data)) + CommLen := uint32(len(data.HashData)) + if data.HasPre { + if MetaLen > 0 { + return nil, xerrors.Errorf("") + } + buf = make([]byte, nextUppandedPowerOfTwo(40 + CommLen)) + binary.BigEndian.PutUint32(buf[:4], 0x80000000) + binary.BigEndian.PutUint32(buf[4:8], CommLen) + copy(buf[8:40], data.PreHash[:]) + copy(buf[40:], data.HashData[:]) + } else { + buf = make([]byte, nextUppandedPowerOfTwo(8 + MetaLen + CommLen)) + binary.BigEndian.PutUint32(buf[:4], MetaLen) + binary.BigEndian.PutUint32(buf[4:8], CommLen) + copy(buf[8:8 + MetaLen], data.Data[:]) + copy(buf[8 + MetaLen:], data.HashData[:]) + } + return buf, nil +} + +func (data *DecodedData)Deserialize(buf []byte) error { + var err error + var MetaLen uint32 + var CommLen uint32 + + read := len(buf) + if read < 8 { + return xerrors.Errorf("can't deserialize the data less then 8bytes") + } + + binary.Read(bytes.NewReader(buf[0:4]), binary.BigEndian, &MetaLen) + binary.Read(bytes.NewReader(buf[4:8]), binary.BigEndian, &CommLen) + data.HasPre = (MetaLen >> 31) != 0 + MetaLen = MetaLen & 0x7fffffff + rbuf := buf[8:read] + if data.HasPre{ + if read < 40 { + return xerrors.Errorf("can't read the pre-piece-hash") + } + copy(data.PreHash[:], buf[8:40]) + rbuf = rbuf[32:] + } + + if uint32(len(rbuf)) <= MetaLen { + data.Data = rbuf[:] + } else if uint32(len(rbuf)) <= CommLen + MetaLen { + data.Data = rbuf[:MetaLen] + data.PieceHash, err = to32ByteHash(rbuf[MetaLen:]) + if err != nil { + return err + } + } else { + data.Data = rbuf[:MetaLen] + data.PieceHash, err = to32ByteHash(rbuf[MetaLen:CommLen + MetaLen]) + if err != nil { + return err + } + } + return nil +} + +func to32ByteHash(in []byte) ([]Hash, error) { + if len(in) % 32 != 0 { + return nil, xerrors.Errorf("lenth of the hash arr must be multiple of 32") + } + hash := make([]Hash, len(in)/32) + for index := 0; index < len(hash); index++ { + copy(hash[index][:], in[index*32:index*32+32]) + } + return hash, nil +} + +func nextUppandedPowerOfTwo(index uint32) abi.UnpaddedPieceSize { + index-- + power := 0 + for index = index / 254; index != 0 ; power += 1 { + index >>= 1 + } + return abi.UnpaddedPieceSize(254 * (1 << power)) } \ No newline at end of file diff --git a/cmd/bench/main.go b/cmd/bench/main.go index 595200a..39d67fd 100644 --- a/cmd/bench/main.go +++ b/cmd/bench/main.go @@ -58,7 +58,7 @@ var testSealAndWindowPoSt = &cli.Command{ }, &cli.IntFlag{ Name: "num-agg", - Value: 4, + Value: 8, Usage: "How many window-post proofs used to aggregate", }, }, diff --git a/seal/basicfs/fs.go b/seal/basicfs/fs.go index 3f70ebc..518ac93 100644 --- a/seal/basicfs/fs.go +++ b/seal/basicfs/fs.go @@ -17,18 +17,18 @@ type sectorFile struct { storiface.SectorFileType } -type Provider struct { +type Manager struct { Root string lk sync.Mutex waitSector map[sectorFile]chan struct{} } -func (b *Provider) GetRoot() string { +func (b *Manager) GetRoot() string { return b.Root } -func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { +func (b *Manager) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint return storiface.SectorPaths{}, nil, err } @@ -89,7 +89,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, exis return out, done, nil } -func (b *Provider) AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { +func (b *Manager) AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint return storiface.SectorPaths{}, nil, err } diff --git a/seal/seal_api.go b/seal/seal_api.go index 6dd2e3c..843c637 100644 --- a/seal/seal_api.go +++ b/seal/seal_api.go @@ -9,7 +9,6 @@ import( "runtime" "sync" "path/filepath" - "encoding/binary" "bytes" "fmt" @@ -23,6 +22,7 @@ import( "github.com/ipfs/go-cid" "fil_integrate/build/fr32" + spieces "fil_integrate/build/pieces" "fil_integrate/build/storiface" "fil_integrate/build/storage" spproof "fil_integrate/build/proof" @@ -36,204 +36,163 @@ const TagLen uint32 = 8 const NewestNetworkVersion = network.Version13 -type Provider struct { +type Encoder struct { Root string } -var _ PieceProvider = &Provider{} +var _ PieceEncoder = &Encoder{} // Data contains MetaData and HashData // Pieces structure is [ Tag | MetaData | HashData ] -func (sp *Provider) EncodeDataToPieces( +func (sp *Encoder) EncodeDataToPieces( ctx context.Context, sectorSize abi.SectorSize, file storage.Data, -) (storage.Hash, []storage.Hash, error) { +) (storage.Piece, []storage.Piece, error) { var hashData []byte - var piecesHash []storage.Hash - var prePieceHash []storage.Hash + var pieces []storage.Piece + var prePiece []storage.Piece root := filepath.Join(sp.Root, "pieces") err := os.Mkdir(root, 0755) if err != nil && !os.IsExist(err) { // nolint - return storage.Hash{}, nil, err + return storage.Piece{}, nil, err } UnpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded() - buf := make([]byte, UnpaddedSectorSize) - DataLen := (uint32)(UnpaddedSectorSize)-TagLen + buf := make([]byte, DataLen) for{ - memset(buf[:TagLen], nil) - - MetaLen, err := file.Read(buf[TagLen:]) + MetaLen, err := file.Read(buf[:]) if err != nil && err != io.EOF{ - return storage.Hash{}, nil, err + return storage.Piece{}, nil, err } if err == io.EOF || uint32(MetaLen) != DataLen{ //encode first sector - prePieceHash, err = sp.EncodeData(buf, sectorSize, uint32(MetaLen), DataLen, hashData) + prePiece, err = sp.EncodeData(buf[:uint32(MetaLen)], sectorSize, uint32(MetaLen), DataLen, hashData) if err != nil{ - return storage.Hash{}, nil, err + return storage.Piece{}, nil, err } break } - binary.BigEndian.PutUint32(buf[:4], uint32(MetaLen)) + var data *storage.DecodedData = &storage.DecodedData{ HasPre: false, Data: buf[:] } + dbuf, err := data.Serialize() + if err != nil { + return storage.Piece{}, nil, err + } - pieceHash, err := pieceCommitment(spt(sectorSize), buf[:]) + pieceHash, err := spieces.GeneratePieceCommitmentFast(dbuf[:], uint64(len(dbuf))) if err != nil { - return storage.Hash{}, nil, err + return storage.Piece{}, nil, err } filename := filepath.Join(root, fmt.Sprintf("%x.dat", pieceHash[:])) - err = ioutil.WriteFile(filename, buf[:], 0644) + err = ioutil.WriteFile(filename, dbuf[:], 0644) if err != nil { - return storage.Hash{}, nil, err + return storage.Piece{}, nil, err } hashData = append(hashData, pieceHash[:]...) - piecesHash = append(piecesHash, pieceHash) + pieces = append(pieces, storage.Piece{ + Commitment: pieceHash, + Size: UnpaddedSectorSize, + }) } - piecesHash = append(piecesHash, prePieceHash...) - return piecesHash[len(piecesHash)-1], piecesHash[:len(piecesHash)-2], nil + pieces = append(pieces, prePiece...) + return pieces[len(pieces)-1], pieces[:len(pieces)-1], nil } -func (sp *Provider) EncodeData( - buf []byte, +func (sp *Encoder) EncodeData( + metadata []byte, sectorSize abi.SectorSize, MetaLen uint32, DataLen uint32, hashData []byte, -) ([]storage.Hash, error) { +) ([]storage.Piece, error) { root := filepath.Join(sp.Root, "pieces") var prePieceHash storage.Hash - var piecesHash []storage.Hash + var pieces []storage.Piece var err error - var end uint32 = 0 for ;len(hashData) > 0; { + var buf []byte //encode next n sector - if end != 0{ + if pieces != nil{ CommLen := min(uint32(len(hashData)), ((DataLen-32)/32) * 32) - binary.BigEndian.PutUint32(buf[:4], 0x80000000) - binary.BigEndian.PutUint32(buf[4:8], CommLen) - memset(buf[4:40], prePieceHash[:]) + var data *storage.DecodedData = &storage.DecodedData{ + HasPre: true, + PreHash: prePieceHash, + HashData: hashData[:CommLen], + } + buf, err = data.Serialize() + if err != nil { + return nil, err + } - rbuf := buf[TagLen + 32:] - memset(rbuf, hashData[:CommLen]) - memset(rbuf[CommLen:], nil) hashData = hashData[CommLen:] - - end = nextUppandedPowerOfTwo(TagLen + 32 + CommLen) - } else { CommLen := min(uint32(len(hashData)), ((DataLen-MetaLen)/32) * 32) - binary.BigEndian.PutUint32(buf[:4], MetaLen) - binary.BigEndian.PutUint32(buf[4:8], CommLen) + var data *storage.DecodedData = &storage.DecodedData{ + HasPre: false, + Data: metadata, + HashData: hashData[:CommLen], + } + buf, err = data.Serialize() + if err != nil { + return nil, err + } - rbuf := buf[TagLen + MetaLen:] - memset(rbuf, hashData[:CommLen]) - memset(rbuf[CommLen:], nil) hashData = hashData[CommLen:] - - end = nextUppandedPowerOfTwo(TagLen + MetaLen + CommLen) } - prePieceHash, err = pieceCommitment(spt(sectorSize), buf[:]) + prePieceHash, err = spieces.GeneratePieceCommitmentFast(buf, uint64(len(buf))) if err != nil { return nil, err } filename := filepath.Join(root, fmt.Sprintf("%x.dat", prePieceHash[:])) - err = ioutil.WriteFile(filename, buf[:], 0644) + err = ioutil.WriteFile(filename, buf, 0644) if err != nil { return nil, err } - piecesHash = append(piecesHash, prePieceHash) + pieces = append(pieces, storage.Piece{ + Commitment: prePieceHash, + Size: abi.UnpaddedPieceSize(len(buf)), + }) } - return piecesHash, nil + return pieces, nil } -func (sp *Provider) DecodePiece( +func DecodePiece( ctx context.Context, sectorSize abi.SectorSize, - in io.Reader, - start storiface.UnpaddedByteIndex, - end storiface.UnpaddedByteIndex, -) (storage.DecodedData, error){ - if start > end { - return storage.DecodedData{}, xerrors.Errorf("start must be less than end") - } - - if start == end { - return storage.DecodedData{}, nil - } - + in io.Reader, +) (storage.DecodedData, error) { unpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded() buf := make([]byte, unpaddedSectorSize) - _, err := in.Read(buf[:]) + read, err := in.Read(buf[:]) if err != nil && err != io.EOF{ return storage.DecodedData{}, err } - var prePieceHash storage.Hash - var MetaLen uint32 - var CommLen uint32 - var data []byte - var pieceHash []storage.Hash - - binary.Read(bytes.NewReader(buf[0:4]), binary.BigEndian, &MetaLen) - binary.Read(bytes.NewReader(buf[4:8]), binary.BigEndian, &CommLen) - hasPre := MetaLen >> 31 - MetaLen = MetaLen & 0x7fffffff - rbuf := buf[8:] - if hasPre != 0 { - copy(prePieceHash[:], buf[8:40]) - rbuf = rbuf[32:] - } - - if start > storiface.UnpaddedByteIndex(MetaLen) { - data = nil - pieceHash, err = to32ByteHash(rbuf[start:end]) - if err != nil { - return storage.DecodedData{}, err - } - } else if end < storiface.UnpaddedByteIndex(MetaLen) { - data = rbuf[start:end] - } else if end > storiface.UnpaddedByteIndex(MetaLen + CommLen) { - data = rbuf[start:MetaLen] - pieceHash, err = to32ByteHash(rbuf[MetaLen:MetaLen+CommLen]) - if err != nil { - return storage.DecodedData{}, err - } - } else { - data = rbuf[start:MetaLen] - pieceHash, err = to32ByteHash(rbuf[MetaLen:end]) - if err != nil { - return storage.DecodedData{}, err - } - } - return storage.DecodedData{ - HasPre: hasPre != 0, - PreHash: prePieceHash, - Data: data, - PieceHash: pieceHash, - }, nil + var data *storage.DecodedData = &storage.DecodedData{} + err = data.Deserialize(buf[:read]) + return *data, err } type Sealer struct{ - sectors SectorProvider + sectors SectorManager } var _ SectorSealer = &Sealer{} -func New(sectors SectorProvider) (*Sealer, error) { +func New(sectors SectorManager) (*Sealer, error) { sb := &Sealer{ sectors: sectors, } @@ -241,7 +200,64 @@ func New(sectors SectorProvider) (*Sealer, error) { return sb, nil } -func (sb *Sealer)AddPiece( +func (sb *Sealer)AddPiece( + ctx context.Context, + sector storage.SectorRef, + pieces *[]storage.Piece, +) ([]abi.PieceInfo, error) { + var addPieces []storage.Piece + var pieceSize abi.UnpaddedPieceSize + var existingPieceSizes []abi.UnpaddedPieceSize + var piecesInfo []abi.PieceInfo + + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return nil, err + } + + maxPieceSize := abi.PaddedPieceSize(ssize).Unpadded() + + pieceRoot := filepath.Join(sb.sectors.GetRoot(), "pieces") + for ;len(*pieces) > 0; { + pieceSize += (*pieces)[0].Size + if pieceSize > maxPieceSize { + return nil, xerrors.Errorf("Exists a piece whose size is bigger than 8MiB or is not power of two or the pieces is not sorted") + } else if pieceSize == maxPieceSize { + addPieces = append(addPieces, (*pieces)[0]) + (*pieces) = (*pieces)[1:] + break + } + addPieces = append(addPieces, (*pieces)[0]) + (*pieces) = (*pieces)[1:] + } + + if pieceSize != maxPieceSize { + return nil, xerrors.Errorf("can not use the existing pieces to generate 8MiB piece") + } + + for _, piece := range(addPieces) { + filename := filepath.Join(pieceRoot, fmt.Sprintf("%x.dat", piece.Commitment[:])) + file, err := os.Open(filename) + if err != nil { + return nil, err + } + defer func(){ + file.Close() + os.Remove(filename) + } () + fmt.Printf("Adding %x.dat\n", piece.Commitment[:]) + pieceInfo, err := sb.addPiece(ctx, sector, existingPieceSizes, piece.Size, file) + if err != nil { + return nil, err + } + existingPieceSizes = append(existingPieceSizes, piece.Size) + piecesInfo = append(piecesInfo, pieceInfo) + } + + return piecesInfo, nil +} + +func (sb *Sealer)addPiece( ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, @@ -264,7 +280,7 @@ func (sb *Sealer)AddPiece( maxPieceSize := abi.PaddedPieceSize(ssize) if offset.Padded()+pieceSize.Padded() > maxPieceSize { - return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset) + return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces with %d bytes of max bytes", pieceSize, sector, offset, maxPieceSize) } var done func() @@ -438,22 +454,6 @@ func pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) { return pieceCID, werr() } -func pieceCommitment(spt abi.RegisteredSealProof, in []byte) (storage.Hash, error) { - prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in))) - if err != nil { - return storage.Hash{}, xerrors.Errorf("getting tee reader pipe: %w", err) - } - - commP, err := ffi.GeneratePieceCommitmentFromFile(spt, prf, abi.UnpaddedPieceSize(len(in))) - if err != nil { - return storage.Hash{}, xerrors.Errorf("generating piece commitment: %w", err) - } - - _ = prf.Close() - - return commP, werr() -} - func ToReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { f, ok := r.(*os.File) if ok { @@ -950,6 +950,7 @@ func (sb *Sealer)GenerateWindowPoStProofs( return nil, skipped, xerrors.Errorf("pubSectorToPriv skipped some sectors") } + log.Infof("Generating Window-PoSt Proof") proof, faulty, err := ffi.GenerateWindowPoSt(minerID, privsectors, randomness) var faultyIDs []abi.SectorID @@ -1081,8 +1082,8 @@ func (v Verifier) VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProo func (v Verifier) VerifyWindowPoSt( sectors []storage.SectorRef, - randomness abi.PoStRandomness, proofs []spproof.PoStProof, + randomness abi.PoStRandomness, proverID abi.ActorID, ) (bool, error) { chanllendedSectors := make([]spproof.SectorInfo, len(sectors)) @@ -1182,21 +1183,24 @@ func min(x, y uint32) uint32 { return y } -func nextUppandedPowerOfTwo(index uint32) uint32 { - power := 0 - for index = index / 254; index != 0 ; power += 1 { - index >>= 1 +func check(in, out []byte) (bool, error){ + if len(in) != len(out) { + return false, xerrors.Errorf("the %d output data and %d input data do not match", len(out), len(in)) } - return 254 * (1 << power) -} -func to32ByteHash(in []byte) ([]storage.Hash, error) { - if len(in) % 32 != 0 { - return nil, xerrors.Errorf("lenth of the hash arr must be multiple of 32") - } - hash := make([]storage.Hash, len(in)/32) - for index := 0; index < len(hash); index++ { - copy(hash[index][:], in[index*32:index*32+32]) + for index := 0; index < len(in); index++ { + if in[index] != out[index] { + return false, xerrors.Errorf("the output data and input data do not match at: %d input is %u, output is %u",index,in[index],out[index]) + } } - return hash, nil -} \ No newline at end of file + + return true, nil +} + +// func nextUppandedPowerOfTwo(index uint32) abi.UnpaddedPieceSize { +// power := 0 +// for index = index / 254; index != 0 ; power += 1 { +// index >>= 1 +// } +// return abi.UnpaddedPieceSize(254 * (1 << power)) +// } \ No newline at end of file diff --git a/seal/test.go b/seal/test.go index 2144002..0602534 100644 --- a/seal/test.go +++ b/seal/test.go @@ -19,7 +19,7 @@ import( const minerID = 1000 func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { - sdir, err := homedir.Expand("~/dzh/bench") + sdir, err := homedir.Expand("~/tmp/bench") if err != nil { return err } @@ -43,14 +43,14 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { return err } - sbfs := &basicfs.Provider{ + sbfs := &basicfs.Manager{ Root: tsdir, } sb ,err := New(sbfs) if err != nil{ return err } - sp := &Provider{ + sp := &Encoder{ Root: tsdir, } @@ -63,6 +63,7 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { var postProofs []spproof.PoStProof var randomnesses []abi.PoStRandomness var sectorCount []uint + var sortedPieces []storage.Piece var index = 0 for i := 0; i < numAggregate; i++ { filename := filepath.Join(tsdir, "input.dat") @@ -76,22 +77,24 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { } defer in.Close() - _, piecesHash, err := sp.EncodeDataToPieces(ctx, sectorSize, in) + finalPiece, pieces, err := sp.EncodeDataToPieces(ctx, sectorSize, in) if err != nil{ return err } + sortedPieces = Insert(sortedPieces, pieces, finalPiece) + fmt.Printf("[%d] sortedPieces [%d] pieces\n", len(sortedPieces), len(pieces)) + } + printPieces(sortedPieces) + + var perr error + for{ var infos []spproof.AggregateSealVerifyInfo var sealedSectors []spproof.SectorInfo var sectors []storage.SectorRef var proofs []spproof.Proof - for _, pieceHash := range piecesHash { - filename = filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", pieceHash[:])) - f, err := os.Open(filename) - if err != nil { - return err - } - + var pieces []abi.PieceInfo + for i := 0; i < 4; i++ { sid := storage.SectorRef{ ID: abi.SectorID{ Miner: minerID, @@ -99,14 +102,11 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { }, ProofType: spt(sectorSize), } - pieceInfo, err := sb.AddPiece(ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), f) - if err != nil { - return err + pieces, perr = sb.AddPiece(ctx, sid, &sortedPieces) + if perr != nil { + break } - var pieces []abi.PieceInfo - pieces = append(pieces, pieceInfo) - cids, err := sb.Sealed(ctx, sid, pieces) if err != nil { return err @@ -130,6 +130,10 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { proofs = append(proofs, proof) index++ } + if perr != nil { + fmt.Println(perr.Error()) + break + } // aggregateInfo := spproof.AggregateSealVerifyProofAndInfos{ @@ -182,4 +186,23 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { fmt.Println("verify failed") } return nil +} + +func printPieces(sortedPieces []storage.Piece) { + for _, piece := range(sortedPieces) { + fmt.Printf("[%d] %x.dat\n", int(piece.Size), piece.Commitment[:]) + } +} + +func Insert(sortedPieces []storage.Piece, pieces []storage.Piece, finalPiece storage.Piece) ([]storage.Piece) { + var i int + var res []storage.Piece + for i = len(sortedPieces)-1; i >= 0; i-- { + if sortedPieces[i].Size >= finalPiece.Size { + break + } + } + res = append(pieces, sortedPieces[:i+1]...) + res = append(res, finalPiece) + return append(res, sortedPieces[i+1:]...) } \ No newline at end of file diff --git a/seal/test_seal.go b/seal/test_seal.go index a9de054..d0e9ead 100644 --- a/seal/test_seal.go +++ b/seal/test_seal.go @@ -6,6 +6,7 @@ import( "io" "io/ioutil" "os" + "sync" "math/rand" "path/filepath" "time" @@ -20,6 +21,7 @@ import( spproof "fil_integrate/build/proof" "fil_integrate/build" "fil_integrate/build/storage" + // "fil_integrate/build/pieces" "fil_integrate/build/storiface" // "fil_integrate/extern/sector-storage/ffiwrapper" "fil_integrate/seal/basicfs" @@ -54,7 +56,7 @@ func TestAggregateWindowPoSt( if err := os.MkdirAll(tsdir, 0775); err != nil && !os.IsExist(err){ return err } - sbfs := &basicfs.Provider{ + sbfs := &basicfs.Manager{ Root: tsdir, } sb ,err := New(sbfs) @@ -88,7 +90,7 @@ func TestAggregateWindowPoSt( ProofType: sealProofType, } - piece, err := sb.AddPiece(ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), file) + piece, err := sb.addPiece(ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), file) if err != nil { return err } @@ -251,7 +253,7 @@ func TestSealAndUnseal() error { if err := os.MkdirAll(tsdir, 0775); err != nil { return err } - sbfs := &basicfs.Provider{ + sbfs := &basicfs.Manager{ Root: tsdir, } sb ,err := New(sbfs) @@ -279,7 +281,7 @@ func TestSealAndUnseal() error { var sealedSectors []spproof.SectorInfo var sectors []storage.SectorRef - piece, err := sb.AddPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/4).Unpadded(), file) + piece, err := sb.addPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/4).Unpadded(), file) if err != nil { return err } @@ -287,15 +289,15 @@ func TestSealAndUnseal() error { existingPieceSizes = append(existingPieceSizes, piece.Size.Unpadded()) pieces = append(pieces, piece) - // piece, err = sb.AddPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/4).Unpadded(), file) - // if err != nil { - // return err - // } + piece, err = sb.addPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/4).Unpadded(), file) + if err != nil { + return err + } - // existingPieceSizes = append(existingPieceSizes, piece.Size.Unpadded()) - // pieces = append(pieces, piece) + existingPieceSizes = append(existingPieceSizes, piece.Size.Unpadded()) + pieces = append(pieces, piece) - piece, err = sb.AddPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/2).Unpadded(), file) + piece, err = sb.addPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/2).Unpadded(), file) if err != nil { return err } @@ -352,7 +354,7 @@ func TestSealAndUnseal() error { wpproof, _, err := sb.GenerateWindowPoStProofs(ctx, sid.ID.Miner, sealedSectors, challenge[:]) - ok, err = ProofVerifier.VerifyWindowPoSt(sectors, challenge[:], wpproof, sid.ID.Miner) + ok, err = ProofVerifier.VerifyWindowPoSt(sectors, wpproof, challenge[:], sid.ID.Miner) if err != nil { return err } @@ -389,7 +391,7 @@ func TestSplitDataInToPieces() error { if err := os.MkdirAll(tsdir, 0775); err != nil { return err } - sbfs := &basicfs.Provider{ + sbfs := &basicfs.Manager{ Root: tsdir, } sb ,err := New(sbfs) @@ -397,7 +399,7 @@ func TestSplitDataInToPieces() error { return err } - sp := &Provider{ + sp := &Encoder{ Root: tsdir, } ctx := context.TODO() @@ -418,13 +420,15 @@ func TestSplitDataInToPieces() error { } defer in.Close() - finalHash, piecesHash, err := sp.EncodeDataToPieces(ctx, sectorSize, in) + start := time.Now() + final, pieces, err := sp.EncodeDataToPieces(ctx, sectorSize, in) if err != nil{ return err } + fmt.Printf("using %s\n", time.Now().Sub(start)) - for i, pieceHash := range(piecesHash) { - var pieces []abi.PieceInfo + for i, piece := range(pieces) { + // var ppieces []abi.PieceInfo sid := storage.SectorRef{ ID: abi.SectorID{ Miner: 1000, @@ -433,44 +437,44 @@ func TestSplitDataInToPieces() error { ProofType: spt(sectorSize), } - filename = filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", pieceHash[:])) + filename = filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", piece.Commitment[:])) f, err := os.OpenFile(filename, os.O_RDONLY|os.O_CREATE, 0644) if err != nil { return err } defer f.Close() - piece, err := sb.AddPiece(ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), f) + ppiece, err := sb.addPiece(ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), f) if err != nil { return err } - pieces = append(pieces, piece) - cids, err := sb.Sealed(ctx, sid, pieces) - // commp, err := commcid.CIDToPieceCommitmentV1(piece.PieceCID) - if err != nil { - return err - } + // ppieces = append(ppieces, ppiece) + // cids, err := sb.Sealed(ctx, sid, ppieces) + // if err != nil { + // return err + // } - commp, err := commcid.CIDToPieceCommitmentV1(cids.Unsealed) + commp, err := commcid.CIDToPieceCommitmentV1(ppiece.PieceCID) if err != nil { return err } - if string(commp[:]) != string(pieceHash[:]) { - fmt.Printf("commp and piece hash mismatch, %x != %x\n", commp[:], pieceHash[:]) - } else { - fmt.Printf("commp and piece hash match, %x == %x\n", commp[:], pieceHash[:]) + if string(commp[:]) != string(piece.Commitment[:]) { + fmt.Printf("commp and piece hash mismatch, %x != %x\n", commp[:], piece.Commitment[:]) } } filename = filepath.Join(root, "output.dat") + if _, err = os.Stat(filename); !os.IsNotExist(err) { + os.Remove(filename) + } out, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { return err } defer out.Close() - err = decodePiecesToData(sp, ctx, tsdir, sectorSize, finalHash, out) + err = decodePiecesToData(ctx, tsdir, sectorSize, final.Commitment, out) if err != nil { return err } @@ -491,14 +495,43 @@ func TestSplitDataInToPieces() error { func Test() int { var buf1 []byte var buf2 []byte - buf1 = append(buf1, 0,1,2,3) - buf2 = append(buf2, 10,20,30,40) - buf1 = append(buf2, buf1...) - fmt.Println(buf1, len(buf1), buf1[3]) - fmt.Println(buf2, len(buf2), buf2[3]) + for i := byte(0); i < byte(32); i++ { + buf1 = append(buf1, i) + buf2 = append(buf2, i) + } + + var wg sync.WaitGroup + wg.Add(8) + + for i := 0; i < 8; i++ { + go func(i int) { + defer wg.Done() + if i % 2 == 0 { + popFront(&buf1) + } else { + printBytes(&buf1) + } + // fmt.Printf("[%x] buf1\n", buf1[:]) + }(i) + } + wg.Wait() + fmt.Printf("[%x] buf1\n", buf1[:]) + // buf3 := popFront(&buf1) + // fmt.Printf("[%x] buf1\n[%x] buf2\n[%x] buf3\n", buf1[:], buf2[:], buf3[:]) return 0 } +func popFront(d *[]byte) []byte{ + time.Sleep(10 * time.Millisecond) + *d = (*d)[2:] + return *d +} + +func printBytes(d *[]byte) { + time.Sleep(10 * time.Millisecond) + fmt.Printf("[%x]\n", (*d)[:]) +} + func getCommRFromDir(root string, sectorID abi.SectorID) (cid.Cid, error) { commr := make([]byte, 32) path := filepath.Join(root, "cache", storiface.SectorName(sectorID), "commr") @@ -546,7 +579,11 @@ func spt(ssize abi.SectorSize) abi.RegisteredSealProof { } func generateRandomData(filename string, b []byte) ([]byte, error) { - Datasize := 256*1024*1024 + if _, err := os.Stat(filename); !os.IsNotExist(err) { + os.Remove(filename) + } + r := rand.New(rand.NewSource(time.Now().UnixNano())) + Datasize := (r.Intn(1024*1024) + 1024*1024)*32 buf := make([]byte, Datasize) for i:=0; i<Datasize; i=i+32{ tmp := blake2b.Sum256(b) @@ -565,9 +602,8 @@ func generateRandomData(filename string, b []byte) ([]byte, error) { return b, nil } -func decodePiecesToData(sp *Provider, ctx context.Context, tsdir string, sectorSize abi.SectorSize, finalHash storage.Hash, out io.Writer) error { +func decodePiecesToData(ctx context.Context, tsdir string, sectorSize abi.SectorSize, finalHash storage.Hash, out io.Writer) error { // var piecesHash []storage.Hash - DataLen := abi.PaddedPieceSize(sectorSize).Unpadded() - 8 filename := filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", finalHash[:])) fmt.Printf("Decode: %x.dat\n", finalHash[:]) file, err := os.OpenFile(filename, os.O_RDONLY, 0644) @@ -577,7 +613,7 @@ func decodePiecesToData(sp *Provider, ctx context.Context, tsdir string, sectorS defer file.Close() // hasPre, preHash, Data, commData, err := sb.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen)) - data, err := sp.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen)) + data, err := DecodePiece(ctx, sectorSize, file) if err != nil { return err } @@ -594,7 +630,7 @@ func decodePiecesToData(sp *Provider, ctx context.Context, tsdir string, sectorS defer file.Close() // hasPre, preHash, Data, hashData, err = sb.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen)) - data, err = sp.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen)) + data, err = DecodePiece(ctx, sectorSize, file) if err != nil{ return err } @@ -605,7 +641,7 @@ func decodePiecesToData(sp *Provider, ctx context.Context, tsdir string, sectorS filename = filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", pieceHash[:])) fmt.Printf("Decode: %x.dat\n", pieceHash[:]) file, err := os.OpenFile(filename, os.O_RDONLY, 0644) - data, err := sp.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen)) + data, err := DecodePiece(ctx, sectorSize, file) if err != nil { return err } diff --git a/seal/type.go b/seal/type.go index 6806be5..4f89f1b 100644 --- a/seal/type.go +++ b/seal/type.go @@ -17,16 +17,15 @@ import( var b = blake2b.Sum256([]byte("randomness")) var Ticket abi.SealRandomness = abi.SealRandomness(b[:]) -type PieceProvider interface { +type PieceEncoder interface { // Split and encode data into pieces // Pieces structure is [ Tag | MetaData | HashData ] or [ Tag | PreHash | HashData] - EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Hash, []storage.Hash, error) - DecodePiece(ctx context.Context, sectorSize abi.SectorSize, in io.Reader, start storiface.UnpaddedByteIndex, end storiface.UnpaddedByteIndex) (storage.DecodedData, error) + EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Piece, []storage.Piece, error) } //interface type SectorSealer interface{ - AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) + AddPiece(context.Context, storage.SectorRef, *[]storage.Piece) ([]abi.PieceInfo, error) // run pre-commit1 and pre-commit2 phase // generate the sealed sector and sector commitment(commd, commr) @@ -46,11 +45,11 @@ type SectorVerifier interface{ VerifySeal(info spproof.SealVerifyInfo) (bool, error) VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (bool, error) - VerifyWindowPoSt(sectors []storage.SectorRef, randomness abi.PoStRandomness, proofs []spproof.PoStProof, proverID abi.ActorID) (bool, error) + VerifyWindowPoSt(sectors []storage.SectorRef, proofs []spproof.PoStProof, randomness abi.PoStRandomness, proverID abi.ActorID) (bool, error) VerifyAggregateWindowPostProofs(sectors [][]storage.SectorRef, proof spproof.Proof, randomnesses []abi.PoStRandomness, proverID abi.ActorID) (bool, error) } -type SectorProvider interface { +type SectorManager interface { GetRoot() (string) // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist // * returns an error when allocate is set, and existing isn't, and the sector exists @@ -58,4 +57,4 @@ type SectorProvider interface { AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) } -var _ SectorProvider = &basicfs.Provider{} \ No newline at end of file +var _ SectorManager = &basicfs.Manager{} \ No newline at end of file -- 2.18.1