Commit 391bf3d6 authored by 董子豪's avatar 董子豪

modify piece hash construction in EncodeDataToPieces

parent 79c7865a
......@@ -11,6 +11,13 @@ type Data = io.Reader
type Hash = [32]byte
type DecodedData struct {
HasPre bool
PreHash Hash
Data []byte
PieceHash []Hash
}
type SectorRef struct {
ID abi.SectorID
ProofType abi.RegisteredSealProof
......
......@@ -4,6 +4,7 @@ import(
"bufio"
"context"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
......@@ -17,7 +18,6 @@ import(
logging "github.com/ipfs/go-log/v2"
commcid "github.com/filecoin-project/go-fil-commcid"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/minio/sha256-simd"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/ipfs/go-cid"
......@@ -30,8 +30,6 @@ import(
var log = logging.Logger("sealing")
var piecesHashMap map[string]int = make(map[string]int)
//32字节,总共256位
//[has_pre][MetaLen1..MetaLen4][PieceLen1..PieceLen4]
const TagLen uint32 = 8
......@@ -204,7 +202,7 @@ func (sb *Sealer)AddPiece(
throttle <- pbuf
}()
c, err := sb.pieceCid(sector.ProofType, pbuf[:read])
c, err := pieceCid(sector.ProofType, pbuf[:read])
done <- struct {
cid.Cid
error
......@@ -270,7 +268,7 @@ func (sb *Sealer)AddPiece(
}
func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) {
func pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) {
prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in)))
if err != nil {
return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err)
......@@ -286,6 +284,22 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
return pieceCID, werr()
}
func pieceCommitment(spt abi.RegisteredSealProof, in []byte) (storage.Hash, error) {
prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in)))
if err != nil {
return storage.Hash{}, xerrors.Errorf("getting tee reader pipe: %w", err)
}
commP, err := ffi.GeneratePieceCommitmentFromFile(spt, prf, abi.UnpaddedPieceSize(len(in)))
if err != nil {
return storage.Hash{}, xerrors.Errorf("generating piece commitment: %w", err)
}
_ = prf.Close()
return commP, werr()
}
func ToReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {
......@@ -596,20 +610,21 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storag
return true, nil
}
// 没有测试
// Data contains [ MetaData | HashData ]
// Data contains MetaData and HashData
// Pieces structure is [ Tag | MetaData | HashData ]
func (sb *Sealer) EncodeDataToPieces(
ctx context.Context,
sectorSize abi.SectorSize,
file storage.Data,
) (storage.Hash, error) {
) (storage.Hash, []storage.Hash, error) {
var hashData []byte
var FinalPieceHash storage.Hash
var piecesHash []storage.Hash
var finalPieceHash storage.Hash
root := filepath.Join(sb.sectors.GetRoot(), "pieces")
if err := os.Mkdir(root, 0755); err != nil && !os.IsExist(err) { // nolint
return storage.Hash{}, err
err := os.Mkdir(root, 0755)
if err != nil && !os.IsExist(err) { // nolint
return storage.Hash{}, nil, err
}
UnpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
......@@ -619,112 +634,90 @@ func (sb *Sealer) EncodeDataToPieces(
for{
memset(buf[:TagLen], nil)
var MetaLen int = 0
var n int
var rerr error
for rbuf := buf[TagLen:]; len(rbuf) > 0; {
n, rerr = file.Read(rbuf[:])
if rerr != nil && rerr != io.EOF{
return storage.Hash{}, rerr
MetaLen, err := file.Read(buf[TagLen:])
if err != nil && err != io.EOF{
return storage.Hash{}, nil, err
}
rbuf = rbuf[n:]
MetaLen += n
if rerr == io.EOF{
break
}
}
if rerr == io.EOF{
if err == io.EOF || uint32(MetaLen) != DataLen{
//encode first sector
var err error
FinalPieceHash, err = sb.EncodeData(buf, uint32(MetaLen), DataLen, hashData)
finalPieceHash, err = sb.EncodeData(buf, sectorSize, uint32(MetaLen), DataLen, hashData)
if err != nil{
return storage.Hash{}, err
return storage.Hash{}, nil, err
}
break
}
binary.BigEndian.PutUint32(buf[:4], uint32(MetaLen))
pieceHash := computeHash(buf[:])
for ; piecesHashMap[string(pieceHash[:])] == 1; pieceHash = computeHash(pieceHash[:]){}
piecesHashMap[string(pieceHash[:])] = 1
fmt.Printf("Encode: %x.dat\n", pieceHash[:])
filename := filepath.Join(root, fmt.Sprintf("%x.dat", pieceHash[:]))
wfile, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil{
return storage.Hash{}, err
}
for wbuf := buf; len(wbuf) > 0; {
n, err := wfile.Write(wbuf)
if err != nil{
return storage.Hash{}, err
pieceHash, err := pieceCommitment(spt(sectorSize), buf[:])
if err != nil {
return storage.Hash{}, nil, err
}
wbuf = wbuf[n:]
filename := filepath.Join(root, fmt.Sprintf("%x.dat", pieceHash[:]))
err = ioutil.WriteFile(filename, buf[:], 0644)
if err != nil {
return storage.Hash{}, nil, err
}
wfile.Close()
hashData = append(hashData, pieceHash[:]...)
piecesHash = append(piecesHash, pieceHash)
}
return FinalPieceHash, nil
return finalPieceHash, piecesHash, nil
}
func (sb *Sealer) EncodeData(
buf []byte,
sectorSize abi.SectorSize,
MetaLen uint32,
DataLen uint32,
hashData []byte,
) (storage.Hash, error) {
root := filepath.Join(sb.sectors.GetRoot(), "pieces")
var prePieceHash storage.Hash
var err error
var end uint32 = 0
for ;len(hashData) > 0; {
//encode next n sector
// end := len(buf)
if end != 0{
CommLen := min(uint32(len(hashData)), ((DataLen-32)/32) * 32)
binary.BigEndian.PutUint32(buf[:4], 0x80000000)
binary.BigEndian.PutUint32(buf[4:8], CommLen)
memset(buf[4:40], prePieceHash[:])
rbuf := buf[TagLen + 32:]
memset(rbuf, hashData[:CommLen])
memset(rbuf[CommLen:], nil)
hashData = hashData[CommLen:]
// if len(hashData) == 0 {
end = nextUppandedPowerOfTwo(TagLen + 32 + CommLen)
// }
} else {
CommLen := min(uint32(len(hashData)), ((DataLen-MetaLen)/32) * 32)
binary.BigEndian.PutUint32(buf[:4], MetaLen)
binary.BigEndian.PutUint32(buf[4:8], CommLen)
rbuf := buf[TagLen + MetaLen:]
memset(rbuf, hashData[:CommLen])
memset(rbuf[CommLen:], nil)
hashData = hashData[CommLen:]
end = nextUppandedPowerOfTwo(TagLen + MetaLen + CommLen)
}
prePieceHash = computeHash(buf[:])
for ; piecesHashMap[string(prePieceHash[:])] == 1; prePieceHash = computeHash(prePieceHash[:]){}
piecesHashMap[string(prePieceHash[:])] = 1
filename := filepath.Join(root, fmt.Sprintf("%x.dat", prePieceHash[:]))
fmt.Printf("Encode: %x.dat lenth:%d\n", prePieceHash[:], end)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil{
prePieceHash, err = pieceCommitment(spt(sectorSize), buf[:])
if err != nil {
return storage.Hash{}, err
}
for wbuf := buf[:end]; len(wbuf) > 0; {
n, err := file.Write(wbuf)
if err != nil{
filename := filepath.Join(root, fmt.Sprintf("%x.dat", prePieceHash[:]))
err = ioutil.WriteFile(filename, buf[:], 0644)
if err != nil {
return storage.Hash{}, err
}
wbuf = wbuf[n:]
}
file.Close()
}
return prePieceHash, nil
......@@ -736,69 +729,66 @@ func (sb *Sealer) DecodePiece(
in io.Reader,
start storiface.UnpaddedByteIndex,
end storiface.UnpaddedByteIndex,
) (bool, storage.Hash, []byte, []storage.Hash, error){
) (storage.DecodedData, error){
if start > end {
return false, storage.Hash{}, nil, nil, xerrors.Errorf("start must be less than end")
return storage.DecodedData{}, xerrors.Errorf("start must be less than end")
}
if start == end {
return false, storage.Hash{}, nil, nil, nil
return storage.DecodedData{}, nil
}
unpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
buf := make([]byte, unpaddedSectorSize)
for rbuf := buf[:]; len(rbuf) > 0; {
n, err := in.Read(rbuf[:])
_, err := in.Read(buf[:])
if err != nil && err != io.EOF{
return false, storage.Hash{}, nil, nil, err
}
rbuf = rbuf[n:]
if err == io.EOF {
break
}
return storage.DecodedData{}, err
}
var prePieceHash storage.Hash
var MetaLen uint32
var CommLen uint32
var data []byte
var dataHash []storage.Hash
var err error
var pieceHash []storage.Hash
binary.Read(bytes.NewReader(buf[0:4]), binary.BigEndian, &MetaLen)
binary.Read(bytes.NewReader(buf[4:8]), binary.BigEndian, &CommLen)
HasPre := MetaLen >> 31
hasPre := MetaLen >> 31
MetaLen = MetaLen & 0x7fffffff
rbuf := buf[8:]
if HasPre != 0 {
if hasPre != 0 {
copy(prePieceHash[:], buf[8:40])
rbuf = rbuf[32:]
}
if start > storiface.UnpaddedByteIndex(MetaLen) {
data = nil
dataHash, err = to32Byte(rbuf[start:end])
pieceHash, err = to32Byte(rbuf[start:end])
if err != nil {
return false, storage.Hash{}, nil, nil, err
return storage.DecodedData{}, err
}
// return HasPre != 0, prePieceHash, nil, rbuf[start:end], nil
} else if end < storiface.UnpaddedByteIndex(MetaLen) {
data = rbuf[start:end]
// return HasPre != 0, prePieceHash, rbuf[start:end], nil, nil
} else if end > storiface.UnpaddedByteIndex(MetaLen + CommLen) {
data = rbuf[start:MetaLen]
dataHash, err = to32Byte(rbuf[MetaLen:MetaLen+CommLen])
pieceHash, err = to32Byte(rbuf[MetaLen:MetaLen+CommLen])
if err != nil {
return false, storage.Hash{}, nil, nil, err
return storage.DecodedData{}, err
}
// return HasPre != 0, prePieceHash, rbuf[start:MetaLen], to32Byte(rbuf[MetaLen:MetaLen+CommLen]), nil
} else {
data = rbuf[start:MetaLen]
dataHash, err = to32Byte(rbuf[MetaLen:end])
pieceHash, err = to32Byte(rbuf[MetaLen:end])
if err != nil {
return false, storage.Hash{}, nil, nil, err
return storage.DecodedData{}, err
}
}
return HasPre != 0, prePieceHash, data, dataHash, nil
return storage.DecodedData{
HasPre: hasPre != 0,
PreHash: prePieceHash,
Data: data,
PieceHash: pieceHash,
}, nil
}
//
......@@ -1147,22 +1137,6 @@ func min(x, y uint32) uint32 {
return y
}
func computeHash(in []byte) storage.Hash {
var res [32]byte
chunk := 32 << 10
hash := sha256.New()
for buf := in; len(buf) > 0 ; buf = buf[chunk:] {
if chunk < len(buf){
hash.Write(buf[:chunk])
} else {
hash.Write(buf[:])
break
}
}
copy(res[:], hash.Sum(nil))
return storage.Hash(res)
}
func nextUppandedPowerOfTwo(index uint32) uint32 {
power := 0
for index = index / 254; index != 0 ; power += 1 {
......
......@@ -21,8 +21,8 @@ type SectorSealer interface{
Sealed(ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.SectorCids, []byte, error)
// Split and encode data into pieces
// Pieces structure is [ Tag | MetaData | HashData ] or [ Tag | PreHash | HashData]
EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Hash, error)
DecodePiece(ctx context.Context, sectorSize abi.SectorSize, in io.Reader, start storiface.UnpaddedByteIndex, end storiface.UnpaddedByteIndex) (bool, storage.Hash, []byte, []storage.Hash, error)
EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Hash, []storage.Hash, error)
DecodePiece(ctx context.Context, sectorSize abi.SectorSize, in io.Reader, start storiface.UnpaddedByteIndex, end storiface.UnpaddedByteIndex) (storage.DecodedData, error)
GenerateCommit2Proof( ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Proof, error)
GenerateWindowPoStProofs(ctx context.Context, minerID abi.ActorID, sectorInfo []spproof.SectorInfo, randomness abi.PoStRandomness) ([]spproof.PoStProof, []abi.SectorID, error)
UnsealedRange(ctx context.Context, sid storage.SectorRef, sectorSize abi.SectorSize, ticket abi.SealRandomness, commd cid.Cid, out io.Writer, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment