Commit f36f16c5 authored by 董子豪's avatar 董子豪

add encode and decode data method

parent ff46a753
......@@ -9,6 +9,8 @@ import (
type Data = io.Reader
type Hash = [32]byte
type SectorRef struct {
ID abi.SectorID
ProofType abi.RegisteredSealProof
......
......@@ -109,12 +109,8 @@ func ParseSectorID(baseName string) (abi.SectorID, error) {
}, nil
}
func SectorName(sid abi.SectorID, numbers ...int32) string {
out := fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
for _, number := range numbers{
out = fmt.Sprintf("%s-%d", out, number)
}
return out
func SectorName(sid abi.SectorID) string {
return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
}
func PathByType(sps SectorPaths, fileType SectorFileType) string {
......
......@@ -24,8 +24,10 @@ func main() {
Usage: "Benchmark performance of seal and window-post",
Version: "1.11.1",
Commands: []*cli.Command{
test,
testSealCmd,
testAggregationCmd,
testSplitDataCmd,
},
}
......@@ -35,6 +37,16 @@ func main() {
}
}
var test = &cli.Command{
Name: "test",
Usage: "Test interface",
Action: func(c *cli.Context) error {
// Test 8MiB sector
seal.Test()
return nil
},
}
var testSealCmd = &cli.Command{
Name: "test-seal",
Usage: "Test interface",
......@@ -48,6 +60,19 @@ var testSealCmd = &cli.Command{
},
}
var testSplitDataCmd = &cli.Command{
Name: "test-split",
Usage: "Test interface",
Action: func(c *cli.Context) error {
// Test 8MiB sector
err := seal.TestSplitDataInToPieces()
if err != nil {
return err
}
return nil
},
}
var testAggregationCmd = &cli.Command{
Name: "test-aggregation",
Usage: "Test interface",
......
......@@ -24,6 +24,7 @@ require (
go.opencensus.io v0.23.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
github.com/libp2p/go-libp2p-core v0.8.5
github.com/minio/sha256-simd v0.1.1
)
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
......
......@@ -28,7 +28,7 @@ func (b *Provider) GetRoot() string {
return b.Root
}
func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error) {
func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
......@@ -68,7 +68,7 @@ func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, exis
return storiface.SectorPaths{}, nil, ctx.Err()
}
path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id.ID, numbers...))
path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id.ID))
prevDone := done
done = func() {
......
......@@ -8,13 +8,16 @@ import(
"runtime"
"sync"
"path/filepath"
"encoding/binary"
"bytes"
"fmt"
"golang.org/x/xerrors"
logging "github.com/ipfs/go-log/v2"
commcid "github.com/filecoin-project/go-fil-commcid"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/minio/sha256-simd"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/ipfs/go-cid"
......@@ -27,9 +30,11 @@ import(
var log = logging.Logger("sealing")
var piecesHashMap map[string]int = make(map[string]int)
//32字节,总共256位
//[has_pre][MetaLen1..MetaLen4][PieceLen1..PieceLen4]
const TagLen int = 8
const TagLen uint32 = 8
const NewestNetworkVersion = network.Version13
......@@ -89,8 +94,7 @@ func (sb *Sealer)AddPiece(
sector storage.SectorRef,
existingPieceSizes []abi.UnpaddedPieceSize,
pieceSize abi.UnpaddedPieceSize,
file storage.Data,
numbers ...int32,
file storage.Data,
) (abi.PieceInfo, error) {
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
......@@ -128,7 +132,7 @@ func (sb *Sealer)AddPiece(
var stagedPath storiface.SectorPaths
if len(existingPieceSizes) == 0 {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, storiface.FTUnsealed, storiface.PathSealing, numbers...)
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, storiface.FTUnsealed, storiface.PathSealing)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
......@@ -138,7 +142,7 @@ func (sb *Sealer)AddPiece(
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
}
} else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathSealing, numbers...)
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathSealing)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
......@@ -593,146 +597,211 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storag
}
// 没有测试
// func EncodeDataToPieces(sb *ffiwrapper.Sealer, ctx context.Context, sector SectorRef, sectorSize abi.SectorSize, file Data) (abi.PieceInfo, error) {
// var piecesID []byte
// var FinalPiece abi.PieceInfo
// var pieceNumber int32 = 1
// UnpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
// buf := make([]byte, UnpaddedSectorSize)
// DataLen := UnpaddedSectorSize-TagLen
// for{
// copy(buf[:TagLen], 0)
// var MetaLen int = 0
// var rerr error
// for rbuf := buf[TagLen:]; len(rbuf) > 0; {
// n, rerr := file.read(buf[TagLen:])
// if rerr != nil && rerr != io.EOF{
// return nil, rerr
// }
// rbuf = rbuf[n:]
// MetaLen += n
// if rerr == io.EOF{
// break
// }
// }
// if rerr == io.EOF{
// //encode first sector
// pieceNumber, FinalPiece, err = EncodeData(buf, MetaLen, DataLen, piecesID, pieceNumber)
// if err != nil{
// return nil, err
// }
// break
// }
// copy(buf[:4], []byte(MetaLen))
// piece, err := sb.AddPiece(ctx, sector, nil, UnpaddedSectorSize, bytes.NewReader(buf), pieceNumber)
// if err != nil{
// return nil, err
// }
// piecesID = append(pieces, commcid.CIDToPieceCommitmentV1(piece.PieceCID)...)
// pieceNumber += 1
// }
// return pieceNumber, FinalPiece, nil
// }
// func EncodeData(buf []byte, MetaLen int, DataLen int, piecesID []byte, pieceNumber int32) (int32, abi.PieceInfo, error) {
// //encode first sector
// remain := len(piecesID)
// CommLen := min(remain, ((DataLen-MetaLen)/32) * 32)
// rbuf := buf[MetaLen+TagLen:]
// copy(buf[:4], []byte(MetaLen))
// copy(buf[4:8], []byte(CommLen))
// copy(rbuf, piecesID[:CommLen])
// copy(rbuf[CommLen:], 0)
// piecesID = piecesID[CommLen:]
// MetaLen = 0
// remain -= CommLen
// rbuf = buf[TagLen+32:]
// prePiece, err := sb.AddPiece(ctx, sector, nil, UnpaddedSectorSize, bytes.NewReader(buf), pieceNumber)
// pieceNumber += 1
// if err != nil{
// return nil, err
// }
// for ;remain > 0; {
// //encode next n sector
// CommLen := min(remain, ((DataLen-32)/32) * 32)
// copy(buf[:4], []byte(MetaLen | 0x80000000))
// copy(buf[4:8], []byte(CommLen))
// copy(buf[8:40], commcid.CIDToPieceCommitmentV1(prePiece.PieceCID)...)
// copy(rbuf, piecesID[:CommLen])
// copy(rbuf[CommLen:], 0)
// piecesID = piecesID[CommLen:]
// MetaLen = 0
// remain -= CommLen
// rbuf = buf[TagLen:]
// prePiece, err = sb.AddPiece(ctx, sector, nil, UnpaddedSectorSize, bytes.NewReader(buf), pieceNumber)
// pieceNumber += 1
// if err != nil{
// return nil, err
// }
// }
// return pieceNumber, prePiece, nil
// }
// func DecodePieceToData(sb *ffiwrapper.Sealer, ctx context.Context, out io.Writer, finalPiece abi.PieceInfo, pieceNum int32, sectorSize abi.SectorSize) ([]abi.PieceInfo, error){
// var piecesID []abi.PieceInfo
// var commds []byte
// unpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
// buf := make([]byte, unpaddedSectorSize)
// DataLen := unpaddedSectorSize-TagLen
// for ;pieceNum > 0; PieceNum-- {
// ok, err := sb.ReadPiece(ctx, sid, bytes.NewWriter(buf), 0, unpaddedSectorSize, pieceNum)
// if err != nil {
// return nil
// }
// if !ok {
// return xerrors.Errorf("Read pieces error!")
// }
// MetaLen := buf[0:4] & 0x7fffffff
// HasPre := buf[0:4] >> 31
// CommLen := buf[4:8]
// rbuf := buf[8:]
// if HasPre {
// prePiece := buf[8:40]
// CommLen -= 32
// rbuf = rbuf[32:]
// commd, _ := commcid.DataCommitmentV1ToCID(prePiece)
// piecesID = append(piecesID, abi.PieceInfo{abi.PaddedPieceSize(sectorSize), commd})
// }
// data := rbuf[:MetaLen]
// commds = append(commds, rbuf[MetaLen:MetaLen+CommLen]...)
// //**data顺序错了,需要
// n, werr := out.write(data[:])
// if werr != nil {
// return nil, werr
// }
// }
// for cbuf := commds; len(cbuf) > 0; {
// commd, _ := cbuf.DataCommitmentV1ToCID(commds[32])
// piecesID = append(piecesID, abi.PieceInfo{abi.PaddedPieceSize(sectorSize), commd})
// cbuf = cbuf[32:]
// }
// return piecesID, nil
// }
// Data contains [ MetaData | HashData ]
// Pieces structure is [ Tag | MetaData | HashData ]
func (sb *Sealer) EncodeDataToPieces(
ctx context.Context,
sectorSize abi.SectorSize,
file storage.Data,
) (storage.Hash, error) {
var hashData []byte
var FinalPieceHash storage.Hash
root := filepath.Join(sb.sectors.GetRoot(), "pieces")
if err := os.Mkdir(root, 0755); err != nil && !os.IsExist(err) { // nolint
return storage.Hash{}, err
}
UnpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
buf := make([]byte, UnpaddedSectorSize)
DataLen := (uint32)(UnpaddedSectorSize)-TagLen
for{
memset(buf[:TagLen], nil)
var MetaLen int = 0
var n int
var rerr error
for rbuf := buf[TagLen:]; len(rbuf) > 0; {
n, rerr = file.Read(rbuf[:])
if rerr != nil && rerr != io.EOF{
return storage.Hash{}, rerr
}
rbuf = rbuf[n:]
MetaLen += n
if rerr == io.EOF{
break
}
}
if rerr == io.EOF{
//encode first sector
var err error
FinalPieceHash, err = sb.EncodeData(buf, uint32(MetaLen), DataLen, hashData)
if err != nil{
return storage.Hash{}, err
}
break
}
binary.BigEndian.PutUint32(buf[:4], uint32(MetaLen))
pieceHash := computeHash(buf[:])
for ; piecesHashMap[string(pieceHash[:])] == 1; pieceHash = computeHash(pieceHash[:]){}
piecesHashMap[string(pieceHash[:])] = 1
fmt.Printf("Encode: %x.dat\n", pieceHash[:])
filename := filepath.Join(root, fmt.Sprintf("%x.dat", pieceHash[:]))
wfile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil{
return storage.Hash{}, err
}
for wbuf := buf; len(wbuf) > 0; {
n, err := wfile.Write(wbuf)
if err != nil{
return storage.Hash{}, err
}
wbuf = wbuf[n:]
}
wfile.Close()
hashData = append(hashData, pieceHash[:]...)
}
return FinalPieceHash, nil
}
func (sb *Sealer) EncodeData(
buf []byte,
MetaLen uint32,
DataLen uint32,
hashData []byte,
) (storage.Hash, error) {
root := filepath.Join(sb.sectors.GetRoot(), "pieces")
var prePieceHash storage.Hash
var end uint32 = 0
for ;len(hashData) > 0; {
//encode next n sector
// end := len(buf)
if end != 0{
CommLen := min(uint32(len(hashData)), ((DataLen-32)/32) * 32)
binary.BigEndian.PutUint32(buf[:4], 0x80000000)
binary.BigEndian.PutUint32(buf[4:8], CommLen)
memset(buf[4:40], prePieceHash[:])
rbuf := buf[TagLen + 32:]
memset(rbuf, hashData[:CommLen])
memset(rbuf[CommLen:], nil)
hashData = hashData[CommLen:]
// if len(hashData) == 0 {
end = nextUppandedPowerOfTwo(TagLen + 32 + CommLen)
// }
} else {
CommLen := min(uint32(len(hashData)), ((DataLen-MetaLen)/32) * 32)
binary.BigEndian.PutUint32(buf[:4], MetaLen)
binary.BigEndian.PutUint32(buf[4:8], CommLen)
rbuf := buf[TagLen + MetaLen:]
memset(rbuf, hashData[:CommLen])
memset(rbuf[CommLen:], nil)
hashData = hashData[CommLen:]
end = nextUppandedPowerOfTwo(TagLen + MetaLen + CommLen)
}
prePieceHash = computeHash(buf[:])
for ; piecesHashMap[string(prePieceHash[:])] == 1; prePieceHash = computeHash(prePieceHash[:]){}
piecesHashMap[string(prePieceHash[:])] = 1
filename := filepath.Join(root, fmt.Sprintf("%x.dat", prePieceHash[:]))
fmt.Printf("Encode: %x.dat lenth:%d\n", prePieceHash[:], end)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil{
return storage.Hash{}, err
}
for wbuf := buf[:end]; len(wbuf) > 0; {
n, err := file.Write(wbuf)
if err != nil{
return storage.Hash{}, err
}
wbuf = wbuf[n:]
}
file.Close()
}
return prePieceHash, nil
}
func (sb *Sealer) DecodePiece(
ctx context.Context,
sectorSize abi.SectorSize,
in io.Reader,
start storiface.UnpaddedByteIndex,
end storiface.UnpaddedByteIndex,
) (bool, storage.Hash, []byte, []storage.Hash, error){
if start > end {
return false, storage.Hash{}, nil, nil, xerrors.Errorf("start must be less than end")
}
if start == end {
return false, storage.Hash{}, nil, nil, nil
}
unpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
buf := make([]byte, unpaddedSectorSize)
for rbuf := buf[:]; len(rbuf) > 0; {
n, err := in.Read(rbuf[:])
if err != nil && err != io.EOF{
return false, storage.Hash{}, nil, nil, err
}
rbuf = rbuf[n:]
if err == io.EOF {
break
}
}
var prePieceHash storage.Hash
var MetaLen uint32
var CommLen uint32
var data []byte
var dataHash []storage.Hash
var err error
binary.Read(bytes.NewReader(buf[0:4]), binary.BigEndian, &MetaLen)
binary.Read(bytes.NewReader(buf[4:8]), binary.BigEndian, &CommLen)
HasPre := MetaLen >> 31
MetaLen = MetaLen & 0x7fffffff
rbuf := buf[8:]
if HasPre != 0 {
copy(prePieceHash[:], buf[8:40])
rbuf = rbuf[32:]
}
if start > storiface.UnpaddedByteIndex(MetaLen) {
data = nil
dataHash, err = to32Byte(rbuf[start:end])
if err != nil {
return false, storage.Hash{}, nil, nil, err
}
// return HasPre != 0, prePieceHash, nil, rbuf[start:end], nil
} else if end < storiface.UnpaddedByteIndex(MetaLen) {
data = rbuf[start:end]
// return HasPre != 0, prePieceHash, rbuf[start:end], nil, nil
} else if end > storiface.UnpaddedByteIndex(MetaLen + CommLen) {
data = rbuf[start:MetaLen]
dataHash, err = to32Byte(rbuf[MetaLen:MetaLen+CommLen])
if err != nil {
return false, storage.Hash{}, nil, nil, err
}
// return HasPre != 0, prePieceHash, rbuf[start:MetaLen], to32Byte(rbuf[MetaLen:MetaLen+CommLen]), nil
} else {
data = rbuf[start:MetaLen]
dataHash, err = to32Byte(rbuf[MetaLen:end])
if err != nil {
return false, storage.Hash{}, nil, nil, err
}
}
return HasPre != 0, prePieceHash, data, dataHash, nil
}
//
func (sb *Sealer)CheckPieceAndDataRoot(
sid storage.SectorRef,
commd cid.Cid,
......@@ -853,7 +922,7 @@ func (sb *Sealer) SealCommit1(ctx context.Context, sector storage.SectorRef, tic
return output, nil
}
func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storage.Proof, error) {
func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) ([]byte, error) {
return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner)
}
......@@ -863,7 +932,7 @@ func (sb *Sealer)Sealed(
seed abi.InteractiveSealRandomness,
ticket abi.SealRandomness,
pieces []abi.PieceInfo,
) (storage.SectorCids, storage.Proof, error) {
) (storage.SectorCids, []byte, error) {
// var sealedSectors spproof.SectorInfo
log.Infof("[%d] Running replication(1)...", sid.ID.Number)
......@@ -896,7 +965,7 @@ func (sb *Sealer)Sealed(
return cids, proof, nil
}
func (sb *Sealer)AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos, proofs []storage.Proof) ([]byte, error){
func (sb *Sealer)AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error){
return ffi.AggregateSealProofs(aggregateInfo, proofs)
}
......@@ -1056,4 +1125,59 @@ func (Verifier)VerifyAggregateWindowPostProofs(aggregateInfo spproof.AggregateWi
func DefaultAggregationType() abi.RegisteredAggregationProof {
return abi.RegisteredAggregationProof_SnarkPackV1;
}
func memset(dst, src []byte) int {
if dst == nil {
return 0
}
if src == nil {
for n := 0; n < len(dst); n++ {
dst[n] = 0
}
return len(dst)
}
return copy(dst, src)
}
func min(x, y uint32) uint32 {
if x < y {
return x
}
return y
}
func computeHash(in []byte) storage.Hash {
var res [32]byte
chunk := 32 << 10
hash := sha256.New()
for buf := in; len(buf) > 0 ; buf = buf[chunk:] {
if chunk < len(buf){
hash.Write(buf[:chunk])
} else {
hash.Write(buf[:])
break
}
}
copy(res[:], hash.Sum(nil))
return storage.Hash(res)
}
func nextUppandedPowerOfTwo(index uint32) uint32 {
power := 0
for index = index / 254; index != 0 ; power += 1 {
index >>= 1
}
return 254 * (1 << power)
}
func to32Byte(in []byte) ([]storage.Hash, error) {
if len(in) % 32 != 0 {
return nil, xerrors.Errorf("lenth of the hash arr must be multiple of 32")
}
hash := make([]storage.Hash, len(in)/32)
for index := 0; index < len(hash); index++ {
copy(hash[index][:], in[index*32:index*32+32])
}
return hash, nil
}
\ No newline at end of file
......@@ -3,9 +3,11 @@ package seal
import(
"context"
"fmt"
"io"
"io/ioutil"
"os"
"math/rand"
"path/filepath"
"time"
"golang.org/x/xerrors"
......@@ -16,6 +18,7 @@ import(
spproof "fil_integrate/build/proof"
"fil_integrate/build"
"fil_integrate/build/storage"
"fil_integrate/build/storiface"
// "fil_integrate/extern/sector-storage/ffiwrapper"
"fil_integrate/seal/basicfs"
)
......@@ -130,7 +133,7 @@ func TestAggregateWindowPoSt(
for i := 0; i < numAggregate; i++{
log.Infof("[%d] Generating Window-Post", i)
proof, _, err := sb.GenProofForWindowPoSt(ctx, 1000, sealedSectorsinfo[i], challenge[:])
proof, _, err := sb.GenerateWindowPoStProofs(ctx, 1000, sealedSectorsinfo[i], challenge[:])
if err != nil {
return err
}
......@@ -214,7 +217,7 @@ func TestAggregateWindowPoSt(
func TestSealAndUnseal() error {
//********************need (sb,ctx,sid,sectorSize,file,seed,ticket,challenge)****************//
sdir, err := homedir.Expand("~/tmp")
sdir, err := homedir.Expand("~/tmp/bench")
if err != nil {
return err
}
......@@ -293,7 +296,7 @@ func TestSealAndUnseal() error {
SealProof: sid.ProofType,
})
proof2, err := sb.GenProofForC2(ctx, sid, seed, ticket, pieces, cids)
proof2, err := sb.GenerateCommit2Proof(ctx, sid, seed, ticket, pieces, cids)
if err != nil {
return err
}
......@@ -346,7 +349,7 @@ func TestSealAndUnseal() error {
return xerrors.Errorf("porep proof for sector %d was invalid", sid.ID.Number)
}
proof, _, err := sb.GenProofForWindowPoSt(ctx, sid.ID.Miner, sealedSectors, challenge[:])
proof, _, err := sb.GenerateWindowPoStProofs(ctx, sid.ID.Miner, sealedSectors, challenge[:])
wpvi := spproof.WindowPoStVerifyInfo{
Randomness: challenge[:],
......@@ -367,6 +370,97 @@ func TestSealAndUnseal() error {
}
func TestSplitDataInToPieces() error {
sdir, err := homedir.Expand("~/tmp/bench")
if err != nil {
return err
}
err = os.MkdirAll(sdir, 0775) //nolint:gosec
if err != nil {
return xerrors.Errorf("creating sectorbuilder dir: %w", err)
}
tsdir, err := ioutil.TempDir(sdir, "bench")
if err != nil {
return err
}
// defer func() {
// if err := os.RemoveAll(tsdir); err != nil {
// log.Warn("remove all: ", err)
// }
// }()
// TODO: pretty sure this isnt even needed?
if err := os.MkdirAll(tsdir, 0775); err != nil {
return err
}
sbfs := &basicfs.Provider{
Root: tsdir,
}
sb, err := New(sbfs)
if err != nil{
return err
}
ctx := context.TODO()
sectorSize := abi.SectorSize(4*1024*1024)
root, err := homedir.Expand("~/tmp")
if err != nil {
return err
}
filename := filepath.Join(root, "input.dat")
err = generateRandomData(filename)
if err != nil {
return err
}
in, err := os.OpenFile(filename, os.O_RDONLY, 0644)
if err != nil {
return err
}
defer in.Close()
finalHash, err := sb.EncodeDataToPieces(ctx, sectorSize, in)
if err != nil{
return err
}
filename = filepath.Join(root, "output.dat")
out, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
defer out.Close()
err = decodePiecesToData(sb, ctx, tsdir, sectorSize, finalHash, out)
if err != nil {
return err
}
ok, err := checkDecodedFile(root)
if err != nil {
return err
}
if !ok {
fmt.Println("decode pieces failed")
} else {
fmt.Println("decode pieces success")
}
return nil
}
func Test() int {
var buf1 []byte
var buf2 []byte
buf1 = append(buf1, 0,1,2,3)
buf2 = append(buf2, 10,20,30,40)
buf1 = append(buf2, buf1...)
fmt.Println(buf1, len(buf1), buf1[4])
fmt.Println(buf2, len(buf2), buf2[4])
return 0
}
func spt(ssize abi.SectorSize) abi.RegisteredSealProof {
spt, err := build.SealProofTypeFromSectorSize(ssize, NewestNetworkVersion)
if err != nil {
......@@ -374,4 +468,149 @@ func spt(ssize abi.SectorSize) abi.RegisteredSealProof {
}
return spt
}
func generateRandomData(filename string) error {
Datasize := 128*1024*1024
buf := make([]byte, Datasize)
b := []byte("random string!")
for i:=0; i<Datasize; i=i+32{
temphash := blake2b.Sum256(b)
copy(b, temphash[:])
copy(buf[i:i+32],temphash[:])
}
_ = os.Remove(filename)
f,err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(buf[:])
if err != nil {
return err
}
return nil
}
func decodePiecesToData(sb *Sealer, ctx context.Context, tsdir string, sectorSize abi.SectorSize, finalHash storage.Hash, out io.Writer) error {
var hashData []storage.Hash
DataLen := abi.PaddedPieceSize(sectorSize).Unpadded() - 8
filename := filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", finalHash[:]))
fmt.Printf("Decode: %x.dat\n", finalHash[:])
file, err := os.OpenFile(filename, os.O_RDONLY, 0644)
if err != nil {
return err
}
defer file.Close()
hasPre, preHash, metaData, commData, err := sb.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen))
if err != nil {
return err
}
for ; hasPre; {
commData = append(hashData, commData...)
filename = filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", preHash[:]))
fmt.Printf("Decode: %x.dat\n", preHash[:])
file, err := os.OpenFile(filename, os.O_RDONLY, 0644)
if err != nil {
return err
}
defer file.Close()
hasPre, preHash, metaData, hashData, err = sb.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen))
if err != nil{
return err
}
}
for _, pieceHash := range commData {
filename = filepath.Join(tsdir, "pieces", fmt.Sprintf("%x.dat", pieceHash[:]))
fmt.Printf("Decode: %x.dat\n", pieceHash[:])
file, err := os.OpenFile(filename, os.O_RDONLY, 0644)
_, _, data, _, err := sb.DecodePiece(ctx, sectorSize, file, 0, storiface.UnpaddedByteIndex(DataLen))
if err != nil {
return err
}
for wbuf := data[:]; len(wbuf) > 0; {
n, err := out.Write(wbuf)
if err != nil{
return err
}
wbuf = wbuf[n:]
}
}
for wbuf := metaData[:]; len(wbuf) > 0; {
n, err := out.Write(wbuf)
if err != nil{
return err
}
wbuf = wbuf[n:]
}
return nil
}
func checkDecodedFile(root string) (bool, error) {
filename := filepath.Join(root, "input.dat")
in, err := os.Open(filename)
if err != nil {
return false, err
}
defer in.Close()
filename = filepath.Join(root, "output.dat")
out, err := os.Open(filename)
if err != nil {
return false, err
}
defer out.Close()
inBuf := make([]byte, 2<<20)
outBuf := make([]byte, 2<<20)
for{
var readin int
var readout int
for wbuf := inBuf[:]; len(wbuf) > 0; {
n, err := in.Read(wbuf)
if err != nil && err != io.EOF{
return false, err
}
wbuf = wbuf[n:]
readin += n
if err == io.EOF {
break
}
}
for wbuf := outBuf[:]; len(wbuf) > 0; {
n, err := out.Read(wbuf)
if err != nil && err != io.EOF{
return false, err
}
wbuf = wbuf[n:]
readout += n
if err == io.EOF {
break
}
}
if readin != readout {
return false, xerrors.Errorf("the output data and input data do not match")
}
if readin == 0 {
break
}
for index := 0; index < readin; index++ {
if inBuf[index] != outBuf[index] {
return false, xerrors.Errorf("the output data and input data do not match")
}
}
}
return true, nil
}
\ No newline at end of file
......@@ -15,9 +15,14 @@ import(
//interface
type SectorSealer interface{
AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data, numbers ...int32) (abi.PieceInfo, error)
AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error)
CheckPieceAndDataRoot(sid storage.SectorRef, commd cid.Cid, pieces []abi.PieceInfo) (bool, error)
Sealed(ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.SectorCids, storage.Proof, error)
Sealed(ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.SectorCids, []byte, error)
// Split and encode data into pieces
// Pieces structure is [ Tag | MetaData | HashData ] or [ Tag | PreHash | HashData]
EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Hash, error)
DecodePiece(ctx context.Context, sectorSize abi.SectorSize, in io.Reader, start storiface.UnpaddedByteIndex, end storiface.UnpaddedByteIndex) (bool, storage.Hash, []byte, []storage.Hash, error)
GenerateCommit2Proof( ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Proof, error)
GenerateWindowPoStProofs(ctx context.Context, minerID abi.ActorID, sectorInfo []spproof.SectorInfo, randomness abi.PoStRandomness) ([]spproof.PoStProof, []abi.SectorID, error)
UnsealedRange(ctx context.Context, sid storage.SectorRef, sectorSize abi.SectorSize, ticket abi.SealRandomness, commd cid.Cid, out io.Writer, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
......@@ -37,7 +42,7 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment