Commit 677f5539 authored by 董子豪's avatar 董子豪

add seal and verify interface

parent 9ced0107
package basicfs
import (
"context"
"os"
"path/filepath"
"sync"
"github.com/filecoin-project/go-state-types/abi"
"fil_integrate/build/storage"
"fil_integrate/build/storiface"
)
type sectorFile struct {
abi.SectorID
storiface.SectorFileType
}
type Provider struct {
Root string
lk sync.Mutex
waitSector map[sectorFile]chan struct{}
}
func (b *Provider) GetRoot() string {
return b.Root
}
func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTSealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
done := func() {}
out := storiface.SectorPaths{
ID: id.ID,
}
for _, fileType := range storiface.PathTypes {
if !existing.Has(fileType) && !allocate.Has(fileType) {
continue
}
b.lk.Lock()
if b.waitSector == nil {
b.waitSector = map[sectorFile]chan struct{}{}
}
ch, found := b.waitSector[sectorFile{id.ID, fileType}]
if !found {
ch = make(chan struct{}, 1)
b.waitSector[sectorFile{id.ID, fileType}] = ch
}
b.lk.Unlock()
select {
case ch <- struct{}{}:
case <-ctx.Done():
done()
return storiface.SectorPaths{}, nil, ctx.Err()
}
path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id.ID, numbers...))
prevDone := done
done = func() {
prevDone()
<-ch
}
if !allocate.Has(fileType) {
if _, err := os.Stat(path); os.IsNotExist(err) {
done()
return storiface.SectorPaths{}, nil, storiface.ErrSectorNotFound
}
}
storiface.SetPathByType(&out, fileType, path)
}
return out, done, nil
}
func (b *Provider) AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTSealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
done := func() {}
out := storiface.SectorPaths{
ID: id.ID,
}
for _, fileType := range storiface.PathTypes {
if !existing.Has(fileType) && !allocate.Has(fileType) {
continue
}
b.lk.Lock()
if b.waitSector == nil {
b.waitSector = map[sectorFile]chan struct{}{}
}
ch, found := b.waitSector[sectorFile{id.ID, fileType}]
if !found {
ch = make(chan struct{}, 1)
b.waitSector[sectorFile{id.ID, fileType}] = ch
}
b.lk.Unlock()
select {
case ch <- struct{}{}:
case <-ctx.Done():
done()
return storiface.SectorPaths{}, nil, ctx.Err()
}
path := filepath.Join(b.Root, fileType.String(), "unsealed.dat")
prevDone := done
done = func() {
prevDone()
<-ch
}
if !allocate.Has(fileType) {
if _, err := os.Stat(path); os.IsNotExist(err) {
done()
return storiface.SectorPaths{}, nil, storiface.ErrSectorNotFound
}
}
storiface.SetPathByType(&out, fileType, path)
}
return out, done, nil
}
package seal
import (
"encoding/binary"
"io"
"os"
"syscall"
"github.com/detailyang/go-fallocate"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"fil_integrate/build/storiface"
)
const veryLargeRle = 1 << 20
const FallocFlPunchHole = 0x02 // linux/falloc.h
// Sectors can be partially unsealed. We support this by appending a small
// trailer to each unsealed sector file containing an RLE+ marking which bytes
// in a sector are unsealed, and which are not (holes)
// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct {
maxPiece abi.PaddedPieceSize
path string
allocated rlepluslazy.RLE
file *os.File
}
func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) error {
trailer, err := rlepluslazy.EncodeRuns(r, nil)
if err != nil {
return xerrors.Errorf("encoding trailer: %w", err)
}
// maxPieceSize == unpadded(sectorSize) == trailer start
if _, err := w.Seek(maxPieceSize, io.SeekStart); err != nil {
return xerrors.Errorf("seek to trailer start: %w", err)
}
rb, err := w.Write(trailer)
if err != nil {
return xerrors.Errorf("writing trailer data: %w", err)
}
if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil {
return xerrors.Errorf("writing trailer length: %w", err)
}
return w.Truncate(maxPieceSize + int64(rb) + 4)
}
func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
err = func() error {
err := fallocate.Fallocate(f, 0, int64(maxPieceSize))
if errno, ok := err.(syscall.Errno); ok {
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
log.Warnf("could not allocated space, ignoring: %v", errno)
err = nil // log and ignore
}
}
if err != nil {
return xerrors.Errorf("fallocate '%s': %w", path, err)
}
if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}()
if err != nil {
_ = f.Close()
return nil, err
}
if err := f.Close(); err != nil {
return nil, xerrors.Errorf("close empty partial file: %w", err)
}
return openPartialFile(maxPieceSize, path)
}
func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
var rle rlepluslazy.RLE
err = func() error {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat '%s': %w", path, err)
}
if st.Size() < int64(maxPieceSize) {
return xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize)
}
// read trailer
var tlen [4]byte
_, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen)))
if err != nil {
return xerrors.Errorf("reading trailer length: %w", err)
}
// sanity-check the length
trailerLen := binary.LittleEndian.Uint32(tlen[:])
expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize)
if expectLen != st.Size() {
return xerrors.Errorf("file '%s' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize)
}
if trailerLen > veryLargeRle {
log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen)
}
trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen)
if trailerStart != int64(maxPieceSize) {
return xerrors.Errorf("expected sector size to equal trailer start index")
}
trailerBytes := make([]byte, trailerLen)
_, err = f.ReadAt(trailerBytes, trailerStart)
if err != nil {
return xerrors.Errorf("reading trailer: %w", err)
}
rle, err = rlepluslazy.FromBuf(trailerBytes)
if err != nil {
return xerrors.Errorf("decoding trailer: %w", err)
}
it, err := rle.RunIterator()
if err != nil {
return xerrors.Errorf("getting trailer run iterator: %w", err)
}
f, err := rlepluslazy.Fill(it)
if err != nil {
return xerrors.Errorf("filling bitfield: %w", err)
}
lastSet, err := rlepluslazy.Count(f)
if err != nil {
return xerrors.Errorf("finding last set byte index: %w", err)
}
if lastSet > uint64(maxPieceSize) {
return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize)
}
return nil
}()
if err != nil {
_ = f.Close()
return nil, err
}
return &partialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
file: f,
}, nil
}
func (pf *partialFile) Close() error {
return pf.file.Close()
}
func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c > 0 {
log.Warnf("getting partial file writer overwriting %d allocated bytes", c)
}
}
return pf.file, nil
}
func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
ored, err := rlepluslazy.Or(have, pieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
if err := Deallocate(pf.file, int64(offset), int64(size)); err != nil {
return xerrors.Errorf("deallocating: %w", err)
}
s, err := rlepluslazy.Subtract(have, pieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, s); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c != uint64(size) {
log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c)
}
}
return pf.file, nil
}
func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator()
}
func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
have, err := pf.Allocated()
if err != nil {
return false, err
}
u, err := rlepluslazy.And(have, pieceRun(offset.Padded(), size.Padded()))
if err != nil {
return false, err
}
uc, err := rlepluslazy.Count(u)
if err != nil {
return false, err
}
return abi.PaddedPieceSize(uc) == size.Padded(), nil
}
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{
Val: false,
Len: uint64(offset),
})
}
runs = append(runs, rlepluslazy.Run{
Val: true,
Len: uint64(size),
})
return &rlepluslazy.RunSliceIterator{Runs: runs}
}
func Deallocate(file *os.File, offset int64, length int64) error {
if length == 0 {
return nil
}
err := syscall.Fallocate(int(file.Fd()), FallocFlPunchHole, offset, length)
if errno, ok := err.(syscall.Errno); ok {
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
log.Warnf("could not deallocate space, ignoring: %v", errno)
err = nil // log and ignore
}
}
return err
}
package seal
import(
"bufio"
"context"
"io"
"os"
// "bytes"
"runtime"
"sync"
"path/filepath"
"bytes"
"golang.org/x/xerrors"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
logging "github.com/ipfs/go-log/v2"
// commcid "github.com/filecoin-project/go-fil-commcid"
commcid "github.com/filecoin-project/go-fil-commcid"
ffi "github.com/filecoin-project/filecoin-ffi"
"fil_integrate/extern/sector-storage/ffiwrapper"
"fil_integrate/extern/sector-storage/ffiwrapper/basicfs"
"fil_integrate/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/ipfs/go-cid"
"fil_integrate/build/fr32"
"fil_integrate/build/storiface"
"fil_integrate/build/storage"
spproof "fil_integrate/build/proof"
)
var log = logging.Logger("sealing")
......@@ -28,46 +31,569 @@ var log = logging.Logger("sealing")
//[has_pre][MetaLen1..MetaLen4][PieceLen1..PieceLen4]
const TagLen int = 8
const arp = abi.RegisteredAggregationProof_SnarkPackV1
const NewestNetworkVersion = network.Version13
func DefaultAggregationType() abi.RegisteredAggregationProof {
return arp;
type Sealer struct{
sectors SectorProvider
}
func GetCommRFromDir(
sb *ffiwrapper.Sealer,
ctx context.Context,
sectorID abi.SectorID,
) (cid.Cid, error) {
return sb.GetCommRFromDir(ctx, sectorID)
var _ SectorSealer = &Sealer{}
func New(sectors SectorProvider) (*Sealer, error) {
sb := &Sealer{
sectors: sectors,
}
return sb, nil
}
func PutCommRIntoDir(
sb *ffiwrapper.Sealer,
ctx context.Context,
sectorID abi.SectorID,
sealedCID cid.Cid,
) error {
return sb.PutCommRIntoDir(ctx, sectorID, sealedCID)
func (sb *Sealer)GetCommRFromDir(sectorID abi.SectorID) (cid.Cid, error) {
commr := make([]byte, 32)
path := filepath.Join(sb.sectors.GetRoot(), "cache", storiface.SectorName(sectorID), "commr")
out, err := os.OpenFile(path, os.O_RDONLY, 0644)
if err != nil{
return cid.Cid{}, err
}
defer out.Close()
_, err = out.Read(commr[:])
if err != nil{
return cid.Cid{}, err
}
return commcid.ReplicaCommitmentV1ToCID(commr[:])
}
func AddPiece(
sb *ffiwrapper.Sealer,
func (sb *Sealer)PutCommRIntoDir(sectorID abi.SectorID, sealedCID cid.Cid) error {
commr, err:= commcid.CIDToReplicaCommitmentV1(sealedCID)
if err != nil{
return err
}
path := filepath.Join(sb.sectors.GetRoot(), "cache", storiface.SectorName(sectorID), "commr")
out, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0644)
if err != nil{
return err
}
defer out.Close()
_, err = out.Write(commr[:])
if err != nil{
return err
}
return nil
}
func (sb *Sealer)AddPiece(
ctx context.Context,
sector storage.SectorRef,
existingPieceSizes []abi.UnpaddedPieceSize,
pieceSize abi.UnpaddedPieceSize,
file storage.Data,
numbers ...int32,
) (abi.PieceInfo, error) {
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return abi.PieceInfo{}, err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
if offset.Padded()+pieceSize.Padded() > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
}
var done func()
var stagedFile *partialFile
defer func() {
if done != nil {
done()
}
if stagedFile != nil {
if err := stagedFile.Close(); err != nil {
log.Errorf("closing staged file: %+v", err)
}
}
}()
var stagedPath storiface.SectorPaths
if len(existingPieceSizes) == 0 {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, storiface.FTUnsealed, storiface.PathSealing, numbers...)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = createPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
}
} else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathSealing, numbers...)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = openPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
}
}
w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded())
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
}
pw := fr32.NewPadWriter(w)
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)
buf := make([]byte, chunk.Unpadded())
for i := 0; i < parallel; i++ {
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
break // won't use this many buffers
}
throttle <- make([]byte, chunk.Unpadded())
}
for {
var read int
for rbuf := buf; len(rbuf) > 0; {
n, err := pr.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
}
rbuf = rbuf[n:]
read += n
if err == io.EOF {
break
}
}
if read == 0 {
break
}
done := make(chan struct {
cid.Cid
error
}, 1)
pbuf := <-throttle
copy(pbuf, buf[:read])
go func(read int) {
defer func() {
throttle <- pbuf
}()
c, err := sb.pieceCid(sector.ProofType, pbuf[:read])
done <- struct {
cid.Cid
error
}{c, err}
}(read)
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
select {
case e := <-done:
if e.error != nil {
return abi.PieceInfo{}, e.error
}
return abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
PieceCID: e.Cid,
}, nil
case <-ctx.Done():
return abi.PieceInfo{}, ctx.Err()
}
})
}
if err := pw.Close(); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err)
}
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
if err := stagedFile.Close(); err != nil {
return abi.PieceInfo{}, err
}
stagedFile = nil
if len(piecePromises) == 1 {
return piecePromises[0]()
}
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pieceCids[i], err = promise()
if err != nil {
return abi.PieceInfo{}, err
}
}
pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}
// validate that the pieceCID was properly formed
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
return abi.PieceInfo{}, err
}
return abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
}, nil
}
func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) {
prf, werr, err := ToReadableFile(bytes.NewReader(in), int64(len(in)))
if err != nil {
return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err)
}
pieceCID, err := ffi.GeneratePieceCIDFromFile(spt, prf, abi.UnpaddedPieceSize(len(in)))
if err != nil {
return cid.Undef, xerrors.Errorf("generating piece commitment: %w", err)
}
_ = prf.Close()
return pieceCID, werr()
}
func ToReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {
return f, func() error { return nil }, nil
}
var w *os.File
f, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
var wait sync.Mutex
var werr error
wait.Lock()
go func() {
defer wait.Unlock()
var copied int64
copied, werr = io.CopyN(w, r, n)
if werr != nil {
log.Warnf("toReadableFile: copy error: %+v", werr)
}
err := w.Close()
if werr == nil && err != nil {
werr = err
log.Warnf("toReadableFile: close error: %+v", err)
return
}
if copied != n {
log.Warnf("copied different amount than expected: %d != %d", copied, n)
werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n)
}
}()
return f, func() error {
wait.Lock()
return werr
}, nil
}
func (sb *Sealer)UnsealedRange(
ctx context.Context,
sid storage.SectorRef,
sectorSize abi.SectorSize,
ticket abi.SealRandomness,
commd cid.Cid,
out io.Writer,
offset storiface.UnpaddedByteIndex,
size abi.UnpaddedPieceSize,
) error {
log.Infof("[%d] Unsealing sector", sid.ID.Number)
{
p, done, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquire unsealed sector for removing: %w", err)
}
done()
if err := os.Remove(p.Unsealed); err != nil {
return xerrors.Errorf("removing unsealed sector: %w", err)
}
}
err := sb.UnsealPiece(ctx, sid, 0, abi.PaddedPieceSize(sectorSize).Unpadded(), ticket, commd)
if err != nil {
return err
}
ok, err := sb.ReadPiece(ctx, out, sid, offset, size)
if err != nil{
return err
}
if !ok {
return xerrors.Errorf("Read pieces error!")
}
return nil
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
// try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
var pf *partialFile
switch {
case xerrors.Is(err, storiface.ErrSectorNotFound):
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
}
defer done()
pf, err = createPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("create unsealed file: %w", err)
}
case err == nil:
defer done()
// return nil
pf, err = openPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("opening partial file: %w", err)
}
default:
return xerrors.Errorf("acquire unsealed sector path (existing): %w", err)
}
defer pf.Close() // nolint
allocated, err := pf.Allocated()
if err != nil {
return xerrors.Errorf("getting bitruns of allocated data: %w", err)
}
toUnseal, err := computeUnsealRanges(allocated, offset, size)
if err != nil {
return xerrors.Errorf("computing unseal ranges: %w", err)
}
if !toUnseal.HasNext() {
return nil
}
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err)
}
defer srcDone()
sealed, err := os.OpenFile(srcPaths.Sealed, os.O_RDONLY, 0644) // nolint:gosec
if err != nil {
return xerrors.Errorf("opening sealed file: %w", err)
}
defer sealed.Close() // nolint
var at, nextat abi.PaddedPieceSize
first := true
for first || toUnseal.HasNext() {
first = false
piece, err := toUnseal.NextRun()
if err != nil {
return xerrors.Errorf("getting next range to unseal: %w", err)
}
at = nextat
nextat += abi.PaddedPieceSize(piece.Len)
if !piece.Val {
continue
}
out, err := pf.Writer(offset.Padded(), size.Padded())
if err != nil {
return xerrors.Errorf("getting partial file writer: %w", err)
}
// <eww>
opr, opw, err := os.Pipe()
if err != nil {
return xerrors.Errorf("creating out pipe: %w", err)
}
var perr error
outWait := make(chan struct{})
{
go func() {
defer close(outWait)
defer opr.Close() // nolint
padwriter := fr32.NewPadWriter(out)
bsize := uint64(size.Padded())
if bsize > uint64(runtime.NumCPU())*fr32.MTTresh {
bsize = uint64(runtime.NumCPU()) * fr32.MTTresh
}
bw := bufio.NewWriterSize(padwriter, int(abi.PaddedPieceSize(bsize).Unpadded()))
_, err := io.CopyN(bw, opr, int64(size))
if err != nil {
perr = xerrors.Errorf("copying data: %w", err)
return
}
if err := bw.Flush(); err != nil {
perr = xerrors.Errorf("flushing unpadded data: %w", err)
return
}
if err := padwriter.Close(); err != nil {
perr = xerrors.Errorf("closing padwriter: %w", err)
return
}
}()
}
// </eww>
// TODO: This may be possible to do in parallel
err = ffi.UnsealRange(sector.ProofType,
srcPaths.Cache,
sealed,
opw,
sector.ID.Number,
sector.ID.Miner,
randomness,
commd,
uint64(at.Unpadded()),
uint64(abi.PaddedPieceSize(piece.Len).Unpadded()))
_ = opw.Close()
if err != nil {
return xerrors.Errorf("unseal range: %w", err)
}
select {
case <-outWait:
case <-ctx.Done():
return ctx.Err()
}
if perr != nil {
return xerrors.Errorf("piping output to unsealed file: %w", perr)
}
if err := pf.MarkAllocated(storiface.PaddedByteIndex(at), abi.PaddedPieceSize(piece.Len)); err != nil {
return xerrors.Errorf("marking unsealed range as allocated: %w", err)
}
if !toUnseal.HasNext() {
break
}
}
return nil
}
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
path, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
if err != nil {
return false, xerrors.Errorf("acquire unsealed sector path: %w", err)
}
defer done()
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return false, err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
if err != nil {
if xerrors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, xerrors.Errorf("opening partial file: %w", err)
}
return sb.AddPiece(ctx, sector, existingPieceSizes, pieceSize, file)
ok, err := pf.HasAllocated(offset, size)
if err != nil {
_ = pf.Close()
return false, err
}
if !ok {
_ = pf.Close()
return false, nil
}
f, err := pf.Reader(offset.Padded(), size.Padded())
if err != nil {
_ = pf.Close()
return false, xerrors.Errorf("getting partial file reader: %w", err)
}
upr, err := fr32.NewUnpadReader(f, size.Padded())
if err != nil {
return false, xerrors.Errorf("creating unpadded reader: %w", err)
}
if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
_ = pf.Close()
return false, xerrors.Errorf("reading unsealed file: %w", err)
}
if err := pf.Close(); err != nil {
return false, xerrors.Errorf("closing partial file: %w", err)
}
return true, nil
}
// 没有测试
// func EncodeDataToPieces(sb *ffiwrapper.Sealer, ctx context.Context, sector storage.SectorRef, sectorSize abi.SectorSize, file storage.Data) (abi.PieceInfo, error) {
// func EncodeDataToPieces(sb *ffiwrapper.Sealer, ctx context.Context, sector SectorRef, sectorSize abi.SectorSize, file Data) (abi.PieceInfo, error) {
// var piecesID []byte
// var FinalPiece abi.PieceInfo
// var pieceNumber int32 = 1
......@@ -108,7 +634,7 @@ func AddPiece(
// if err != nil{
// return nil, err
// }
// piecesID = append(pieces, commcid.CIDToReplicaCommitmentV1(piece.PieceCID)...)
// piecesID = append(pieces, commcid.CIDToPieceCommitmentV1(piece.PieceCID)...)
// pieceNumber += 1
// }
// return pieceNumber, FinalPiece, nil
......@@ -142,7 +668,7 @@ func AddPiece(
// copy(buf[:4], []byte(MetaLen | 0x80000000))
// copy(buf[4:8], []byte(CommLen))
// copy(buf[8:40], commcid.CIDToReplicaCommitmentV1(prePiece.PieceCID)...)
// copy(buf[8:40], commcid.CIDToPieceCommitmentV1(prePiece.PieceCID)...)
// copy(rbuf, piecesID[:CommLen])
// copy(rbuf[CommLen:], 0)
......@@ -207,100 +733,156 @@ func AddPiece(
// return piecesID, nil
// }
func AggregateWindowPoStProofs(
sb *ffiwrapper.Sealer,
ctx context.Context,
aggregate abi.RegisteredAggregationProof,
randomnesses []abi.PoStRandomness,
windowPoStProofs []proof5.PoStProof,
sectorCountArr []uint,
) ([]byte, error) {
if len(windowPoStProofs) != len(sectorCountArr) {
return nil, xerrors.Errorf("the lenth of windowPoStProofs and sectorCount is not match")
func (sb *Sealer)CheckPieceAndDataRoot(
sid storage.SectorRef,
commd cid.Cid,
pieces []abi.PieceInfo,
) (bool, error) {
UnsealedCID, err := ffi.GenerateUnsealedCID(sid.ProofType, pieces)
if err != nil{
return false, err
}
sectorCount := sectorCountArr[0]
for _, count := range(sectorCountArr) {
if sectorCount != count {
return nil, xerrors.Errorf("Window PoSt challenge count must be equal")
return commd == UnsealedCID, nil
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
// ffi.say_hello()
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil {
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
return nil, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
return sb.AggregateWindowPoStProofs(ctx, ffi.AggregateWindowPostInfos{
Randomnesses: randomnesses,
AggregateType: aggregate,
Proofs: windowPoStProofs,
SectorCount: sectorCount,
})
if err := e.Close(); err != nil {
return nil, err
}
if err := os.Mkdir(paths.Cache, 0755); err != nil { // nolint
if os.IsExist(err) {
log.Warnf("existing cache in %s; removing", paths.Cache)
if err := os.RemoveAll(paths.Cache); err != nil {
return nil, xerrors.Errorf("remove existing sector cache from %s (sector %d): %w", paths.Cache, sector, err)
}
if err := os.Mkdir(paths.Cache, 0755); err != nil { // nolint:gosec
return nil, xerrors.Errorf("mkdir cache path after cleanup: %w", err)
}
} else {
return nil, err
}
}
var sum abi.UnpaddedPieceSize
for _, piece := range pieces {
sum += piece.Size.Unpadded()
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return nil, err
}
ussize := abi.PaddedPieceSize(ssize).Unpadded()
if sum != ussize {
return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
// TODO: context cancellation respect
p1o, err := ffi.SealPreCommitPhase1(
sector.ProofType,
paths.Cache,
paths.Unsealed,
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
ticket,
pieces,
)
if err != nil {
return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
return p1o, nil
}
func VerifyAggregateWindowPostProofs(
ctx context.Context,
proofType abi.RegisteredPoStProof,
aggregateType abi.RegisteredAggregationProof,
miner abi.ActorID,
aggregateProof []byte,
randomnesses []abi.PoStRandomness,
sealedSectors [][]proof5.SectorInfo,
) (bool, error) {
var sectorInfos []proof5.SectorInfo
arr := make([]uint, len(sealedSectors))
for i, sectors := range(sealedSectors) {
arr[i] = uint(len(sectors))
for _, sector := range(sectors) {
sectorInfos = append(sectorInfos, sector)
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
return ffiwrapper.ProofVerifier.VerifyAggregateWindowPostProofs(ctx, ffi.AggregateWindowPostInfos{
PoStType: proofType,
AggregateType: aggregateType,
Miner: miner,
AggregationProof: aggregateProof,
Randomnesses: randomnesses,
ChallengedSectors: sectorInfos,
Arr: arr,
})
return storage.SectorCids{
Unsealed: unsealedCID,
Sealed: sealedCID,
}, nil
}
func CheckPieceAndDataRoot(
sid storage.SectorRef,
comm_d cid.Cid,
pieces []abi.PieceInfo,
) (bool, error) {
UnsealedCID, err := ffiwrapper.GenerateUnsealedCID(sid.ProofType, pieces)
if err != nil{
return false, err
func (sb *Sealer) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
if err != nil {
return nil, xerrors.Errorf("acquire sector paths: %w", err)
}
defer done()
output, err := ffi.SealCommitPhase1(
sector.ProofType,
cids.Sealed,
cids.Unsealed,
paths.Cache,
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
ticket,
seed,
pieces,
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.ID.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed)
return comm_d == UnsealedCID, nil
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return output, nil
}
func Sealed(
sb *ffiwrapper.Sealer,
func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storage.Proof, error) {
return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner)
}
func (sb *Sealer)Sealed(
ctx context.Context,
sid storage.SectorRef,
seed abi.InteractiveSealRandomness,
ticket abi.SealRandomness,
pieces []abi.PieceInfo,
) (storage.SectorCids, storage.Proof, storage.Commit1Out, error) {
// var sealedSectors saproof2.SectorInfo
) (storage.SectorCids, storage.Proof, error) {
// var sealedSectors spproof.SectorInfo
log.Infof("[%d] Running replication(1)...", sid.ID.Number)
pc1out, err := sb.SealPreCommit1(ctx, sid, ticket, pieces)
if err != nil {
return storage.SectorCids{}, nil, nil, xerrors.Errorf("commit: %w", err)
return storage.SectorCids{}, nil, xerrors.Errorf("commit: %w", err)
}
log.Infof("[%d] Running replication(2)...", sid.ID.Number)
cids, err := sb.SealPreCommit2(ctx, sid, pc1out)
if err != nil {
return storage.SectorCids{}, nil, nil, xerrors.Errorf("commit: %w", err)
return storage.SectorCids{}, nil, xerrors.Errorf("commit: %w", err)
}
log.Infof("[%d] Generating PoRep for sector (1)", sid.ID.Number)
c1o, err := sb.SealCommit1(ctx, sid, ticket, seed, pieces, cids)
if err != nil {
return storage.SectorCids{}, nil, nil, err
return storage.SectorCids{}, nil, err
}
log.Infof("[%d] Generating PoRep for sector (2)", sid.ID.Number)
......@@ -308,70 +890,169 @@ func Sealed(
var proof storage.Proof
proof, err = sb.SealCommit2(ctx, sid, c1o)
if err != nil {
return storage.SectorCids{}, nil, nil, err
return storage.SectorCids{}, nil, err
}
return cids, proof, c1o, nil
return cids, proof, nil
}
func GenProofForC2(
sb *ffiwrapper.Sealer,
func (sb *Sealer)AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error){
return ffi.AggregateSealProofs(aggregateInfo, proofs)
}
func (sb *Sealer)GenProofForC2(
ctx context.Context,
sid storage.SectorRef,
c1Out storage.Commit1Out,
seed abi.InteractiveSealRandomness,
ticket abi.SealRandomness,
pieces []abi.PieceInfo,
cids storage.SectorCids,
) (storage.Proof, error) {
return sb.SealCommit2(ctx, sid, c1Out)
c1out, err := sb.SealCommit1(ctx, sid, ticket, seed, pieces, cids)
if err != nil {
return nil, err
}
return sb.SealCommit2(ctx, sid, c1out)
}
func GenProofForWindowPoSt(
sb *ffiwrapper.Sealer,
func (sb *Sealer)GenProofForWindowPoSt(
ctx context.Context,
minerID abi.ActorID,
sectorInfo []proof5.SectorInfo,
sectorInfo []spproof.SectorInfo,
randomness abi.PoStRandomness,
) ([]proof5.PoStProof, []abi.SectorID, error) {
return sb.GenerateWindowPoSt(ctx, minerID, sectorInfo, randomness)
) ([]spproof.PoStProof, []abi.SectorID, error) {
randomness[31] &= 0x3f
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
if err != nil {
return nil, nil, xerrors.Errorf("gathering sector info: %w", err)
}
defer done()
if len(skipped) > 0 {
return nil, skipped, xerrors.Errorf("pubSectorToPriv skipped some sectors")
}
proof, faulty, err := ffi.GenerateWindowPoSt(minerID, privsectors, randomness)
var faultyIDs []abi.SectorID
for _, f := range faulty {
faultyIDs = append(faultyIDs, abi.SectorID{
Miner: minerID,
Number: f,
})
}
return proof, faultyIDs, err
}
func UnsealedRange(
sb *ffiwrapper.Sealer,
ctx context.Context,
sbfs *basicfs.Provider,
sid storage.SectorRef,
sectorSize abi.SectorSize,
ticket abi.SealRandomness,
commd cid.Cid,
out io.Writer,
offset storiface.UnpaddedByteIndex,
size abi.UnpaddedPieceSize,
) error {
log.Infof("[%d] Unsealing sector", sid.ID.Number)
{
p, done, err := sbfs.AcquireSector(ctx, sid, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquire unsealed sector for removing: %w", err)
func (sb *Sealer)AggregateWindowPoStProofs(aggregateInfo spproof.AggregateWindowPostInfos, proofs []spproof.PoStProof) ([]byte, error) {
if len(proofs) != len(aggregateInfo.SectorCount) {
return nil, xerrors.Errorf("the lenth of windowPoStProofs and sectorCount is not match")
}
done()
if err := os.Remove(p.Unsealed); err != nil {
return xerrors.Errorf("removing unsealed sector: %w", err)
sectorCount := aggregateInfo.SectorCount[0]
for _, count := range(aggregateInfo.SectorCount) {
if sectorCount != count {
return nil, xerrors.Errorf("Window PoSt challenge count must be equal")
}
}
err := sb.UnsealPiece(ctx, sid, 0, abi.PaddedPieceSize(sectorSize).Unpadded(), ticket, commd)
for i, random := range(aggregateInfo.Randomnesses) {
aggregateInfo.Randomnesses[i][31] = random[31] & 0x3f
}
return ffi.AggregateWindowPoStProofs(aggregateInfo, proofs)
}
func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []spproof.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var doneFuncs []func()
done := func() {
for _, df := range doneFuncs {
df()
}
}
var skipped []abi.SectorID
var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo {
if _, faulty := fmap[s.SectorNumber]; faulty {
continue
}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: mid, Number: s.SectorNumber},
ProofType: s.SealProof,
}
paths, d, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTCache|storiface.FTSealed, 0, storiface.PathStorage)
if err != nil {
return err
log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err)
skipped = append(skipped, sid.ID)
continue
}
doneFuncs = append(doneFuncs, d)
ok, err := sb.ReadPiece(ctx, out, sid, offset, size)
if err != nil{
return err
postProofType, err := rpt(s.SealProof)
if err != nil {
done()
return ffi.SortedPrivateSectorInfo{}, nil, nil, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err)
}
if !ok {
return xerrors.Errorf("Read pieces error!")
out = append(out, ffi.PrivateSectorInfo{
CacheDirPath: paths.Cache,
PoStProofType: postProofType,
SealedSectorPath: paths.Sealed,
SectorInfo: s,
})
}
return nil
return ffi.NewSortedPrivateSectorInfo(out...), skipped, done, nil
}
type Verifier struct {}
var ProofVerifier = Verifier{}
var _ SectorVerifier = Verifier{}
func (Verifier) VerifySeal(info spproof.SealVerifyInfo) (bool, error) {
return ffi.VerifySeal(info)
}
func (Verifier) VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (bool, error) {
return ffi.VerifyAggregateSeals(aggregate)
}
func (Verifier) VerifyWindowPoSt(info spproof.WindowPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
return ffi.VerifyWindowPoSt(info)
}
func (Verifier)VerifyAggregateWindowPostProofs(aggregateInfo spproof.AggregateWindowPostInfos, sealedSectors [][]spproof.SectorInfo) (bool, error) {
var sectorInfos []spproof.SectorInfo
arr := make([]uint, len(sealedSectors))
for i, sectors := range(sealedSectors) {
arr[i] = uint(len(sectors))
for _, sector := range(sectors) {
sectorInfos = append(sectorInfos, sector)
}
}
aggregateInfo.ChallengedSectors = sectorInfos
aggregateInfo.Arr = arr
for i, random := range(aggregateInfo.Randomnesses) {
aggregateInfo.Randomnesses[i][31] = random[31] & 0x3f
}
return ffi.VerifyAggregateWindowPostProofs(aggregateInfo)
}
func DefaultAggregationType() abi.RegisteredAggregationProof {
return abi.RegisteredAggregationProof_SnarkPackV1;
}
\ No newline at end of file
......@@ -12,13 +12,12 @@ import(
"github.com/mitchellh/go-homedir"
"github.com/minio/blake2b-simd"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
saproof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
spproof "fil_integrate/build/proof"
"fil_integrate/build"
"fil_integrate/extern/sector-storage/ffiwrapper"
"fil_integrate/extern/sector-storage/ffiwrapper/basicfs"
"fil_integrate/build/storage"
// "fil_integrate/extern/sector-storage/ffiwrapper"
"fil_integrate/seal/basicfs"
)
func TestAggregateWindowPoSt(
......@@ -26,7 +25,7 @@ func TestAggregateWindowPoSt(
numSectors int,
numAggregate int,
) error {
sdir, err := homedir.Expand("~/tmp")
sdir, err := homedir.Expand("~/tmp/bench")
if err != nil {
return err
}
......@@ -40,35 +39,35 @@ func TestAggregateWindowPoSt(
if err != nil {
return err
}
// defer func() {
// if err := os.RemoveAll(tsdir); err != nil {
// log.Warn("remove all: ", err)
// }
// }()
defer func() {
if err := os.RemoveAll(tsdir); err != nil {
log.Warn("remove all: ", err)
}
}()
// TODO: pretty sure this isnt even needed?
if err := os.MkdirAll(tsdir, 0775); err != nil {
if err := os.MkdirAll(tsdir, 0775); err != nil && !os.IsExist(err){
return err
}
sbfs := &basicfs.Provider{
Root: tsdir,
}
sb ,err := ffiwrapper.New(sbfs)
sb ,err := New(sbfs)
if err != nil{
return err
}
ctx := context.TODO()
file := rand.New(rand.NewSource(1587))
trand := blake2b.Sum256([]byte("ticket-preimage"))
ticket := abi.SealRandomness(trand[:])
ctx := context.TODO()
var challenge [32]byte
rand.Read(challenge[:])
var randomnesses []abi.PoStRandomness
var sealedSectorsinfo [][]saproof2.SectorInfo
var sealedSectorsinfo [][]spproof.SectorInfo
var sectorCount []uint
var proofs []proof5.PoStProof
var proofs []spproof.PoStProof
sealProofType := spt(sectorSize)
start := time.Now()
......@@ -84,44 +83,41 @@ func TestAggregateWindowPoSt(
ProofType: sealProofType,
}
piece, err := AddPiece(sb, ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), file)
piece, err := sb.AddPiece(ctx, sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), file)
if err != nil {
return err
}
pieces = append(pieces, piece)
// log.Infof("[%d] Running replication(1)...", sid.ID.Number)
pc1out, err := sb.SealPreCommit1(ctx, sid, ticket, pieces)
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
// log.Infof("[%d] Running replication(2)...", sid.ID.Number)
cids, err := sb.SealPreCommit2(ctx, sid, pc1out)
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
comm_r := cids.Sealed
PutCommRIntoDir(sb, ctx, sid.ID, comm_r)
sb.PutCommRIntoDir(sid.ID, comm_r)
}
}
log.Infof("Sealed [%d] Sectors Done", numSectors*numAggregate)
sealed := time.Now()
for i := 0; i < numAggregate; i++{
var sealedSectors []saproof2.SectorInfo
var sealedSectors []spproof.SectorInfo
for j := 0; j < numSectors; j++{
sectorID := abi.SectorID{
Miner: 1000,
Number: abi.SectorNumber(i*numSectors+j),
}
comm_r, err := GetCommRFromDir(sb, ctx, sectorID)
comm_r, err := sb.GetCommRFromDir(sectorID)
if err != nil {
return err
}
sealedSectors = append(sealedSectors, saproof2.SectorInfo{
sealedSectors = append(sealedSectors, spproof.SectorInfo{
SealedCID: comm_r,
SectorNumber: sectorID.Number,
SealProof: sealProofType,
......@@ -134,7 +130,7 @@ func TestAggregateWindowPoSt(
for i := 0; i < numAggregate; i++{
log.Infof("[%d] Generating Window-Post", i)
proof, _, err := GenProofForWindowPoSt(sb, ctx, 1000, sealedSectorsinfo[i], challenge[:])
proof, _, err := sb.GenProofForWindowPoSt(ctx, 1000, sealedSectorsinfo[i], challenge[:])
if err != nil {
return err
}
......@@ -146,22 +142,55 @@ func TestAggregateWindowPoSt(
log.Infof("Generate [%d] Window-PoSt Done", numAggregate)
genWindowPoSt := time.Now()
aggregateProof, err := AggregateWindowPoStProofs(sb, ctx, DefaultAggregationType(), randomnesses, proofs, sectorCount)
aggregateProof1, err := sb.AggregateWindowPoStProofs(spproof.AggregateWindowPostInfos{
AggregateType: DefaultAggregationType(),
Randomnesses: randomnesses,
SectorCount: sectorCount,
}, proofs)
if err != nil {
return err
}
aggregateProofs := time.Now()
PoStType, _ := sealProofType.RegisteredWindowPoStProof()
ok, err := VerifyAggregateWindowPostProofs(
ctx,
PoStType,
DefaultAggregationType(),
abi.ActorID(1000),
aggregateProof,
randomnesses,
sealedSectorsinfo,
)
aggregateProofsCold := time.Now()
aggregateProof2, err := sb.AggregateWindowPoStProofs(spproof.AggregateWindowPostInfos{
AggregateType: DefaultAggregationType(),
Randomnesses: randomnesses,
SectorCount: sectorCount,
}, proofs)
if err != nil {
return err
}
aggregateProofsHot := time.Now()
poStType, _ := sealProofType.RegisteredWindowPoStProof()
svi1 := spproof.AggregateWindowPostInfos{
PoStType: poStType,
AggregateType: DefaultAggregationType(),
Miner: abi.ActorID(1000),
AggregationProof: aggregateProof1,
Randomnesses: randomnesses,
}
svi2 := spproof.AggregateWindowPostInfos{
PoStType: poStType,
AggregateType: DefaultAggregationType(),
Miner: abi.ActorID(1000),
AggregationProof: aggregateProof2,
Randomnesses: randomnesses,
}
ok, err := ProofVerifier.VerifyAggregateWindowPostProofs(svi1, sealedSectorsinfo)
if err != nil {
return err
}
if ok {
fmt.Println("Aggregate proof is true")
} else{
fmt.Println("Aggregate proof is false")
}
verifyProofsCold := time.Now()
ok, err = ProofVerifier.VerifyAggregateWindowPostProofs(svi2, sealedSectorsinfo)
if err != nil {
return err
}
......@@ -170,13 +199,15 @@ func TestAggregateWindowPoSt(
} else{
fmt.Println("Aggregate proof is false")
}
verifyProofs := time.Now()
verifyProofsHot := time.Now()
fmt.Printf("Seal %d sectors using %s\n", numSectors*numAggregate, sealed.Sub(start))
fmt.Printf("Read %d comm_r using %s\n", numAggregate*numSectors, loadCommr.Sub(sealed))
fmt.Printf("Generate %d window-post using %s\n", numAggregate, genWindowPoSt.Sub(loadCommr))
fmt.Printf("Aggregate %d window-post Proofs using %s\n", numAggregate, aggregateProofs.Sub(genWindowPoSt))
fmt.Printf("Verify Proofs using %s\n", verifyProofs.Sub(aggregateProofs))
fmt.Printf("Aggregate %d window-post Proofs(cold) using %s\n", numAggregate, aggregateProofsCold.Sub(genWindowPoSt))
fmt.Printf("Aggregate %d window-post Proofs(hot) using %s\n", numAggregate, aggregateProofsHot.Sub(aggregateProofsCold))
fmt.Printf("Verify Aggregation Window-PoSt Proofs(cold) using %s\n", verifyProofsCold.Sub(aggregateProofsHot))
fmt.Printf("Verify Aggregation Window-PoSt Proofs(cold) using %s\n", verifyProofsHot.Sub(verifyProofsCold))
return nil
}
......@@ -197,11 +228,11 @@ func TestSealAndUnseal() error {
if err != nil {
return err
}
// defer func() {
// if err := os.RemoveAll(tsdir); err != nil {
// log.Warn("remove all: ", err)
// }
// }()
defer func() {
if err := os.RemoveAll(tsdir); err != nil {
log.Warn("remove all: ", err)
}
}()
// TODO: pretty sure this isnt even needed?
if err := os.MkdirAll(tsdir, 0775); err != nil {
......@@ -210,7 +241,7 @@ func TestSealAndUnseal() error {
sbfs := &basicfs.Provider{
Root: tsdir,
}
sb ,err := ffiwrapper.New(sbfs)
sb ,err := New(sbfs)
if err != nil{
return err
}
......@@ -232,9 +263,9 @@ func TestSealAndUnseal() error {
//ADD PIECES
var existingPieceSizes []abi.UnpaddedPieceSize
var pieces []abi.PieceInfo
var sealedSectors []saproof2.SectorInfo
var sealedSectors []spproof.SectorInfo
piece, err := AddPiece(sb, ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/2).Unpadded(), file)
piece, err := sb.AddPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/2).Unpadded(), file)
if err != nil {
return err
}
......@@ -242,7 +273,7 @@ func TestSealAndUnseal() error {
existingPieceSizes = append(existingPieceSizes, piece.Size.Unpadded())
pieces = append(pieces, piece)
piece, err = AddPiece(sb, ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/2).Unpadded(), file)
piece, err = sb.AddPiece(ctx, sid, existingPieceSizes, abi.PaddedPieceSize(sectorSize/2).Unpadded(), file)
if err != nil {
return err
}
......@@ -251,23 +282,23 @@ func TestSealAndUnseal() error {
pieces = append(pieces, piece)
//SEAL
cids, proof1, c1o, err := Sealed(sb, ctx, sid, seed, ticket, pieces)
cids, proof1, err := sb.Sealed(ctx, sid, seed, ticket, pieces)
if err != nil {
return err
}
sealedSectors = append(sealedSectors, saproof2.SectorInfo{
sealedSectors = append(sealedSectors, spproof.SectorInfo{
SealedCID: cids.Sealed,
SectorNumber: sid.ID.Number,
SealProof: sid.ProofType,
})
proof2, err := GenProofForC2(sb , ctx, sid, c1o)
proof2, err := sb.GenProofForC2(ctx, sid, seed, ticket, pieces, cids)
if err != nil {
return err
}
ok, err := CheckPieceAndDataRoot(sid, cids.Unsealed, pieces)
ok, err := sb.CheckPieceAndDataRoot(sid, cids.Unsealed, pieces)
if err != nil {
return err
}
......@@ -277,7 +308,7 @@ func TestSealAndUnseal() error {
}
//verify proof
svi1 := saproof2.SealVerifyInfo{
svi1 := spproof.SealVerifyInfo{
SectorID: sid.ID,
SealedCID: cids.Sealed,
SealProof: sid.ProofType,
......@@ -288,7 +319,7 @@ func TestSealAndUnseal() error {
UnsealedCID: cids.Unsealed,
}
svi2 := saproof2.SealVerifyInfo{
svi2 := spproof.SealVerifyInfo{
SectorID: sid.ID,
SealedCID: cids.Sealed,
SealProof: sid.ProofType,
......@@ -299,7 +330,7 @@ func TestSealAndUnseal() error {
UnsealedCID: cids.Unsealed,
}
ok, err = ffiwrapper.ProofVerifier.VerifySeal(svi1)
ok, err = ProofVerifier.VerifySeal(svi1)
if err != nil {
return err
}
......@@ -307,7 +338,7 @@ func TestSealAndUnseal() error {
return xerrors.Errorf("porep proof for sector %d was invalid", sid.ID.Number)
}
ok, err = ffiwrapper.ProofVerifier.VerifySeal(svi2)
ok, err = ProofVerifier.VerifySeal(svi2)
if err != nil {
return err
}
......@@ -315,16 +346,16 @@ func TestSealAndUnseal() error {
return xerrors.Errorf("porep proof for sector %d was invalid", sid.ID.Number)
}
proof, _, err := GenProofForWindowPoSt(sb, ctx, sid.ID.Miner, sealedSectors, challenge[:])
proof, _, err := sb.GenProofForWindowPoSt(ctx, sid.ID.Miner, sealedSectors, challenge[:])
wpvi := saproof2.WindowPoStVerifyInfo{
wpvi := spproof.WindowPoStVerifyInfo{
Randomness: challenge[:],
Proofs: proof,
ChallengedSectors: sealedSectors,
Prover: sid.ID.Miner,
}
ok, err = ffiwrapper.ProofVerifier.VerifyWindowPoSt(ctx, wpvi)
ok, err = ProofVerifier.VerifyWindowPoSt(wpvi)
if err != nil {
return err
}
......
package seal
import(
"context"
"io"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
spproof "fil_integrate/build/proof"
"fil_integrate/build/storage"
"fil_integrate/build/storiface"
"fil_integrate/seal/basicfs"
)
//interface
type SectorSealer interface{
AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data, numbers ...int32) (abi.PieceInfo, error)
CheckPieceAndDataRoot(sid storage.SectorRef, commd cid.Cid, pieces []abi.PieceInfo) (bool, error)
Sealed(ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.SectorCids, storage.Proof, error)
GenProofForC2( ctx context.Context, sid storage.SectorRef, seed abi.InteractiveSealRandomness, ticket abi.SealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Proof, error)
GenProofForWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []spproof.SectorInfo, randomness abi.PoStRandomness) ([]spproof.PoStProof, []abi.SectorID, error)
UnsealedRange(ctx context.Context, sid storage.SectorRef, sectorSize abi.SectorSize, ticket abi.SealRandomness, commd cid.Cid, out io.Writer, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error)
AggregateWindowPoStProofs(aggregateInfo spproof.AggregateWindowPostInfos, proofs []spproof.PoStProof) ([]byte, error)
}
type SectorVerifier interface{
VerifySeal(info spproof.SealVerifyInfo) (bool, error)
VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (bool, error)
VerifyWindowPoSt(info spproof.WindowPoStVerifyInfo) (bool, error)
VerifyAggregateWindowPostProofs(aggregateInfo spproof.AggregateWindowPostInfos, sealedSectors [][]spproof.SectorInfo) (bool, error)
}
type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
GetRoot() (string)
AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
\ No newline at end of file
package seal
import (
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"fil_integrate/build/storiface"
)
// merge gaps between ranges which are close to each other
// TODO: more benchmarking to come up with more optimal number
const mergeGaps = 32 << 20
// TODO const expandRuns = 16 << 20 // unseal more than requested for future requests
func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) {
todo := pieceRun(offset.Padded(), size.Padded())
todo, err := rlepluslazy.Subtract(todo, unsealed)
if err != nil {
return nil, xerrors.Errorf("compute todo-unsealed: %w", err)
}
return rlepluslazy.JoinClose(todo, mergeGaps)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment