Commit 1dc88c40 authored by 董子豪's avatar 董子豪

aggregate window-post

parent 46a89ea2
......@@ -405,13 +405,15 @@ func FilGenerateWindowPost(randomness Fil32ByteArray, replicasPtr []FilPrivateRe
}
// FilGenerateWindowPost function as declared in filecoin-ffi/filcrypto.h:498
func FilAggregateWindowPoStProofs(registeredAggregation FilRegisteredAggregationProof, randomnessesPtr []Fil32ByteArray, randomnessesLen uint, proofsPtr []FilPoStProof, proofsLen uint) *FilAggregateProof {
func FilAggregateWindowPoStProofs(registeredAggregation FilRegisteredAggregationProof, randomnessesPtr []Fil32ByteArray, randomnessesLen uint, proofsPtr []FilPoStProof, proofsLen uint, sectorCount uint) *FilAggregateProof {
cregisteredAggregation, cregisteredAggregationAllocMap := (C.fil_RegisteredAggregationProof)(registeredAggregation), cgoAllocsUnknown
crandomnessesPtr, crandomnessesPtrAllocMap := unpackArgSFil32ByteArray(randomnessesPtr)
crandomnessesLen, crandomnessesLenAllocMap := (C.size_t)(randomnessesLen), cgoAllocsUnknown
cproofsPtr, cproofsPtrAllocMap := unpackArgSFilPoStProof(proofsPtr)
cproofsLen, cproofsLenAllocMap := (C.size_t)(proofsLen), cgoAllocsUnknown
__ret := C.fil_aggregate_window_post_proofs(cregisteredAggregation, crandomnessesPtr, crandomnessesLen, cproofsPtr, cproofsLen)
csectorCount, csectorcountAllocMap := (C.size_t)(sectorCount), cgoAllocsUnknown
__ret := C.fil_aggregate_window_post_proofs(cregisteredAggregation, crandomnessesPtr, crandomnessesLen, cproofsPtr, cproofsLen, csectorCount)
runtime.KeepAlive(csectorcountAllocMap)
runtime.KeepAlive(cproofsLenAllocMap)
packSFilPoStProof(proofsPtr, cproofsPtr)
runtime.KeepAlive(cproofsPtrAllocMap)
......@@ -907,7 +909,7 @@ func FilVerifyWindowPoSt(randomness Fil32ByteArray, replicasPtr []FilPublicRepli
return __v
}
func FilVerifyAggregateWindowPoStProofs(registeredProof FilRegisteredPoStProof, registeredAggregation FilRegisteredAggregationProof, proverId Fil32ByteArray, proofPtr []byte, proofLen uint, randomnessesPtr []Fil32ByteArray, randomnessesLen uint, replicasPtr []FilPublicReplicaInfo, arrPtr []uint64, arrLen uint) *FilVerifyAggregateSealProofResponse {
func FilVerifyAggregateWindowPoStProofs(registeredProof FilRegisteredPoStProof, registeredAggregation FilRegisteredAggregationProof, proverId Fil32ByteArray, proofPtr []byte, proofLen uint, randomnessesPtr []Fil32ByteArray, randomnessesLen uint, replicasPtr []FilPublicReplicaInfo, arrPtr []uint, arrLen uint) *FilVerifyAggregateSealProofResponse {
cregisteredProof, cregisteredProofAllocMap := (C.fil_RegisteredPoStProof)(registeredProof), cgoAllocsUnknown
cregisteredAggregation, cregisteredAggregationAllocMap := (C.fil_RegisteredAggregationProof)(registeredAggregation), cgoAllocsUnknown
cproverId, cproverIdAllocMap := proverId.PassValue()
......
typedef unsigned char uint8_t;
typedef unsigned long long uint64_t;
typedef unsigned long int size_t;
#define bool _Bool
......@@ -775,7 +775,7 @@ func AggregateWindowPoStProofs(aggregateInfo AggregateWindowPostInfos) ([]byte,
}
defer free()
resp := generated.FilAggregateWindowPoStProofs(rap, randomnesses, uint(len(randomnesses)), filPoStProofs, filPoStProofsLen)
resp := generated.FilAggregateWindowPoStProofs(rap, randomnesses, uint(len(randomnesses)), filPoStProofs, filPoStProofsLen, aggregateInfo.SectorCount)
resp.Deref()
defer generated.FilDestroyAggregateProof(resp)
......
......@@ -1307,7 +1307,7 @@ pub unsafe extern "C" fn fil_verify_aggregate_window_post_proofs(
randomnesses_ptr: *mut fil_32ByteArray,
randomnesses_len: libc::size_t,
replicas_ptr: *const fil_PublicReplicaInfo,
arr_ptr: *const u64,
arr_ptr: *const libc::size_t,
arr_len: libc::size_t,
) -> *mut fil_VerifyAggregateSealProofResponse {
catch_panic_response(|| {
......
......@@ -57,7 +57,7 @@ pub unsafe fn to_public_replica_info_map(
#[allow(clippy::type_complexity)]
pub unsafe fn to_public_replica_infos_map(
replicas_ptr: *const fil_PublicReplicaInfo,
arr_ptr: *const u64,
arr_ptr: *const libc::size_t,
arr_len: libc::size_t,
) -> Result<Vec<BTreeMap<SectorId, PublicReplicaInfo>>> {
......@@ -65,7 +65,7 @@ pub unsafe fn to_public_replica_infos_map(
ensure!(!arr_ptr.is_null(), "arr_ptr must not be null");
let mut replicas = Vec::new();
let mut replicas_len:u64 = 0u64;
let mut replicas_len:usize = 0;
let mut public_replica_infos = Vec::new();
let arr = std::slice::from_raw_parts(arr_ptr, arr_len);
......@@ -74,7 +74,7 @@ pub unsafe fn to_public_replica_infos_map(
replicas_len += replica_len;
}
for ffi_info in from_raw_parts(replicas_ptr, replicas_len as usize) {
for ffi_info in from_raw_parts(replicas_ptr, replicas_len) {
replicas.push(PublicReplicaInfoTmp {
sector_id: ffi_info.sector_id,
registered_proof: ffi_info.registered_proof,
......@@ -102,6 +102,54 @@ pub unsafe fn to_public_replica_infos_map(
Ok(public_replica_infos)
}
// #[allow(clippy::type_complexity)]
// pub unsafe fn get_public_replicas(
// replicas_ptr: *const *const fil_PublicReplicaInfo,
// arr_ptr: *const libc::size_t,
// arr_len: libc::size_t,
// ) -> Result<Vec<BTreeMap<SectorId, PublicReplicaInfo>>> {
// ensure!(!replicas_ptr.is_null(), "replicas_ptr must not be null");
// ensure!(!arr_ptr.is_null(), "arr_ptr must not be null");
// let mut public_replica_infos = Vec::new();
// let arr = std::slice::from_raw_parts(arr_ptr, arr_len);
// let replica_arr = std::slice::from_raw_parts(replicas_ptr, arr_len);
// for index, replicas_len in arr.iter().enumerate(){
// let mut replicas = Vec::new();
// for ffi_info in from_raw_parts(replica_arr[index], replicas_len) {
// replicas.push(PublicReplicaInfoTmp {
// sector_id: ffi_info.sector_id,
// registered_proof: ffi_info.registered_proof,
// comm_r: ffi_info.comm_r,
// });
// }
// let map = replicas
// .into_par_iter()
// .map(|info| {
// let PublicReplicaInfoTmp {
// registered_proof,
// comm_r,
// sector_id,
// } = info;
// (
// SectorId::from(sector_id),
// PublicReplicaInfo::new(registered_proof.into(), comm_r),
// )
// })
// .collect();
// public_replica_infos.push(map);
// }
// Ok(public_replica_infos)
// }
#[derive(Debug, Clone)]
struct PrivateReplicaInfoTmp {
pub registered_proof: fil_RegisteredPoStProof,
......
......@@ -17,9 +17,10 @@ type AggregateWindowPostInfos struct{
Miner abi.ActorID
AggregationProof []byte
ChallengedSectors []proof5.SectorInfo
Arr []uint
Proofs []proof5.PoStProof
Arr []uint64
Randomnesses []abi.SealRandomness
SectorCount uint
}
// BLS
......
use std::collections::BTreeMap;
use std::time::Instant;
use anyhow::{ensure, Context, Result};
use filecoin_hashers::Hasher;
......@@ -310,13 +309,11 @@ pub fn aggregate_window_post_proofs<Tree: 'static + MerkleTreeTrait>(
};
let srs_prover_key = get_post_srs_key::<Tree>(&post_config, proofs.len())?;
let start_time = Instant::now();
let aggregate_proof = FallbackPoStCompound::<Tree>::aggregate_proofs(
&srs_prover_key,
&hashed_randomness,
proofs.as_slice(),
)?;
let aggregate_window_post_proofs_time = start_time.elapsed().as_millis();
let mut aggregate_proof_bytes = Vec::new();
aggregate_proof.write(&mut aggregate_proof_bytes)?;
......@@ -398,7 +395,6 @@ pub fn verify_aggregate_window_post_proofs<Tree: 'static + MerkleTreeTrait>(
};
info!("start verifying aggregate proof");
let start_time = Instant::now();
let result = FallbackPoStCompound::<Tree>::verify_aggregate_proofs(
&srs_verifier_key,
&verifying_key,
......@@ -406,7 +402,6 @@ pub fn verify_aggregate_window_post_proofs<Tree: 'static + MerkleTreeTrait>(
commit_inputs.as_slice(),
&aggregate_proof,
)?;
let verify_aggregate_proofs_time = start_time.elapsed().as_millis();
info!("end verifying aggregate proof");
info!("verify_aggregate_window_post_proofs:finish");
......
......@@ -124,17 +124,17 @@ lazy_static! {
// https://github.com/filecoin-project/specs-actors/blob/master/actors/abi/sector.go
pub static ref WINDOW_POST_SECTOR_COUNT: RwLock<HashMap<u64, usize>> = RwLock::new(
[
(SECTOR_SIZE_2_KIB, 2),
(SECTOR_SIZE_4_KIB, 2),
(SECTOR_SIZE_16_KIB, 2),
(SECTOR_SIZE_32_KIB, 2),
(SECTOR_SIZE_8_MIB, 2),
(SECTOR_SIZE_16_MIB, 2),
(SECTOR_SIZE_32_MIB, 1),
(SECTOR_SIZE_64_MIB, 1),
(SECTOR_SIZE_128_MIB, 1),
(SECTOR_SIZE_256_MIB, 1),
(SECTOR_SIZE_512_MIB, 1),
(SECTOR_SIZE_2_KIB, 512),
(SECTOR_SIZE_4_KIB, 512),
(SECTOR_SIZE_16_KIB, 512),
(SECTOR_SIZE_32_KIB, 512),
(SECTOR_SIZE_8_MIB, 512),
(SECTOR_SIZE_16_MIB, 512),
(SECTOR_SIZE_32_MIB, 512),
(SECTOR_SIZE_64_MIB, 512),
(SECTOR_SIZE_128_MIB, 512),
(SECTOR_SIZE_256_MIB, 512),
(SECTOR_SIZE_512_MIB, 512),
(SECTOR_SIZE_1_GIB, 512),
(SECTOR_SIZE_32_GIB, 2349), // this gives 125,279,217 constraints, fitting in a single partition
(SECTOR_SIZE_64_GIB, 2300), // this gives 129,887,900 constraints, fitting in a single partition
......
......@@ -32,7 +32,6 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/chain/actors/policy"
"fil_integrate/extern/sector-storage/ffiwrapper/basicfs"
"fil_integrate/extern/sector-storage/storiface"
)
......@@ -537,7 +536,7 @@ func TestSealAndVerifyAggregate(t *testing.T) {
avi := proof5.AggregateSealVerifyProofAndInfos{
Miner: miner,
SealProof: sealProofType,
AggregateProof: policy.GetDefaultAggregationProof(),
AggregateProof: abi.RegisteredAggregationProof_SnarkPackV1,
Proof: nil,
Infos: make([]proof5.AggregateSealVerifyInfo, numAgg),
}
......
......@@ -53,7 +53,7 @@ type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireUnsealed(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType, numbers ...int32) (storiface.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}
package sectorstorage
import (
"context"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/metrics"
"fil_integrate/extern/sector-storage/sealtasks"
"fil_integrate/extern/sector-storage/storiface"
)
type trackedWork struct {
job storiface.WorkerJob
worker WorkerID
workerHostname string
}
type workTracker struct {
lk sync.Mutex
done map[storiface.CallID]struct{}
running map[storiface.CallID]trackedWork
// TODO: done, aggregate stats, queue stats, scheduler feedback
}
func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
wt.lk.Lock()
defer wt.lk.Unlock()
t, ok := wt.running[callID]
if !ok {
wt.done[callID] = struct{}{}
stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1))
return
}
took := metrics.SinceInMilliseconds(t.job.Start)
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(t.job.Task)),
tag.Upsert(metrics.WorkerHostname, t.workerHostname),
)
stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took))
delete(wt.running, callID)
}
func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil {
return callID, err
}
wt.lk.Lock()
defer wt.lk.Unlock()
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
return callID, err
}
wt.running[callID] = trackedWork{
job: storiface.WorkerJob{
ID: callID,
Sector: sid.ID,
Task: task,
Start: time.Now(),
},
worker: wid,
workerHostname: wi.Hostname,
}
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))
return callID, err
}
}
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker {
return &trackedWorker{
Worker: w,
wid: wid,
workerInfo: wi,
tracker: wt,
}
}
func (wt *workTracker) Running() []trackedWork {
wt.lk.Lock()
defer wt.lk.Unlock()
out := make([]trackedWork, 0, len(wt.running))
for _, job := range wt.running {
out = append(out, job)
}
return out
}
type trackedWorker struct {
Worker
wid WorkerID
workerInfo storiface.WorkerInfo
tracker *workTracker
}
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
}
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
}
func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
}
func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
}
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
}
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
}
func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
}
func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
}
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
}
var _ Worker = &trackedWorker{}
......@@ -22,7 +22,7 @@ type Storage interface {
// storage are responsible for assigning sector IDs)
NewSector(ctx context.Context, sector SectorRef) error
// Add a piece to an existing *unsealed* sector
AddPiece(ctx context.Context, sector SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData Data) (abi.PieceInfo, error)
AddPiece(ctx context.Context, sector SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file Data, numbers ...int32) (abi.PieceInfo, error)
}
type Prover interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment