Commit df15fd05 authored by 董子豪's avatar 董子豪

add merkle proof

parent cb403080
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"fil_integrate/build/cid" "fil_integrate/build/cid"
spieces "fil_integrate/build/pieces"
spproof "fil_integrate/build/proof" spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
...@@ -13,12 +14,14 @@ import ( ...@@ -13,12 +14,14 @@ import (
type Connection struct { type Connection struct {
u2pChannel chan Data u2pChannel chan Data
p2uChannel chan Data
p2kChannel chan Data p2kChannel chan Data
} }
func NewConnection() *Connection { func NewConnection() *Connection {
return &Connection{ return &Connection{
u2pChannel: make(chan Data), u2pChannel: make(chan Data),
p2uChannel: make(chan Data),
p2kChannel: make(chan Data), p2kChannel: make(chan Data),
} }
} }
...@@ -37,14 +40,35 @@ func (conn *Connection) RequestPiece(ctx context.Context, pieceCommit cid.Commit ...@@ -37,14 +40,35 @@ func (conn *Connection) RequestPiece(ctx context.Context, pieceCommit cid.Commit
return nil return nil
} }
func (conn *Connection) SendPiece(ctx context.Context, data []byte, pieceCommit cid.Commit) error { func (conn *Connection) SendPieceToUser(ctx context.Context, data []byte, proof spieces.MerkleProof, piece abi.PieceInfo) error {
buf := make([]byte, len(data)) buf := make([]byte, len(data))
copy(buf, data) copy(buf, data)
sdata := Data{ sdata := Data{
op: OP_SEND_PIECE, op: OP_SEND_PIECE,
data: PieceInfo{ data: PieceInfo{
Data: buf, Data: buf,
PieceCID: pieceCommit, Proof: proof,
Piece: piece,
},
}
select {
case conn.p2uChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendPieceToProvider(ctx context.Context, data []byte, proof spieces.MerkleProof, piece abi.PieceInfo) error {
buf := make([]byte, len(data))
copy(buf, data)
sdata := Data{
op: OP_SEND_PIECE,
data: PieceInfo{
Data: buf,
Proof: proof,
Piece: piece,
}, },
} }
select { select {
...@@ -76,7 +100,7 @@ func (conn *Connection) SendSealDone(ctx context.Context) error { ...@@ -76,7 +100,7 @@ func (conn *Connection) SendSealDone(ctx context.Context) error {
data: nil, data: nil,
} }
select { select {
case conn.u2pChannel <- sdata: case conn.p2uChannel <- sdata:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
...@@ -265,6 +289,19 @@ func (conn *Connection) U2PMessage(ctx context.Context) (Operator, interface{}, ...@@ -265,6 +289,19 @@ func (conn *Connection) U2PMessage(ctx context.Context) (Operator, interface{},
} }
} }
func (conn *Connection) P2UMessage(ctx context.Context) (Operator, interface{}, error) {
select {
case mess := <-conn.p2uChannel:
return mess.op, mess.data, nil
case <-ctx.Done():
if conn.p2uChannel != nil {
close(conn.p2uChannel)
conn.p2uChannel = nil
}
return OP_CLOSED, nil, xerrors.Errorf("context canceled")
}
}
func (conn *Connection) P2KMessage(ctx context.Context) (Operator, interface{}, error) { func (conn *Connection) P2KMessage(ctx context.Context) (Operator, interface{}, error) {
select { select {
case mess := <-conn.p2kChannel: case mess := <-conn.p2kChannel:
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"fil_integrate/build/cid" spieces "fil_integrate/build/pieces"
spproof "fil_integrate/build/proof" spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
...@@ -58,7 +58,7 @@ func runUser(conn *Connection, ctx context.Context) error { ...@@ -58,7 +58,7 @@ func runUser(conn *Connection, ctx context.Context) error {
for i := 0; i < numFile; i++ { for i := 0; i < numFile; i++ {
fmt.Println("User: Addding Piece") fmt.Println("User: Addding Piece")
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
conn.SendPiece(ctx, nil, cid.Commit{}) conn.SendPieceToProvider(ctx, nil, spieces.MerkleProof{}, abi.PieceInfo{})
} }
for i := 0; i < numFile; i++ { for i := 0; i < numFile; i++ {
op, data, err := conn.U2PMessage(ctx) op, data, err := conn.U2PMessage(ctx)
...@@ -112,7 +112,7 @@ func runProvider(conn *Connection, ctx context.Context) error { ...@@ -112,7 +112,7 @@ func runProvider(conn *Connection, ctx context.Context) error {
fmt.Println("Provider: Aggregating Window PoSt") fmt.Println("Provider: Aggregating Window PoSt")
conn.SendAggregateWindowPoStProof(ctx, spproof.PoStProof{}, nil, 0) conn.SendAggregateWindowPoStProof(ctx, spproof.PoStProof{}, nil, 0)
for i := 0; i < numFile; i++ { for i := 0; i < numFile; i++ {
conn.SendPiece(ctx, nil, cid.Commit{}) conn.SendPieceToUser(ctx, nil, spieces.MerkleProof{}, abi.PieceInfo{})
} }
return nil return nil
} }
......
package connect package connect
import ( import (
"fil_integrate/build/cid" spieces "fil_integrate/build/pieces"
spproof "fil_integrate/build/proof" spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
...@@ -32,13 +32,9 @@ type Data struct { ...@@ -32,13 +32,9 @@ type Data struct {
} }
type PieceInfo struct { type PieceInfo struct {
Data []byte Data []byte
PieceCID cid.Commit Proof spieces.MerkleProof
} Piece abi.PieceInfo
func GetPieceInfo(data interface{}) (PieceInfo, bool) {
piece, ok := data.(PieceInfo)
return piece, ok
} }
type SealProofInfo struct { type SealProofInfo struct {
......
...@@ -4,19 +4,20 @@ import ( ...@@ -4,19 +4,20 @@ import (
"context" "context"
"fil_integrate/build/cid" "fil_integrate/build/cid"
spieces "fil_integrate/build/pieces"
spproof "fil_integrate/build/proof" spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
) )
type ProviderAPI interface { type ProviderAPI interface {
SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error
NextSectorID() storage.SectorRef NextSectorID() storage.SectorRef
MinerID() abi.ActorID MinerID() abi.ActorID
AddPiece(ctx context.Context, sid storage.SectorRef) error AddPiece(ctx context.Context, sid storage.SectorRef) error
Sealed(ctx context.Context, sid storage.SectorRef) (storage.SectorCids, error) Sealed(ctx context.Context, sid storage.SectorRef) (storage.SectorCids, error)
SavePiece(ctx context.Context, piece abi.PieceInfo, data []byte, proof spieces.MerkleProof) error
ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error) ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error)
GenerateCommitProof(ctx context.Context, sid storage.SectorRef, commit storage.SectorCids, seed abi.InteractiveSealRandomness) (spproof.Proof, error) GenerateCommitProof(ctx context.Context, sid storage.SectorRef, commit storage.SectorCids, seed abi.InteractiveSealRandomness) (spproof.Proof, error)
......
package provider package provider
import ( import (
"bytes"
"context" "context"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"fil_integrate/build" "fil_integrate/build"
"fil_integrate/build/cid" "fil_integrate/build/cid"
spieces "fil_integrate/build/pieces"
spproof "fil_integrate/build/proof" spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
...@@ -16,11 +18,13 @@ import ( ...@@ -16,11 +18,13 @@ import (
type Provider struct { type Provider struct {
sealer seal.SectorSealer sealer seal.SectorSealer
minerID abi.ActorID minerID abi.ActorID
sortedPieces []storage.Piece sortedPieces []abi.PieceInfo
sectorSize abi.SectorSize sectorSize abi.SectorSize
sectorNumber abi.SectorNumber sectorNumber abi.SectorNumber
// pieceID -> Recived piece data lenth
pieceRecMap map[cid.Commit]abi.PaddedPieceSize
// pieceID -> sector[start:end] // pieceID -> sector[start:end]
pieceMap map[cid.Commit]storage.RangeSector pieceRangeMap map[cid.Commit]storage.RangeSector
// sectorID -> []pieceID // sectorID -> []pieceID
sectorMap map[abi.SectorID][]abi.PieceInfo sectorMap map[abi.SectorID][]abi.PieceInfo
} }
...@@ -29,20 +33,17 @@ var _ ProviderAPI = &Provider{} ...@@ -29,20 +33,17 @@ var _ ProviderAPI = &Provider{}
func New(sealer seal.SectorSealer, miner abi.ActorID) *Provider { func New(sealer seal.SectorSealer, miner abi.ActorID) *Provider {
p := &Provider{ p := &Provider{
sealer: sealer, sealer: sealer,
minerID: miner, minerID: miner,
sectorSize: abi.SectorSize(storage.SectorSize32MiB), sectorSize: abi.SectorSize(storage.SectorSize32MiB),
sectorNumber: 0, sectorNumber: 0,
pieceMap: make(map[cid.Commit]storage.RangeSector), pieceRecMap: make(map[cid.Commit]abi.PaddedPieceSize),
sectorMap: make(map[abi.SectorID][]abi.PieceInfo), pieceRangeMap: make(map[cid.Commit]storage.RangeSector),
sectorMap: make(map[abi.SectorID][]abi.PieceInfo),
} }
return p return p
} }
func (p *Provider) SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error {
return p.sealer.SavePiece(ctx, piece, in)
}
func (p *Provider) NextSectorID() storage.SectorRef { func (p *Provider) NextSectorID() storage.SectorRef {
sid := storage.SectorRef{ sid := storage.SectorRef{
ID: abi.SectorID{ ID: abi.SectorID{
...@@ -59,12 +60,87 @@ func (p *Provider) MinerID() abi.ActorID { ...@@ -59,12 +60,87 @@ func (p *Provider) MinerID() abi.ActorID {
return p.minerID return p.minerID
} }
func (p *Provider) SavePiece(ctx context.Context, piece abi.PieceInfo, data []byte, proof spieces.MerkleProof) error {
size := len(data)
received, ok := p.pieceRecMap[piece.PieceCID]
if !ok {
received = 0
}
ok, err := proof.Verify(data, piece.PieceCID, received)
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("Merkle Proof verify falied")
}
err = p.sealer.SavePiece(ctx, piece, bytes.NewReader(data), size)
if err != nil {
return err
}
received += abi.UnpaddedPieceSize(size).Padded()
if received == piece.Size {
var res []abi.PieceInfo
if p.sortedPieces == nil || piece.Size >= p.sortedPieces[0].Size {
res = append(res, piece)
res = append(res, p.sortedPieces...)
} else {
var i int
for i = len(p.sortedPieces) - 1; i >= 0; i-- {
if p.sortedPieces[i].Size >= piece.Size {
break
}
}
res = append(res, p.sortedPieces[:i+1]...)
res = append(res, piece)
res = append(res, p.sortedPieces[i+1:]...)
}
p.sortedPieces = res
}
p.pieceRecMap[piece.PieceCID] = received
return nil
}
func (p *Provider) AddPiece(ctx context.Context, sid storage.SectorRef) error { func (p *Provider) AddPiece(ctx context.Context, sid storage.SectorRef) error {
pieces, err := p.sealer.AddPiece(ctx, sid) var index int
var addPieces []abi.PieceInfo
var pieceSize abi.PaddedPieceSize
var existingPieceSizes []abi.UnpaddedPieceSize
ssize, err := sid.ProofType.SectorSize()
if err != nil { if err != nil {
return err return err
} }
p.sectorMap[sid.ID] = pieces
maxPieceSize := abi.PaddedPieceSize(ssize)
// Select pieces to seal
for index = 0; index < len(p.sortedPieces); index++ {
if pieceSize > maxPieceSize {
return xerrors.Errorf("Exists a piece whose size is bigger than 32MiB or is not power of two or the pieces is not sorted")
} else if pieceSize == maxPieceSize {
break
}
pieceSize += p.sortedPieces[index].Size
addPieces = append(addPieces, p.sortedPieces[index])
}
if pieceSize != maxPieceSize {
return seal.PicesNotEnoughError
}
for _, piece := range addPieces {
err := p.sealer.AddPiece(ctx, sid, existingPieceSizes, piece)
if err != nil {
return err
}
existingPieceSizes = append(existingPieceSizes, piece.Size.Unpadded())
}
p.sortedPieces = p.sortedPieces[index:]
p.sectorMap[sid.ID] = addPieces
return nil return nil
} }
...@@ -82,7 +158,7 @@ func (p *Provider) Sealed(ctx context.Context, sid storage.SectorRef) (storage.S ...@@ -82,7 +158,7 @@ func (p *Provider) Sealed(ctx context.Context, sid storage.SectorRef) (storage.S
// Store the mapping relations, pieceID -> sector[start:end] // Store the mapping relations, pieceID -> sector[start:end]
var offset abi.UnpaddedPieceSize var offset abi.UnpaddedPieceSize
for _, piece := range pieces { for _, piece := range pieces {
p.pieceMap[piece.PieceCID] = storage.RangeSector{ p.pieceRangeMap[piece.PieceCID] = storage.RangeSector{
Sector: sid, Sector: sid,
Unsealed: cids.Unsealed, Unsealed: cids.Unsealed,
Offset: abi.UnpaddedByteIndex(offset), Offset: abi.UnpaddedByteIndex(offset),
...@@ -95,7 +171,7 @@ func (p *Provider) Sealed(ctx context.Context, sid storage.SectorRef) (storage.S ...@@ -95,7 +171,7 @@ func (p *Provider) Sealed(ctx context.Context, sid storage.SectorRef) (storage.S
} }
func (p *Provider) ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error) { func (p *Provider) ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error) {
sRange, ok := p.pieceMap[pieceID] sRange, ok := p.pieceRangeMap[pieceID]
if !ok { if !ok {
return p.sealer.ReadPiece(ctx, pieceID) return p.sealer.ReadPiece(ctx, pieceID)
} }
...@@ -192,4 +268,4 @@ func (p *Provider) AggregateWindowPoStProofs( ...@@ -192,4 +268,4 @@ func (p *Provider) AggregateWindowPoStProofs(
Randomnesses: srandomnesses, Randomnesses: srandomnesses,
Prover: p.minerID, Prover: p.minerID,
}, proofs) }, proofs)
} }
\ No newline at end of file
package provider package provider
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"os" "os"
...@@ -12,6 +11,7 @@ import ( ...@@ -12,6 +11,7 @@ import (
"fil_integrate/actor/connect" "fil_integrate/actor/connect"
"fil_integrate/build/cid" "fil_integrate/build/cid"
spieces "fil_integrate/build/pieces"
spproof "fil_integrate/build/proof" spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
...@@ -41,11 +41,9 @@ func RunProvider(ctx context.Context, conn *connect.Connection, root string) err ...@@ -41,11 +41,9 @@ func RunProvider(ctx context.Context, conn *connect.Connection, root string) err
break break
} }
piece := data.(connect.PieceInfo) pieceInfo := data.(connect.PieceInfo)
err = p.SavePiece(ctx, abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(piece.Data)).Padded(), err = p.SavePiece(ctx, pieceInfo.Piece, pieceInfo.Data, pieceInfo.Proof)
PieceCID: piece.PieceCID,
}, bytes.NewReader(piece.Data))
if err != nil { if err != nil {
return err return err
} }
...@@ -165,10 +163,47 @@ func RunProvider(ctx context.Context, conn *connect.Connection, root string) err ...@@ -165,10 +163,47 @@ func RunProvider(ctx context.Context, conn *connect.Connection, root string) err
if err != nil { if err != nil {
return err return err
} }
err = conn.SendPiece(ctx, buf, piece) err = handleSend(ctx, conn, buf, abi.PieceInfo{
PieceCID: piece,
Size: abi.UnpaddedPieceSize(len(buf)).Padded(),
}, 1024*1024)
if err != nil { if err != nil {
return err return err
} }
} }
return nil return nil
} }
func handleSend(ctx context.Context, conn *connect.Connection, buf []byte, piece abi.PieceInfo, size abi.PaddedPieceSize) error {
tree, err := spieces.GenerateMerkleTree(buf, abi.UnpaddedPieceSize(len(buf)))
if err != nil {
return err
}
if size > abi.UnpaddedPieceSize(len(buf)).Padded() {
size = abi.UnpaddedPieceSize(len(buf)).Padded()
}
for offset := abi.PaddedPieceSize(0) ; offset < piece.Size; offset += size {
data := buf[:size.Unpadded()]
buf = buf[size.Unpadded():]
mproof, err := tree.GenProof(offset, size)
if err != nil {
return err
}
ok, err := mproof.Verify(data[:], piece.PieceCID, offset)
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("merkle proof verify failed")
}
err = conn.SendPieceToUser(ctx, data, mproof, piece)
if err != nil {
return err
}
}
return nil
}
\ No newline at end of file
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"fil_integrate/actor/connect" "fil_integrate/actor/connect"
"fil_integrate/build/cid" "fil_integrate/build/cid"
spieces "fil_integrate/build/pieces"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
"fil_integrate/seal" "fil_integrate/seal"
...@@ -18,6 +19,8 @@ type User struct { ...@@ -18,6 +19,8 @@ type User struct {
sectorSize abi.SectorSize sectorSize abi.SectorSize
encoder seal.PieceEncoder encoder seal.PieceEncoder
conn *connect.Connection conn *connect.Connection
pieceChan chan abi.PieceInfo
doneChan chan struct{}
cid2sidMap map[cid.Commit]abi.ActorID cid2sidMap map[cid.Commit]abi.ActorID
} }
...@@ -27,6 +30,8 @@ func New(encoder seal.PieceEncoder, conn *connect.Connection) *User { ...@@ -27,6 +30,8 @@ func New(encoder seal.PieceEncoder, conn *connect.Connection) *User {
u := &User{ u := &User{
sectorSize: abi.SectorSize(storage.SectorSize32MiB), sectorSize: abi.SectorSize(storage.SectorSize32MiB),
cid2sidMap: make(map[cid.Commit]abi.ActorID), cid2sidMap: make(map[cid.Commit]abi.ActorID),
pieceChan: make(chan abi.PieceInfo, 32),
doneChan: make(chan struct{}),
encoder: encoder, encoder: encoder,
conn: conn, conn: conn,
} }
...@@ -37,13 +42,95 @@ func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (abi.P ...@@ -37,13 +42,95 @@ func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (abi.P
finalPiece, pieces, err := u.encoder.EncodeDataToPieces(ctx, u.sectorSize, file) finalPiece, pieces, err := u.encoder.EncodeDataToPieces(ctx, u.sectorSize, file)
// send piece to provider // send piece to provider
for _, piece := range pieces { for _, piece := range pieces {
buf, err := u.encoder.LoadPiece(ctx, piece.PieceCID) u.pieceChan <- piece
}
return finalPiece, err
}
func (u *User) SendPiecesToMiner(ctx context.Context, miner abi.ActorID, size abi.PaddedPieceSize) error {
for{
select {
case piece := <- u.pieceChan:
err := u.handleSend(ctx, miner, piece, size)
if err != nil {
return err
}
case <- u.doneChan:
Loop:
for{
select{
case piece := <- u.pieceChan:
err := u.handleSend(ctx, miner, piece, size)
if err != nil {
return err
}
default:
break Loop
}
}
err := u.conn.SendEncodeDone(ctx)
if err != nil {
return err
}
log.Infof("send pieces done")
return nil
case <- ctx.Done():
return xerrors.Errorf("context canceled")
}
}
return nil
}
func (u *User) handleSend(ctx context.Context, miner abi.ActorID, piece abi.PieceInfo, size abi.PaddedPieceSize) error {
buf, err := u.encoder.LoadPiece(ctx, piece.PieceCID)
if err != nil {
return err
}
log.Infof("sending piece")
tree, err := spieces.GenerateMerkleTree(buf, abi.UnpaddedPieceSize(len(buf)))
if err != nil {
return err
}
if size > abi.UnpaddedPieceSize(len(buf)).Padded() {
size = abi.UnpaddedPieceSize(len(buf)).Padded()
}
for offset := abi.PaddedPieceSize(0) ; offset < piece.Size; offset += size {
data := buf[:size.Unpadded()]
buf = buf[size.Unpadded():]
mproof, err := tree.GenProof(offset, size)
if err != nil { if err != nil {
return abi.PieceInfo{}, err return err
}
ok, err := mproof.Verify(data[:], piece.PieceCID, offset)
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("merkle proof verify failed")
}
err = u.conn.SendPieceToProvider(ctx, data, mproof, piece)
if err != nil {
return err
} }
u.conn.SendPiece(ctx, buf, piece.PieceCID)
} }
return finalPiece, err return nil
}
func (u *User) SendPiecesDone(ctx context.Context) error {
select{
case u.doneChan <- struct{}{}:
break
case <- ctx.Done():
return xerrors.Errorf("context canceled")
}
return nil
} }
func (u *User) ReadPieceRange( func (u *User) ReadPieceRange(
...@@ -57,14 +144,17 @@ func (u *User) ReadPieceRange( ...@@ -57,14 +144,17 @@ func (u *User) ReadPieceRange(
UnpaddedSectorSize := abi.PaddedPieceSize(u.sectorSize).Unpadded() UnpaddedSectorSize := abi.PaddedPieceSize(u.sectorSize).Unpadded()
DataLen := uint32(UnpaddedSectorSize) - seal.TagLen DataLen := uint32(UnpaddedSectorSize) - seal.TagLen
data, err := u.getPiece(ctx, piece.PieceCID) data, err := u.getPiece(ctx, piece)
if err != nil { if err != nil {
return err return err
} }
piecesCommit := data.PieceCommit piecesCommit := data.PieceCommit
for data.HasPre { for data.HasPre {
data, err = u.getPiece(ctx, data.PrePieceCommit) data, err = u.getPiece(ctx, abi.PieceInfo{
Size: abi.PaddedPieceSize(storage.SectorSize32MiB),
PieceCID: data.PrePieceCommit,
})
if err != nil { if err != nil {
return err return err
} }
...@@ -94,7 +184,10 @@ func (u *User) ReadPieceRange( ...@@ -94,7 +184,10 @@ func (u *User) ReadPieceRange(
var wbuf []byte var wbuf []byte
if len(piecesCommit) != 0 { if len(piecesCommit) != 0 {
data, err := u.getPiece(ctx, piecesCommit[0]) data, err := u.getPiece(ctx, abi.PieceInfo{
Size: abi.PaddedPieceSize(storage.SectorSize32MiB),
PieceCID: piecesCommit[0],
})
if err != nil { if err != nil {
return err return err
} }
...@@ -114,36 +207,44 @@ func (u *User) ReadPieceRange( ...@@ -114,36 +207,44 @@ func (u *User) ReadPieceRange(
return nil return nil
} }
func (u *User) getPiece(ctx context.Context, pieceCommit cid.Commit) (*basicpiece.DecodedData, error) { func (u *User) getPiece(ctx context.Context, piece abi.PieceInfo) (*basicpiece.DecodedData, error) {
// todo: GET from chian/provider // todo: GET from chian/provider
// miner, ok := cid2sidMap[pieceCommit] // miner, ok := cid2sidMap[pieceCommit]
buf, err := u.GetPieceFromProvider(ctx, 10000, pieceCommit) buf, err := u.GetPieceFromProvider(ctx, 10000, piece)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return u.encoder.DecodePiece(ctx, buf) return u.encoder.DecodePiece(ctx, buf)
} }
func (u *User) GetPieceFromProvider(ctx context.Context, miner abi.ActorID, pieceCommit cid.Commit) ([]byte, error) { func (u *User) GetPieceFromProvider(ctx context.Context, miner abi.ActorID, piece abi.PieceInfo) ([]byte, error) {
var buf []byte var buf = make([]byte, piece.Size.Unpadded())
err := u.conn.RequestPiece(ctx, pieceCommit) err := u.conn.RequestPiece(ctx, piece.PieceCID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
op, data, err := u.conn.U2PMessage(ctx) var offset abi.PaddedPieceSize = 0
if err != nil { for offset < piece.Size {
return nil, err op, data, err := u.conn.P2UMessage(ctx)
} if err != nil {
if op != connect.OP_SEND_PIECE { return nil, err
return nil, xerrors.Errorf("Unexpected operator") }
} if op != connect.OP_SEND_PIECE {
return nil, xerrors.Errorf("Unexpected operator")
}
pieceInfo := data.(connect.PieceInfo)
ok, err := (&pieceInfo.Proof).Verify(pieceInfo.Data, piece.PieceCID, offset)
if err != nil {
return nil, err
}
if !ok {
return nil, xerrors.Errorf("merkle proof verify failed")
}
switch data.(type) { copy(buf[offset.Unpadded():], pieceInfo.Data)
case connect.PieceInfo: offset += abi.UnpaddedPieceSize(len(pieceInfo.Data)).Padded()
buf = data.(connect.PieceInfo).Data
default:
return nil, xerrors.Errorf("Unexpected data")
} }
return buf, nil return buf, nil
} }
......
...@@ -50,11 +50,20 @@ func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile ...@@ -50,11 +50,20 @@ func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile
var PiecesRange []pieceRead var PiecesRange []pieceRead
var err error var err error
b := []byte("random data") b := []byte("random data")
go func() {
err := u.SendPiecesToMiner(ctx, 10000, 1024*1024)
if err != nil {
fmt.Printf("user: %w\n", err)
}
} ()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numFile; i++ { for i := 0; i < numFile; i++ {
log.Infof("Generating random data") log.Infof("Generating random data")
filename := filepath.Join(root, fmt.Sprintf("input-%d.dat", i)) filename := filepath.Join(root, fmt.Sprintf("input-%d.dat", i))
r := rand.New(rand.NewSource(time.Now().UnixNano())) dataSize := uint64(r.Int63n(int64(sectorSize * 10))) + 3*sectorSize
dataSize := uint64(r.Int63n(int64(sectorSize * 6))) + 2*sectorSize if i == 0 {
dataSize = 4*sectorSize - 823642
}
b, err = seal.GenerateRandomData(filename, dataSize, b) b, err = seal.GenerateRandomData(filename, dataSize, b)
if err != nil { if err != nil {
return err return err
...@@ -70,6 +79,7 @@ func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile ...@@ -70,6 +79,7 @@ func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile
if err != nil { if err != nil {
return err return err
} }
fmt.Println(finalPiece.Size)
for _, r := range range2Read { for _, r := range range2Read {
PiecesRange = append(PiecesRange, pieceRead{ PiecesRange = append(PiecesRange, pieceRead{
...@@ -87,11 +97,11 @@ func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile ...@@ -87,11 +97,11 @@ func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile
}, },
}) })
} }
err = conn.SendEncodeDone(ctx) err = u.SendPiecesDone(ctx)
if err != nil { if err != nil {
return err return err
} }
op, _, err := conn.U2PMessage(ctx) op, _, err := conn.P2UMessage(ctx)
if err != nil { if err != nil {
return err return err
} }
......
package pieces
import (
"github.com/minio/sha256-simd"
"golang.org/x/xerrors"
"fil_integrate/build/cid"
"fil_integrate/build/fr32"
"fil_integrate/build/state-types/abi"
)
type MerkleTree struct {
leaves abi.PaddedPieceSize
// merkle tree data except leaves node
data []cid.Commit
root cid.Commit
}
func GenerateMerkleTree(data []byte, unpad abi.UnpaddedPieceSize) (MerkleTree, error) {
err := unpad.Validate()
if err != nil {
return MerkleTree{}, err
}
padSize := unpad.Padded()
out := make([]byte, padSize)
fr32.Pad(data, out)
return MerkleTreeBuild(out)
}
func MerkleTreeBuild(data []byte) (MerkleTree, error) {
n := abi.PaddedPieceSize(len(data))
if n != nextPowerOfTwo(n) {
return MerkleTree{}, xerrors.Errorf("can not generate the merkle tree")
}
var tree MerkleTree
tree.leaves = n / 32
tree.data = make([]cid.Commit, tree.leaves-1)
data_index := 0
for index := abi.PaddedPieceSize(0); index < n; data_index++ {
h := sha256.New()
// write left child
trim_to_fr32(data[index : index+32])
h.Write(data[index : index+32])
index += 32
// write right child
trim_to_fr32(data[index : index+32])
h.Write(data[index : index+32])
index += 32
res := h.Sum(nil)
trim_to_fr32(res[:32])
copy(tree.data[data_index][:], res)
}
for child := 0; data_index > child+1; data_index++ {
h := sha256.New()
// write left child
h.Write(tree.data[child][:])
child++
// write left child
h.Write(tree.data[child][:])
child++
res := h.Sum(nil)
trim_to_fr32(res[:32])
copy(tree.data[data_index][:], res)
}
tree.root = tree.data[len(tree.data)-1]
return tree, nil
}
func (tree MerkleTree) GetRoot() cid.Commit {
return tree.root
}
func (tree MerkleTree) GenProof(offset abi.PaddedPieceSize, size abi.PaddedPieceSize) (MerkleProof, error) {
// we can't gengerate the merkle proof from leaves,
// because the merkle tree don't store the meta data
if size != nextPowerOfTwo(size) || offset % size != 0 || size <= 32 {
return MerkleProof{}, xerrors.Errorf("can not generate the merkle proof")
}
offset /= 32
size /= 32
// begin at level 2
width := tree.leaves >> 1
size >>= 1
offset >>= 1
base := abi.PaddedPieceSize(0)
// find the sub-tree(offset, size) index at merkle-tree
for size > 1 {
base += width
size >>= 1
offset >>= 1
width >>= 1
}
var proof *MerkleProof = &MerkleProof{}
for base < tree.leaves-2 {
index := (offset >> 1) << 1
if index != offset {
proof.Add(tree.data[base + index], LEFT)
} else {
proof.Add(tree.data[base + index + 1], RIGHT)
}
base += width
offset >>= 1
width >>= 1
}
return *proof, nil
}
package pieces
import (
// "fmt"
"github.com/minio/sha256-simd"
"fil_integrate/build/cid"
"fil_integrate/build/state-types/abi"
)
const (
LEFT byte = 0
RIGHT byte = 1
)
type MerkleProof struct {
Data []cid.Commit
Path []byte
}
func (proof *MerkleProof) Add(hash cid.Commit, path byte) {
proof.Data = append(proof.Data, hash)
proof.Path = append(proof.Path, path)
}
func (proof *MerkleProof) Verify(data []byte, finalRoot cid.Commit, dataOffset abi.PaddedPieceSize) (bool, error) {
ok := proof.verifyMerklePath(dataOffset, abi.UnpaddedPieceSize(len(data)).Padded())
if !ok {
return false, nil
}
root, err := GeneratePieceCommitmentFast(data, abi.UnpaddedPieceSize(len(data)))
if err != nil {
return false, err
}
for index, hash := range proof.Data {
h := sha256.New()
if proof.Path[index] == LEFT {
h.Write(hash[:])
h.Write(root[:])
} else {
h.Write(root[:])
h.Write(hash[:])
}
res := h.Sum(nil)
trim_to_fr32(res[:32])
copy(root[:], res)
}
return root == finalRoot, nil
}
func (proof *MerkleProof) verifyMerklePath(offset abi.PaddedPieceSize, size abi.PaddedPieceSize) bool {
for size > 1 {
size >>= 1
offset >>= 1
}
for _, path := range proof.Path {
index := (offset >> 1) << 1
if index != offset {
if path != LEFT {
return false
}
} else {
if path != RIGHT {
return false
}
}
offset >>= 1
}
return true
}
\ No newline at end of file
package pieces package pieces
import ( import (
"math/bits"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/minio/sha256-simd" "github.com/minio/sha256-simd"
"fil_integrate/build/fr32" "fil_integrate/build/fr32"
"fil_integrate/build/state-types/abi" "fil_integrate/build/state-types/abi"
) )
const NODE_SIZE = 32 func nextPowerOfTwo(v abi.PaddedPieceSize) abi.PaddedPieceSize {
var a = false
func nextPowerOfTwo(v uint64) uint64 {
v-- v--
v |= v >> 1 v |= v >> 1
v |= v >> 2 v |= v >> 2
...@@ -26,18 +19,18 @@ func nextPowerOfTwo(v uint64) uint64 { ...@@ -26,18 +19,18 @@ func nextPowerOfTwo(v uint64) uint64 {
return v return v
} }
func GeneratePieceCommitmentFast(data []byte, unpad uint64) ([32]byte, error) { func GeneratePieceCommitmentFast(data []byte, unpad abi.UnpaddedPieceSize) ([32]byte, error) {
var result [32]byte var result [32]byte
inLen := paddedSize(unpad) err := unpad.Validate()
if err != nil {
in := make([]byte, inLen) return [32]byte{}, err
copy(in, data) }
pow2 := nextPowerOfTwo(unpad)
out := make([]byte, pow2) padSize := unpad.Padded()
out := make([]byte, padSize)
fr32.Pad(in, out) fr32.Pad(data, out)
// r, err := MerkleTreeRecurse(out) // r, err := MerkleTreeRecurse(out)
r, err := MerkleTreeLoop(out) r, err := MerkleTreeLoop(out)
...@@ -48,19 +41,19 @@ func GeneratePieceCommitmentFast(data []byte, unpad uint64) ([32]byte, error) { ...@@ -48,19 +41,19 @@ func GeneratePieceCommitmentFast(data []byte, unpad uint64) ([32]byte, error) {
return result, nil return result, nil
} }
func MerkleTreeRecurse(D []byte) ([]byte, error) { func MerkleTreeRecurse(data []byte) ([]byte, error) {
n := uint64(len(D)) n := uint64(len(data))
//叶结点,直接返回 //叶结点,直接返回
if n < 32 { if n < 32 {
return D, xerrors.Errorf("can not generate the merkle tree") return data, xerrors.Errorf("can not generate the merkle tree")
} }
if n == 32 { if n == 32 {
return D, nil return data, nil
} }
k := len(D) / 2 k := len(data) / 2
h := sha256.New() h := sha256.New()
//求左子树 //求左子树
x, err := MerkleTreeRecurse(D[0:k]) x, err := MerkleTreeRecurse(data[0:k])
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -68,7 +61,7 @@ func MerkleTreeRecurse(D []byte) ([]byte, error) { ...@@ -68,7 +61,7 @@ func MerkleTreeRecurse(D []byte) ([]byte, error) {
trim_to_fr32(x) trim_to_fr32(x)
h.Write(x[:]) h.Write(x[:])
//求右子树 //求右子树
x, err = MerkleTreeRecurse(D[k:n]) x, err = MerkleTreeRecurse(data[k:n])
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -80,52 +73,35 @@ func MerkleTreeRecurse(D []byte) ([]byte, error) { ...@@ -80,52 +73,35 @@ func MerkleTreeRecurse(D []byte) ([]byte, error) {
return res, nil return res, nil
} }
func MerkleTreeLoop(D []byte) ([]byte, error) { func MerkleTreeLoop(data []byte) ([]byte, error) {
n := uint64(len(D)) n := abi.PaddedPieceSize(len(data))
if n != nextPowerOfTwo(n) { if n != nextPowerOfTwo(n) {
return nil, xerrors.Errorf("can not generate the merkle tree") return nil, xerrors.Errorf("can not generate the merkle tree")
} }
for lenth := uint64(32); lenth < n; lenth <<= 1 { for lenth := abi.PaddedPieceSize(32); lenth < n; lenth <<= 1 {
for index := uint64(0); index < n; { for index := abi.PaddedPieceSize(0); index < n; {
windex := index windex := index
h := sha256.New() h := sha256.New()
// write left child // write left child
trim_to_fr32(D[index : index+32]) trim_to_fr32(data[index : index+32])
h.Write(D[index : index+32]) h.Write(data[index : index+32])
index += lenth index += lenth
// write right child // write right child
trim_to_fr32(D[index : index+32]) trim_to_fr32(data[index : index+32])
h.Write(D[index : index+32]) h.Write(data[index : index+32])
index += lenth index += lenth
res := h.Sum(nil) res := h.Sum(nil)
copy(D[windex:windex+32], res) copy(data[windex:windex+32], res)
} }
} }
trim_to_fr32(D[:32]) trim_to_fr32(data[:32])
return D[:32], nil return data[:32], nil
} }
func trim_to_fr32(data []byte) { func trim_to_fr32(data []byte) {
// strip last two bits, to ensure result is in Fr. // strip last two bits, to ensure result is in Fr.
data[31] &= 0b0011_1111 data[31] &= 0b0011_1111
} }
func paddedSize(size uint64) abi.UnpaddedPieceSize {
if size <= 127 {
return abi.UnpaddedPieceSize(127)
}
// round to the nearest 127-divisible, find out fr32-padded size
paddedPieceSize := (size + 126) / 127 * 128
// round up if not power of 2
if bits.OnesCount64(paddedPieceSize) != 1 {
paddedPieceSize = 1 << uint(64-bits.LeadingZeros64(paddedPieceSize))
}
// get the unpadded size of the now-determind piece
return abi.PaddedPieceSize(paddedPieceSize).Unpadded()
}
...@@ -69,9 +69,21 @@ var testSealAndWindowPoSt = &cli.Command{ ...@@ -69,9 +69,21 @@ var testSealAndWindowPoSt = &cli.Command{
var testSealCmd = &cli.Command{ var testSealCmd = &cli.Command{
Name: "test-seal", Name: "test-seal",
Usage: "Test sealing the sectors", Usage: "Test sealing the sectors",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "sector-size",
Value: "8MiB",
Usage: "size of the sectors in bytes",
},
},
Action: func(c *cli.Context) error { Action: func(c *cli.Context) error {
// Test 8MiB sector sectorSizeInt, err := units.RAMInBytes(c.String("sector-size"))
err := seal.TestSealAndUnseal() if err != nil {
return err
}
sectorSize := abi.SectorSize(sectorSizeInt)
// Default: Test 8MiB sector
err = seal.TestSealAndUnseal(sectorSize)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -27,9 +27,9 @@ type PieceEncoder interface { ...@@ -27,9 +27,9 @@ type PieceEncoder interface {
//interface //interface
type SectorSealer interface { type SectorSealer interface {
SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data, size int) error
AddPiece(ctx context.Context, sid storage.SectorRef) ([]abi.PieceInfo, error) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, piece abi.PieceInfo) error
// run pre-commit1 and pre-commit2 phase // run pre-commit1 and pre-commit2 phase
// generate the sealed sector and sector commitment(commd, commr) // generate the sealed sector and sector commitment(commd, commr)
Sealed(ctx context.Context, sid storage.SectorRef, pieces []abi.PieceInfo) (storage.SectorCids, error) Sealed(ctx context.Context, sid storage.SectorRef, pieces []abi.PieceInfo) (storage.SectorCids, error)
......
...@@ -20,6 +20,9 @@ type DecodedData struct { ...@@ -20,6 +20,9 @@ type DecodedData struct {
CommitData []byte CommitData []byte
} }
// DecodedData called piece follows as:
// [CommLen|DataLen|Data]...[CommLen|DataLen|Data|Commit][HasPre&CommLen|DataLen|PreCommit|Commit]
// 🢙🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢘🢛
func (data *DecodedData) Serialize() ([]byte, error) { func (data *DecodedData) Serialize() ([]byte, error) {
var buf []byte var buf []byte
MetaLen := uint32(len(data.Data)) MetaLen := uint32(len(data.Data))
...@@ -70,13 +73,13 @@ func (data *DecodedData) Deserialize(buf []byte) error { ...@@ -70,13 +73,13 @@ func (data *DecodedData) Deserialize(buf []byte) error {
data.Data = rbuf[:] data.Data = rbuf[:]
} else if uint32(len(rbuf)) <= CommLen+MetaLen { } else if uint32(len(rbuf)) <= CommLen+MetaLen {
data.Data = rbuf[:MetaLen] data.Data = rbuf[:MetaLen]
data.PieceCommit, err = to32ByteHash(rbuf[MetaLen:]) data.PieceCommit, err = to32ByteCommit(rbuf[MetaLen:])
if err != nil { if err != nil {
return err return err
} }
} else { } else {
data.Data = rbuf[:MetaLen] data.Data = rbuf[:MetaLen]
data.PieceCommit, err = to32ByteHash(rbuf[MetaLen : CommLen+MetaLen]) data.PieceCommit, err = to32ByteCommit(rbuf[MetaLen : CommLen+MetaLen])
if err != nil { if err != nil {
return err return err
} }
...@@ -84,7 +87,7 @@ func (data *DecodedData) Deserialize(buf []byte) error { ...@@ -84,7 +87,7 @@ func (data *DecodedData) Deserialize(buf []byte) error {
return nil return nil
} }
func to32ByteHash(in []byte) ([]cid.Commit, error) { func to32ByteCommit(in []byte) ([]cid.Commit, error) {
if len(in)%32 != 0 { if len(in)%32 != 0 {
return nil, xerrors.Errorf("lenth of the hash arr must be multiple of 32") return nil, xerrors.Errorf("lenth of the hash arr must be multiple of 32")
} }
......
...@@ -5,10 +5,9 @@ import ( ...@@ -5,10 +5,9 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/cid" "fil_integrate/build/cid"
spieces "fil_integrate/build/pieces" spieces "fil_integrate/build/pieces"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
"fil_integrate/build/storiface" "fil_integrate/build/storiface"
"fil_integrate/seal/basicpiece" "fil_integrate/seal/basicpiece"
...@@ -70,7 +69,7 @@ func (sp *Encoder) EncodeDataToPieces( ...@@ -70,7 +69,7 @@ func (sp *Encoder) EncodeDataToPieces(
return abi.PieceInfo{}, nil, err return abi.PieceInfo{}, nil, err
} }
pieceCommit, err := spieces.GeneratePieceCommitmentFast(dbuf[:], uint64(len(dbuf))) pieceCommit, err := spieces.GeneratePieceCommitmentFast(dbuf[:], abi.UnpaddedPieceSize(len(dbuf)))
if err != nil { if err != nil {
return abi.PieceInfo{}, nil, err return abi.PieceInfo{}, nil, err
} }
...@@ -145,7 +144,7 @@ func (sp *Encoder) EncodeData( ...@@ -145,7 +144,7 @@ func (sp *Encoder) EncodeData(
hashData = hashData[CommLen:] hashData = hashData[CommLen:]
} }
prePieceCommit, err = spieces.GeneratePieceCommitmentFast(buf, uint64(len(buf))) prePieceCommit, err = spieces.GeneratePieceCommitmentFast(buf, abi.UnpaddedPieceSize(len(buf)))
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -2,46 +2,34 @@ package seal ...@@ -2,46 +2,34 @@ package seal
import ( import (
// "sync" // "sync"
"os"
"context"
"fmt" "fmt"
"time"
"github.com/minio/blake2b-simd"
) )
type info struct { func GenerateRandomData(filename string, dataSize uint64, b []byte) ([]byte, error) {
a int if _, err := os.Stat(filename); !os.IsNotExist(err) {
b bool os.Remove(filename)
} }
// r := rand.New(rand.NewSource(time.Now().UnixNano()))
// user run seal: user -> provider pieces, pieces-hash // Datasize := (r.Intn(1024*1024) + 1024*1024) * 32
// user run getPiece: user -> provider pieces-hash, procider -> user pieces, var i uint64
buf := make([]byte, dataSize)
// provider run seal: provider -> keeper proof, commd, commr, sid... for i = 0; i < dataSize; i += 32 {
// provider run aggregate: provider -> keeper proof, commds... tmp := blake2b.Sum256(b)
// provider run window-post: b = tmp[:]
func Test() error { copy(buf[i:], b[:])
ch := make(chan interface{}, 1) }
go func() { f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644)
ch <- 8 if err != nil {
ch <- []byte{10, 20, 30} return nil, err
ch <- info{20, false} }
}() defer f.Close()
_, err = f.Write(buf[:])
printx(<-ch) if err != nil {
printx(<-ch) return nil, err
printx(<-ch)
return nil
}
func printx(val interface{}) {
switch v := val.(type) {
case int:
var a int = v
fmt.Println(a)
case []byte:
var a []byte = v
fmt.Println(a[0], len(a))
case info:
var a info = v
fmt.Println(a.a)
default:
fmt.Println("not found")
} }
} return b, nil
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment