Commit a5114286 authored by 董子豪's avatar 董子豪

remove sector-storage

parent 677f5539
......@@ -5,8 +5,9 @@ package ffi
import (
"github.com/filecoin-project/filecoin-ffi/generated"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/pkg/errors"
"fil_integrate/build/proof"
)
type FallbackChallenges struct {
......
......@@ -6,8 +6,6 @@ require (
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48
github.com/filecoin-project/specs-actors v0.9.13
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57
github.com/ipfs/go-cid v0.0.7
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
......
......@@ -21,13 +21,6 @@ github.com/filecoin-project/go-state-types v0.0.0-20201102161440-c8033295a1fc/go
github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48 h1:Jc4OprDp3bRDxbsrXNHPwJabZJM3iDy+ri8/1e0ZnX4=
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/specs-actors v0.9.13 h1:rUEOQouefi9fuVY/2HOroROJlZbOzWYXXeIh41KF2M4=
github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc=
github.com/filecoin-project/specs-actors/v3 v3.1.0/go.mod h1:mpynccOLlIRy0QnR008BwYBwT9fen+sPR13MA1VmMww=
github.com/filecoin-project/specs-actors/v4 v4.0.0/go.mod h1:TkHXf/l7Wyw4ZejyXIPS2rK8bBO0rdwhTZyQQgaglng=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57 h1:N6IBsnGXfAMXd677G6EiOKewFwQ7Ulcuupi4U6wYmXE=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57/go.mod h1:283yBMMUSDB2abcjP/hhrwTkhb9h3sfM6KGrep/ZlBI=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
......
......@@ -15,17 +15,17 @@ import (
"github.com/filecoin-project/go-address"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
"github.com/pkg/errors"
"golang.org/x/xerrors"
"github.com/filecoin-project/filecoin-ffi/generated"
spproof "fil_integrate/build/proof"
)
// VerifySeal returns true if the sealing operation from which its inputs were
// derived was valid, and false if not.
func VerifySeal(info proof5.SealVerifyInfo) (bool, error) {
func VerifySeal(info spproof.SealVerifyInfo) (bool, error) {
sp, err := toFilRegisteredSealProof(info.SealProof)
if err != nil {
return false, err
......@@ -41,7 +41,7 @@ func VerifySeal(info proof5.SealVerifyInfo) (bool, error) {
return false, err
}
proverID, err := toProverID(info.Miner)
proverID, err := toProverID(info.SectorID.Miner)
if err != nil {
return false, err
}
......@@ -58,7 +58,7 @@ func VerifySeal(info proof5.SealVerifyInfo) (bool, error) {
return resp.IsValid, nil
}
func VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error) {
func VerifyAggregateSeals(aggregate spproof.AggregateSealVerifyProofAndInfos) (bool, error) {
if len(aggregate.Infos) == 0 {
return false, xerrors.New("no seal verify infos")
}
......@@ -115,7 +115,7 @@ func VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bo
// VerifyWinningPoSt returns true if the Winning PoSt-generation operation from which its
// inputs were derived was valid, and false if not.
func VerifyWinningPoSt(info proof5.WinningPoStVerifyInfo) (bool, error) {
func VerifyWinningPoSt(info spproof.WinningPoStVerifyInfo) (bool, error) {
filPublicReplicaInfos, filPublicReplicaInfosLen, err := toFilPublicReplicaInfos(info.ChallengedSectors, "winning")
if err != nil {
return false, errors.Wrap(err, "failed to create public replica info array for FFI")
......@@ -153,7 +153,7 @@ func VerifyWinningPoSt(info proof5.WinningPoStVerifyInfo) (bool, error) {
// VerifyWindowPoSt returns true if the Winning PoSt-generation operation from which its
// inputs were derived was valid, and false if not.
func VerifyWindowPoSt(info proof5.WindowPoStVerifyInfo) (bool, error) {
func VerifyWindowPoSt(info spproof.WindowPoStVerifyInfo) (bool, error) {
filPublicReplicaInfos, filPublicReplicaInfosLen, err := toFilPublicReplicaInfos(info.ChallengedSectors, "window")
if err != nil {
return false, errors.Wrap(err, "failed to create public replica info array for FFI")
......@@ -187,7 +187,7 @@ func VerifyWindowPoSt(info proof5.WindowPoStVerifyInfo) (bool, error) {
return resp.IsValid, nil
}
func VerifyAggregateWindowPostProofs(aggregateInfo AggregateWindowPostInfos)(bool, error) {
func VerifyAggregateWindowPostProofs(aggregateInfo spproof.AggregateWindowPostInfos)(bool, error) {
if len(aggregateInfo.ChallengedSectors) == 0 {
return false, xerrors.New("no seal verify infos")
}
......@@ -527,7 +527,7 @@ func SealCommitPhase2(
// infos [commRs, seeds],
// }
// TODO AggregateSealProofs it only needs InteractiveRandomness out of the aggregateInfo.Infos
func AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) (out []byte, err error) {
func AggregateSealProofs(aggregateInfo spproof.AggregateSealVerifyProofAndInfos, proofs [][]byte) (out []byte, err error) {
sp, err := toFilRegisteredSealProof(aggregateInfo.SealProof)
if err != nil {
return nil, err
......@@ -680,7 +680,7 @@ func GenerateWinningPoSt(
minerID abi.ActorID,
privateSectorInfo SortedPrivateSectorInfo,
randomness abi.PoStRandomness,
) ([]proof5.PoStProof, error) {
) ([]spproof.PoStProof, error) {
filReplicas, filReplicasLen, free, err := toFilPrivateReplicaInfos(privateSectorInfo.Values(), "winning")
if err != nil {
return nil, errors.Wrap(err, "failed to create private replica info array for FFI")
......@@ -720,7 +720,7 @@ func GenerateWindowPoSt(
minerID abi.ActorID,
privateSectorInfo SortedPrivateSectorInfo,
randomness abi.PoStRandomness,
) ([]proof5.PoStProof, []abi.SectorNumber, error) {
) ([]spproof.PoStProof, []abi.SectorNumber, error) {
filReplicas, filReplicasLen, free, err := toFilPrivateReplicaInfos(privateSectorInfo.Values(), "window")
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create private replica info array for FFI")
......@@ -757,7 +757,7 @@ func GenerateWindowPoSt(
return proofs, faultySectors, nil
}
// GenerateWindowPoSt
func AggregateWindowPoStProofs(aggregateInfo AggregateWindowPostInfos) ([]byte, error) {
func AggregateWindowPoStProofs(aggregateInfo spproof.AggregateWindowPostInfos, proofs []spproof.PoStProof) ([]byte, error) {
rap, err := toFilRegisteredAggregationProof(aggregateInfo.AggregateType)
if err != nil {
return nil, err
......@@ -769,13 +769,13 @@ func AggregateWindowPoStProofs(aggregateInfo AggregateWindowPostInfos) ([]byte,
randomnesses[i] = to32ByteArray(randomness)
}
filPoStProofs, filPoStProofsLen, free, err := toFilPoStProofs(aggregateInfo.Proofs)
filPoStProofs, filPoStProofsLen, free, err := toFilPoStProofs(proofs)
if err != nil {
return nil, errors.Wrap(err, "failed to create PoSt proofs array for FFI")
}
defer free()
resp := generated.FilAggregateWindowPoStProofs(rap, randomnesses, uint(len(randomnesses)), filPoStProofs, filPoStProofsLen, aggregateInfo.SectorCount)
resp := generated.FilAggregateWindowPoStProofs(rap, randomnesses, uint(len(randomnesses)), filPoStProofs, filPoStProofsLen, aggregateInfo.SectorCount[0])
resp.Deref()
defer generated.FilDestroyAggregateProof(resp)
......@@ -921,7 +921,7 @@ func toFilPublicPieceInfos(src []abi.PieceInfo) ([]generated.FilPublicPieceInfo,
return out, uint(len(out)), nil
}
func toFilPublicReplicaInfos(src []proof5.SectorInfo, typ string) ([]generated.FilPublicReplicaInfo, uint, error) {
func toFilPublicReplicaInfos(src []spproof.SectorInfo, typ string) ([]generated.FilPublicReplicaInfo, uint, error) {
out := make([]generated.FilPublicReplicaInfo, len(src))
for idx := range out {
......@@ -1039,8 +1039,8 @@ func fromFilPoStFaultySectors(ptr []uint64, l uint) ([]abi.SectorNumber, error)
return snums, nil
}
func fromFilPoStProofs(src []generated.FilPoStProof) ([]proof5.PoStProof, error) {
out := make([]proof5.PoStProof, len(src))
func fromFilPoStProofs(src []generated.FilPoStProof) ([]spproof.PoStProof, error) {
out := make([]spproof.PoStProof, len(src))
for idx := range out {
src[idx].Deref()
......@@ -1050,7 +1050,7 @@ func fromFilPoStProofs(src []generated.FilPoStProof) ([]proof5.PoStProof, error)
return nil, err
}
out[idx] = proof5.PoStProof{
out[idx] = spproof.PoStProof{
PoStProof: pp,
ProofBytes: copyBytes(src[idx].ProofPtr, src[idx].ProofLen),
}
......@@ -1059,7 +1059,7 @@ func fromFilPoStProofs(src []generated.FilPoStProof) ([]proof5.PoStProof, error)
return out, nil
}
func toFilPoStProofs(src []proof5.PoStProof) ([]generated.FilPoStProof, uint, func(), error) {
func toFilPoStProofs(src []spproof.PoStProof) ([]generated.FilPoStProof, uint, func(), error) {
allocs := make([]AllocationManager, len(src))
out := make([]generated.FilPoStProof, len(src))
......
......@@ -6,9 +6,8 @@ import (
"sort"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/ipfs/go-cid"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
spproof "fil_integrate/build/proof"
)
type AggregateWindowPostInfos struct{
......@@ -16,9 +15,9 @@ type AggregateWindowPostInfos struct{
AggregateType abi.RegisteredAggregationProof
Miner abi.ActorID
AggregationProof []byte
ChallengedSectors []proof5.SectorInfo
ChallengedSectors []spproof.SectorInfo
Arr []uint
Proofs []proof5.PoStProof
Proofs []spproof.PoStProof
Randomnesses []abi.PoStRandomness
SectorCount uint
}
......@@ -134,7 +133,7 @@ type publicSectorInfo struct {
}
type PrivateSectorInfo struct {
proof.SectorInfo
spproof.SectorInfo
CacheDirPath string
PoStProofType abi.RegisteredPoStProof
SealedSectorPath string
......
......@@ -15,8 +15,9 @@ import (
"path/filepath"
"github.com/filecoin-project/go-state-types/abi"
prf "github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/ipfs/go-cid"
spproof "fil_integrate/build/proof"
)
func WorkflowProofsLifecycle(t TestHelper) {
......@@ -137,7 +138,7 @@ func WorkflowProofsLifecycle(t TestHelper) {
t.RequireNoError(err)
// verify the 'ole proofy
isValid, err := VerifySeal(prf.SealVerifyInfo{
isValid, err := VerifySeal(spproof.SealVerifyInfo{
SectorID: abi.SectorID{
Miner: minerID,
Number: sectorNum,
......@@ -217,7 +218,7 @@ func WorkflowProofsLifecycle(t TestHelper) {
// generate a PoSt over the proving set before importing, just to exercise
// the new API
privateInfo := NewSortedPrivateSectorInfo(PrivateSectorInfo{
SectorInfo: prf.SectorInfo{
SectorInfo: spproof.SectorInfo{
SectorNumber: sectorNum,
SealedCID: sealedCID,
},
......@@ -226,7 +227,7 @@ func WorkflowProofsLifecycle(t TestHelper) {
SealedSectorPath: sealedSectorFile.Name(),
})
provingSet := []prf.SectorInfo{{
provingSet := []spproof.SectorInfo{{
SealProof: sealProofType,
SectorNumber: sectorNum,
SealedCID: sealedCID,
......@@ -236,7 +237,7 @@ func WorkflowProofsLifecycle(t TestHelper) {
indicesInProvingSet, err := GenerateWinningPoStSectorChallenge(winningPostProofType, minerID, randomness[:], uint64(len(provingSet)))
t.RequireNoError(err)
var challengedSectors []prf.SectorInfo
var challengedSectors []spproof.SectorInfo
for idx := range indicesInProvingSet {
challengedSectors = append(challengedSectors, provingSet[indicesInProvingSet[idx]])
}
......@@ -244,7 +245,7 @@ func WorkflowProofsLifecycle(t TestHelper) {
proofs, err := GenerateWinningPoSt(minerID, privateInfo, randomness[:])
t.RequireNoError(err)
isValid, err = VerifyWinningPoSt(prf.WinningPoStVerifyInfo{
isValid, err = VerifyWinningPoSt(spproof.WinningPoStVerifyInfo{
Randomness: randomness[:],
Proofs: proofs,
ChallengedSectors: challengedSectors,
......
# sector-storage
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![CircleCI](https://circleci.com/gh/filecoin-project/sector-storage.svg?style=svg)](https://circleci.com/gh/filecoin-project/sector-storage)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
> a concrete implementation of the [specs-storage](https://github.com/filecoin-project/specs-storage) interface
The sector-storage project provides a implementation-nonspecific reference implementation of the [specs-storage](https://github.com/filecoin-project/specs-storage) interface.
## Disclaimer
Please report your issues with regards to sector-storage at the [lotus issue tracker](https://github.com/filecoin-project/lotus/issues)
## Architecture
![high-level architecture](docs/sector-storage.svg)
### `Manager`
Manages is the top-level piece of the storage system gluing all the other pieces
together. It also implements scheduling logic.
### `package stores`
This package implements the sector storage subsystem. Fundamentally the storage
is divided into `path`s, each path has it's UUID, and stores a set of sector
'files'. There are currently 3 types of sector files - `unsealed`, `sealed`,
and `cache`.
Paths can be shared between nodes by sharing the underlying filesystem.
### `stores.Local`
The Local store implements SectorProvider for paths mounted in the local
filesystem. Paths can be shared between nodes, and support shared filesystems
such as NFS.
stores.Local implements all native filesystem-related operations
### `stores.Remote`
The Remote store extends Local store, handles fetching sector files into a local
store if needed, and handles removing sectors from non-local stores.
### `stores.Index`
The Index is a singleton holding metadata about storage paths, and a mapping of
sector files to paths
### `LocalWorker`
LocalWorker implements the Worker interface with ffiwrapper.Sealer and a
store.Store instance
## License
The Filecoin Project is dual-licensed under Apache 2.0 and MIT terms:
- Apache License, Version 2.0, ([LICENSE-APACHE](https://github.com/filecoin-project/sector-storage/blob/master/LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT](https://github.com/filecoin-project/sector-storage/blob/master/LICENSE-MIT) or http://opensource.org/licenses/MIT)
This diff is collapsed.
This diff is collapsed.
package sectorstorage
import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"fil_integrate/extern/sector-storage/storiface"
)
// FaultTracker TODO: Track things more actively
type FaultTracker interface {
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)
}
// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
var bad = make(map[abi.SectorID]string)
ssize, err := pp.SectorSize()
if err != nil {
return nil, err
}
// TODO: More better checks
for _, sector := range sectors {
err := func() error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
locked, err := m.index.StorageTryLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
if !locked {
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector)
bad[sector.ID] = fmt.Sprint("can't acquire read lock")
return nil
}
lp, _, err := m.localStore.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad[sector.ID] = fmt.Sprintf("acquire sector failed: %s", err)
return nil
}
if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache and/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)
bad[sector.ID] = fmt.Sprintf("cache and/or sealed paths not found, cache %q, sealed %q", lp.Cache, lp.Sealed)
return nil
}
toCheck := map[string]int64{
lp.Sealed: 1,
filepath.Join(lp.Cache, "t_aux"): 0,
filepath.Join(lp.Cache, "p_aux"): 0,
}
addCachePathsForSectorSize(toCheck, lp.Cache, ssize)
for p, sz := range toCheck {
st, err := os.Stat(p)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: sector file stat error", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "err", err)
bad[sector.ID] = fmt.Sprintf("%s", err)
return nil
}
if sz != 0 {
if st.Size() != int64(ssize)*sz {
log.Warnw("CheckProvable Sector FAULT: sector file is wrong size", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p, "size", st.Size(), "expectSize", int64(ssize)*sz)
bad[sector.ID] = fmt.Sprintf("%s is wrong size (got %d, expect %d)", p, st.Size(), int64(ssize)*sz)
return nil
}
}
}
if rg != nil {
wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
return err
}
var pr abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(pr)
pr[31] &= 0x3f
ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, pr, []abi.SectorNumber{
sector.ID.Number,
})
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err)
return nil
}
commr, err := rg(ctx, sector.ID)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
bad[sector.ID] = fmt.Sprintf("getting commR: %s", err)
return nil
}
_, err = ffi.GenerateSingleVanillaProof(ffi.PrivateSectorInfo{
SectorInfo: proof.SectorInfo{
SealProof: sector.ProofType,
SectorNumber: sector.ID.Number,
SealedCID: commr,
},
CacheDirPath: lp.Cache,
PoStProofType: wpp,
SealedSectorPath: lp.Sealed,
}, ch.Challenges[sector.ID.Number])
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err)
return nil
}
}
return nil
}()
if err != nil {
return nil, err
}
}
return bad, nil
}
func addCachePathsForSectorSize(chk map[string]int64, cacheDir string, ssize abi.SectorSize) {
switch ssize {
case 2 << 10:
fallthrough
case 8 << 20:
fallthrough
case 512 << 20:
chk[filepath.Join(cacheDir, "sc-02-data-tree-r-last.dat")] = 0
case 32 << 30:
for i := 0; i < 8; i++ {
chk[filepath.Join(cacheDir, fmt.Sprintf("sc-02-data-tree-r-last-%d.dat", i))] = 0
}
case 64 << 30:
for i := 0; i < 16; i++ {
chk[filepath.Join(cacheDir, fmt.Sprintf("sc-02-data-tree-r-last-%d.dat", i))] = 0
}
default:
log.Warnf("not checking cache files of %s sectors for faults", ssize)
}
}
var _ FaultTracker = &Manager{}
package basicfs
import (
"context"
"os"
"path/filepath"
"sync"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"fil_integrate/extern/sector-storage/storiface"
)
type sectorFile struct {
abi.SectorID
storiface.SectorFileType
}
type Provider struct {
Root string
lk sync.Mutex
waitSector map[sectorFile]chan struct{}
}
func (b *Provider) GetRoot() string {
return b.Root
}
func (b *Provider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTSealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
done := func() {}
out := storiface.SectorPaths{
ID: id.ID,
}
for _, fileType := range storiface.PathTypes {
if !existing.Has(fileType) && !allocate.Has(fileType) {
continue
}
b.lk.Lock()
if b.waitSector == nil {
b.waitSector = map[sectorFile]chan struct{}{}
}
ch, found := b.waitSector[sectorFile{id.ID, fileType}]
if !found {
ch = make(chan struct{}, 1)
b.waitSector[sectorFile{id.ID, fileType}] = ch
}
b.lk.Unlock()
select {
case ch <- struct{}{}:
case <-ctx.Done():
done()
return storiface.SectorPaths{}, nil, ctx.Err()
}
path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id.ID, numbers...))
prevDone := done
done = func() {
prevDone()
<-ch
}
if !allocate.Has(fileType) {
if _, err := os.Stat(path); os.IsNotExist(err) {
done()
return storiface.SectorPaths{}, nil, storiface.ErrSectorNotFound
}
}
storiface.SetPathByType(&out, fileType, path)
}
return out, done, nil
}
func (b *Provider) AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTSealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
done := func() {}
out := storiface.SectorPaths{
ID: id.ID,
}
for _, fileType := range storiface.PathTypes {
if !existing.Has(fileType) && !allocate.Has(fileType) {
continue
}
b.lk.Lock()
if b.waitSector == nil {
b.waitSector = map[sectorFile]chan struct{}{}
}
ch, found := b.waitSector[sectorFile{id.ID, fileType}]
if !found {
ch = make(chan struct{}, 1)
b.waitSector[sectorFile{id.ID, fileType}] = ch
}
b.lk.Unlock()
select {
case ch <- struct{}{}:
case <-ctx.Done():
done()
return storiface.SectorPaths{}, nil, ctx.Err()
}
path := filepath.Join(b.Root, fileType.String(), "unsealed.dat")
prevDone := done
done = func() {
prevDone()
<-ch
}
if !allocate.Has(fileType) {
if _, err := os.Stat(path); os.IsNotExist(err) {
done()
return storiface.SectorPaths{}, nil, storiface.ErrSectorNotFound
}
}
storiface.SetPathByType(&out, fileType, path)
}
return out, done, nil
}
package ffiwrapper
import (
"encoding/binary"
"io"
"os"
"syscall"
"github.com/detailyang/go-fallocate"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"fil_integrate/extern/sector-storage/fsutil"
"fil_integrate/extern/sector-storage/storiface"
)
const veryLargeRle = 1 << 20
// Sectors can be partially unsealed. We support this by appending a small
// trailer to each unsealed sector file containing an RLE+ marking which bytes
// in a sector are unsealed, and which are not (holes)
// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct {
maxPiece abi.PaddedPieceSize
path string
allocated rlepluslazy.RLE
file *os.File
}
func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) error {
trailer, err := rlepluslazy.EncodeRuns(r, nil)
if err != nil {
return xerrors.Errorf("encoding trailer: %w", err)
}
// maxPieceSize == unpadded(sectorSize) == trailer start
if _, err := w.Seek(maxPieceSize, io.SeekStart); err != nil {
return xerrors.Errorf("seek to trailer start: %w", err)
}
rb, err := w.Write(trailer)
if err != nil {
return xerrors.Errorf("writing trailer data: %w", err)
}
if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil {
return xerrors.Errorf("writing trailer length: %w", err)
}
return w.Truncate(maxPieceSize + int64(rb) + 4)
}
func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
err = func() error {
err := fallocate.Fallocate(f, 0, int64(maxPieceSize))
if errno, ok := err.(syscall.Errno); ok {
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
log.Warnf("could not allocated space, ignoring: %v", errno)
err = nil // log and ignore
}
}
if err != nil {
return xerrors.Errorf("fallocate '%s': %w", path, err)
}
if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}()
if err != nil {
_ = f.Close()
return nil, err
}
if err := f.Close(); err != nil {
return nil, xerrors.Errorf("close empty partial file: %w", err)
}
return openPartialFile(maxPieceSize, path)
}
func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
var rle rlepluslazy.RLE
err = func() error {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat '%s': %w", path, err)
}
if st.Size() < int64(maxPieceSize) {
return xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize)
}
// read trailer
var tlen [4]byte
_, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen)))
if err != nil {
return xerrors.Errorf("reading trailer length: %w", err)
}
// sanity-check the length
trailerLen := binary.LittleEndian.Uint32(tlen[:])
expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize)
if expectLen != st.Size() {
return xerrors.Errorf("file '%s' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize)
}
if trailerLen > veryLargeRle {
log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen)
}
trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen)
if trailerStart != int64(maxPieceSize) {
return xerrors.Errorf("expected sector size to equal trailer start index")
}
trailerBytes := make([]byte, trailerLen)
_, err = f.ReadAt(trailerBytes, trailerStart)
if err != nil {
return xerrors.Errorf("reading trailer: %w", err)
}
rle, err = rlepluslazy.FromBuf(trailerBytes)
if err != nil {
return xerrors.Errorf("decoding trailer: %w", err)
}
it, err := rle.RunIterator()
if err != nil {
return xerrors.Errorf("getting trailer run iterator: %w", err)
}
f, err := rlepluslazy.Fill(it)
if err != nil {
return xerrors.Errorf("filling bitfield: %w", err)
}
lastSet, err := rlepluslazy.Count(f)
if err != nil {
return xerrors.Errorf("finding last set byte index: %w", err)
}
if lastSet > uint64(maxPieceSize) {
return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize)
}
return nil
}()
if err != nil {
_ = f.Close()
return nil, err
}
return &partialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
file: f,
}, nil
}
func (pf *partialFile) Close() error {
return pf.file.Close()
}
func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c > 0 {
log.Warnf("getting partial file writer overwriting %d allocated bytes", c)
}
}
return pf.file, nil
}
func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
ored, err := rlepluslazy.Or(have, pieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
if err := fsutil.Deallocate(pf.file, int64(offset), int64(size)); err != nil {
return xerrors.Errorf("deallocating: %w", err)
}
s, err := rlepluslazy.Subtract(have, pieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, s); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c != uint64(size) {
log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c)
}
}
return pf.file, nil
}
func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator()
}
func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
have, err := pf.Allocated()
if err != nil {
return false, err
}
u, err := rlepluslazy.And(have, pieceRun(offset.Padded(), size.Padded()))
if err != nil {
return false, err
}
uc, err := rlepluslazy.Count(u)
if err != nil {
return false, err
}
return abi.PaddedPieceSize(uc) == size.Padded(), nil
}
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{
Val: false,
Len: uint64(offset),
})
}
runs = append(runs, rlepluslazy.Run{
Val: true,
Len: uint64(size),
})
return &rlepluslazy.RunSliceIterator{Runs: runs}
}
//+build cgo
package ffiwrapper
import (
ffi "github.com/filecoin-project/filecoin-ffi"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
)
var ProofProver = proofProver{}
var _ Prover = ProofProver
type proofProver struct{}
func (v proofProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
return ffi.AggregateSealProofs(aggregateInfo, proofs)
}
package ffiwrapper
import (
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("ffiwrapper")
type Sealer struct {
sectors SectorProvider
stopping chan struct{}
}
func (sb *Sealer) Stop() {
close(sb.stopping)
}
This diff is collapsed.
This diff is collapsed.
package ffiwrapper
import (
"context"
"io"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"fil_integrate/extern/sector-storage/ffiwrapper/basicfs"
"fil_integrate/extern/sector-storage/storiface"
)
type Validator interface {
CanCommit(sector storiface.SectorPaths) (bool, error)
CanProve(sector storiface.SectorPaths) (bool, error)
}
type StorageSealer interface {
storage.Sealer
storage.Storage
}
type Storage interface {
storage.Prover
StorageSealer
UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}
type Verifier interface {
VerifySeal(proof5.SealVerifyInfo) (bool, error)
VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error)
VerifyWinningPoSt(ctx context.Context, info proof5.WinningPoStVerifyInfo) (bool, error)
VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoStVerifyInfo) (bool, error)
GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredPoStProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error)
}
// Prover contains cheap proving-related methods
type Prover interface {
// TODO: move GenerateWinningPoStSectorChallenge from the Verifier interface to here
AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error)
}
type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
GetRoot() (string)
AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
package ffiwrapper
import (
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"fil_integrate/extern/sector-storage/storiface"
)
// merge gaps between ranges which are close to each other
// TODO: more benchmarking to come up with more optimal number
const mergeGaps = 32 << 20
// TODO const expandRuns = 16 << 20 // unseal more than requested for future requests
func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) {
todo := pieceRun(offset.Padded(), size.Padded())
todo, err := rlepluslazy.Subtract(todo, unsealed)
if err != nil {
return nil, xerrors.Errorf("compute todo-unsealed: %w", err)
}
return rlepluslazy.JoinClose(todo, mergeGaps)
}
//+build cgo
package ffiwrapper
import (
"context"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"fil_integrate/extern/sector-storage/storiface"
)
func (sb *Sealer) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) ([]proof5.PoStProof, error) {
randomness[31] &= 0x3f
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWinningPoStProof) // TODO: FAULTS?
if err != nil {
return nil, err
}
defer done()
if len(skipped) > 0 {
return nil, xerrors.Errorf("pubSectorToPriv skipped sectors: %+v", skipped)
}
return ffi.GenerateWinningPoSt(minerID, privsectors, randomness)
}
func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) ([]proof5.PoStProof, []abi.SectorID, error) {
randomness[31] &= 0x3f
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
if err != nil {
return nil, nil, xerrors.Errorf("gathering sector info: %w", err)
}
defer done()
if len(skipped) > 0 {
return nil, skipped, xerrors.Errorf("pubSectorToPriv skipped some sectors")
}
proof, faulty, err := ffi.GenerateWindowPoSt(minerID, privsectors, randomness)
var faultyIDs []abi.SectorID
for _, f := range faulty {
faultyIDs = append(faultyIDs, abi.SectorID{
Miner: minerID,
Number: f,
})
}
return proof, faultyIDs, err
}
func (sb *Sealer) AggregateWindowPoStProofs(ctx context.Context, aggregateInfo ffi.AggregateWindowPostInfos) ([]byte, error) {
for i, random := range(aggregateInfo.Randomnesses) {
aggregateInfo.Randomnesses[i][31] = random[31] & 0x3f
}
return ffi.AggregateWindowPoStProofs(aggregateInfo)
}
func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var doneFuncs []func()
done := func() {
for _, df := range doneFuncs {
df()
}
}
var skipped []abi.SectorID
var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo {
if _, faulty := fmap[s.SectorNumber]; faulty {
continue
}
sid := storage.SectorRef{
ID: abi.SectorID{Miner: mid, Number: s.SectorNumber},
ProofType: s.SealProof,
}
paths, d, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTCache|storiface.FTSealed, 0, storiface.PathStorage)
if err != nil {
log.Warnw("failed to acquire sector, skipping", "sector", sid.ID, "error", err)
skipped = append(skipped, sid.ID)
continue
}
doneFuncs = append(doneFuncs, d)
postProofType, err := rpt(s.SealProof)
if err != nil {
done()
return ffi.SortedPrivateSectorInfo{}, nil, nil, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err)
}
out = append(out, ffi.PrivateSectorInfo{
CacheDirPath: paths.Cache,
PoStProofType: postProofType,
SealedSectorPath: paths.Sealed,
SectorInfo: s,
})
}
return ffi.NewSortedPrivateSectorInfo(out...), skipped, done, nil
}
var _ Verifier = ProofVerifier
type proofVerifier struct{}
var ProofVerifier = proofVerifier{}
func (proofVerifier) VerifySeal(info proof5.SealVerifyInfo) (bool, error) {
return ffi.VerifySeal(info)
}
func (proofVerifier) VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error) {
return ffi.VerifyAggregateSeals(aggregate)
}
func (proofVerifier) VerifyWinningPoSt(ctx context.Context, info proof5.WinningPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
_, span := trace.StartSpan(ctx, "VerifyWinningPoSt")
defer span.End()
return ffi.VerifyWinningPoSt(info)
}
func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
_, span := trace.StartSpan(ctx, "VerifyWindowPoSt")
defer span.End()
return ffi.VerifyWindowPoSt(info)
}
func (proofVerifier) VerifyAggregateWindowPostProofs(ctx context.Context, aggregateInfo ffi.AggregateWindowPostInfos) (bool, error){
for i, random := range(aggregateInfo.Randomnesses) {
aggregateInfo.Randomnesses[i][31] = random[31] & 0x3f
}
return ffi.VerifyAggregateWindowPostProofs(aggregateInfo)
}
func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
randomness[31] &= 0x3f
return ffi.GenerateWinningPoStSectorChallenge(proofType, minerID, randomness, eligibleSectorCount)
}
package fr32
import (
"math/bits"
"runtime"
"sync"
"github.com/filecoin-project/go-state-types/abi"
)
var MTTresh = uint64(32 << 20)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh
if threads > uint64(runtime.NumCPU()) {
threads = 1 << (bits.Len32(uint32(runtime.NumCPU())))
}
if threads == 0 {
return 1
}
if threads > 32 {
return 32 // avoid too large buffers
}
return threads
}
func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
threads := mtChunkCount(abi.PaddedPieceSize(padLen))
threadBytes := abi.PaddedPieceSize(padLen / int(threads))
var wg sync.WaitGroup
wg.Add(int(threads))
for i := 0; i < int(threads); i++ {
go func(thread int) {
defer wg.Done()
start := threadBytes * abi.PaddedPieceSize(thread)
end := start + threadBytes
op(in[start.Unpadded():end.Unpadded()], out[start:end])
}(i)
}
wg.Wait()
}
func Pad(in, out []byte) {
// Assumes len(in)%127==0 and len(out)%128==0
if len(out) > int(MTTresh) {
mt(in, out, len(out), pad)
return
}
pad(in, out)
}
func pad(in, out []byte) {
chunks := len(out) / 128
for chunk := 0; chunk < chunks; chunk++ {
inOff := chunk * 127
outOff := chunk * 128
copy(out[outOff:outOff+31], in[inOff:inOff+31])
t := in[inOff+31] >> 6
out[outOff+31] = in[inOff+31] & 0x3f
var v byte
for i := 32; i < 64; i++ {
v = in[inOff+i]
out[outOff+i] = (v << 2) | t
t = v >> 6
}
t = v >> 4
out[outOff+63] &= 0x3f
for i := 64; i < 96; i++ {
v = in[inOff+i]
out[outOff+i] = (v << 4) | t
t = v >> 4
}
t = v >> 2
out[outOff+95] &= 0x3f
for i := 96; i < 127; i++ {
v = in[inOff+i]
out[outOff+i] = (v << 6) | t
t = v >> 2
}
out[outOff+127] = t & 0x3f
}
}
func Unpad(in []byte, out []byte) {
// Assumes len(in)%128==0 and len(out)%127==0
if len(in) > int(MTTresh) {
mt(out, in, len(in), unpad)
return
}
unpad(out, in)
}
func unpad(out, in []byte) {
chunks := len(in) / 128
for chunk := 0; chunk < chunks; chunk++ {
inOffNext := chunk*128 + 1
outOff := chunk * 127
at := in[chunk*128]
for i := 0; i < 32; i++ {
next := in[i+inOffNext]
out[outOff+i] = at
//out[i] |= next << 8
at = next
}
out[outOff+31] |= at << 6
for i := 32; i < 64; i++ {
next := in[i+inOffNext]
out[outOff+i] = at >> 2
out[outOff+i] |= next << 6
at = next
}
out[outOff+63] ^= (at << 6) ^ (at << 4)
for i := 64; i < 96; i++ {
next := in[i+inOffNext]
out[outOff+i] = at >> 4
out[outOff+i] |= next << 4
at = next
}
out[outOff+95] ^= (at << 4) ^ (at << 2)
for i := 96; i < 127; i++ {
next := in[i+inOffNext]
out[outOff+i] = at >> 6
out[outOff+i] |= next << 2
at = next
}
}
}
package fr32_test
import (
"bytes"
"io"
"io/ioutil"
"os"
"testing"
"fil_integrate/extern/sector-storage/fr32"
ffi "github.com/filecoin-project/filecoin-ffi"
commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-state-types/abi"
"github.com/stretchr/testify/require"
)
func TestWriteTwoPcs(t *testing.T) {
tf, _ := ioutil.TempFile("/tmp/", "scrb-")
paddedSize := abi.PaddedPieceSize(16 << 20)
n := 2
var rawBytes []byte
for i := 0; i < n; i++ {
buf := bytes.Repeat([]byte{0xab * byte(i)}, int(paddedSize.Unpadded()))
rawBytes = append(rawBytes, buf...)
rf, w, _ := commpffi.ToReadableFile(bytes.NewReader(buf), int64(len(buf)))
_, _, _, err := ffi.WriteWithAlignment(abi.RegisteredSealProof_StackedDrg32GiBV1, rf, abi.UnpaddedPieceSize(len(buf)), tf, nil)
if err != nil {
panic(err)
}
if err := w(); err != nil {
panic(err)
}
}
if _, err := tf.Seek(io.SeekStart, 0); err != nil { // nolint:staticcheck
panic(err)
}
ffiBytes, err := ioutil.ReadAll(tf)
if err != nil {
panic(err)
}
if err := tf.Close(); err != nil {
panic(err)
}
if err := os.Remove(tf.Name()); err != nil {
panic(err)
}
outBytes := make([]byte, int(paddedSize)*n)
fr32.Pad(rawBytes, outBytes)
require.Equal(t, ffiBytes, outBytes)
unpadBytes := make([]byte, int(paddedSize.Unpadded())*n)
fr32.Unpad(ffiBytes, unpadBytes)
require.Equal(t, rawBytes, unpadBytes)
}
package fr32_test
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
ffi "github.com/filecoin-project/filecoin-ffi"
commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-state-types/abi"
"github.com/stretchr/testify/require"
"fil_integrate/extern/sector-storage/fr32"
)
func padFFI(buf []byte) []byte {
rf, w, _ := commpffi.ToReadableFile(bytes.NewReader(buf), int64(len(buf)))
tf, _ := ioutil.TempFile("/tmp/", "scrb-")
_, _, _, err := ffi.WriteWithAlignment(abi.RegisteredSealProof_StackedDrg32GiBV1, rf, abi.UnpaddedPieceSize(len(buf)), tf, nil)
if err != nil {
panic(err)
}
if err := w(); err != nil {
panic(err)
}
if _, err := tf.Seek(io.SeekStart, 0); err != nil { // nolint:staticcheck
panic(err)
}
padded, err := ioutil.ReadAll(tf)
if err != nil {
panic(err)
}
if err := tf.Close(); err != nil {
panic(err)
}
if err := os.Remove(tf.Name()); err != nil {
panic(err)
}
return padded
}
func TestPadChunkFFI(t *testing.T) {
testByteChunk := func(b byte) func(*testing.T) {
return func(t *testing.T) {
var buf [128]byte
copy(buf[:], bytes.Repeat([]byte{b}, 127))
fr32.Pad(buf[:], buf[:])
expect := padFFI(bytes.Repeat([]byte{b}, 127))
require.Equal(t, expect, buf[:])
}
}
t.Run("ones", testByteChunk(0xff))
t.Run("lsb1", testByteChunk(0x01))
t.Run("msb1", testByteChunk(0x80))
t.Run("zero", testByteChunk(0x0))
t.Run("mid", testByteChunk(0x3c))
}
func TestPadChunkRandEqFFI(t *testing.T) {
for i := 0; i < 200; i++ {
var input [127]byte
rand.Read(input[:])
var buf [128]byte
fr32.Pad(input[:], buf[:])
expect := padFFI(input[:])
require.Equal(t, expect, buf[:])
}
}
func TestRoundtrip(t *testing.T) {
testByteChunk := func(b byte) func(*testing.T) {
return func(t *testing.T) {
var buf [128]byte
input := bytes.Repeat([]byte{0x01}, 127)
fr32.Pad(input, buf[:])
var out [127]byte
fr32.Unpad(buf[:], out[:])
require.Equal(t, input, out[:])
}
}
t.Run("ones", testByteChunk(0xff))
t.Run("lsb1", testByteChunk(0x01))
t.Run("msb1", testByteChunk(0x80))
t.Run("zero", testByteChunk(0x0))
t.Run("mid", testByteChunk(0x3c))
}
func TestRoundtripChunkRand(t *testing.T) {
for i := 0; i < 200; i++ {
var input [127]byte
rand.Read(input[:])
var buf [128]byte
copy(buf[:], input[:])
fr32.Pad(buf[:], buf[:])
var out [127]byte
fr32.Unpad(buf[:], out[:])
require.Equal(t, input[:], out[:])
}
}
func TestRoundtrip16MRand(t *testing.T) {
up := abi.PaddedPieceSize(16 << 20).Unpadded()
input := make([]byte, up)
rand.Read(input[:])
buf := make([]byte, 16<<20)
fr32.Pad(input, buf)
out := make([]byte, up)
fr32.Unpad(buf, out)
require.Equal(t, input, out)
ffi := padFFI(input)
require.Equal(t, ffi, buf)
}
func BenchmarkPadChunk(b *testing.B) {
var buf [128]byte
in := bytes.Repeat([]byte{0xff}, 127)
b.SetBytes(127)
for i := 0; i < b.N; i++ {
fr32.Pad(in, buf[:])
}
}
func BenchmarkChunkRoundtrip(b *testing.B) {
var buf [128]byte
copy(buf[:], bytes.Repeat([]byte{0xff}, 127))
var out [127]byte
b.SetBytes(127)
for i := 0; i < b.N; i++ {
fr32.Pad(buf[:], buf[:])
fr32.Unpad(buf[:], out[:])
}
}
func BenchmarkUnpadChunk(b *testing.B) {
var buf [128]byte
copy(buf[:], bytes.Repeat([]byte{0xff}, 127))
fr32.Pad(buf[:], buf[:])
var out [127]byte
b.SetBytes(127)
b.ReportAllocs()
bs := buf[:]
for i := 0; i < b.N; i++ {
fr32.Unpad(bs, out[:])
}
}
func BenchmarkUnpad16MChunk(b *testing.B) {
up := abi.PaddedPieceSize(16 << 20).Unpadded()
var buf [16 << 20]byte
fr32.Pad(bytes.Repeat([]byte{0xff}, int(up)), buf[:])
var out [16 << 20]byte
b.SetBytes(16 << 20)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Unpad(buf[:], out[:])
}
}
func BenchmarkPad16MChunk(b *testing.B) {
up := abi.PaddedPieceSize(16 << 20).Unpadded()
var buf [16 << 20]byte
in := bytes.Repeat([]byte{0xff}, int(up))
b.SetBytes(16 << 20)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Pad(in, buf[:])
}
}
func BenchmarkPad1GChunk(b *testing.B) {
up := abi.PaddedPieceSize(1 << 30).Unpadded()
var buf [1 << 30]byte
in := bytes.Repeat([]byte{0xff}, int(up))
b.SetBytes(1 << 30)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Pad(in, buf[:])
}
}
func BenchmarkUnpad1GChunk(b *testing.B) {
up := abi.PaddedPieceSize(1 << 30).Unpadded()
var buf [1 << 30]byte
fr32.Pad(bytes.Repeat([]byte{0xff}, int(up)), buf[:])
var out [1 << 30]byte
b.SetBytes(1 << 30)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Unpad(buf[:], out[:])
}
}
package fr32
import (
"io"
"math/bits"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
)
type unpadReader struct {
src io.Reader
left uint64
work []byte
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{
src: src,
left: uint64(sz),
work: buf,
}, nil
}
func (r *unpadReader) Read(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
chunks := len(out) / 127
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(chunks*128)))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}
todo := abi.PaddedPieceSize(outTwoPow)
if r.left < uint64(todo) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left)))
}
r.left -= uint64(todo)
n, err := r.src.Read(r.work[:todo])
if err != nil && err != io.EOF {
return n, err
}
if n != int(todo) {
return 0, xerrors.Errorf("didn't read enough: %w", err)
}
Unpad(r.work[:todo], out[:todo.Unpadded()])
return int(todo.Unpadded()), err
}
type padWriter struct {
dst io.Writer
stash []byte
work []byte
}
func NewPadWriter(dst io.Writer) io.WriteCloser {
return &padWriter{
dst: dst,
}
}
func (w *padWriter) Write(p []byte) (int, error) {
in := p
if len(p)+len(w.stash) < 127 {
w.stash = append(w.stash, p...)
return len(p), nil
}
if len(w.stash) != 0 {
in = append(w.stash, in...)
}
for {
pieces := subPieces(abi.UnpaddedPieceSize(len(in)))
biggest := pieces[len(pieces)-1]
if abi.PaddedPieceSize(cap(w.work)) < biggest.Padded() {
w.work = make([]byte, 0, biggest.Padded())
}
Pad(in[:int(biggest)], w.work[:int(biggest.Padded())])
n, err := w.dst.Write(w.work[:int(biggest.Padded())])
if err != nil {
return int(abi.PaddedPieceSize(n).Unpadded()), err
}
in = in[biggest:]
if len(in) < 127 {
if cap(w.stash) < len(in) {
w.stash = make([]byte, 0, len(in))
}
w.stash = w.stash[:len(in)]
copy(w.stash, in)
return len(p), nil
}
}
}
func (w *padWriter) Close() error {
if len(w.stash) > 0 {
return xerrors.Errorf("still have %d unprocessed bytes", len(w.stash))
}
// allow gc
w.stash = nil
w.work = nil
w.dst = nil
return nil
}
package fr32_test
import (
"bufio"
"bytes"
"io/ioutil"
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"fil_integrate/extern/sector-storage/fr32"
)
func TestUnpadReader(t *testing.T) {
ps := abi.PaddedPieceSize(64 << 20).Unpadded()
raw := bytes.Repeat([]byte{0x77}, int(ps))
padOut := make([]byte, ps.Padded())
fr32.Pad(raw, padOut)
r, err := fr32.NewUnpadReader(bytes.NewReader(padOut), ps.Padded())
if err != nil {
t.Fatal(err)
}
// using bufio reader to make sure reads are big enough for the padreader - it can't handle small reads right now
readered, err := ioutil.ReadAll(bufio.NewReaderSize(r, 512))
if err != nil {
t.Fatal(err)
}
require.Equal(t, raw, readered)
}
package fr32
import (
"math/bits"
"github.com/filecoin-project/go-state-types/abi"
)
func subPieces(in abi.UnpaddedPieceSize) []abi.UnpaddedPieceSize {
// Convert to in-sector bytes for easier math:
//
// (we convert to sector bytes as they are nice round binary numbers)
w := uint64(in.Padded())
out := make([]abi.UnpaddedPieceSize, bits.OnesCount64(w))
for i := range out {
// Extract the next lowest non-zero bit
next := bits.TrailingZeros64(w)
psize := uint64(1) << next
// e.g: if the number is 0b010100, psize will be 0b000100
// set that bit to 0 by XORing it, so the next iteration looks at the
// next bit
w ^= psize
// Add the piece size to the list of pieces we need to create
out[i] = abi.PaddedPieceSize(psize).Unpadded()
}
return out
}
package fsutil
import (
"os"
"syscall"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("fsutil")
const FallocFlPunchHole = 0x02 // linux/falloc.h
func Deallocate(file *os.File, offset int64, length int64) error {
if length == 0 {
return nil
}
err := syscall.Fallocate(int(file.Fd()), FallocFlPunchHole, offset, length)
if errno, ok := err.(syscall.Errno); ok {
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
log.Warnf("could not deallocate space, ignoring: %v", errno)
err = nil // log and ignore
}
}
return err
}
// +build !linux
package fsutil
import (
"os"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("fsutil")
func Deallocate(file *os.File, offset int64, length int64) error {
log.Warnf("deallocating space not supported")
return nil
}
package fsutil
import (
"os"
"path/filepath"
"syscall"
"golang.org/x/xerrors"
)
type SizeInfo struct {
OnDisk int64
}
// FileSize returns bytes used by a file or directory on disk
// NOTE: We care about the allocated bytes, not file or directory size
func FileSize(path string) (SizeInfo, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
stat, ok := info.Sys().(*syscall.Stat_t)
if !ok {
return xerrors.New("FileInfo.Sys of wrong type")
}
// NOTE: stat.Blocks is in 512B blocks, NOT in stat.Blksize return SizeInfo{size}, nil
// See https://www.gnu.org/software/libc/manual/html_node/Attribute-Meanings.html
size += int64(stat.Blocks) * 512 // nolint NOTE: int64 cast is needed on osx
}
return err
})
if err != nil {
if os.IsNotExist(err) {
return SizeInfo{}, os.ErrNotExist
}
return SizeInfo{}, xerrors.Errorf("filepath.Walk err: %w", err)
}
return SizeInfo{size}, nil
}
package fsutil
type FsStat struct {
Capacity int64
Available int64 // Available to use for sector storage
FSAvailable int64 // Available in the filesystem
Reserved int64
// non-zero when storage has configured MaxStorage
Max int64
Used int64
}
package fsutil
import (
"syscall"
"golang.org/x/xerrors"
)
func Statfs(path string) (FsStat, error) {
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return FsStat{}, xerrors.Errorf("statfs: %w", err)
}
// force int64 to handle platform specific differences
//nolint:unconvert
return FsStat{
Capacity: int64(stat.Blocks) * int64(stat.Bsize),
Available: int64(stat.Bavail) * int64(stat.Bsize),
FSAvailable: int64(stat.Bavail) * int64(stat.Bsize),
}, nil
}
package fsutil
import (
"syscall"
"unsafe"
)
func Statfs(volumePath string) (FsStat, error) {
// From https://github.com/ricochet2200/go-disk-usage/blob/master/du/diskusage_windows.go
h := syscall.MustLoadDLL("kernel32.dll")
c := h.MustFindProc("GetDiskFreeSpaceExW")
var freeBytes int64
var totalBytes int64
var availBytes int64
c.Call(
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(volumePath))),
uintptr(unsafe.Pointer(&freeBytes)),
uintptr(unsafe.Pointer(&totalBytes)),
uintptr(unsafe.Pointer(&availBytes)))
return FsStat{
Capacity: totalBytes,
Available: availBytes,
FSAvailable: availBytes,
}, nil
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package mock
func CommDR(in []byte) (out [32]byte) {
for i, b := range in {
out[i] = ^b
}
return out
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package storiface
type PathType string
const (
PathStorage PathType = "storage"
PathSealing PathType = "sealing"
)
type AcquireMode string
const (
AcquireMove AcquireMode = "move"
AcquireCopy AcquireMode = "copy"
)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment