Commit cb62f112 authored by 董子豪's avatar 董子豪

add actor-user interface

parent 9c99406d
package user package user
import( import (
"context" "context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"time"
"github.com/minio/md5-simd"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage"
"fil_integrate/seal"
) )
type Range struct {
offset uint64
size uint64
}
type pieceRead struct {
piece storage.Piece
idx int
pieceRange Range
}
func TestUser() error { func TestUser() error {
sectorSize := storage.SectorSize32MiB
} sdir, err := homedir.Expand("~/tmp/bench")
\ No newline at end of file if err != nil {
return err
}
err = os.MkdirAll(sdir, 0775) //nolint:gosec
if err != nil {
return xerrors.Errorf("creating sectorbuilder dir: %w", err)
}
tsdir, err := ioutil.TempDir(sdir, "bench")
if err != nil {
return err
}
defer func() {
if err := os.RemoveAll(tsdir); err != nil {
log.Warn("remove all: ", err)
}
}()
// TODO: pretty sure this isnt even needed?
if err := os.MkdirAll(tsdir, 0775); err != nil {
return err
}
sp := &seal.Encoder{
Root: tsdir,
}
ctx := context.TODO()
u := New(sp)
b := []byte("random data")
var numFile = 4
var PiecesRange []pieceRead
var range2Read []Range = []Range{Range{0,0}, Range{1024, sectorSize}, Range{1024, 3*sectorSize+1024}, Range{2*sectorSize+2048, sectorSize+4096}}
for i := 0; i < numFile; i++ {
filename := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", i))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
dataSize := uint64(r.Int63n(int64(sectorSize / 8))) * 32 + 4 * sectorSize
b, err = seal.GenerateRandomData(filename, dataSize, b)
if err != nil {
return err
}
in, err := os.OpenFile(filename, os.O_RDONLY, 0644)
if err != nil {
return err
}
defer in.Close()
finalPiece, _, err := sp.EncodeDataToPieces(ctx, abi.SectorSize(sectorSize), in)
if err != nil {
return err
}
for _, r := range(range2Read) {
PiecesRange = append(PiecesRange, pieceRead{
piece: finalPiece,
idx: i,
pieceRange: r,
})
}
PiecesRange = append(PiecesRange, pieceRead{
piece: finalPiece,
idx: i,
pieceRange: Range{
offset: dataSize - 1024,
size: 1024,
},
})
}
for _, r := range(PiecesRange) {
input := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", r.idx))
output := filepath.Join(tsdir, "output")
if _, err = os.Stat(output); !os.IsNotExist(err) {
os.Remove(output)
}
f, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
err = u.ReadPieceRange(ctx, f, r.piece, r.pieceRange.offset, r.pieceRange.size)
if err != nil {
return err
}
f.Close()
ok, err := checkDecodedFile(input, output, r.pieceRange.offset, r.pieceRange.size)
if err != nil {
return err
}
if !ok {
fmt.Println("decode pieces failed")
} else {
fmt.Println("decode pieces success")
}
}
return nil
}
func checkDecodedFile(file1 string, file2 string, offset uint64, size uint64) (bool, error) {
in, err := os.Open(file1)
if err != nil {
return false, err
}
defer in.Close()
var f1 io.Reader = in
if offset != 0 || size != 0 {
if _, err := in.Seek(int64(offset), io.SeekStart); err != nil {
return false, xerrors.Errorf("seek to trailer start: %w", err)
}
f1 = io.LimitReader(in, int64(size))
}
f2, err := os.Open(file2)
if err != nil {
return false, err
}
defer f2.Close()
inBuf := make([]byte, 2<<20)
outBuf := make([]byte, 2<<20)
server1 := md5simd.NewServer()
defer server1.Close()
server2 := md5simd.NewServer()
defer server2.Close()
h1 := server1.NewHash()
defer h1.Close()
h2 := server2.NewHash()
defer h2.Close()
for {
_, inerr := f1.Read(inBuf[:])
if err != nil && err != io.EOF {
return false, err
}
_, outerr := f2.Read(outBuf[:])
if err != nil && err != io.EOF {
return false, err
}
h1.Write(inBuf)
h2.Write(outBuf)
if inerr == io.EOF && outerr == io.EOF {
hash1 := h1.Sum(nil)
hash2 := h2.Sum(nil)
if string(hash1) != string(hash2) {
return false, xerrors.Errorf("the output can't match input file")
}
break
}
}
return true, nil
}
package user package user
import( import (
"io"
"context" "context"
"io"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
) )
type UserAPI interface { type UserAPI interface {
EncodeDataToPieces(ctx context.Context, file storage.Data) (storage.Piece, []storage.Piece, error) EncodeDataToPieces(ctx context.Context, file storage.Data) (storage.Piece, []storage.Piece, error)
ReadPieceRange(ctx context.Context, out io.Writer, piece storage.Piece, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error ReadPieceRange(ctx context.Context, out io.Writer, piece storage.Piece, offset uint64, size uint64) error
} }
\ No newline at end of file
package user package user
import( import (
"context" "context"
"io"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"fil_integrate/build/state-types/abi"
"fil_integrate/build/storage" "fil_integrate/build/storage"
"fil_integrate/seal" "fil_integrate/seal"
) )
type User struct{ var log = logging.Logger("user")
type User struct {
sectorSize abi.SectorSize sectorSize abi.SectorSize
encoder seal.PieceEncoder encoder seal.PieceEncoder
} }
var _ UserAPI = &User{} var _ UserAPI = &User{}
func New(encoder seal.PieceEncoder) *User {
u := &User{
sectorSize: abi.SectorSize(storage.SectorSize32MiB),
encoder: encoder,
}
return u
}
func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (storage.Piece, []storage.Piece, error) { func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (storage.Piece, []storage.Piece, error) {
finalPiece, pieces, err := u.encoder.EncodeDataToPieces(ctx, u.sectorSize, file) finalPiece, pieces, err := u.encoder.EncodeDataToPieces(ctx, u.sectorSize, file)
// map(file) -> finalPiece ... // map(file) -> finalPiece ...
...@@ -22,16 +37,17 @@ func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (stora ...@@ -22,16 +37,17 @@ func (u *User) EncodeDataToPieces(ctx context.Context, file storage.Data) (stora
} }
func (u *User) ReadPieceRange( func (u *User) ReadPieceRange(
ctx context.Context, ctx context.Context,
out io.Writer, out io.Writer,
piece storage.Piece, piece storage.Piece,
offset uint64, offset uint64,
size uint64, size uint64,
) error { ) error {
log.Infof("Reading Piece [%d:%d]", offset, offset + size)
UnpaddedSectorSize := abi.PaddedPieceSize(u.sectorSize).Unpadded() UnpaddedSectorSize := abi.PaddedPieceSize(u.sectorSize).Unpadded()
DataLen := uint32(UnpaddedSectorSize) - seal.TagLen DataLen := uint32(UnpaddedSectorSize) - seal.TagLen
data, err := u.getPiece(ctx, piece.Hash) data, err := u.getPiece(ctx, piece.Commitment)
if err != nil { if err != nil {
return err return err
} }
...@@ -45,26 +61,27 @@ func (u *User) ReadPieceRange( ...@@ -45,26 +61,27 @@ func (u *User) ReadPieceRange(
piecesHash = append(data.PieceHash, piecesHash...) piecesHash = append(data.PieceHash, piecesHash...)
} }
buf := data.Data[:] buf := data.Data[:]
maxSize := uint64(len(piecesHash)) * uint64(DataLen) + uint64(len(buf)) maxSize := uint64(len(piecesHash))*uint64(DataLen) + uint64(len(buf))
if offset == 0 && size == 0{ if offset == 0 && size == 0 {
size = maxSize size = maxSize
} }
if size + offset > maxSize { if size+offset > maxSize {
return xerrors.Errorf("Piece Size is Out of Range [offset: %w, size:%w, max_size:%w]", offset, size, max_size) return xerrors.Errorf("Piece Size is Out of Range [offset: %w, size:%w, max_size:%w]", offset, size, maxSize)
} }
piecesHash = piecesHash[offset/uint64(DataLen):] piecesHash = piecesHash[offset/uint64(DataLen):]
rangePiece := RangePiece{ rangePiece := &RangePiece{
offset: offset, offset: offset,
size: size, size: size,
lenth: uint64(DataLen), lenth: uint64(DataLen),
} }
for{ for {
rstart, rsize := rangePiece.nextRange() rstart, rsize := rangePiece.nextRange()
if rsize == 0 { if rsize == 0 {
break break
} }
var wbuf []byte
if len(piecesHash) != 0 { if len(piecesHash) != 0 {
data, err := u.getPiece(ctx, piecesHash[0]) data, err := u.getPiece(ctx, piecesHash[0])
if err != nil { if err != nil {
...@@ -85,31 +102,29 @@ func (u *User) ReadPieceRange( ...@@ -85,31 +102,29 @@ func (u *User) ReadPieceRange(
} }
func (u *User) getPiece(ctx context.Context, pieceHash storage.Hash) (*storage.DecodedData, error) { func (u *User) getPiece(ctx context.Context, pieceHash storage.Hash) (*storage.DecodedData, error) {
// GET from chian/provider // todo: GET from chian/provider
// buf, err := GetPieceFromProvider(pieceHash) // buf, err := GetPieceFromProvider(pieceHash)
unpaddedSectorSize := abi.PaddedPieceSize(u.sectorSize).Unpadded() data, err := u.encoder.DecodePiece(ctx, u.sectorSize, pieceHash)
buf := make([]byte, unpaddedSectorSize)
read, err := in.Read(buf[:])
if err != nil && err != io.EOF {
return storage.DecodedData{}, err
}
var data *storage.DecodedData = &storage.DecodedData{}
err = data.Deserialize(buf[:read])
return data, err return data, err
} }
type RangePiece struct { type RangePiece struct {
offset uint64 offset uint64
size uint64 size uint64
lenth uint64 lenth uint64
} }
func (rp RangePiece) nextRange() (uint64, uint64) { func (rp *RangePiece) nextRange() (uint64, uint64) {
start := rp.offset % rp.lenth start := rp.offset % rp.lenth
size := min(rp.size, rp.lenth-rp.start) size := min(rp.size, rp.lenth-start)
rp.offset = rp.offset + size rp.offset = rp.offset + size
rp.size = rp.size - size rp.size = rp.size - size
return start, size return start, size
} }
\ No newline at end of file
func min(x, y uint64) uint64 {
if x < y {
return x
}
return y
}
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"fil_integrate/build/storiface" "fil_integrate/build/storiface"
) )
const SectorSize32MiB uint64 = 32*1024*1024
type Data = io.Reader type Data = io.Reader
......
package main
import (
"os"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"fil_integrate/actor/user"
)
var log = logging.Logger("bench")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting test")
app := &cli.App{
Name: "test",
Usage: "test the actor(user, provider, keeper)",
Version: "1.0.1",
Commands: []*cli.Command{
testUser,
},
}
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
return
}
}
var testUser = &cli.Command{
Name: "test-all",
Usage: "Test Seal the sectors and generate window post",
// Flags: []cli.Flag{
// &cli.StringFlag{
// Name: "sector-size",
// Value: "8MiB",
// Usage: "size of the sectors in bytes",
// },
// &cli.IntFlag{
// Name: "num-agg",
// Value: 8,
// Usage: "How many window-post proofs used to aggregate",
// },
// },
Action: func(c *cli.Context) error {
err := user.TestUser()
if err != nil {
return err
}
return nil
},
}
\ No newline at end of file
...@@ -152,22 +152,28 @@ func (sp *Encoder) EncodeData( ...@@ -152,22 +152,28 @@ func (sp *Encoder) EncodeData(
return pieces, nil return pieces, nil
} }
func DecodePiece( func (sp *Encoder) DecodePiece(
ctx context.Context, ctx context.Context,
sectorSize abi.SectorSize, sectorSize abi.SectorSize,
in io.Reader, pieceHash storage.Hash,
) (storage.DecodedData, error) { ) (*storage.DecodedData, error) {
filename := filepath.Join(sp.Root, "pieces", fmt.Sprintf("%x.dat", pieceHash[:]))
in, err := os.Open(filename)
if err != nil {
return nil, err
}
defer in.Close()
unpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded() unpaddedSectorSize := abi.PaddedPieceSize(sectorSize).Unpadded()
buf := make([]byte, unpaddedSectorSize) buf := make([]byte, unpaddedSectorSize)
read, err := in.Read(buf[:]) read, err := in.Read(buf[:])
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return storage.DecodedData{}, err return nil, err
} }
var data *storage.DecodedData = &storage.DecodedData{} var data *storage.DecodedData = &storage.DecodedData{}
err = data.Deserialize(buf[:read]) err = data.Deserialize(buf[:read])
return *data, err return data, err
} }
func min(x, y uint32) uint32 { func min(x, y uint32) uint32 {
......
...@@ -81,7 +81,7 @@ func TestSealAndUnseal() error { ...@@ -81,7 +81,7 @@ func TestSealAndUnseal() error {
// var sectors []storage.SectorRef // var sectors []storage.SectorRef
file := filepath.Join(tsdir, "input-0.dat") file := filepath.Join(tsdir, "input-0.dat")
generateRandomData(file, uint64(abi.PaddedPieceSize(sectorSize).Unpadded()), []byte("sectorSize")) GenerateRandomData(file, uint64(abi.PaddedPieceSize(sectorSize).Unpadded()), []byte("sectorSize"))
in, err := os.Open(file) in, err := os.Open(file)
if err != nil { if err != nil {
return err return err
...@@ -230,7 +230,7 @@ func TestSplitDataInToPieces(sectorSize abi.SectorSize, dataSize uint64) error { ...@@ -230,7 +230,7 @@ func TestSplitDataInToPieces(sectorSize abi.SectorSize, dataSize uint64) error {
for i := 0; i < numFile; i++ { for i := 0; i < numFile; i++ {
filename := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", i)) filename := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", i))
start := time.Now() start := time.Now()
b, err = generateRandomData(filename, dataSize, b) b, err = GenerateRandomData(filename, dataSize, b)
if err != nil { if err != nil {
return err return err
} }
...@@ -309,7 +309,7 @@ func TestSplitDataInToPieces(sectorSize abi.SectorSize, dataSize uint64) error { ...@@ -309,7 +309,7 @@ func TestSplitDataInToPieces(sectorSize abi.SectorSize, dataSize uint64) error {
} }
defer out.Close() defer out.Close()
err = decodePiecesToData(sb, ctx, tsdir, sectorSize, finalPiece.Commitment, out) err = decodePiecesToData(sb, sp, ctx, tsdir, sectorSize, finalPiece.Commitment, out)
if err != nil { if err != nil {
return err return err
} }
...@@ -378,7 +378,7 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { ...@@ -378,7 +378,7 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error {
filename := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", i)) filename := filepath.Join(tsdir, fmt.Sprintf("input-%d.dat", i))
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
Datasize := (r.Intn(1024*1024) + 1024*1024) * 32 Datasize := (r.Intn(1024*1024) + 1024*1024) * 32
b, err = generateRandomData(filename, uint64(Datasize), b) b, err = GenerateRandomData(filename, uint64(Datasize), b)
if err != nil { if err != nil {
return err return err
} }
...@@ -533,7 +533,7 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error { ...@@ -533,7 +533,7 @@ func TestSealAndWindowPoSt(sectorSize abi.SectorSize, numAggregate int) error {
} }
defer out.Close() defer out.Close()
err = decodePiecesToData(sb, ctx, tsdir, sectorSize, finalPieces[i], out) err = decodePiecesToData(sb, sp, ctx, tsdir, sectorSize, finalPieces[i], out)
if err != nil { if err != nil {
return err return err
} }
...@@ -565,7 +565,7 @@ func Insert(sortedPieces []storage.Piece, pieces []storage.Piece, finalPiece sto ...@@ -565,7 +565,7 @@ func Insert(sortedPieces []storage.Piece, pieces []storage.Piece, finalPiece sto
return append(res, sortedPieces[i+1:]...) return append(res, sortedPieces[i+1:]...)
} }
func generateRandomData(filename string, dataSize uint64, b []byte) ([]byte, error) { func GenerateRandomData(filename string, dataSize uint64, b []byte) ([]byte, error) {
if _, err := os.Stat(filename); !os.IsNotExist(err) { if _, err := os.Stat(filename); !os.IsNotExist(err) {
os.Remove(filename) os.Remove(filename)
} }
...@@ -590,26 +590,24 @@ func generateRandomData(filename string, dataSize uint64, b []byte) ([]byte, err ...@@ -590,26 +590,24 @@ func generateRandomData(filename string, dataSize uint64, b []byte) ([]byte, err
return b, nil return b, nil
} }
func decodePiecesToData(sb *Sealer, ctx context.Context, tsdir string, sectorSize abi.SectorSize, finalHash storage.Hash, out io.Writer) error { func decodePiecesToData(sb *Sealer, sp *Encoder, ctx context.Context, tsdir string, sectorSize abi.SectorSize, finalHash storage.Hash, out io.Writer) error {
// var piecesHash []storage.Hash // var piecesHash []storage.Hash
file, err := unseal(sb, ctx, finalHash) err := unseal(sb, ctx, finalHash)
if err != nil { if err != nil {
return err return err
} }
data, err := DecodePiece(ctx, sectorSize, file) data, err := sp.DecodePiece(ctx, sectorSize, finalHash)
file.Close()
if err != nil { if err != nil {
return err return err
} }
piecesHash := data.PieceHash piecesHash := data.PieceHash
for data.HasPre { for data.HasPre {
file, err = unseal(sb, ctx, data.PreHash) err = unseal(sb, ctx, data.PreHash)
if err != nil { if err != nil {
return err return err
} }
data, err = DecodePiece(ctx, sectorSize, file) data, err = sp.DecodePiece(ctx, sectorSize, data.PreHash)
file.Close()
if err != nil { if err != nil {
return err return err
} }
...@@ -618,12 +616,11 @@ func decodePiecesToData(sb *Sealer, ctx context.Context, tsdir string, sectorSiz ...@@ -618,12 +616,11 @@ func decodePiecesToData(sb *Sealer, ctx context.Context, tsdir string, sectorSiz
buf := data.Data[:] buf := data.Data[:]
for _, pieceHash := range piecesHash { for _, pieceHash := range piecesHash {
file, err = unseal(sb, ctx, pieceHash) err = unseal(sb, ctx, pieceHash)
if err != nil { if err != nil {
return err return err
} }
data, err := DecodePiece(ctx, sectorSize, file) data, err := sp.DecodePiece(ctx, sectorSize, pieceHash)
file.Close()
if err != nil { if err != nil {
return err return err
} }
...@@ -640,25 +637,21 @@ func decodePiecesToData(sb *Sealer, ctx context.Context, tsdir string, sectorSiz ...@@ -640,25 +637,21 @@ func decodePiecesToData(sb *Sealer, ctx context.Context, tsdir string, sectorSiz
return nil return nil
} }
func unseal(sb *Sealer, ctx context.Context, fileHash storage.Hash) (*os.File, error) { func unseal(sb *Sealer, ctx context.Context, fileHash storage.Hash) error {
rangeSector, ok := hashMap[fileHash] rangeSector, ok := hashMap[fileHash]
filename := filepath.Join(sb.sectors.GetRoot(), "pieces", fmt.Sprintf("%x.dat", fileHash[:])) filename := filepath.Join(sb.sectors.GetRoot(), "pieces", fmt.Sprintf("%x.dat", fileHash[:]))
if ok { if ok {
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil { if err != nil {
return nil, err return err
} }
defer file.Close()
err = sb.UnsealedRange(ctx, file, rangeSector.Sector, rangeSector.Unsealed, rangeSector.Offset, rangeSector.Size) err = sb.UnsealedRange(ctx, file, rangeSector.Sector, rangeSector.Unsealed, rangeSector.Offset, rangeSector.Size)
if err != nil { if err != nil {
return nil, err return err
} }
file.Close()
}
file, err := os.Open(filename)
if err != nil {
return nil, err
} }
return file, nil return nil
} }
func checkDecodedFile(root string, i int) (bool, error) { func checkDecodedFile(root string, i int) (bool, error) {
......
...@@ -21,6 +21,7 @@ type PieceEncoder interface { ...@@ -21,6 +21,7 @@ type PieceEncoder interface {
// Split and encode data into pieces // Split and encode data into pieces
// Pieces structure is [ Tag | MetaData | HashData ] or [ Tag | PreHash | HashData] // Pieces structure is [ Tag | MetaData | HashData ] or [ Tag | PreHash | HashData]
EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Piece, []storage.Piece, error) EncodeDataToPieces(ctx context.Context, sectorSize abi.SectorSize, file storage.Data) (storage.Piece, []storage.Piece, error)
DecodePiece(ctx context.Context, sectorSize abi.SectorSize, pieceHash storage.Hash) (*storage.DecodedData, error)
} }
//interface //interface
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment