Commit 6281e143 authored by 董子豪's avatar 董子豪

add three actors api

parent ffa08d17
package connect
import (
"context"
"golang.org/x/xerrors"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
)
type Connection struct {
u2pChannel chan Data
p2kChannel chan Data
}
func NewConnection() *Connection {
return &Connection{
u2pChannel: make(chan Data),
p2kChannel: make(chan Data),
}
}
func (conn *Connection) RequestPiece(ctx context.Context, pieceCommit cid.Commit) error {
sdata := Data{
op: OP_REQUEST_PIECE,
data: pieceCommit,
}
select {
case conn.u2pChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendPiece(ctx context.Context, data []byte, pieceCommit cid.Commit) error {
buf := make([]byte, len(data))
copy(buf, data)
sdata := Data{
op: OP_SEND_PIECE,
data: PieceInfo{
Data: buf,
PieceCID: pieceCommit,
},
}
select {
case conn.u2pChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendEncodeDone(ctx context.Context) error {
sdata := Data{
op: OP_ENCODE_DONE,
data: nil,
}
select {
case conn.u2pChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendSealDone(ctx context.Context) error {
sdata := Data{
op: OP_SEAL_DONE,
data: nil,
}
select {
case conn.u2pChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) RequestSealRandom(ctx context.Context, sid abi.SectorID) error {
sdata := Data{
op: OP_REQUEST_SEAL_RANDOM,
data: sid,
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) RequestPoStRandom(ctx context.Context, sids []abi.SectorID) error {
var buf []abi.SectorID
buf = append(buf, sids...)
sdata := Data{
op: OP_REQUEST_POST_RANDOM,
data: buf,
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendSealRandom(ctx context.Context, random abi.InteractiveSealRandomness) error {
sdata := Data{
op: OP_SEND_SEAL_RANDOM,
data: random,
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendPoStRandom(ctx context.Context, random abi.PoStRandomness) error {
sdata := Data{
op: OP_SEND_POST_RANDOM,
data: random,
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendSealProof(
ctx context.Context,
proof spproof.Proof,
sid abi.SectorID,
commit storage.SectorCids,
) error {
var buf spproof.Proof
buf = append(buf, proof...)
sdata := Data{
op: OP_SEND_SEAL_PROOF,
data: SealProofInfo{
Proof: buf,
Sector: sid,
Commit: commit,
},
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendAggregateSealProof(
ctx context.Context,
proof spproof.Proof,
sids []abi.SectorID,
commits []storage.SectorCids,
) error {
var buf spproof.Proof
buf = append(buf, proof...)
sdata := Data{
op: OP_SEND_AGGREGATE_SEAL_PROOF,
data: AggregateSealProofInfo{
Proof: buf,
Sectors: sids,
Commits: commits,
},
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendWindowPoStProof(
ctx context.Context,
proof spproof.PoStProof,
randomness abi.PoStRandomness,
miner abi.ActorID,
) error {
var buf spproof.Proof
buf1 := spproof.PoStProof{
ProofBytes: append(buf, proof.ProofBytes...),
PoStProof: proof.PoStProof,
}
sdata := Data{
op: OP_SEND_WINDOW_POST_PROOF,
data: WindowPoStProofInfo{
Proof: buf1,
Randomness: randomness,
Miner: miner,
},
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) SendAggregateWindowPoStProof(
ctx context.Context,
proof spproof.PoStProof,
randomnesses []abi.PoStRandomness,
miner abi.ActorID,
) error {
var buf spproof.Proof
buf1 := spproof.PoStProof{
ProofBytes: append(buf, proof.ProofBytes...),
PoStProof: proof.PoStProof,
}
buf2 := make([]abi.PoStRandomness, len(randomnesses))
for i, random := range randomnesses {
buf2[i] = random
}
sdata := Data{
op: OP_SEND_AGGREGATE_WINDOW_POST_PROOF,
data: AggregateWindowPoStProofInfo{
Proof: buf1,
Randomnesses: buf2,
Miner: miner,
},
}
select {
case conn.p2kChannel <- sdata:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (conn *Connection) U2PMessage(ctx context.Context) (Operator, interface{}, error) {
select {
case mess := <-conn.u2pChannel:
return mess.op, mess.data, nil
case <-ctx.Done():
if conn.u2pChannel != nil {
close(conn.u2pChannel)
conn.u2pChannel = nil
}
return OP_CLOSED, nil, xerrors.Errorf("context canceled")
}
}
func (conn *Connection) P2KMessage(ctx context.Context) (Operator, interface{}, error) {
select {
case mess := <-conn.p2kChannel:
return mess.op, mess.data, nil
case <-ctx.Done():
if conn.p2kChannel != nil {
close(conn.p2kChannel)
conn.p2kChannel = nil
}
return OP_CLOSED, nil, xerrors.Errorf("context canceled")
}
}
package connect
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/xerrors"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
)
func TestConnection() error {
conn := NewConnection()
ctx := context.Background()
// Create a new context, with its cancellation function
// from the original context
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
err := runUser(conn, ctx)
if err != nil {
cancel()
fmt.Printf("User: %+v\n", err)
}
fmt.Println("User: done")
}()
go func() {
defer wg.Done()
err := runProvider(conn, ctx)
if err != nil {
cancel()
fmt.Printf("Provider: %+v\n", err)
}
fmt.Println("Provider: done")
}()
go func() {
defer wg.Done()
err := runKeeper(conn, ctx)
if err != nil {
cancel()
fmt.Printf("Keeper: %+v\n", err)
}
fmt.Println("Keeper: done")
}()
wg.Wait()
return nil
}
func runUser(conn *Connection, ctx context.Context) error {
numFile := 4
for i := 0; i < numFile; i++ {
fmt.Println("User: Addding Piece")
time.Sleep(2 * time.Second)
conn.SendPiece(ctx, nil, cid.Commit{})
}
for i := 0; i < numFile; i++ {
op, data, err := conn.U2PMessage(ctx)
if err != nil {
return err
}
if op != OP_SEND_PIECE {
return xerrors.Errorf("unsupported operator")
}
fmt.Println("User: Reading piece", data)
}
return nil
}
func runProvider(conn *Connection, ctx context.Context) error {
numFile := 4
for i := 0; i < numFile; i++ {
op, data, err := conn.U2PMessage(ctx)
if err != nil {
return err
}
if op != OP_SEND_PIECE {
return xerrors.Errorf("unsupported operator")
}
fmt.Println("Provider: save piece", data)
}
for sum := 10; sum > 0; {
for i := 0; i < 4; i++ {
fmt.Println("Provider: sealing sector")
conn.RequestSealRandom(ctx, abi.SectorID{})
op, data, err := conn.P2KMessage(ctx)
if err != nil {
return err
}
if op != OP_SEND_SEAL_RANDOM {
return xerrors.Errorf("unsupported operator")
}
fmt.Println("Provider: Generating Commit Proof", data)
conn.SendSealProof(ctx, nil, abi.SectorID{}, storage.SectorCids{})
sum--
if sum <= 0 {
break
}
}
fmt.Println("Provider: Aggregating Commit Proof")
conn.SendAggregateSealProof(ctx, nil, nil, nil)
fmt.Println("Provider: Generating Window PoSt")
conn.SendWindowPoStProof(ctx, spproof.PoStProof{}, abi.PoStRandomness{}, 0)
}
fmt.Println("Provider: Aggregating Window PoSt")
conn.SendAggregateWindowPoStProof(ctx, spproof.PoStProof{}, nil, 0)
for i := 0; i < numFile; i++ {
conn.SendPiece(ctx, nil, cid.Commit{})
}
return nil
}
func runKeeper(conn *Connection, ctx context.Context) error {
for {
op, data, err := conn.P2KMessage(ctx)
if err != nil {
return err
}
switch op {
case OP_REQUEST_SEAL_RANDOM:
fmt.Println("Keeper: Generating random seed", data)
conn.SendSealRandom(ctx, abi.InteractiveSealRandomness{})
case OP_REQUEST_POST_RANDOM:
fmt.Println("Keeper: Generating random seed", data)
conn.SendPoStRandom(ctx, abi.PoStRandomness{})
case OP_SEND_SEAL_PROOF:
fmt.Println("Keeper: Verifying seal proof")
case OP_SEND_AGGREGATE_SEAL_PROOF:
fmt.Println("Keeper: verifying aggregate seal proof", data)
case OP_SEND_WINDOW_POST_PROOF:
fmt.Println("Keeper: verifying window post proof", data)
case OP_SEND_AGGREGATE_WINDOW_POST_PROOF:
fmt.Println("Keeper: verifying aggregate window post proof", data)
return nil
default:
return xerrors.Errorf("unsupported operator")
}
}
return nil
}
// func hadleUserMessage(conn *Connection, op Operator, data interface{}) error {
// switch op {
// case OP_REQUEST_PIECE:
// fmt.Println("Provider: send piece to user")
// conn.SendPiece(ctx, nil, cid.Commit{})
// case OP_SEND_PIECE:
// fmt.Println("Provider: save piece")
// fmt.Println("Provider: sealing sector")
// conn.RequestRandom(ctx, storage.SectorRef{})
// default:
// return xerrors.Errorf("unsupported operator")
// }
// return nil
// }
// func hadleKeeperMessage(conn *Connection, op Operator, data interface{}) error {
// switch op{
// case OP_SEND_RANDOM:
// fmt.Println("Provider: generating commit proof")
// conn.SendSealProof(ctx, nil, storage.SectorRef{}, cid.Commit{}, cid.Commit{})
// case OP_REQUEST_WINDOW_POST:
// fmt.Println("Provider: generating window post")
// conn.SendWindowPoStProof(ctx, spproof.PoStProof{}, abi.PoStRandomness{}, 0)
// default:
// return xerrors.Errorf("unsupported operator")
// }
// return nil
// }
package connect
import (
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
)
type Operator int
const (
OP_REQUEST_PIECE Operator = iota
OP_SEND_PIECE
OP_ENCODE_DONE
OP_SEAL_DONE
OP_REQUEST_SEAL_RANDOM
OP_REQUEST_POST_RANDOM
OP_SEND_SEAL_RANDOM
OP_SEND_POST_RANDOM
OP_SEND_SEAL_PROOF
OP_SEND_AGGREGATE_SEAL_PROOF
OP_REQUEST_WINDOW_POST
OP_SEND_WINDOW_POST_PROOF
OP_SEND_AGGREGATE_WINDOW_POST_PROOF
OP_CLOSED
)
type Data struct {
op Operator
data interface{}
}
type PieceInfo struct {
Data []byte
PieceCID cid.Commit
}
func GetPieceInfo(data interface{}) (PieceInfo, bool) {
piece, ok := data.(PieceInfo)
return piece, ok
}
type SealProofInfo struct {
Proof spproof.Proof
Sector abi.SectorID
Commit storage.SectorCids
}
type AggregateSealProofInfo struct {
Proof spproof.Proof
Sectors []abi.SectorID
Commits []storage.SectorCids
}
type WindowPoStChallenge struct {
Sectors []abi.SectorID
Randomness abi.PoStRandomness
}
type WindowPoStProofInfo struct {
Proof spproof.PoStProof
Randomness abi.PoStRandomness
Miner abi.ActorID
}
type AggregateWindowPoStProofInfo struct {
Proof spproof.PoStProof
Randomnesses []abi.PoStRandomness
Miner abi.ActorID
}
package keeper
import (
"context"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
)
type KeeperAPI interface {
VerifyPieceAndDataRoot(commd cid.Commit, pieces []abi.PieceInfo) (bool, error)
VerifySeal(ctx context.Context, sid abi.SectorID, randomness abi.InteractiveSealRandomness, commit storage.SectorCids, proof spproof.Proof) (bool, error)
VerifyAggregateSeals(ctx context.Context, sids []abi.SectorID, randomnesses []abi.InteractiveSealRandomness, commits []storage.SectorCids, proof spproof.Proof) (bool, error)
VerifyWindowPoSt(ctx context.Context, sids []abi.SectorID, randomness abi.PoStRandomness, proof spproof.PoStProof) (bool, error)
VerifyAggregateWindowPostProofs(ctx context.Context, sidsArr [][]abi.SectorID, randomnesses []abi.PoStRandomness, proof spproof.PoStProof) (bool, error)
}
package keeper
import(
import (
"context"
"golang.org/x/xerrors"
"fil_integrate/build"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
......@@ -11,83 +14,152 @@ import(
)
type Keeper struct {
vierifier seal.SectorVerifier
verifier seal.SectorVerifier
sectorSize abi.SectorSize
// sid -> (commd, commr)
commMap map[abi.SectorID]storage.SectorCids
}
var _ KeeperAPI = &Keeper{}
func New(verifier seal.SectorVerifier) *Keeper {
kp := &Keeper{
vierifier: verifier,
sectorSize: storage.SectorSize32MiB,
verifier: verifier,
sectorSize: abi.SectorSize(storage.SectorSize32MiB),
commMap: make(map[abi.SectorID]storage.SectorCids),
}
return kp
}
func (k *Keeper) VerifySeal(
ctx context.Context,
sid storage.SectorRef,
randomness abi.InteractiveSealRandomness
sealedCID cid.Commit,
unsealedCID cid.Commit,
sid abi.SectorID,
randomness abi.InteractiveSealRandomness,
commit storage.SectorCids,
proof spproof.Proof,
) (bool, error) {
return k.verifier.VerifySeal(spproof.SealVerifyInfo{
SealType: spt(k.sectorSize),
SectorID: sid,
log.Infof("[%d] Verifying Seal Sector", sid.Number)
ok, err := k.verifier.VerifySeal(spproof.SealVerifyInfo{
SealType: build.Spt(k.sectorSize),
Miner: sid.Miner,
Number: sid.Number,
InteractiveRandomness: randomness,
SealProof: proof,
SealedCID: sealedCID,
UnsealedCID: unsealedCID,
SealProof: proof,
SealedCID: commit.Sealed,
UnsealedCID: commit.Unsealed,
})
if ok && err == nil {
k.commMap[sid] = commit
}
return ok, err
}
func (k *Keeper) VerifyAggregateSeals(
ctx context.Context,
miner abi.ActorID,
numbers []abi.SectorNumber,
ctx context.Context,
sids []abi.SectorID,
randomnesses []abi.InteractiveSealRandomness,
commrs []cid.Commit,
commds []cid.Commit,
commits []storage.SectorCids,
proof spproof.Proof,
) (ok, error) {
infos := make([]spproof.AggregateSealVerifyInfo, len(numbers))
if len(numbers) != len(randomnesses) || len(numbers) != len(commmrs) || len(numbers) != len(commds){
) (bool, error) {
log.Infof("verifying aggregate seal proof")
infos := make([]spproof.AggregateSealVerifyInfo, len(sids))
if len(sids) != len(randomnesses) || len(sids) != len(commits) {
return false, xerrors.Errorf("the lenth of the seal infos don't match")
}
for i := 0; i < len(infos); i++ {
infos[i] = spproof.AggregateSealVerifyInfo{
Number: numbers[i],
Number: sids[i].Number,
InteractiveRandomness: randomnesses[i],
SealedCID: commrs[i],
UnsealedCID: commds[i],
SealedCID: commits[i].Sealed,
UnsealedCID: commits[i].Unsealed,
}
}
return k.verifier.VerifyAggregateSeals(spproof.AggregateSealVerifyProofAndInfos{
Miner: miner,
SealType: spt(k.sectorSize),
AggregateType: abi.DefaultAggregationType(),
ok, err := k.verifier.VerifyAggregateSeals(spproof.AggregateSealVerifyProofAndInfos{
Miner: sids[0].Miner,
SealType: build.Spt(k.sectorSize),
AggregateType: abi.DefaultAggregationType(),
AggregateProof: proof,
Infos: infos,
Infos: infos,
})
if ok && err == nil {
for i, sid := range sids {
k.commMap[sid] = commits[i]
}
}
return ok, err
}
func (k *Keeper) VerifyWindowPoSt(
sectors []storage.SectorRef,
proof spproof.PoStProof,
ctx context.Context,
sids []abi.SectorID,
randomness abi.PoStRandomness,
proverID abi.ActorID,
) (ok, error) {
return k.verifier.VerifyWindowPoSt(sectors, proof, randomness, proverID)
proof spproof.PoStProof,
) (bool, error) {
var sectors []spproof.SectorInfo
for _, sid := range sids {
commit, ok := k.commMap[sid]
if !ok {
return false, xerrors.Errorf("Sector:%+v not found", sid)
}
sectors = append(sectors, spproof.SectorInfo{
SealType: build.Spt(k.sectorSize),
SectorNumber: sid.Number,
SealedCID: commit.Sealed,
})
}
return k.verifier.VerifyWindowPoSt(spproof.WindowPoStVerifyInfo{
Randomness: randomness,
Proof: proof,
ChallengedSectors: sectors,
Prover: sids[0].Miner,
})
}
func (k *Keeper) VerifyAggregateWindowPostProofs(
sectors [][]storage.SectorRef,
proof spproof.PoStProof,
ctx context.Context,
sidsArr [][]abi.SectorID,
randomnesses []abi.PoStRandomness,
proverID abi.ActorID,
) (ok, error) {
return k.verifier.VerifyAggregateWindowPostProofs(sectors, proof, randomnesses, proverID)
}
\ No newline at end of file
proof spproof.PoStProof,
) (bool, error) {
var sectors []spproof.SectorInfo
sectorCount := make([]uint, len(sidsArr))
for i, sids := range sidsArr {
sectorCount[i] = uint(len(sids))
for _, sid := range sids {
commit, ok := k.commMap[sid]
if !ok {
return false, xerrors.Errorf("Sector:%+v not found", sid)
}
sectors = append(sectors, spproof.SectorInfo{
SealType: build.Spt(k.sectorSize),
SectorNumber: sid.Number,
SealedCID: commit.Sealed,
})
}
}
postType, err := sectors[0].SealType.RegisteredWindowPoStProof()
if err != nil {
return false, err
}
return k.verifier.VerifyAggregateWindowPostProofs(spproof.AggregateWindowPostInfos{
PoStType: postType,
AggregateType: abi.DefaultAggregationType(),
AggregateProof: proof,
ChallengedSectors: sectors,
SectorCount: sectorCount,
Randomnesses: randomnesses,
Prover: sidsArr[0][0].Miner,
})
}
func (k *Keeper) VerifyPieceAndDataRoot(commd cid.Commit, pieces []abi.PieceInfo) (bool, error) {
return k.verifier.VerifyPieceAndDataRoot(build.Spt(k.sectorSize), commd, pieces)
}
package keeper
import (
"context"
// "fmt"
logging "github.com/ipfs/go-log/v2"
"github.com/minio/blake2b-simd"
"golang.org/x/xerrors"
"fil_integrate/actor/connect"
"fil_integrate/build/state-types/abi"
"fil_integrate/seal"
)
var log = logging.Logger("Keeper")
func RunKeeper(ctx context.Context, conn *connect.Connection) error {
v := seal.NewVerifier()
kp := New(v)
var random = blake2b.Sum256([]byte("keeper"))
srm := make(map[abi.SectorID]abi.InteractiveSealRandomness)
prm := make(map[abi.PoStRandomness][]abi.SectorID)
for {
op, data, err := conn.P2KMessage(ctx)
if err != nil {
return err
}
switch op {
case connect.OP_REQUEST_SEAL_RANDOM:
log.Infof("Generating seal random seed")
sid := data.(abi.SectorID)
random := blake2b.Sum256(random[:])
// random = seed
srm[sid] = random
err := conn.SendSealRandom(ctx, random)
if err != nil {
return err
}
case connect.OP_REQUEST_POST_RANDOM:
log.Infof("Generating post random seed")
sids := data.([]abi.SectorID)
random = blake2b.Sum256(random[:])
prm[random] = sids
err := conn.SendPoStRandom(ctx, random)
if err != nil {
return err
}
case connect.OP_SEND_SEAL_PROOF:
info := data.(connect.SealProofInfo)
seed, ok := srm[info.Sector]
if !ok {
return xerrors.Errorf("can't find the random seed", info.Sector)
}
ok, err := kp.VerifySeal(ctx, info.Sector, seed, info.Commit, info.Proof)
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("Verify Seal proof failed")
}
case connect.OP_SEND_AGGREGATE_SEAL_PROOF:
info := data.(connect.AggregateSealProofInfo)
var seeds []abi.InteractiveSealRandomness
for _, sid := range info.Sectors {
seed, ok := srm[sid]
if !ok {
return xerrors.Errorf("can't find the random seed", sid)
}
seeds = append(seeds, seed)
}
ok, err := kp.VerifyAggregateSeals(ctx, info.Sectors, seeds, info.Commits, info.Proof)
if err != nil {
return err
}
if !ok {
log.Warnf("Keeper: Verify Seal Aggregation proof failed")
}
case connect.OP_SEND_WINDOW_POST_PROOF:
log.Infof("verifying window post proof")
info := data.(connect.WindowPoStProofInfo)
sids, ok := prm[info.Randomness]
if !ok {
return xerrors.Errorf("can't find the random seed", info.Randomness)
}
ok, err := kp.VerifyWindowPoSt(ctx, sids, info.Randomness, info.Proof)
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("Verify Window-PoSt proof failed")
}
case connect.OP_SEND_AGGREGATE_WINDOW_POST_PROOF:
log.Infof("verifying aggregate window post proof")
info := data.(connect.AggregateWindowPoStProofInfo)
var sidsArr [][]abi.SectorID
for _, random := range info.Randomnesses {
sids, ok := prm[random]
if !ok {
log.Warnf("can't find the random seed", random)
}
sidsArr = append(sidsArr, sids)
}
ok, err := kp.VerifyAggregateWindowPostProofs(ctx, sidsArr, info.Randomnesses, info.Proof)
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("Verify Window-PoSt proof failed")
}
return nil
default:
return xerrors.Errorf("unsupported operator")
}
}
return nil
}
package provider
import (
"context"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
)
type ProviderAPI interface {
SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error
NextSectorID() storage.SectorRef
MinerID() abi.ActorID
AddPiece(ctx context.Context, sid storage.SectorRef) error
Sealed(ctx context.Context, sid storage.SectorRef) (storage.SectorCids, error)
ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error)
GenerateCommitProof(ctx context.Context, sid storage.SectorRef, commit storage.SectorCids, seed abi.InteractiveSealRandomness) (spproof.Proof, error)
AggregateSealProofs(ctx context.Context, sids []storage.SectorRef, commits []storage.SectorCids, seeds []abi.InteractiveSealRandomness, proofs []spproof.Proof) (spproof.Proof, error)
GenerateWindowPoStProofs(ctx context.Context, sids []storage.SectorRef, commits []storage.SectorCids, randomness abi.PoStRandomness) (spproof.PoStProof, []abi.SectorID, error)
AggregateWindowPoStProofs(ctx context.Context, sidsArr [][]storage.SectorRef, commitsArr [][]storage.SectorCids, randomnesses []abi.PoStRandomness, proofs []spproof.PoStProof) (spproof.PoStProof, error)
}
......@@ -3,161 +3,193 @@ package provider
import (
"context"
"golang.org/x/xerrors"
"fil_integrate/build"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage/"
"fil_integrate/build/storage"
"fil_integrate/seal"
)
type Provider struct {
sealer seal.SectorSealer
minerID abi.ActorID
sealer seal.SectorSealer
minerID abi.ActorID
sortedPieces []storage.Piece
sectorSize abi.SectorSize
sectorNumber uint64
sectorSize abi.SectorSize
sectorNumber abi.SectorNumber
// pieceID -> sector[start:end]
pieceMap map[cid.Commit]storage.RangeSector
// sectorID -> []pieceID
sectorMap map[abi.SectorID][]abi.PieceInfo
// sectorID -> (commd, commr)
commMap map[abi.SectorID]storage.SectorCids
}
var _ ProviderAPI = &Provider{}
func New(sealer seal.SectorSealer, miner abi.ActorID) *Provider {
p := &Provider{
sealer: sealer,
minerID: miner,
sectorSize: abi.SectorSize(storage.SectorSize32MiB),
sealer: sealer,
minerID: miner,
sectorSize: abi.SectorSize(storage.SectorSize32MiB),
sectorNumber: 0,
sectorMap: make(map[abi.SectorID][]abi.PieceInfo),
commMap: make(map[abi.SectorID]storage.SectorCids)
pieceMap: make(map[cid.Commit]storage.RangeSector),
sectorMap: make(map[abi.SectorID][]abi.PieceInfo),
}
return p
}
func (p *Provider) GetNextSectorID() (storage.SectorRef) {
func (p *Provider) SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error {
return p.sealer.SavePiece(ctx, piece, in)
}
func (p *Provider) NextSectorID() storage.SectorRef {
sid := storage.SectorRef{
ID: abi.SectorID{
Miner: p.minerID,
Number: p.sealSectorNumber,
}
ProofType: spt(p.sectorSize),
Miner: p.minerID,
Number: p.sectorNumber,
},
ProofType: build.Spt(p.sectorSize),
}
p.sealSectorNumber++
p.sectorNumber++
return sid
}
func (p *Provider) MinerID() abi.ActorID {
return p.minerID
}
func (p *Provider) AddPiece(ctx context.Context, sid storage.SectorRef) error {
pieces, err := p.sealer.AddPiece(ctx, sid)
if err != nil {
return err
}
p.sectorMap[sid.ID] = pieces
return nil
}
func (p *Provider) Sealed(ctx context.Context, sid storage.SectorRef) error {
func (p *Provider) Sealed(ctx context.Context, sid storage.SectorRef) (storage.SectorCids, error) {
pieces, ok := p.sectorMap[sid.ID]
if !ok {
return xerrors.Errorf("can't find the pieces info")
return storage.SectorCids{}, xerrors.Errorf("can't find the pieces info")
}
cids, err := p.sealer.Sealed(ctx, sid, pieces)
if err != nil {
return err
return storage.SectorCids{}, err
}
return nil
// Store the mapping relations, pieceID -> sector[start:end]
var offset abi.UnpaddedPieceSize
for _, piece := range pieces {
p.pieceMap[piece.PieceCID] = storage.RangeSector{
Sector: sid,
Unsealed: cids.Unsealed,
Offset: abi.UnpaddedByteIndex(offset),
Size: piece.Size.Unpadded(),
}
offset += piece.Size.Unpadded()
}
return cids, nil
}
func ReadPiece(ctx context.Context, pieceID storage.PieceRef) ([]byte, error) {
buf, err := p.sealer.ReadPiece(ctx, pieceID)
if err != nil {
return nil, err
func (p *Provider) ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error) {
sRange, ok := p.pieceMap[pieceID]
if !ok {
return p.sealer.ReadPiece(ctx, pieceID)
}
return buf, nil
return p.sealer.UnsealedPieceRange(ctx, sRange.Sector, sRange.Unsealed, sRange.Offset, sRange.Size)
}
func (p *Provider) GenerateCommitProof(
ctx context,
sid storage.SectorRef,
ctx context.Context,
sid storage.SectorRef,
commit storage.SectorCids,
seed abi.InteractiveSealRandomness,
) (spproof.Proof, error) {
pieces, ok := p.sectorMap[sid.ID]
if !ok {
return spproof.Proof, xerrors.Errorf("can't find the pieces info")
}
cids, ok := p.commMap[sid.ID]
if !ok {
return spproof.Proof, xerrors.Errorf("can't find the commiment")
return spproof.Proof{}, xerrors.Errorf("can't find the pieces info")
}
return p.Sealer.GenerateCommitProof(ctx, sid, seed, pieces, seed)
return p.sealer.GenerateCommitProof(ctx, sid, seed, pieces, commit)
}
func (p *Provider) AggregateSealProofs(
ctx context.Context,
sids []storage.SectorRef,
seed []abi.InteractiveSealRandomness,
ctx context.Context,
sids []storage.SectorRef,
commits []storage.SectorCids,
seeds []abi.InteractiveSealRandomness,
proofs []spproof.Proof,
) (spproof.Proof, error) {
var infos []spproof.AggregateSealVerifyInfo
for i, sid := range sids {
cids, ok := p.commMap[sid.ID]
if !ok {
return spproof.Proof, xerrors.Errorf("can't find the commiment")
}
infos = append(infos, spproof.AggregateSealVerifyInfo{
Number: sid.ID.Number,
Randomness: seed[i],
SealedCID: cids.Sealed,
UnsealedCID: cids.Unsealed,
Number: sid.ID.Number,
InteractiveRandomness: seeds[i],
SealedCID: commits[i].Sealed,
UnsealedCID: commits[i].Unsealed,
})
}
return p.sealer.AggregateSealProofs(spproof.AggregateSealVerifyProofAndInfos{
SealType: spt(p.sectorSize),
SealType: build.Spt(p.sectorSize),
AggregateType: abi.DefaultAggregationType(),
Infos: infos,
})
Infos: infos,
}, proofs)
}
func (p *Provider) GenerateWindowPoStProofs(
ctx context.Context,
sids []storage.SectorRef,
ctx context.Context,
sids []storage.SectorRef,
commits []storage.SectorCids,
randomness abi.PoStRandomness,
) (spproof.PoStProof, error) {
) (spproof.PoStProof, []abi.SectorID, error) {
var challengedSectors []spproof.SectorInfo
if len(sids) != len(commits) {
return spproof.PoStProof{}, nil, xerrors.Errorf("can't use %d sector and %d commitment to gengerate window post", len(sids), len(commits))
}
for i, sid := range sids {
cids, ok := p.commMap[sid.ID]
if !ok {
return spproof.Proof, xerrors.Errorf("can't find the commiment")
}
challengedSectors = append(challengedSectors, spproof.SectorInfo{
SealType: spt(p.sectorSize),
SealType: build.Spt(p.sectorSize),
SectorNumber: sid.ID.Number,
SealedCID: cids.Sealed,
SealedCID: commits[i].Sealed,
})
}
return p.Sealer.GenerateWindowPoStProofs(ctx, p.minerID, challengedSectors, randomness)
return p.sealer.GenerateWindowPoStProofs(ctx, p.minerID, challengedSectors, randomness)
}
func (p *Provider) AggregateWindowPoStProofs(
ctx context.Context,
sectorCount []uint,
randomnesses []abi.PoStRandomness,
ctx context.Context,
sidsArr [][]storage.SectorRef,
commitsArr [][]storage.SectorCids,
randomnesses []abi.PoStRandomness,
proofs []spproof.PoStProof,
) (spproof.PoStProof, error) {
return p.sealer.AggregateSealProofs(spproof.AggregateWindowPostInfos{
AggregateType: abi.DefaultAggregationType(),
SectorCount: sectorCount,
Randomnesses: randomnesses,
}, proofs)
}
func spt(ssize abi.SectorSize) abi.RegisteredSealProof {
spt, err := build.SealProofTypeFromSectorSize(ssize, NewestNetworkVersion)
if err != nil {
panic(err)
var challengedSectors []spproof.SectorInfo
var sectorCount []uint
var srandomnesses []abi.PoStRandomness
for i, sids := range sidsArr {
for j, sid := range sids {
challengedSectors = append(challengedSectors, spproof.SectorInfo{
SealType: build.Spt(p.sectorSize),
SectorNumber: sid.ID.Number,
SealedCID: commitsArr[i][j].Sealed,
})
}
sectorCount = append(sectorCount, uint(len(sids)))
}
return spt
}
\ No newline at end of file
srandomnesses = append(srandomnesses, randomnesses...)
return p.sealer.AggregateWindowPoStProofs(spproof.AggregateWindowPostInfos{
PoStType: proofs[0].PoStProof,
AggregateType: abi.DefaultAggregationType(),
ChallengedSectors: challengedSectors,
SectorCount: sectorCount,
Randomnesses: srandomnesses,
Prover: p.minerID,
}, proofs)
}
package provider
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"fil_integrate/actor/connect"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
"fil_integrate/seal"
"fil_integrate/seal/basicfs"
)
var log = logging.Logger("Provider")
func RunProvider(ctx context.Context, conn *connect.Connection, root string) error {
root = filepath.Join(root, "provider")
if err := os.MkdirAll(root, 0775); err != nil {
return err
}
sbfs := &basicfs.Manager{
Root: root,
}
sb := seal.NewSealer(sbfs)
p := New(sb, 10000)
for {
op, data, err := conn.U2PMessage(ctx)
if err != nil {
return err
}
if op == connect.OP_ENCODE_DONE {
break
}
piece := data.(connect.PieceInfo)
err = p.SavePiece(ctx, abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(piece.Data)).Padded(),
PieceCID: piece.PieceCID,
}, bytes.NewReader(piece.Data))
if err != nil {
return err
}
}
var perr error
var sectorsInfo [][]storage.SectorRef
var commitsInfo [][]storage.SectorCids
var randomnesses []abi.PoStRandomness
var postProofs []spproof.PoStProof
for {
var sectors []abi.SectorID
var sids []storage.SectorRef
var seeds []abi.InteractiveSealRandomness
var proofs []spproof.Proof
var sectorCommits []storage.SectorCids
for i := 0; i < 4; i++ {
sid := p.NextSectorID()
perr = p.AddPiece(ctx, sid)
if perr == seal.PicesNotEnoughError {
break
} else if perr != nil {
return perr
}
cids, err := p.Sealed(ctx, sid)
if err != nil {
return err
}
err = conn.RequestSealRandom(ctx, sid.ID)
if err != nil {
return err
}
_, data, err := conn.P2KMessage(ctx)
if err != nil {
return err
}
seed := data.(abi.InteractiveSealRandomness)
proof, err := p.GenerateCommitProof(ctx, sid, cids, seed)
if err != nil {
return err
}
err = conn.SendSealProof(ctx, proof, sid.ID, cids)
if err != nil {
return err
}
sectorCommits = append(sectorCommits, cids)
sids = append(sids, sid)
sectors = append(sectors, sid.ID)
seeds = append(seeds, seed)
proofs = append(proofs, proof)
}
if perr != nil {
fmt.Println(perr.Error())
break
}
proof, err := p.AggregateSealProofs(ctx, sids, sectorCommits, seeds, proofs)
if err != nil {
return err
}
err = conn.SendAggregateSealProof(ctx, proof, sectors, sectorCommits)
if err != nil {
return err
}
err = conn.RequestPoStRandom(ctx, sectors)
if err != nil {
return err
}
_, data, err := conn.P2KMessage(ctx)
if err != nil {
return err
}
seed := data.(abi.PoStRandomness)
fmt.Println(seed)
postProof, _, err := p.GenerateWindowPoStProofs(ctx, sids, sectorCommits, seed)
if err != nil {
return err
}
err = conn.SendWindowPoStProof(ctx, postProof, seed, p.MinerID())
randomnesses = append(randomnesses, seed)
sectorsInfo = append(sectorsInfo, sids)
commitsInfo = append(commitsInfo, sectorCommits)
postProofs = append(postProofs, postProof)
}
log.Infof("Aggregating window post")
proof, err := p.AggregateWindowPoStProofs(ctx, sectorsInfo, commitsInfo, randomnesses, postProofs)
err = conn.SendAggregateWindowPoStProof(ctx, proof, randomnesses, p.MinerID())
if err != nil {
return err
}
err = conn.SendSealDone(ctx)
if err != nil {
return err
}
for {
op, data, err := conn.U2PMessage(ctx)
if err != nil {
return err
}
if op != connect.OP_REQUEST_PIECE {
return xerrors.Errorf("Unexpected operator")
}
piece := data.(cid.Commit)
buf, err := p.ReadPiece(ctx, piece)
if err != nil {
return err
}
err = conn.SendPiece(ctx, buf, piece)
if err != nil {
return err
}
}
return nil
}
package provider
import(
"context"
)
type ProviderAPI{
}
\ No newline at end of file
package actor
import(
"context"
"os"
"sync"
"fmt"
"io/ioutil"
"golang.org/x/xerrors"
"github.com/mitchellh/go-homedir"
"fil_integrate/actor/connect"
"fil_integrate/actor/user"
"fil_integrate/actor/provider"
"fil_integrate/actor/keeper"
)
func TestActor() error {
conn := connect.NewConnection()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sdir, err := homedir.Expand("~/tmp/bench")
if err != nil {
return err
}
err = os.MkdirAll(sdir, 0775) //nolint:gosec
if err != nil {
return xerrors.Errorf("creating sectorbuilder dir: %w", err)
}
tsdir, err := ioutil.TempDir(sdir, "bench")
if err != nil {
return err
}
defer os.RemoveAll(tsdir)
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
err := user.RunUser(ctx, conn, tsdir, 4)
cancel()
if err != nil {
fmt.Printf("User: %+v\n", err)
return
}
fmt.Println("User: done")
} ()
go func() {
defer wg.Done()
err := provider.RunProvider(ctx, conn, tsdir)
if err != nil {
cancel()
fmt.Printf("Provider: %+v\n", err)
return
}
fmt.Println("Provider: done")
} ()
go func() {
defer wg.Done()
err := keeper.RunKeeper(ctx, conn)
if err != nil {
cancel()
fmt.Printf("Keeper: %+v\n", err)
return
}
fmt.Println("Keeper: done")
} ()
wg.Wait()
return nil
}
\ No newline at end of file
package user
import (
"context"
"io"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
)
type UserAPI interface {
EncodeDataToPieces(ctx context.Context, file storage.Data) (abi.PieceInfo, error)
ReadPieceRange(ctx context.Context, out io.Writer, piece abi.PieceInfo, offset uint64, size uint64) error
}
package user
import (
"context"
"io"
"fil_integrate/build/storage"
)
type UserAPI interface {
EncodeDataToPieces(ctx context.Context, file storage.Data) (storage.Piece, []storage.Piece, error)
ReadPieceRange(ctx context.Context, out io.Writer, piece storage.Piece, offset uint64, size uint64) error
}
......@@ -4,39 +4,45 @@ import (
"context"
"io"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"fil_integrate/actor/connect"
"fil_integrate/build/cid"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
"fil_integrate/seal"
)
var log = logging.Logger("user")
type User struct {
sectorSize abi.SectorSize
encoder seal.PieceEncoder
conn *connect.Connection
cid2sidMap map[cid.Commit]abi.ActorID
}
var _ UserAPI = &User{}
func New(encoder seal.PieceEncoder) *User {
func New(encoder seal.PieceEncoder, conn *connect.Connection) *User {
u := &User{
sectorSize: abi.SectorSize(storage.SectorSize32MiB),
cid2sidMap: make(map[cid.Commit]abi.SectorID)
encoder: encoder,
sectorSize: abi.SectorSize(storage.SectorSize32MiB),
cid2sidMap: make(map[cid.Commit]abi.ActorID),
encoder: encoder,
conn: conn,
}
return u
}
func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (abi.PieceInfo, []abi.PieceInfo, error) {
func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (abi.PieceInfo, error) {
finalPiece, pieces, err := u.encoder.EncodeDataToPieces(ctx, u.sectorSize, file)
// map(file) -> finalPiece ...
// err := PostPiecesToProvider(pieces)
return finalPiece, pieces, err
// send piece to provider
for _, piece := range pieces {
buf, err := u.encoder.LoadPiece(ctx, piece.PieceCID)
if err != nil {
return abi.PieceInfo{}, err
}
u.conn.SendPiece(ctx, buf, piece.PieceCID)
}
return finalPiece, err
}
func (u *User) ReadPieceRange(
......@@ -46,11 +52,11 @@ func (u *User) ReadPieceRange(
offset uint64,
size uint64,
) error {
log.Infof("Reading Piece [%d:%d]", offset, offset + size)
log.Infof("Reading Piece [%d:%d]", offset, offset+size)
UnpaddedSectorSize := abi.PaddedPieceSize(u.sectorSize).Unpadded()
DataLen := uint32(UnpaddedSectorSize) - seal.TagLen
data, err := u.getPiece(ctx, piece.Commitment)
data, err := u.getPiece(ctx, piece.PieceCID)
if err != nil {
return err
}
......@@ -84,17 +90,20 @@ func (u *User) ReadPieceRange(
if rsize == 0 {
break
}
var wbuf []byte
if len(piecesCommit) != 0 {
data, err := u.getPiece(ctx, piecesCommit[0])
if err != nil {
return err
}
wbuf = data.Data[rstart:]
piecesCommit = piecesCommit[1:]
} else {
wbuf = buf[rstart:]
}
_, err = out.Write(wbuf[:rsize])
if err != nil {
return err
......@@ -107,13 +116,35 @@ func (u *User) ReadPieceRange(
func (u *User) getPiece(ctx context.Context, pieceCommit cid.Commit) (*storage.DecodedData, error) {
// todo: GET from chian/provider
// miner, ok := cid2sidMap[pieceCommit]
buf, err := GetPieceFromProvider(miner, pieceCommit)
data, err := u.encoder.DecodePiece(ctx, u.sectorSize, buf)
return data, err
buf, err := u.GetPieceFromProvider(ctx, 10000, pieceCommit)
if err != nil {
return nil, err
}
return u.encoder.DecodePiece(ctx, buf)
}
func GetPieceFromProvider(miner abi.ActorID, pieceCommit cid.Commit) ([]byte, error) {
return nil, nil
func (u *User) GetPieceFromProvider(ctx context.Context, miner abi.ActorID, pieceCommit cid.Commit) ([]byte, error) {
var buf []byte
err := u.conn.RequestPiece(ctx, pieceCommit)
if err != nil {
return nil, err
}
op, data, err := u.conn.U2PMessage(ctx)
if err != nil {
return nil, err
}
if op != connect.OP_SEND_PIECE {
return nil, xerrors.Errorf("Unexpected operator")
}
switch data.(type) {
case connect.PieceInfo:
buf = data.(connect.PieceInfo).Data
default:
return nil, xerrors.Errorf("Unexpected data")
}
return buf, nil
}
type RangePiece struct {
......
......@@ -4,72 +4,57 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/minio/md5-simd"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"fil_integrate/actor/connect"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
"fil_integrate/seal"
"fil_integrate/seal/basicfs"
)
var log = logging.Logger("User")
type Range struct {
offset uint64
size uint64
size uint64
}
type pieceRead struct {
piece storage.Piece
idx int
piece abi.PieceInfo
idx int
pieceRange Range
}
func TestUser() error {
sectorSize := storage.SectorSize32MiB
sdir, err := homedir.Expand("~/tmp/bench")
if err != nil {
func RunUser(ctx context.Context, conn *connect.Connection, root string, numFile int) error {
root = filepath.Join(root, "user")
if err := os.MkdirAll(root, 0775); err != nil {
return err
}
err = os.MkdirAll(sdir, 0775) //nolint:gosec
if err != nil {
return xerrors.Errorf("creating sectorbuilder dir: %w", err)
}
tsdir, err := ioutil.TempDir(sdir, "bench")
if err != nil {
return err
sbfs := &basicfs.Manager{
Root: root,
}
defer func() {
if err := os.RemoveAll(tsdir); err != nil {
log.Warn("remove all: ", err)
}
}()
sp := seal.NewEncoder(sbfs)
u := New(sp, conn)
// TODO: pretty sure this isnt even needed?
if err := os.MkdirAll(tsdir, 0775); err != nil {
return err
}
sp := &seal.Encoder{
Root: tsdir,
}
ctx := context.TODO()
u := New(sp)
sectorSize := storage.SectorSize32MiB
b := []byte("random data")
var numFile = 4
var range2Read []Range = []Range{Range{0, 0}, Range{1024, sectorSize}, Range{1024, 3*sectorSize + 1024}, Range{2*sectorSize + 2048, sectorSize + 4096}}
var PiecesRange []pieceRead
var range2Read []Range = []Range{Range{0,0}, Range{1024, sectorSize}, Range{1024, 3*sectorSize+1024}, Range{2*sectorSize+2048, sectorSize+4096}}
var err error
b := []byte("random data")
for i := 0; i < numFile; i++ {
filename := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", i))
log.Infof("Generating random data")
filename := filepath.Join(root, fmt.Sprintf("input-%d.dat", i))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
dataSize := uint64(r.Int63n(int64(sectorSize / 8))) * 32 + 4 * sectorSize
dataSize := uint64(r.Int63n(int64(sectorSize/8)))*32 + 4*sectorSize
b, err = seal.GenerateRandomData(filename, dataSize, b)
if err != nil {
return err
......@@ -81,30 +66,42 @@ func TestUser() error {
}
defer in.Close()
finalPiece, _, err := sp.EncodeDataToPieces(ctx, abi.SectorSize(sectorSize), in)
finalPiece, err := u.EncodeDataToPieces(ctx, in)
if err != nil {
return err
}
for _, r := range(range2Read) {
for _, r := range range2Read {
PiecesRange = append(PiecesRange, pieceRead{
piece: finalPiece,
idx: i,
piece: finalPiece,
idx: i,
pieceRange: r,
})
}
PiecesRange = append(PiecesRange, pieceRead{
piece: finalPiece,
idx: i,
idx: i,
pieceRange: Range{
offset: dataSize - 1024,
size: 1024,
size: 1024,
},
})
}
err = conn.SendEncodeDone(ctx)
if err != nil {
return err
}
op, _, err := conn.U2PMessage(ctx)
if err != nil {
return err
}
if op != connect.OP_SEAL_DONE {
return xerrors.Errorf("Unexpected operator")
}
for _, r := range(PiecesRange) {
input := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", r.idx))
output := filepath.Join(tsdir, "output")
for _, r := range PiecesRange {
input := filepath.Join(root, fmt.Sprintf("input-%d.dat", r.idx))
output := filepath.Join(root, "output")
if _, err = os.Stat(output); !os.IsNotExist(err) {
os.Remove(output)
......@@ -124,9 +121,9 @@ func TestUser() error {
return err
}
if !ok {
fmt.Println("decode pieces failed")
fmt.Println("User: decode pieces failed")
} else {
fmt.Println("decode pieces success")
fmt.Println("User: decode pieces success")
}
}
return nil
......
package cid
import()
import ()
type Commit [32]byte
......@@ -8,4 +8,4 @@ var Undef = Commit{}
func (c Commit) Bytes() []byte {
return c[:]
}
\ No newline at end of file
}
......@@ -4,7 +4,7 @@ import (
"math/bits"
"golang.org/x/xerrors"
"github.com/minio/sha256-simd"
"fil_integrate/build/fr32"
......
package proof
import (
"fil_integrate/build/state-types/abi"
"fil_integrate/build/cid"
"fil_integrate/build/state-types/abi"
)
type SectorInfo struct {
......@@ -15,7 +15,8 @@ type Proof []byte
type SealVerifyInfo struct {
SealType abi.RegisteredSealProof
SectorID abi.SectorID
Miner abi.ActorID
Number abi.SectorNumber
Randomness abi.SealRandomness
InteractiveRandomness abi.InteractiveSealRandomness
SealProof Proof
......
......@@ -424,7 +424,7 @@ func DefaultAggregationType() RegisteredAggregationProof {
return RegisteredAggregationProof_SnarkPackV1
}
type Randomness []byte
type Randomness [32]byte
type SealRandomness Randomness
type InteractiveSealRandomness Randomness
......
......@@ -7,6 +7,8 @@ import (
"fil_integrate/build/state-types/network"
)
const NewestNetworkVersion = network.Version13
func SealProofTypeFromSectorSize(ssize abi.SectorSize, nv network.Version) (abi.RegisteredSealProof, error) {
switch {
case nv < network.Version7:
......@@ -146,3 +148,12 @@ func WinningPoStProofTypeFromWindowPoStProofType(nver network.Version, proof abi
return -1, xerrors.Errorf("unknown proof type %d", proof)
}
}
func Spt(ssize abi.SectorSize) abi.RegisteredSealProof {
spt, err := SealProofTypeFromSectorSize(ssize, NewestNetworkVersion)
if err != nil {
panic(err)
}
return spt
}
\ No newline at end of file
......@@ -6,7 +6,8 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"fil_integrate/actor/user"
"fil_integrate/actor"
"fil_integrate/actor/connect"
)
var log = logging.Logger("bench")
......@@ -21,7 +22,8 @@ func main() {
Usage: "test the actor(user, provider, keeper)",
Version: "1.0.1",
Commands: []*cli.Command{
testUser,
testConnect,
testAllActor,
},
}
......@@ -31,26 +33,26 @@ func main() {
}
}
var testUser = &cli.Command{
Name: "test-all",
var testConnect = &cli.Command{
Name: "test-connect",
Usage: "Test Seal the sectors and generate window post",
// Flags: []cli.Flag{
// &cli.StringFlag{
// Name: "sector-size",
// Value: "8MiB",
// Usage: "size of the sectors in bytes",
// },
// &cli.IntFlag{
// Name: "num-agg",
// Value: 8,
// Usage: "How many window-post proofs used to aggregate",
// },
// },
Action: func(c *cli.Context) error {
err := user.TestUser()
err := connect.TestConnection()
if err != nil {
return err
}
return nil
},
}
\ No newline at end of file
}
var testAllActor = &cli.Command{
Name: "test-all-actors",
Usage: "Test Seal the sectors and generate window post",
Action: func(c *cli.Context) error {
err := actor.TestActor()
if err != nil {
return err
}
return nil
},
}
......@@ -3,8 +3,8 @@ package main
import (
"os"
"github.com/docker/go-units"
"fil_integrate/build/state-types/abi"
"github.com/docker/go-units"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
......
......@@ -31,15 +31,15 @@ func VerifySeal(info spproof.SealVerifyInfo) (bool, error) {
return false, err
}
commR := to32ByteArray(info.SealedCID[:])
commD := to32ByteArray(info.UnsealedCID[:])
commR := to32ByteArray(info.SealedCID)
commD := to32ByteArray(info.UnsealedCID)
proverID, err := toProverID(info.SectorID.Miner)
proverID, err := toProverID(info.Miner)
if err != nil {
return false, err
}
resp := generated.FilVerifySeal(sp, commR, commD, proverID, to32ByteArray(info.Randomness), to32ByteArray(info.InteractiveRandomness), uint64(info.SectorID.Number), info.SealProof, uint(len(info.SealProof)))
resp := generated.FilVerifySeal(sp, commR, commD, proverID, to32ByteArray(info.Randomness), to32ByteArray(info.InteractiveRandomness), uint64(info.Number), info.SealProof, uint(len(info.SealProof)))
resp.Deref()
defer generated.FilDestroyVerifySealResponse(resp)
......@@ -60,8 +60,8 @@ func VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (b
inputs := make([]generated.FilAggregationInputs, len(aggregate.Infos))
for i, info := range aggregate.Infos {
commR := to32ByteArray(info.SealedCID[:])
commD := to32ByteArray(info.UnsealedCID[:])
commR := to32ByteArray(info.SealedCID)
commD := to32ByteArray(info.UnsealedCID)
inputs[i] = generated.FilAggregationInputs{
CommR: commR,
......@@ -435,8 +435,8 @@ func SealCommitPhase1(
return nil, err
}
commR := to32ByteArray(sealedCID.Bytes())
commD := to32ByteArray(unsealedCID.Bytes())
commR := to32ByteArray(sealedCID)
commD := to32ByteArray(unsealedCID)
filPublicPieceInfos, filPublicPieceInfosLen, err := toFilPublicPieceInfos(pieces)
if err != nil {
......@@ -495,7 +495,7 @@ func AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos,
seeds := make([]generated.Fil32ByteArray, len(aggregateInfo.Infos))
for i, info := range aggregateInfo.Infos {
seeds[i] = to32ByteArray(info.InteractiveRandomness)
commRs[i] = to32ByteArray(info.SealedCID[:])
commRs[i] = to32ByteArray(info.SealedCID)
}
pfs := make([]generated.FilSealCommitPhase2Response, len(proofs))
......@@ -567,7 +567,7 @@ func UnsealRange(
return err
}
commD := to32ByteArray(unsealedCID[:])
commD := to32ByteArray(unsealedCID)
sealedSectorFd := sealedSector.Fd()
defer runtime.KeepAlive(sealedSector)
......@@ -736,7 +736,7 @@ func AggregateWindowPoStProofs(aggregateInfo spproof.AggregateWindowPostInfos, p
}
return spproof.PoStProof{
PoStProof: aggregateInfo.PoStType,
PoStProof: proofs[0].PoStProof,
ProofBytes: copyBytes(resp.ProofPtr, resp.ProofLen),
}, nil
......@@ -862,7 +862,7 @@ func toFilPublicPieceInfos(src []abi.PieceInfo) ([]generated.FilPublicPieceInfo,
out := make([]generated.FilPublicPieceInfo, len(src))
for idx := range out {
commP := to32ByteArray(src[idx].PieceCID[:])
commP := to32ByteArray(src[idx].PieceCID)
out[idx] = generated.FilPublicPieceInfo{
NumBytes: uint64(src[idx].Size.Unpadded()),
......@@ -877,7 +877,7 @@ func toFilPublicReplicaInfos(src []spproof.SectorInfo, typ string) ([]generated.
out := make([]generated.FilPublicReplicaInfo, len(src))
for idx := range out {
commR := to32ByteArray(src[idx].SealedCID[:])
commR := to32ByteArray(src[idx].SealedCID)
out[idx] = generated.FilPublicReplicaInfo{
CommR: commR.Inner,
......@@ -912,7 +912,7 @@ func toFilPublicReplicaInfos(src []spproof.SectorInfo, typ string) ([]generated.
}
func toFilPrivateReplicaInfo(src PrivateSectorInfo) (generated.FilPrivateReplicaInfo, func(), error) {
commR := to32ByteArray(src.SealedCID[:])
commR := to32ByteArray(src.SealedCID)
pp, err := toFilRegisteredPoStProof(src.PoStProofType)
if err != nil {
......@@ -936,7 +936,7 @@ func toFilPrivateReplicaInfos(src []PrivateSectorInfo, typ string) ([]generated.
out := make([]generated.FilPrivateReplicaInfo, len(src))
for idx := range out {
commR := to32ByteArray(src[idx].SealedCID[:])
commR := to32ByteArray(src[idx].SealedCID)
pp, err := toFilRegisteredPoStProof(src[idx].PoStProofType)
if err != nil {
......@@ -1059,9 +1059,15 @@ func toFilPoStProofs(src []spproof.PoStProof) ([]generated.FilPoStProof, uint, f
}, nil
}
func to32ByteArray(in []byte) generated.Fil32ByteArray {
func to32ByteArray(in [32]byte) generated.Fil32ByteArray {
var out generated.Fil32ByteArray
copy(out.Inner[:], in)
copy(out.Inner[:], in[:])
return out
}
func to32Bytes(in []byte) generated.Fil32ByteArray {
var out generated.Fil32ByteArray
copy(out.Inner[:], in[:])
return out
}
......@@ -1072,7 +1078,7 @@ func toProverID(minerID abi.ActorID) (generated.Fil32ByteArray, error) {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(minerID))
return to32ByteArray(buf[:n]), nil
return to32Bytes(buf[:n]), nil
}
func fromFilRegisteredPoStProof(p generated.FilRegisteredPoStProof) (abi.RegisteredPoStProof, error) {
......
......@@ -124,14 +124,14 @@ lazy_static! {
// https://github.com/filecoin-project/specs-actors/blob/master/actors/abi/sector.go
pub static ref WINDOW_POST_SECTOR_COUNT: RwLock<HashMap<u64, usize>> = RwLock::new(
[
(SECTOR_SIZE_2_KIB, 512),
(SECTOR_SIZE_4_KIB, 512),
(SECTOR_SIZE_16_KIB, 512),
(SECTOR_SIZE_32_KIB, 512),
(SECTOR_SIZE_8_MIB, 512),
(SECTOR_SIZE_16_MIB, 512),
(SECTOR_SIZE_32_MIB, 512),
(SECTOR_SIZE_64_MIB, 512),
(SECTOR_SIZE_2_KIB, 8),
(SECTOR_SIZE_4_KIB, 8),
(SECTOR_SIZE_16_KIB, 8),
(SECTOR_SIZE_32_KIB, 8),
(SECTOR_SIZE_8_MIB, 8),
(SECTOR_SIZE_16_MIB, 8),
(SECTOR_SIZE_32_MIB, 8),
(SECTOR_SIZE_64_MIB, 8),
(SECTOR_SIZE_128_MIB, 512),
(SECTOR_SIZE_256_MIB, 512),
(SECTOR_SIZE_512_MIB, 512),
......
......@@ -540,7 +540,7 @@ fn aggregate_window_post_proofs_inner<Tree: 'static + MerkleTreeTrait>(
ensure!(
registered_proof == &registered_proof_v1,
"can only generate the same kind of PoSt"
"can only aggregate the same kind of PoSt"
);
agg_proofs.push(proof.to_vec());
}
......@@ -608,7 +608,7 @@ fn verify_aggregate_window_post_proofs_inner<Tree: 'static + MerkleTreeTrait>(
ensure!(
registered_proof == &registered_proof_v1,
"can only aggregate the same kind of PoSt"
"can only verify the same kind of PoSt"
);
let info_v1 = filecoin_proofs_v1::PublicReplicaInfo::new(*comm_r)?;
......
......@@ -2,7 +2,7 @@ package seal
import (
"context"
"github.com/minio/blake2b-simd"
"fil_integrate/build/cid"
......@@ -14,19 +14,21 @@ import (
)
var b = blake2b.Sum256([]byte("randomness"))
var Ticket abi.SealRandomness = abi.SealRandomness(b[:])
var Ticket abi.SealRandomness = abi.SealRandomness(b)
type PieceEncoder interface {
// Split and encode data into pieces
// Pieces structure is [ Tag | MetaData | HashData ] or [ Tag | PreHash | HashData]
EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (abi.PieceInfo, []abi.PieceInfo, error)
LoadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error)
DecodePiece(ctx context.Context, buf []byte) (*storage.DecodedData, error)
}
//interface
type SectorSealer interface {
AddPiece(ctx context.Context, sid storage.SectorRef) ([]abi.PieceInfo, error)
SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error
AddPiece(ctx context.Context, sid storage.SectorRef) ([]abi.PieceInfo, error)
// run pre-commit1 and pre-commit2 phase
// generate the sealed sector and sector commitment(commd, commr)
Sealed(ctx context.Context, sid storage.SectorRef, pieces []abi.PieceInfo) (storage.SectorCids, error)
......@@ -35,7 +37,8 @@ type SectorSealer interface {
GenerateCommitProof(ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (spproof.Proof, error)
AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos, proofs []spproof.Proof) (spproof.Proof, error)
ReadPiece(ctx context.Context, piece cid.Commit) ([]byte, error)
ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error)
UnsealedPieceRange(ctx context.Context, sid storage.SectorRef, unsealed cid.Commit, offset abi.UnpaddedByteIndex, size abi.UnpaddedPieceSize) ([]byte, error)
GenerateWindowPoStProofs(ctx context.Context, minerID abi.ActorID, sectorInfo []spproof.SectorInfo, randomness abi.PoStRandomness) (spproof.PoStProof, []abi.SectorID, error)
AggregateWindowPoStProofs(aggregateInfo spproof.AggregateWindowPostInfos, proofs []spproof.PoStProof) (spproof.PoStProof, error)
......@@ -45,8 +48,9 @@ type SectorVerifier interface {
VerifySeal(info spproof.SealVerifyInfo) (bool, error)
VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (bool, error)
VerifyWindowPoSt(sectors []storage.SectorRef, proof spproof.PoStProof, randomness abi.PoStRandomness, proverID abi.ActorID) (bool, error)
VerifyAggregateWindowPostProofs(sectors [][]storage.SectorRef, proof spproof.PoStProof, randomnesses []abi.PoStRandomness, proverID abi.ActorID) (bool, error)
VerifyWindowPoSt(info spproof.WindowPoStVerifyInfo) (bool, error)
VerifyAggregateWindowPostProofs(aggregate spproof.AggregateWindowPostInfos) (bool, error)
VerifyPieceAndDataRoot(proofType abi.RegisteredSealProof, commd cid.Commit, pieces []abi.PieceInfo) (bool, error)
}
type SectorManager interface {
......
......@@ -97,7 +97,7 @@ func (sp *Encoder) EncodeDataToPieces(
}
func (sp *Encoder) EncodeData(
ctx context.Context,
ctx context.Context,
metadata []byte,
sectorSize abi.SectorSize,
MetaLen uint32,
......@@ -109,15 +109,19 @@ func (sp *Encoder) EncodeData(
var pieces []abi.PieceInfo
var err error
if len(metadata) == 0 && len(hashData) == 32 {
return nil, nil
}
for len(hashData) > 0 {
var buf []byte
//encode next n sector
if pieces != nil {
CommLen := min(uint32(len(hashData)), ((DataLen-32)/32)*32)
var data *storage.DecodedData = &storage.DecodedData{
HasPre: true,
PrePieceCommit: prePieceCommit,
CommitData: hashData[:CommLen],
HasPre: true,
PrePieceCommit: prePieceCommit,
CommitData: hashData[:CommLen],
}
buf, err = data.Serialize()
if err != nil {
......@@ -128,8 +132,8 @@ func (sp *Encoder) EncodeData(
} else {
CommLen := min(uint32(len(hashData)), ((DataLen-MetaLen)/32)*32)
var data *storage.DecodedData = &storage.DecodedData{
HasPre: false,
Data: metadata,
HasPre: false,
Data: metadata,
CommitData: hashData[:CommLen],
}
buf, err = data.Serialize()
......@@ -166,6 +170,15 @@ func (sp *Encoder) EncodeData(
return pieces, nil
}
func (sp *Encoder) LoadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error) {
stagePath, done, err := sp.sectors.AcquirePiece(ctx, pieceID, storiface.FTPiece, 0)
if err != nil {
return nil, err
}
defer done()
return ioutil.ReadFile(stagePath.Piece)
}
func (sp *Encoder) DecodePiece(
ctx context.Context,
buf []byte,
......
package seal
import (
// "sync"
"fmt"
)
type info struct {
a int
b bool
}
// user run seal: user -> provider pieces, pieces-hash
// user run getPiece: user -> provider pieces-hash, procider -> user pieces,
// provider run seal: provider -> keeper proof, commd, commr, sid...
// provider run aggregate: provider -> keeper proof, commds...
// provider run window-post:
func Test() error {
ch := make(chan interface{}, 1)
go func() {
ch <- 8
ch <- []byte{10, 20, 30}
ch <- info{20, false}
}()
printx(<-ch)
printx(<-ch)
printx(<-ch)
return nil
}
func printx(val interface{}) {
switch v := val.(type) {
case int:
var a int = v
fmt.Println(a)
case []byte:
var a []byte = v
fmt.Println(a[0], len(a))
case info:
var a info = v
fmt.Println(a.a)
default:
fmt.Println("not found")
}
}
......@@ -5,8 +5,8 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"io/ioutil"
"runtime"
"sync"
......@@ -15,49 +15,52 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
logging "github.com/ipfs/go-log/v2"
"fil_integrate/build"
"fil_integrate/build/cid"
"fil_integrate/build/fr32"
spproof "fil_integrate/build/proof"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/state-types/network"
"fil_integrate/build/storage"
"fil_integrate/build/storiface"
)
var skip = 0
var log = logging.Logger("sealing")
const NewestNetworkVersion = network.Version13
var PicesNotEnoughError = xerrors.Errorf("can not use the existing pieces to fill the sector")
type Sealer struct {
sectors SectorManager
// pieceID -> sector[start:end]
pieceMap map[cid.Commit]storage.RangeSector
sortedPieces []abi.PieceInfo
}
var _ SectorSealer = &Sealer{}
func NewSealer(sectors SectorManager) (*Sealer) {
func NewSealer(sectors SectorManager) *Sealer {
sb := &Sealer{
sectors: sectors,
pieceMap: make(map[cid.Commit]storage.RangeSector),
}
return sb
}
func (sb *Sealer) SavePiece(piece abi.PieceInfo, in storage.Data) error {
func (sb *Sealer) SavePiece(ctx context.Context, piece abi.PieceInfo, in storage.Data) error {
var res []abi.PieceInfo
if in != nil {
// stagePath, done, err := sb.sectors.AcquirePiece()
// out, err := os.OpenFile(stagePath.Piece, )
// write, err := io.CopyN(out, in, piece.Size)
stagePath, done, err := sb.sectors.AcquirePiece(ctx, piece.PieceCID, 0, storiface.FTPiece)
if err != nil {
return err
}
defer done()
out, err := os.OpenFile(stagePath.Piece, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
_, err = io.CopyN(out, in, int64(piece.Size.Unpadded()))
if err != nil {
return err
}
}
if sb.sortedPieces == nil || piece.Size >= sb.sortedPieces[0].Size {
res = append(res, piece)
res = append(res, sb.sortedPieces...)
......@@ -94,22 +97,14 @@ func (sb *Sealer) AddPiece(
maxPieceSize := abi.PaddedPieceSize(ssize)
// Select pieces to seal
for index = 0; index < len(sb.sortedPieces); {
if skip == 10 {
skip++
continue
}
skip++
pieceSize += sb.sortedPieces[index].Size
for index = 0; index < len(sb.sortedPieces); index++ {
if pieceSize > maxPieceSize {
return nil, xerrors.Errorf("Exists a piece whose size is bigger than 8MiB or is not power of two or the pieces is not sorted")
} else if pieceSize == maxPieceSize {
addPieces = append(addPieces, sb.sortedPieces[index])
index++
break
}
pieceSize += sb.sortedPieces[index].Size
addPieces = append(addPieces, sb.sortedPieces[index])
index++
}
if pieceSize != maxPieceSize {
......@@ -141,19 +136,6 @@ func (sb *Sealer) AddPiece(
}
sb.sortedPieces = sb.sortedPieces[index:]
unsealed, err := ffi.GenerateUnsealedCID(sector.ProofType, piecesInfo)
// Store the mapping relations, pieceID -> sector[start:end]
var offset abi.UnpaddedPieceSize
for _, piece := range addPieces {
sb.pieceMap[piece.PieceCID] = storage.RangeSector{
Sector: sector,
Unsealed: unsealed,
Offset: abi.UnpaddedByteIndex(offset),
Size: piece.Size.Unpadded(),
}
offset += piece.Size.Unpadded()
}
return piecesInfo, nil
}
......@@ -385,35 +367,30 @@ func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
}, nil
}
func (sb *Sealer) ReadPiece(
ctx context.Context,
piece cid.Commit,
) ([]byte, error) {
dstPaths, dstDone, err := sb.sectors.AcquirePiece(ctx, piece, 0, storiface.FTPiece)
func (sb *Sealer) ReadPiece(ctx context.Context, pieceID cid.Commit) ([]byte, error) {
srcPaths, srcDone, err := sb.sectors.AcquirePiece(ctx, pieceID, storiface.FTPiece, storiface.FTNone)
if err != nil {
return nil, err
}
defer dstDone()
if _, err = os.Stat(dstPaths.Piece); !os.IsNotExist(err) {
log.Infof("reading piece: %s", dstPaths.Piece)
return ioutil.ReadFile(dstPaths.Piece)
return nil, xerrors.Errorf("acquire sealed sector paths: %w", err)
}
defer srcDone()
sectorRange, found := sb.pieceMap[piece]
if !found {
return nil, xerrors.Errorf("The piece is not existing")
}
log.Infof("[%d] Unsealing sector", sectorRange.Sector.ID.Number)
return ioutil.ReadFile(srcPaths.Piece)
}
func (sb *Sealer) UnsealedPieceRange(
ctx context.Context,
sid storage.SectorRef,
unsealed cid.Commit,
offset abi.UnpaddedByteIndex,
size abi.UnpaddedPieceSize,
) ([]byte, error) {
log.Infof("[%d] Unsealing sector", sid.ID.Number)
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sectorRange.Sector, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, storiface.FTNone)
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTCache|storiface.FTSealed, storiface.FTNone)
if err != nil {
return nil, xerrors.Errorf("acquire sealed sector paths: %w", err)
}
defer srcDone()
if _, err = os.Stat(dstPaths.Unsealed); !os.IsNotExist(err) {
log.Infof("reading piece: %s", dstPaths.Unsealed)
return readFile(dstPaths.Unsealed, sectorRange.Size)
}
sealed, err := os.OpenFile(srcPaths.Sealed, os.O_RDONLY, 0644) // nolint:gosec
if err != nil {
......@@ -429,7 +406,7 @@ func (sb *Sealer) ReadPiece(
var perr error
outWait := make(chan struct{})
buf := make([]byte, sectorRange.Size)
buf := make([]byte, size)
{
go func() {
......@@ -438,7 +415,7 @@ func (sb *Sealer) ReadPiece(
for wbuf := buf[:]; len(wbuf) > 0; {
n, err := opr.Read(wbuf)
if err != nil && err != io.EOF{
if err != nil && err != io.EOF {
perr = xerrors.Errorf("copying data: %w", err)
return
}
......@@ -449,16 +426,16 @@ func (sb *Sealer) ReadPiece(
// </eww>
// TODO: This may be possible to do in parallel
err = ffi.UnsealRange(sectorRange.Sector.ProofType,
err = ffi.UnsealRange(sid.ProofType,
srcPaths.Cache,
sealed,
opw,
sectorRange.Sector.ID.Number,
sectorRange.Sector.ID.Miner,
sid.ID.Number,
sid.ID.Miner,
Ticket,
sectorRange.Unsealed,
uint64(sectorRange.Offset),
uint64(sectorRange.Size),
unsealed,
uint64(offset),
uint64(size),
)
_ = opw.Close()
......@@ -476,29 +453,11 @@ func (sb *Sealer) ReadPiece(
if perr != nil {
return nil, xerrors.Errorf("piping output to unsealed file: %w", perr)
}
// err = ioutil.WriteFile(dstPaths.Piece, buf[:], 0644)
// if err != nil {
// return nil, err
// }
return buf, nil
}
//
func (sb *Sealer) CheckPieceAndDataRoot(
sid storage.SectorRef,
commd cid.Commit,
pieces []abi.PieceInfo,
) (bool, error) {
UnsealedCID, err := ffi.GenerateUnsealedCID(sid.ProofType, pieces)
if err != nil {
return false, err
}
return commd == UnsealedCID, nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
// ffi.say_hello()
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache)
......@@ -508,7 +467,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef,
defer func() {
done()
os.Remove(paths.Unsealed)
} ()
}()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
......@@ -771,7 +730,7 @@ func readFile(file string, size abi.UnpaddedPieceSize) ([]byte, error) {
return nil, err
}
buf := make([]byte, size)
for wbuf := buf[:]; len(wbuf) > 0;{
for wbuf := buf[:]; len(wbuf) > 0; {
read, err := pr.Read(wbuf)
if err != nil {
return nil, err
......@@ -780,12 +739,3 @@ func readFile(file string, size abi.UnpaddedPieceSize) ([]byte, error) {
}
return buf, nil
}
func spt(ssize abi.SectorSize) abi.RegisteredSealProof {
spt, err := build.SealProofTypeFromSectorSize(ssize, NewestNetworkVersion)
if err != nil {
panic(err)
}
return spt
}
This diff is collapsed.
package seal
import (
"sync"
"golang.org/x/xerrors"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/storage"
"fil_integrate/build/state-types/abi"
ffi "github.com/filecoin-project/filecoin-ffi"
)
type Verifier struct {
lock *sync.RWMutex
sm map[abi.SectorID]storage.SectorCids
}
type Verifier struct{}
var ProofVerifier = Verifier{
lock: new(sync.RWMutex),
sm: make(map[abi.SectorID]storage.SectorCids),
}
var ProofVerifier = Verifier{}
var _ SectorVerifier = Verifier{}
func NewVerifier() Verifier {
return Verifier{}
}
func (v Verifier) VerifySeal(info spproof.SealVerifyInfo) (bool, error) {
info.Randomness = Ticket
ok, err := ffi.VerifySeal(info)
if ok && err == nil {
v.lock.Lock()
defer v.lock.Unlock()
v.sm[info.SectorID] = storage.SectorCids{
Sealed: info.SealedCID,
Unsealed: info.UnsealedCID,
}
}
return ok, err
return ffi.VerifySeal(info)
}
func (v Verifier) VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (bool, error) {
for i, _ := range aggregate.Infos {
aggregate.Infos[i].Randomness = Ticket
}
ok, err := ffi.VerifyAggregateSeals(aggregate)
if ok && err == nil {
v.lock.Lock()
defer v.lock.Unlock()
for _, info := range aggregate.Infos {
sid := abi.SectorID{
Miner: aggregate.Miner,
Number: info.Number,
}
v.sm[sid] = storage.SectorCids{
Sealed: info.SealedCID,
Unsealed: info.UnsealedCID,
}
}
}
return ok, err
return ffi.VerifyAggregateSeals(aggregate)
}
func (v Verifier) VerifyWindowPoSt(
sectors []storage.SectorRef,
proof spproof.PoStProof,
randomness abi.PoStRandomness,
proverID abi.ActorID,
) (bool, error) {
chanllendedSectors := make([]spproof.SectorInfo, len(sectors))
// minerID = sectors[0].ID.Miner
v.lock.RLock()
// defer m.Lock.RUnLock()
for idx, sid := range sectors {
cids, ok := v.sm[sid.ID]
if !ok {
v.lock.RUnlock()
return false, xerrors.Errorf("can not map the sectorID into sector commitment")
}
chanllendedSectors[idx] = spproof.SectorInfo{
SealType: sid.ProofType,
SectorNumber: sid.ID.Number,
SealedCID: cids.Sealed,
}
}
v.lock.RUnlock()
randomness[31] &= 0x3f
log.Infof("Verifying Window-PoSt Proof")
return ffi.VerifyWindowPoSt(spproof.WindowPoStVerifyInfo{
Randomness: randomness,
Proof: proof,
ChallengedSectors: chanllendedSectors,
Prover: proverID,
})
func (v Verifier) VerifyWindowPoSt(info spproof.WindowPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
return ffi.VerifyWindowPoSt(info)
}
func (v Verifier) VerifyAggregateWindowPostProofs(
sectors [][]storage.SectorRef,
proof spproof.PoStProof,
randomnesses []abi.PoStRandomness,
proverID abi.ActorID,
) (bool, error) {
var sectorInfos []spproof.SectorInfo
sectorCount := make([]uint, len(sectors))
v.lock.RLock()
// defer v.Lock.RUnLock()
for i, sectorRange := range sectors {
sectorCount[i] = uint(len(sectorRange))
for _, sid := range sectorRange {
cids, ok := v.sm[sid.ID]
if !ok {
v.lock.RUnlock()
return false, xerrors.Errorf("can not map the sectorID into sector commitment")
}
sectorInfos = append(sectorInfos, spproof.SectorInfo{
SealType: sid.ProofType,
SectorNumber: sid.ID.Number,
SealedCID: cids.Sealed,
})
}
func (v Verifier) VerifyAggregateWindowPostProofs(aggregate spproof.AggregateWindowPostInfos) (bool, error) {
for i, random := range aggregate.Randomnesses {
aggregate.Randomnesses[i][31] = random[31] & 0x3f
}
v.lock.RUnlock()
for i, random := range randomnesses {
randomnesses[i][31] = random[31] & 0x3f
}
return ffi.VerifyAggregateWindowPostProofs(aggregate)
}
postType, err := sectorInfos[0].SealType.RegisteredWindowPoStProof()
func (v Verifier) VerifyPieceAndDataRoot(
proofType abi.RegisteredSealProof,
commd cid.Commit,
pieces []abi.PieceInfo,
) (bool, error) {
UnsealedCID, err := ffi.GenerateUnsealedCID(proofType, pieces)
if err != nil {
return false, err
}
return ffi.VerifyAggregateWindowPostProofs(spproof.AggregateWindowPostInfos{
PoStType: postType,
AggregateType: abi.DefaultAggregationType(),
AggregateProof: proof,
ChallengedSectors: sectorInfos,
SectorCount: sectorCount,
Randomnesses: randomnesses,
Prover: proverID,
})
return commd == UnsealedCID, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment