Unverified Commit 3ff2f756 authored by Balint Gabor's avatar Balint Gabor Committed by GitHub

swarm: Chunk refactor (#17659)

Co-authored-by: 's avatarJanos Guljas <janos@resenje.org>
Co-authored-by: 's avatarBalint Gabor <balint.g@gmail.com>
Co-authored-by: 's avatarAnton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: 's avatarViktor Trón <viktor.tron@gmail.com>
parent ff3a5d24
......@@ -39,7 +39,7 @@ func hash(ctx *cli.Context) {
defer f.Close()
stat, _ := f.Stat()
fileStore := storage.NewFileStore(storage.NewMapChunkStore(), storage.NewFileStoreParams())
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams())
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
if err != nil {
utils.Fatalf("%v\n", err)
......
......@@ -48,7 +48,7 @@ func main() {
cli.StringFlag{
Name: "cluster-endpoint",
Value: "testing",
Usage: "cluster to point to (open, or testing)",
Usage: "cluster to point to (local, open or testing)",
Destination: &cluster,
},
cli.IntFlag{
......@@ -76,8 +76,8 @@ func main() {
},
cli.IntFlag{
Name: "filesize",
Value: 1,
Usage: "file size for generated random file in MB",
Value: 1024,
Usage: "file size for generated random file in KB",
Destination: &filesize,
},
}
......
......@@ -39,6 +39,11 @@ import (
func generateEndpoints(scheme string, cluster string, from int, to int) {
if cluster == "prod" {
cluster = ""
} else if cluster == "local" {
for port := from; port <= to; port++ {
endpoints = append(endpoints, fmt.Sprintf("%s://localhost:%v", scheme, port))
}
return
} else {
cluster = cluster + "."
}
......@@ -53,13 +58,13 @@ func generateEndpoints(scheme string, cluster string, from int, to int) {
}
func cliUploadAndSync(c *cli.Context) error {
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size", filesize) }(time.Now())
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
generateEndpoints(scheme, cluster, from, to)
log.Info("uploading to " + endpoints[0] + " and syncing")
f, cleanup := generateRandomFile(filesize * 1000000)
f, cleanup := generateRandomFile(filesize * 1000)
defer cleanup()
hash, err := upload(f, endpoints[0])
......@@ -76,12 +81,7 @@ func cliUploadAndSync(c *cli.Context) error {
log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash))
if filesize < 10 {
time.Sleep(35 * time.Second)
} else {
time.Sleep(15 * time.Second)
time.Sleep(2 * time.Duration(filesize) * time.Second)
}
time.Sleep(3 * time.Second)
wg := sync.WaitGroup{}
for _, endpoint := range endpoints {
......@@ -109,7 +109,7 @@ func cliUploadAndSync(c *cli.Context) error {
// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file
func fetch(hash string, endpoint string, original []byte, ruid string) error {
log.Trace("sleeping", "ruid", ruid)
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash)
res, err := http.Get(endpoint + "/bzz:/" + hash + "/")
......
......@@ -250,13 +250,6 @@ func NewAPI(fileStore *storage.FileStore, dns Resolver, resourceHandler *mru.Han
return
}
// Upload to be used only in TEST
func (a *API) Upload(ctx context.Context, uploadDir, index string, toEncrypt bool) (hash string, err error) {
fs := NewFileSystem(a)
hash, err = fs.Upload(uploadDir, index, toEncrypt)
return hash, err
}
// Retrieve FileStore reader API
func (a *API) Retrieve(ctx context.Context, addr storage.Address) (reader storage.LazySectionReader, isEncrypted bool) {
return a.fileStore.Retrieve(ctx, addr)
......
......@@ -62,6 +62,7 @@ type Config struct {
NetworkID uint64
SwapEnabled bool
SyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
LightNodeEnabled bool
SyncUpdateDelay time.Duration
......@@ -89,7 +90,8 @@ func NewConfig() (c *Config) {
NetworkID: network.DefaultNetworkID,
SwapEnabled: false,
SyncEnabled: true,
DeliverySkipCheck: false,
SyncingSkipCheck: false,
DeliverySkipCheck: true,
SyncUpdateDelay: 15 * time.Second,
SwapAPI: "",
}
......
......@@ -477,12 +477,12 @@ func testBzzGetPath(encrypted bool, t *testing.T) {
var wait func(context.Context) error
ctx := context.TODO()
addr[i], wait, err = srv.FileStore.Store(ctx, reader[i], int64(len(mf)), encrypted)
for j := i + 1; j < len(testmanifest); j++ {
testmanifest[j] = strings.Replace(testmanifest[j], fmt.Sprintf("<key%v>", i), addr[i].Hex(), -1)
}
if err != nil {
t.Fatal(err)
}
for j := i + 1; j < len(testmanifest); j++ {
testmanifest[j] = strings.Replace(testmanifest[j], fmt.Sprintf("<key%v>", i), addr[i].Hex(), -1)
}
err = wait(ctx)
if err != nil {
t.Fatal(err)
......
......@@ -69,9 +69,12 @@ func (a *API) NewManifest(ctx context.Context, toEncrypt bool) (storage.Address,
if err != nil {
return nil, err
}
key, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), toEncrypt)
wait(ctx)
return key, err
addr, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), toEncrypt)
if err != nil {
return nil, err
}
err = wait(ctx)
return addr, err
}
// Manifest hack for supporting Mutable Resource Updates from the bzz: scheme
......@@ -87,8 +90,12 @@ func (a *API) NewResourceManifest(ctx context.Context, resourceAddr string) (sto
if err != nil {
return nil, err
}
key, _, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), false)
return key, err
addr, wait, err := a.Store(ctx, bytes.NewReader(data), int64(len(data)), false)
if err != nil {
return nil, err
}
err = wait(ctx)
return addr, err
}
// ManifestWriter is used to add and remove entries from an underlying manifest
......@@ -106,21 +113,26 @@ func (a *API) NewManifestWriter(ctx context.Context, addr storage.Address, quitC
return &ManifestWriter{a, trie, quitC}, nil
}
// AddEntry stores the given data and adds the resulting key to the manifest
func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (key storage.Address, err error) {
// AddEntry stores the given data and adds the resulting address to the manifest
func (m *ManifestWriter) AddEntry(ctx context.Context, data io.Reader, e *ManifestEntry) (addr storage.Address, err error) {
entry := newManifestTrieEntry(e, nil)
if data != nil {
key, _, err = m.api.Store(ctx, data, e.Size, m.trie.encrypted)
var wait func(context.Context) error
addr, wait, err = m.api.Store(ctx, data, e.Size, m.trie.encrypted)
if err != nil {
return nil, err
}
err = wait(ctx)
if err != nil {
return nil, err
}
entry.Hash = key.Hex()
entry.Hash = addr.Hex()
}
if entry.Hash == "" {
return key, errors.New("missing entry hash")
return addr, errors.New("missing entry hash")
}
m.trie.addEntry(entry, m.quitC)
return key, nil
return addr, nil
}
// RemoveEntry removes the given path from the manifest
......@@ -129,7 +141,7 @@ func (m *ManifestWriter) RemoveEntry(path string) error {
return nil
}
// Store stores the manifest, returning the resulting storage key
// Store stores the manifest, returning the resulting storage address
func (m *ManifestWriter) Store() (storage.Address, error) {
return m.trie.ref, m.trie.recalcAndStore()
}
......@@ -211,51 +223,51 @@ type manifestTrieEntry struct {
subtrie *manifestTrie
}
func loadManifest(ctx context.Context, fileStore *storage.FileStore, hash storage.Address, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
log.Trace("manifest lookup", "key", hash)
func loadManifest(ctx context.Context, fileStore *storage.FileStore, addr storage.Address, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
log.Trace("manifest lookup", "addr", addr)
// retrieve manifest via FileStore
manifestReader, isEncrypted := fileStore.Retrieve(ctx, hash)
log.Trace("reader retrieved", "key", hash)
return readManifest(manifestReader, hash, fileStore, isEncrypted, quitC, decrypt)
manifestReader, isEncrypted := fileStore.Retrieve(ctx, addr)
log.Trace("reader retrieved", "addr", addr)
return readManifest(manifestReader, addr, fileStore, isEncrypted, quitC, decrypt)
}
func readManifest(mr storage.LazySectionReader, hash storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
func readManifest(mr storage.LazySectionReader, addr storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
// TODO check size for oversized manifests
size, err := mr.Size(mr.Context(), quitC)
if err != nil { // size == 0
// can't determine size means we don't have the root chunk
log.Trace("manifest not found", "key", hash)
log.Trace("manifest not found", "addr", addr)
err = fmt.Errorf("Manifest not Found")
return
}
if size > manifestSizeLimit {
log.Warn("manifest exceeds size limit", "key", hash, "size", size, "limit", manifestSizeLimit)
log.Warn("manifest exceeds size limit", "addr", addr, "size", size, "limit", manifestSizeLimit)
err = fmt.Errorf("Manifest size of %v bytes exceeds the %v byte limit", size, manifestSizeLimit)
return
}
manifestData := make([]byte, size)
read, err := mr.Read(manifestData)
if int64(read) < size {
log.Trace("manifest not found", "key", hash)
log.Trace("manifest not found", "addr", addr)
if err == nil {
err = fmt.Errorf("Manifest retrieval cut short: read %v, expect %v", read, size)
}
return
}
log.Debug("manifest retrieved", "key", hash)
log.Debug("manifest retrieved", "addr", addr)
var man struct {
Entries []*manifestTrieEntry `json:"entries"`
}
err = json.Unmarshal(manifestData, &man)
if err != nil {
err = fmt.Errorf("Manifest %v is malformed: %v", hash.Log(), err)
log.Trace("malformed manifest", "key", hash)
err = fmt.Errorf("Manifest %v is malformed: %v", addr.Log(), err)
log.Trace("malformed manifest", "addr", addr)
return
}
log.Trace("manifest entries", "key", hash, "len", len(man.Entries))
log.Trace("manifest entries", "addr", addr, "len", len(man.Entries))
trie = &manifestTrie{
fileStore: fileStore,
......@@ -406,12 +418,12 @@ func (mt *manifestTrie) recalcAndStore() error {
sr := bytes.NewReader(manifest)
ctx := context.TODO()
key, wait, err2 := mt.fileStore.Store(ctx, sr, int64(len(manifest)), mt.encrypted)
addr, wait, err2 := mt.fileStore.Store(ctx, sr, int64(len(manifest)), mt.encrypted)
if err2 != nil {
return err2
}
err2 = wait(ctx)
mt.ref = key
mt.ref = addr
return err2
}
......
......@@ -20,7 +20,6 @@ package fuse
import (
"bytes"
"context"
"crypto/rand"
"flag"
"fmt"
......@@ -111,7 +110,7 @@ func createTestFilesAndUploadToSwarm(t *testing.T, api *api.API, files map[strin
}
//upload directory to swarm and return hash
bzzhash, err := api.Upload(context.TODO(), uploadDir, "", toEncrypt)
bzzhash, err := Upload(uploadDir, "", api, toEncrypt)
if err != nil {
t.Fatalf("Error uploading directory %v: %vm encryption: %v", uploadDir, err, toEncrypt)
}
......@@ -1695,3 +1694,9 @@ func TestFUSE(t *testing.T) {
t.Run("appendFileContentsToEndNonEncrypted", ta.appendFileContentsToEndNonEncrypted)
}
}
func Upload(uploadDir, index string, a *api.API, toEncrypt bool) (hash string, err error) {
fs := api.NewFileSystem(a)
hash, err = fs.Upload(uploadDir, index, toEncrypt)
return hash, err
}
This diff is collapsed.
This diff is collapsed.
......@@ -28,10 +28,13 @@ package priorityqueue
import (
"context"
"errors"
"github.com/ethereum/go-ethereum/log"
)
var (
errContention = errors.New("queue contention")
ErrContention = errors.New("contention")
errBadPriority = errors.New("bad priority")
wakey = struct{}{}
......@@ -39,7 +42,7 @@ var (
// PriorityQueue is the basic structure
type PriorityQueue struct {
queues []chan interface{}
Queues []chan interface{}
wakeup chan struct{}
}
......@@ -50,27 +53,29 @@ func New(n int, l int) *PriorityQueue {
queues[i] = make(chan interface{}, l)
}
return &PriorityQueue{
queues: queues,
Queues: queues,
wakeup: make(chan struct{}, 1),
}
}
// Run is a forever loop popping items from the queues
func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) {
top := len(pq.queues) - 1
top := len(pq.Queues) - 1
p := top
READ:
for {
q := pq.queues[p]
q := pq.Queues[p]
select {
case <-ctx.Done():
return
case x := <-q:
log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
f(x)
p = top
default:
if p > 0 {
p--
log.Trace("priority.queue p > 0", "p", p)
continue READ
}
p = top
......@@ -78,6 +83,7 @@ READ:
case <-ctx.Done():
return
case <-pq.wakeup:
log.Trace("priority.queue wakeup", "p", p)
}
}
}
......@@ -85,23 +91,15 @@ READ:
// Push pushes an item to the appropriate queue specified in the priority argument
// if context is given it waits until either the item is pushed or the Context aborts
// otherwise returns errContention if the queue is full
func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error {
if p < 0 || p >= len(pq.queues) {
func (pq *PriorityQueue) Push(x interface{}, p int) error {
if p < 0 || p >= len(pq.Queues) {
return errBadPriority
}
if ctx == nil {
select {
case pq.queues[p] <- x:
default:
return errContention
}
} else {
select {
case pq.queues[p] <- x:
case <-ctx.Done():
return ctx.Err()
}
log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
select {
case pq.Queues[p] <- x:
default:
return ErrContention
}
select {
case pq.wakeup <- wakey:
......
......@@ -30,7 +30,7 @@ func TestPriorityQueue(t *testing.T) {
results = append(results, v.(string))
wg.Done()
})
pq.Push(context.Background(), "2.0", 2)
pq.Push("2.0", 2)
wg.Wait()
if results[0] != "2.0" {
t.Errorf("expected first result %q, got %q", "2.0", results[0])
......@@ -66,7 +66,7 @@ Loop:
{
priorities: []int{0, 0, 0},
values: []string{"0.0", "0.0", "0.1"},
errors: []error{nil, nil, errContention},
errors: []error{nil, nil, ErrContention},
},
} {
var results []string
......@@ -74,7 +74,7 @@ Loop:
pq := New(3, 2)
wg.Add(len(tc.values))
for j, value := range tc.values {
err := pq.Push(nil, value, tc.priorities[j])
err := pq.Push(value, tc.priorities[j])
if tc.errors != nil && err != tc.errors[j] {
t.Errorf("expected push error %v, got %v", tc.errors[j], err)
continue Loop
......
......@@ -94,7 +94,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) {
}
s.Net = simulations.NewNetwork(
adapters.NewSimAdapter(adapterServices),
adapters.NewTCPAdapter(adapterServices),
&simulations.NetworkConfig{ID: "0"},
)
......@@ -164,17 +164,6 @@ var maxParallelCleanups = 10
func (s *Simulation) Close() {
close(s.done)
// Close all connections before calling the Network Shutdown.
// It is possible that p2p.Server.Stop will block if there are
// existing connections.
for _, c := range s.Net.Conns {
if c.Up {
s.Net.Disconnect(c.One, c.Other)
}
}
s.shutdownWG.Wait()
s.Net.Shutdown()
sem := make(chan struct{}, maxParallelCleanups)
s.mu.RLock()
cleanupFuncs := make([]func(), len(s.cleanupFuncs))
......@@ -206,6 +195,9 @@ func (s *Simulation) Close() {
}
close(s.runC)
}
s.shutdownWG.Wait()
s.Net.Shutdown()
}
// Done returns a channel that is closed when the simulation
......
......@@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
return nil, nil, nil, removeDataDir, err
}
db := storage.NewDBAPI(localStore)
delivery := NewDelivery(to, db)
streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, nil, removeDataDir, err
}
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
......@@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) {
func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) {
func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
rrs.stores[idx].Put(ctx, chunk)
return rrs.stores[idx].Put(ctx, chunk)
}
func (rrs *roundRobinStore) Close() {
......
......@@ -19,12 +19,11 @@ package stream
import (
"context"
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
"fmt"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/discover"
cp "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
......@@ -46,39 +45,34 @@ var (
)
type Delivery struct {
db *storage.DBAPI
kad *network.Kademlia
receiveC chan *ChunkDeliveryMsg
getPeer func(discover.NodeID) *Peer
chunkStore storage.SyncChunkStore
kad *network.Kademlia
getPeer func(discover.NodeID) *Peer
}
func NewDelivery(kad *network.Kademlia, db *storage.DBAPI) *Delivery {
d := &Delivery{
db: db,
kad: kad,
receiveC: make(chan *ChunkDeliveryMsg, deliveryCap),
func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
return &Delivery{
chunkStore: chunkStore,
kad: kad,
}
go d.processReceivedChunks()
return d
}
// SwarmChunkServer implements Server
type SwarmChunkServer struct {
deliveryC chan []byte
batchC chan []byte
db *storage.DBAPI
chunkStore storage.ChunkStore
currentLen uint64
quit chan struct{}
}
// NewSwarmChunkServer is SwarmChunkServer constructor
func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer {
func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
s := &SwarmChunkServer{
deliveryC: make(chan []byte, deliveryCap),
batchC: make(chan []byte),
db: db,
quit: make(chan struct{}),
deliveryC: make(chan []byte, deliveryCap),
batchC: make(chan []byte),
chunkStore: chunkStore,
quit: make(chan struct{}),
}
go s.processDeliveries()
return s
......@@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() {
// GetData retrives chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
chunk, err := s.db.Get(ctx, storage.Address(key))
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
if err != nil {
return nil, err
}
return chunk.SData, nil
return chunk.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
......@@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return err
}
streamer := s.Server.(*SwarmChunkServer)
chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
if chunk.ReqC != nil {
if created {
if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
chunk.SetErrored(storage.ErrChunkForward)
return nil
}
var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
go func() {
select {
case <-ctx.Done():
case <-streamer.quit:
}
go func() {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
"waiting.delivery")
defer osp.Finish()
t := time.NewTimer(10 * time.Minute)
defer t.Stop()
log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.kad.BaseAddr()), "created", created)
start := time.Now()
select {
case <-chunk.ReqC:
log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start))
case <-t.C:
log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr)
chunk.SetErrored(storage.ErrChunkTimeout)
return
}
chunk.SetErrored(nil)
if req.SkipCheck {
err := sp.Deliver(ctx, chunk, s.priority)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
sp.Drop(err)
}
cancel()
}()
go func() {
chunk, err := d.chunkStore.Get(ctx, req.Addr)
if err != nil {
log.Warn("ChunkStore.Get can not retrieve chunk", "err", err)
return
}
if req.SkipCheck {
err = sp.Deliver(ctx, chunk, s.priority)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
streamer.deliveryC <- chunk.Addr[:]
}()
return nil
}
// TODO: call the retrieve function of the outgoing syncer
if req.SkipCheck {
log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr)
if length := len(chunk.SData); length < 9 {
log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
return
}
return sp.Deliver(ctx, chunk, s.priority)
}
streamer.deliveryC <- chunk.Addr[:]
select {
case streamer.deliveryC <- chunk.Address()[:]:
case <-streamer.quit:
}
}()
return nil
}
......@@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
// TODO: Fix context SNAFU
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
......@@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
"chunk.delivery")
defer osp.Finish()
req.peer = sp
d.receiveC <- req
return nil
}
processReceivedChunksCount.Inc(1)
func (d *Delivery) processReceivedChunks() {
R:
for req := range d.receiveC {
processReceivedChunksCount.Inc(1)
if len(req.SData) > cp.DefaultSize+8 {
log.Warn("received chunk is bigger than expected", "len", len(req.SData))
continue R
}
// this should be has locally
chunk, err := d.db.Get(context.TODO(), req.Addr)
if err == nil {
continue R
}
if err != storage.ErrFetching {
log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk)
continue R
}
select {
case <-chunk.ReqC:
log.Error("someone else delivered?", "hash", chunk.Addr.Hex())
continue R
default:
}
chunk.SData = req.SData
d.db.Put(context.TODO(), chunk)
go func(req *ChunkDeliveryMsg) {
err := chunk.WaitToStore()
go func() {
req.peer = sp
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
// TODO: Enable this log line
// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
req.peer.Drop(err)
}
}(req)
}
}
}()
return nil
}
// RequestFromPeers sends a chunk retrieve request to
func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
var success bool
var err error
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
var sp *Peer
spID := req.Source
d.kad.EachConn(hash, 255, func(p *network.Peer, po int, nn bool) bool {
spId := p.ID()
for _, p := range peersToSkip {
if p == spId {
log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId)
if spID != nil {
sp = d.getPeer(*spID)
if sp == nil {
return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
}
} else {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
id := p.ID()
// TODO: skip light nodes that do not accept retrieve requests
if req.SkipPeer(id.String()) {
log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
}
}
sp := d.getPeer(spId)
sp = d.getPeer(id)
if sp == nil {
log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
return true
}
spID = &id
return false
})
if sp == nil {
log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
return true
return nil, nil, errors.New("no peer found")
}
err = sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: hash,
SkipCheck: skipCheck,
}, Top)
if err != nil {
return true
}
requestFromPeersEachCount.Inc(1)
success = true
return false
})
if success {
return nil
}
return errors.New("no peer found")
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
}, Top)
if err != nil {
return nil, nil, err
}
requestFromPeersEachCount.Inc(1)
return spID, sp.quit, nil
}
......@@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) {
peerID := tester.IDs[0]
streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true)
ctx := context.Background()
req := network.NewRequest(
storage.Address(hash0[:]),
true,
&sync.Map{},
)
streamer.delivery.RequestFromPeers(ctx, req)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
......@@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: chunk.Addr[:],
Addr: chunk.Address()[:],
},
Peer: peerID,
},
......@@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
})
hash := storage.Address(hash0[:])
chunk := storage.NewChunk(hash, nil)
chunk.SData = hash
localStore.Put(context.TODO(), chunk)
chunk.WaitToStore()
chunk := storage.NewChunk(hash, hash)
err = localStore.Put(context.TODO(), chunk)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
......@@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
hash = storage.Address(hash1[:])
chunk = storage.NewChunk(hash, nil)
chunk.SData = hash1[:]
localStore.Put(context.TODO(), chunk)
chunk.WaitToStore()
chunk = storage.NewChunk(hash, hash1[:])
err = localStore.Put(context.TODO(), chunk)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
......@@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
chunkKey := hash0[:]
chunkData := hash1[:]
chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey)
if !created {
t.Fatal("chunk already exists")
}
select {
case <-chunk.ReqC:
t.Fatal("chunk is already received")
default:
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
......@@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
},
},
p2ptest.Exchange{
Label: "ChunkDeliveryRequest message",
Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
{
Code: 6,
......@@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
timeout := time.NewTimer(1 * time.Second)
select {
case <-timeout.C:
t.Fatal("timeout receiving chunk")
case <-chunk.ReqC:
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// wait for the chunk to get stored
storedChunk, err := localStore.Get(ctx, chunkKey)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
default:
}
storedChunk, err = localStore.Get(ctx, chunkKey)
time.Sleep(50 * time.Millisecond)
}
storedChunk, err := localStore.Get(context.TODO(), chunkKey)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if !bytes.Equal(storedChunk.SData, chunkData) {
if !bytes.Equal(storedChunk.Data(), chunkData) {
t.Fatal("Retrieved chunk has different data than original")
}
......@@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
......@@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
......@@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
DoSync: true,
SyncUpdateDelay: 0,
})
retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
}
netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
......
......@@ -38,13 +38,18 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
func TestIntervals(t *testing.T) {
func TestIntervalsLive(t *testing.T) {
testIntervals(t, true, nil, false)
testIntervals(t, false, NewRange(9, 26), false)
testIntervals(t, true, NewRange(9, 26), false)
testIntervals(t, true, nil, true)
}
func TestIntervalsHistory(t *testing.T) {
testIntervals(t, false, NewRange(9, 26), false)
testIntervals(t, false, NewRange(9, 26), true)
}
func TestIntervalsLiveAndHistory(t *testing.T) {
testIntervals(t, true, NewRange(9, 26), false)
testIntervals(t, true, NewRange(9, 26), true)
}
......@@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
return newTestExternalClient(db), nil
return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
......@@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
t.Fatal(err)
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
......@@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
log.Error("WaitKademlia error: %v", "err", err)
return err
}
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
......@@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}
go func() {
for d := range disconnections {
if d.Error != nil {
......@@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var liveHashesChan chan []byte
liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
log.Error("Subscription error: %v", "err", err)
log.Error("get hashes", "err", err)
return
}
i := externalStreamSessionAt
......@@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var historyHashesChan chan []byte
historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
log.Error("get hashes", "err", err)
return
}
......@@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
}()
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
if err != nil {
return err
}
if err := <-liveErrC; err != nil {
return err
}
......@@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
db *storage.DBAPI
store storage.SyncChunkStore
enableNotificationsC chan struct{}
}
func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
db: db,
store: store,
enableNotificationsC: make(chan struct{}),
}
}
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
if chunk.ReqC == nil {
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
wait := c.store.FetchFunc(ctx, storage.Address(hash))
if wait == nil {
return nil
}
c.hashes <- hash
//NOTE: This was failing on go1.9.x with a deadlock.
//Sometimes this function would just block
//It is commented now, but it may be well worth after the chunk refactor
//to re-enable this and see if the problem has been addressed
/*
return func() {
return chunk.WaitToStore()
select {
case c.hashes <- hash:
case <-ctx.Done():
log.Warn("testExternalClient NeedData context", "err", ctx.Err())
return func(_ context.Context) error {
return ctx.Err()
}
*/
return nil
}
return wait
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
......
......@@ -18,9 +18,7 @@ package stream
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
......@@ -31,6 +29,8 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
var syncBatchTimeout = 30 * time.Second
// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
......@@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
log.Warn("SendOfferedHashes dropping peer", "err", err)
p.Drop(err)
log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
......@@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
log.Warn("SendOfferedHashes dropping peer", "err", err)
p.Drop(err)
log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
}
......@@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if err != nil {
return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
}
wg := sync.WaitGroup{}
ctr := 0
errC := make(chan error)
ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
ctx = context.WithValue(ctx, "source", p.ID().String())
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
ctr++
want.Set(i/HashSize, true)
wg.Add(1)
// create request and wait until the chunk data arrives and is stored
go func(w func()) {
w()
wg.Done()
go func(w func(context.Context) error) {
select {
case errC <- w(ctx):
case <-ctx.Done():
}
}(wait)
}
}
// done := make(chan bool)
// go func() {
// wg.Wait()
// close(done)
// }()
// go func() {
// select {
// case <-done:
// s.next <- s.batchDone(p, req, hashes)
// case <-time.After(1 * time.Second):
// p.Drop(errors.New("timeout waiting for batch to be delivered"))
// }
// }()
go func() {
wg.Wait()
defer cancel()
for i := 0; i < ctr; i++ {
select {
case err := <-errC:
if err != nil {
log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
p.Drop(err)
return
}
case <-ctx.Done():
log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
return
}
}
select {
case c.next <- c.batchDone(p, req, hashes):
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
case <-ctx.Done():
log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
}
}()
// only send wantedKeysMsg if all missing chunks of the previous batch arrived
......@@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
c.sessionAt = req.From
}
from, to := c.nextBatch(req.To + 1)
log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID())
if from == to {
return nil
}
......@@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
To: to,
}
go func() {
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
case <-time.After(120 * time.Second):
log.Warn("handleOfferedHashesMsg timeout, so dropping peer")
p.Drop(errors.New("handle offered hashes timeout"))
return
case err := <-c.next:
if err != nil {
log.Warn("c.next dropping peer", "err", err)
log.Warn("c.next error dropping peer", "err", err)
p.Drop(err)
return
}
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
return
case <-ctx.Done():
log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
log.Warn("SendPriority err, so dropping peer", "err", err)
p.Drop(err)
log.Warn("SendPriority error", "err", err)
}
}()
return nil
......@@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
log.Warn("SendOfferedHashes dropping peer", "err", err)
p.Drop(err)
log.Warn("SendOfferedHashes error", "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
......@@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
chunk := storage.NewChunk(hash, nil)
chunk.SData = data
if length := len(chunk.SData); length < 9 {
log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
}
chunk := storage.NewChunk(hash, data)
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}
......
......@@ -33,8 +33,6 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
var sendTimeout = 30 * time.Second
type notFoundError struct {
t string
s Stream
......@@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
p.Send(wmsg.Context, wmsg.Msg)
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
p.Drop(err)
}
})
// basic monitoring for pq contention
go func(pq *pq.PriorityQueue) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var len_maxi int
var cap_maxi int
for k := range pq.Queues {
if len_maxi < len(pq.Queues[k]) {
len_maxi = len(pq.Queues[k])
}
if cap_maxi < cap(pq.Queues[k]) {
cap_maxi = cap(pq.Queues[k])
}
}
metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
case <-p.quit:
return
}
}
}(p.pq)
go func() {
<-p.quit
cancel()
......@@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
......@@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
defer sp.Finish()
msg := &ChunkDeliveryMsg{
Addr: chunk.Addr,
SData: chunk.SData,
Addr: chunk.Address(),
SData: chunk.Data(),
}
return p.SendPriority(ctx, msg, priority)
}
......@@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
}
return p.pq.Push(cctx, wmsg, int(priority))
err := p.pq.Push(wmsg, int(priority))
if err == pq.ErrContention {
log.Warn("dropping peer on priority queue contention", "peer", p.ID())
p.Drop(err)
}
return err
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
......
......@@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
return r, cleanup, nil
},
......@@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
cleanup = func() {
os.RemoveAll(datadir)
store.Close()
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 0,
})
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucketKeyFileStore = simulation.BucketKey("filestore")
bucket.Store(bucketKeyFileStore, fileStore)
cleanup = func() {
os.RemoveAll(datadir)
netStore.Close()
r.Close()
}
return r, cleanup, nil
},
......
This diff is collapsed.
......@@ -32,10 +32,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
opentracing "github.com/opentracing/opentracing-go"
)
const (
......@@ -43,8 +41,8 @@ const (
Mid
High
Top
PriorityQueue // number of queues
PriorityQueueCap = 32 // queue capacity
PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
PriorityQueueCap = 128 // queue capacity
HashSize = 32
)
......@@ -73,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
......@@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
return NewSwarmChunkServer(delivery.db), nil
return NewSwarmChunkServer(delivery.chunkStore), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
RegisterSwarmSyncerServer(streamer, db)
RegisterSwarmSyncerClient(streamer, db)
RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore)
if options.DoSync {
// latestIntC function ensures that
......@@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
return peer.Send(context.TODO(), msg)
}
func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
"registry.retrieve")
defer sp.Finish()
return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
}
func (r *Registry) NodeInfo() interface{} {
return nil
}
......@@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
NeedData(context.Context, []byte) func()
NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}
......
......@@ -80,15 +80,17 @@ func newTestClient(t string) *testClient {
}
}
func (self *testClient) NeedData(ctx context.Context, hash []byte) func() {
func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
return func() {
return func(context.Context) error {
<-self.wait0
return nil
}
} else if bytes.Equal(hash, hash2[:]) {
return func() {
return func(context.Context) error {
<-self.wait2
return nil
}
}
return nil
......
......@@ -28,7 +28,6 @@ import (
)
const (
// BatchSize = 2
BatchSize = 128
)
......@@ -38,35 +37,37 @@ const (
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
db *storage.DBAPI
store storage.SyncChunkStore
sessionAt uint64
start uint64
live bool
quit chan struct{}
}
// NewSwarmSyncerServer is contructor for SwarmSyncerServer
func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) {
sessionAt := db.CurrentBucketStorageIndex(po)
func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
sessionAt := syncChunkStore.BinIndex(po)
var start uint64
if live {
start = sessionAt
}
return &SwarmSyncerServer{
po: po,
db: db,
store: syncChunkStore,
sessionAt: sessionAt,
start: start,
live: live,
quit: make(chan struct{}),
}, nil
}
func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) {
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
return NewSwarmSyncerServer(live, po, db)
return NewSwarmSyncerServer(live, po, syncChunkStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
......@@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() {
close(s.quit)
}
// GetSection retrieves the actual chunk from localstore
// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
chunk, err := s.db.Get(ctx, storage.Address(key))
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
chunk, err := s.store.Get(ctx, storage.Address(key))
if err != nil {
return nil, err
}
return chunk.SData, nil
return chunk.Data(), nil
}
// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
if from == 0 {
from = s.start
}
if to <= from || from >= s.sessionAt {
to = math.MaxUint64
if s.live {
if from == 0 {
from = s.start
}
if to <= from || from >= s.sessionAt {
to = math.MaxUint64
}
} else {
if (to < from && to != 0) || from > s.sessionAt {
return nil, 0, 0, nil, nil
}
if to == 0 || to > s.sessionAt {
to = s.sessionAt
}
}
var ticker *time.Ticker
defer func() {
if ticker != nil {
......@@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
}
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool {
batch = append(batch, addr[:]...)
err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
batch = append(batch, key[:]...)
i++
to = idx
return i < BatchSize
......@@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
wait = true
}
log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po))
log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
return batch, from, to, nil, nil
}
......@@ -146,28 +155,26 @@ type SwarmSyncerClient struct {
sessionReader storage.LazySectionReader
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
db *storage.DBAPI
store storage.SyncChunkStore
// chunker storage.Chunker
currentRoot storage.Address
requestFunc func(chunk *storage.Chunk)
end, start uint64
peer *Peer
ignoreExistingRequest bool
stream Stream
currentRoot storage.Address
requestFunc func(chunk *storage.Chunk)
end, start uint64
peer *Peer
stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) {
func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
db: db,
peer: p,
ignoreExistingRequest: ignoreExistingRequest,
stream: stream,
store: store,
peer: p,
stream: stream,
}, nil
}
// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// retrieveC := make(storage.Chunk, chunksCap)
// RunChunkRequestor(p, retrieveC)
// storeC := make(storage.Chunk, chunksCap)
......@@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live))
return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
}
// NeedData
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) {
chunk, _ := s.db.GetOrCreateRequest(ctx, key)
// TODO: we may want to request from this peer anyway even if the request exists
// ignoreExistingRequest is temporary commented out until its functionality is verified.
// For now, this optimization can be disabled.
if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) {
return nil
}
// create request and wait until the chunk data arrives and is stored
return func() {
chunk.WaitToStore()
}
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
return s.store.FetchFunc(ctx, key)
}
// BatchDone
......
......@@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}
}
localStore := store.(*storage.LocalStore)
db := storage.NewDBAPI(localStore)
bucket.Store(bucketKeyDB, db)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, nil, err
}
bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, db)
delivery := NewDelivery(kad, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
......@@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
db := item.(*storage.DBAPI)
db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
netStore := item.(*storage.NetStore)
netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
......@@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
db := item.(*storage.DBAPI)
chunk, err := db.Get(ctx, key)
if err == storage.ErrFetching {
<-chunk.ReqC
} else if err != nil {
continue
db := item.(*storage.NetStore)
_, err := db.Get(ctx, key)
if err == nil {
found++
}
// needed for leveldb not to be closed?
// chunk.WaitToStore()
found++
}
}
log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)
......
......@@ -87,10 +87,10 @@ func TestSwarmNetwork(t *testing.T) {
},
},
{
name: "100_nodes",
name: "50_nodes",
steps: []testSwarmNetworkStep{
{
nodeCount: 100,
nodeCount: 50,
},
},
options: &testSwarmNetworkOptions{
......@@ -99,10 +99,10 @@ func TestSwarmNetwork(t *testing.T) {
disabled: !*longrunning,
},
{
name: "100_nodes_skip_check",
name: "50_nodes_skip_check",
steps: []testSwarmNetworkStep{
{
nodeCount: 100,
nodeCount: 50,
},
},
options: &testSwarmNetworkOptions{
......@@ -287,6 +287,7 @@ func testSwarmNetwork(t *testing.T, o *testSwarmNetworkOptions, steps ...testSwa
config.Init(privkey)
config.DeliverySkipCheck = o.SkipCheck
config.Port = ""
swarm, err := NewSwarm(config, nil)
if err != nil {
......
This diff is collapsed.
......@@ -21,7 +21,6 @@ import (
"context"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"testing"
......@@ -43,27 +42,8 @@ type chunkerTester struct {
t test
}
// fakeChunkStore doesn't store anything, just implements the ChunkStore interface
// It can be used to inject into a hasherStore if you don't want to actually store data just do the
// hashing
type fakeChunkStore struct {
}
// Put doesn't store anything it is just here to implement ChunkStore
func (f *fakeChunkStore) Put(context.Context, *Chunk) {
}
// Gut doesn't store anything it is just here to implement ChunkStore
func (f *fakeChunkStore) Get(context.Context, Address) (*Chunk, error) {
return nil, errors.New("FakeChunkStore doesn't support Get")
}
// Close doesn't store anything it is just here to implement ChunkStore
func (f *fakeChunkStore) Close() {
}
func newTestHasherStore(chunkStore ChunkStore, hash string) *hasherStore {
return NewHasherStore(chunkStore, MakeHashFunc(hash), false)
func newTestHasherStore(store ChunkStore, hash string) *hasherStore {
return NewHasherStore(store, MakeHashFunc(hash), false)
}
func testRandomBrokenData(n int, tester *chunkerTester) {
......@@ -82,11 +62,12 @@ func testRandomBrokenData(n int, tester *chunkerTester) {
putGetter := newTestHasherStore(NewMapChunkStore(), SHA3Hash)
expectedError := fmt.Errorf("Broken reader")
addr, _, err := TreeSplit(context.TODO(), brokendata, int64(n), putGetter)
ctx := context.Background()
key, _, err := TreeSplit(ctx, brokendata, int64(n), putGetter)
if err == nil || err.Error() != expectedError.Error() {
tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err)
}
tester.t.Logf(" Key = %v\n", addr)
tester.t.Logf(" Address = %v\n", key)
}
func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester) Address {
......@@ -96,7 +77,7 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester)
input, found := tester.inputs[uint64(n)]
var data io.Reader
if !found {
data, input = generateRandomData(n)
data, input = GenerateRandomData(n)
tester.inputs[uint64(n)] = input
} else {
data = io.LimitReader(bytes.NewReader(input), int64(n))
......@@ -116,13 +97,13 @@ func testRandomData(usePyramid bool, hash string, n int, tester *chunkerTester)
if err != nil {
tester.t.Fatalf(err.Error())
}
tester.t.Logf(" Key = %v\n", addr)
tester.t.Logf(" Address = %v\n", addr)
err = wait(ctx)
if err != nil {
tester.t.Fatalf(err.Error())
}
reader := TreeJoin(context.TODO(), addr, putGetter, 0)
reader := TreeJoin(ctx, addr, putGetter, 0)
output := make([]byte, n)
r, err := reader.Read(output)
if r != n || err != io.EOF {
......@@ -196,14 +177,14 @@ func TestDataAppend(t *testing.T) {
input, found := tester.inputs[uint64(n)]
var data io.Reader
if !found {
data, input = generateRandomData(n)
data, input = GenerateRandomData(n)
tester.inputs[uint64(n)] = input
} else {
data = io.LimitReader(bytes.NewReader(input), int64(n))
}
chunkStore := NewMapChunkStore()
putGetter := newTestHasherStore(chunkStore, SHA3Hash)
store := NewMapChunkStore()
putGetter := newTestHasherStore(store, SHA3Hash)
ctx := context.TODO()
addr, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
......@@ -214,18 +195,17 @@ func TestDataAppend(t *testing.T) {
if err != nil {
tester.t.Fatalf(err.Error())
}
//create a append data stream
appendInput, found := tester.inputs[uint64(m)]
var appendData io.Reader
if !found {
appendData, appendInput = generateRandomData(m)
appendData, appendInput = GenerateRandomData(m)
tester.inputs[uint64(m)] = appendInput
} else {
appendData = io.LimitReader(bytes.NewReader(appendInput), int64(m))
}
putGetter = newTestHasherStore(chunkStore, SHA3Hash)
putGetter = newTestHasherStore(store, SHA3Hash)
newAddr, wait, err := PyramidAppend(ctx, addr, appendData, putGetter, putGetter)
if err != nil {
tester.t.Fatalf(err.Error())
......@@ -256,18 +236,18 @@ func TestRandomData(t *testing.T) {
tester := &chunkerTester{t: t}
for _, s := range sizes {
treeChunkerKey := testRandomData(false, SHA3Hash, s, tester)
pyramidChunkerKey := testRandomData(true, SHA3Hash, s, tester)
if treeChunkerKey.String() != pyramidChunkerKey.String() {
tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String())
treeChunkerAddress := testRandomData(false, SHA3Hash, s, tester)
pyramidChunkerAddress := testRandomData(true, SHA3Hash, s, tester)
if treeChunkerAddress.String() != pyramidChunkerAddress.String() {
tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerAddress.String(), pyramidChunkerAddress.String())
}
}
for _, s := range sizes {
treeChunkerKey := testRandomData(false, BMTHash, s, tester)
pyramidChunkerKey := testRandomData(true, BMTHash, s, tester)
if treeChunkerKey.String() != pyramidChunkerKey.String() {
tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String())
treeChunkerAddress := testRandomData(false, BMTHash, s, tester)
pyramidChunkerAddress := testRandomData(true, BMTHash, s, tester)
if treeChunkerAddress.String() != pyramidChunkerAddress.String() {
tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerAddress.String(), pyramidChunkerAddress.String())
}
}
}
......@@ -312,12 +292,18 @@ func benchmarkSplitTreeSHA3(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash)
putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash)
_, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter)
ctx := context.Background()
_, wait, err := TreeSplit(ctx, data, int64(n), putGetter)
if err != nil {
t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
}
}
......@@ -325,36 +311,50 @@ func benchmarkSplitTreeBMT(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash)
putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash)
_, _, err := TreeSplit(context.TODO(), data, int64(n), putGetter)
ctx := context.Background()
_, wait, err := TreeSplit(ctx, data, int64(n), putGetter)
if err != nil {
t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
}
}
func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
func benchmarkSplitPyramidBMT(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, SHA3Hash)
putGetter := newTestHasherStore(&FakeChunkStore{}, BMTHash)
_, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter)
ctx := context.Background()
_, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
}
}
func benchmarkSplitPyramidBMT(n int, t *testing.B) {
func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
data := testDataReader(n)
putGetter := newTestHasherStore(&fakeChunkStore{}, BMTHash)
putGetter := newTestHasherStore(&FakeChunkStore{}, SHA3Hash)
_, _, err := PyramidSplit(context.TODO(), data, putGetter, putGetter)
ctx := context.Background()
_, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
}
err = wait(ctx)
if err != nil {
t.Fatalf(err.Error())
}
......@@ -367,10 +367,10 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
data := testDataReader(n)
data1 := testDataReader(m)
chunkStore := NewMapChunkStore()
putGetter := newTestHasherStore(chunkStore, SHA3Hash)
store := NewMapChunkStore()
putGetter := newTestHasherStore(store, SHA3Hash)
ctx := context.TODO()
ctx := context.Background()
key, wait, err := PyramidSplit(ctx, data, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
......@@ -380,7 +380,7 @@ func benchmarkSplitAppendPyramid(n, m int, t *testing.B) {
t.Fatalf(err.Error())
}
putGetter = newTestHasherStore(chunkStore, SHA3Hash)
putGetter = newTestHasherStore(store, SHA3Hash)
_, wait, err = PyramidAppend(ctx, key, data1, putGetter, putGetter)
if err != nil {
t.Fatalf(err.Error())
......
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"context"
"sync"
)
/*
ChunkStore interface is implemented by :
- MemStore: a memory cache
- DbStore: local disk/db store
- LocalStore: a combination (sequence of) memStore and dbStore
- NetStore: cloud storage abstraction layer
- FakeChunkStore: dummy store which doesn't store anything just implements the interface
*/
type ChunkStore interface {
Put(context.Context, *Chunk) // effectively there is no error even if there is an error
Get(context.Context, Address) (*Chunk, error)
Close()
}
// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory.
type MapChunkStore struct {
chunks map[string]*Chunk
mu sync.RWMutex
}
func NewMapChunkStore() *MapChunkStore {
return &MapChunkStore{
chunks: make(map[string]*Chunk),
}
}
func (m *MapChunkStore) Put(ctx context.Context, chunk *Chunk) {
m.mu.Lock()
defer m.mu.Unlock()
m.chunks[chunk.Addr.Hex()] = chunk
chunk.markAsStored()
}
func (m *MapChunkStore) Get(ctx context.Context, addr Address) (*Chunk, error) {
m.mu.RLock()
defer m.mu.RUnlock()
chunk := m.chunks[addr.Hex()]
if chunk == nil {
return nil, ErrChunkNotFound
}
return chunk, nil
}
func (m *MapChunkStore) Close() {
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/swarm/log"
)
// PutChunks adds chunks to localstore
// It waits for receive on the stored channel
// It logs but does not fail on delivery error
func PutChunks(store *LocalStore, chunks ...*Chunk) {
wg := sync.WaitGroup{}
wg.Add(len(chunks))
go func() {
for _, c := range chunks {
<-c.dbStoredC
if err := c.GetErrored(); err != nil {
log.Error("chunk store fail", "err", err, "key", c.Addr)
}
wg.Done()
}
}()
for _, c := range chunks {
go store.Put(context.TODO(), c)
}
wg.Wait()
}
......@@ -23,16 +23,20 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/log"
ch "github.com/ethereum/go-ethereum/swarm/chunk"
colorable "github.com/mattn/go-colorable"
)
var (
loglevel = flag.Int("loglevel", 3, "verbosity of logs")
loglevel = flag.Int("loglevel", 3, "verbosity of logs")
getTimeout = 30 * time.Second
)
func init() {
......@@ -56,47 +60,73 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader
}
}
func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) {
return mput(store, processors, n, GenerateRandomChunk)
func newLDBStore(t *testing.T) (*LDBStore, func()) {
dir, err := ioutil.TempDir("", "bzz-storage-test")
if err != nil {
t.Fatal(err)
}
log.Trace("memstore.tempdir", "dir", dir)
ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir)
db, err := NewLDBStore(ldbparams)
if err != nil {
t.Fatal(err)
}
cleanup := func() {
db.Close()
err := os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
}
return db, cleanup
}
func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) {
wg := sync.WaitGroup{}
wg.Add(processors)
c := make(chan *Chunk)
for i := 0; i < processors; i++ {
func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) {
return mput(store, n, GenerateRandomChunk)
}
func mputChunks(store ChunkStore, chunks ...Chunk) error {
i := 0
f := func(n int64) Chunk {
chunk := chunks[i]
i++
return chunk
}
_, err := mput(store, len(chunks), f)
return err
}
func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) {
// put to localstore and wait for stored channel
// does not check delivery error state
errc := make(chan error)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for i := int64(0); i < int64(n); i++ {
chunk := f(ch.DefaultSize)
go func() {
defer wg.Done()
for chunk := range c {
wg.Add(1)
chunk := chunk
store.Put(context.TODO(), chunk)
go func() {
defer wg.Done()
<-chunk.dbStoredC
}()
select {
case errc <- store.Put(ctx, chunk):
case <-ctx.Done():
}
}()
hs = append(hs, chunk)
}
fa := f
if _, ok := store.(*MemStore); ok {
fa = func(i int64) *Chunk {
chunk := f(i)
chunk.markAsStored()
return chunk
}
}
// wait for all chunks to be stored
for i := 0; i < n; i++ {
chunk := fa(int64(i))
hs = append(hs, chunk.Addr)
c <- chunk
err := <-errc
if err != nil {
return nil, err
}
}
close(c)
wg.Wait()
return hs
return hs, nil
}
func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error {
func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error) error {
wg := sync.WaitGroup{}
wg.Add(len(hs))
errc := make(chan error)
......@@ -104,6 +134,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error)
for _, k := range hs {
go func(h Address) {
defer wg.Done()
// TODO: write timeout with context
chunk, err := store.Get(context.TODO(), h)
if err != nil {
errc <- err
......@@ -143,57 +174,54 @@ func (r *brokenLimitedReader) Read(buf []byte) (int, error) {
return r.lr.Read(buf)
}
func generateRandomData(l int) (r io.Reader, slice []byte) {
slice = make([]byte, l)
if _, err := rand.Read(slice); err != nil {
panic("rand error")
func testStoreRandom(m ChunkStore, n int, chunksize int64, t *testing.T) {
chunks, err := mputRandomChunks(m, n, chunksize)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
r = io.LimitReader(bytes.NewReader(slice), int64(l))
return
}
func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
hs := mputRandomChunks(m, processors, n, chunksize)
err := mget(m, hs, nil)
err = mget(m, chunkAddresses(chunks), nil)
if err != nil {
t.Fatalf("testStore failed: %v", err)
}
}
func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) {
hs := mputRandomChunks(m, processors, n, chunksize)
f := func(h Address, chunk *Chunk) error {
if !bytes.Equal(h, chunk.Addr) {
return fmt.Errorf("key does not match retrieved chunk Key")
func testStoreCorrect(m ChunkStore, n int, chunksize int64, t *testing.T) {
chunks, err := mputRandomChunks(m, n, chunksize)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
f := func(h Address, chunk Chunk) error {
if !bytes.Equal(h, chunk.Address()) {
return fmt.Errorf("key does not match retrieved chunk Address")
}
hasher := MakeHashFunc(DefaultHash)()
hasher.ResetWithLength(chunk.SData[:8])
hasher.Write(chunk.SData[8:])
hasher.ResetWithLength(chunk.SpanBytes())
hasher.Write(chunk.Payload())
exp := hasher.Sum(nil)
if !bytes.Equal(h, exp) {
return fmt.Errorf("key is not hash of chunk data")
}
return nil
}
err := mget(m, hs, f)
err = mget(m, chunkAddresses(chunks), f)
if err != nil {
t.Fatalf("testStore failed: %v", err)
}
}
func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
chunks := make([]*Chunk, n)
func benchmarkStorePut(store ChunkStore, n int, chunksize int64, b *testing.B) {
chunks := make([]Chunk, n)
i := 0
f := func(dataSize int64) *Chunk {
f := func(dataSize int64) Chunk {
chunk := GenerateRandomChunk(dataSize)
chunks[i] = chunk
i++
return chunk
}
mput(store, processors, n, f)
mput(store, n, f)
f = func(dataSize int64) *Chunk {
f = func(dataSize int64) Chunk {
chunk := chunks[i]
i++
return chunk
......@@ -204,18 +232,62 @@ func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64,
for j := 0; j < b.N; j++ {
i = 0
mput(store, processors, n, f)
mput(store, n, f)
}
}
func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) {
hs := mputRandomChunks(store, processors, n, chunksize)
func benchmarkStoreGet(store ChunkStore, n int, chunksize int64, b *testing.B) {
chunks, err := mputRandomChunks(store, n, chunksize)
if err != nil {
b.Fatalf("expected no error, got %v", err)
}
b.ReportAllocs()
b.ResetTimer()
addrs := chunkAddresses(chunks)
for i := 0; i < b.N; i++ {
err := mget(store, hs, nil)
err := mget(store, addrs, nil)
if err != nil {
b.Fatalf("mget failed: %v", err)
}
}
}
// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory.
type MapChunkStore struct {
chunks map[string]Chunk
mu sync.RWMutex
}
func NewMapChunkStore() *MapChunkStore {
return &MapChunkStore{
chunks: make(map[string]Chunk),
}
}
func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error {
m.mu.Lock()
defer m.mu.Unlock()
m.chunks[ch.Address().Hex()] = ch
return nil
}
func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
m.mu.RLock()
defer m.mu.RUnlock()
chunk := m.chunks[ref.Hex()]
if chunk == nil {
return nil, ErrChunkNotFound
}
return chunk, nil
}
func (m *MapChunkStore) Close() {
}
func chunkAddresses(chunks []Chunk) []Address {
addrs := make([]Address, len(chunks))
for i, ch := range chunks {
addrs[i] = ch.Address()
}
return addrs
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package storage
import "context"
// wrapper of db-s to provide mockable custom local chunk store access to syncer
type DBAPI struct {
db *LDBStore
loc *LocalStore
}
func NewDBAPI(loc *LocalStore) *DBAPI {
return &DBAPI{loc.DbStore, loc}
}
// to obtain the chunks from address or request db entry only
func (d *DBAPI) Get(ctx context.Context, addr Address) (*Chunk, error) {
return d.loc.Get(ctx, addr)
}
// current storage counter of chunk db
func (d *DBAPI) CurrentBucketStorageIndex(po uint8) uint64 {
return d.db.CurrentBucketStorageIndex(po)
}
// iteration storage counter and proximity order
func (d *DBAPI) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
return d.db.SyncIterator(from, to, po, f)
}
// to obtain the chunks from address or request db entry only
func (d *DBAPI) GetOrCreateRequest(ctx context.Context, addr Address) (*Chunk, bool) {
return d.loc.GetOrCreateRequest(ctx, addr)
}
// to obtain the chunks from key or request db entry only
func (d *DBAPI) Put(ctx context.Context, chunk *Chunk) {
d.loc.Put(ctx, chunk)
}
......@@ -49,11 +49,11 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
fileStore := NewFileStore(localStore, NewFileStoreParams())
defer os.RemoveAll("/tmp/bzz")
reader, slice := generateRandomData(testDataSize)
reader, slice := GenerateRandomData(testDataSize)
ctx := context.TODO()
key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt)
if err != nil {
t.Errorf("Store error: %v", err)
t.Fatalf("Store error: %v", err)
}
err = wait(ctx)
if err != nil {
......@@ -66,13 +66,13 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
resultSlice := make([]byte, len(slice))
n, err := resultReader.ReadAt(resultSlice, 0)
if err != io.EOF {
t.Errorf("Retrieve error: %v", err)
t.Fatalf("Retrieve error: %v", err)
}
if n != len(slice) {
t.Errorf("Slice size error got %d, expected %d.", n, len(slice))
t.Fatalf("Slice size error got %d, expected %d.", n, len(slice))
}
if !bytes.Equal(slice, resultSlice) {
t.Errorf("Comparison error.")
t.Fatalf("Comparison error.")
}
ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666)
ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666)
......@@ -86,13 +86,13 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
}
n, err = resultReader.ReadAt(resultSlice, 0)
if err != io.EOF {
t.Errorf("Retrieve error after removing memStore: %v", err)
t.Fatalf("Retrieve error after removing memStore: %v", err)
}
if n != len(slice) {
t.Errorf("Slice size error after removing memStore got %d, expected %d.", n, len(slice))
t.Fatalf("Slice size error after removing memStore got %d, expected %d.", n, len(slice))
}
if !bytes.Equal(slice, resultSlice) {
t.Errorf("Comparison error after removing memStore.")
t.Fatalf("Comparison error after removing memStore.")
}
}
......@@ -114,7 +114,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
DbStore: db,
}
fileStore := NewFileStore(localStore, NewFileStoreParams())
reader, slice := generateRandomData(testDataSize)
reader, slice := GenerateRandomData(testDataSize)
ctx := context.TODO()
key, wait, err := fileStore.Store(ctx, reader, testDataSize, toEncrypt)
if err != nil {
......@@ -122,7 +122,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
}
err = wait(ctx)
if err != nil {
t.Errorf("Store error: %v", err)
t.Fatalf("Store error: %v", err)
}
resultReader, isEncrypted := fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
......@@ -131,13 +131,13 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
resultSlice := make([]byte, len(slice))
n, err := resultReader.ReadAt(resultSlice, 0)
if err != io.EOF {
t.Errorf("Retrieve error: %v", err)
t.Fatalf("Retrieve error: %v", err)
}
if n != len(slice) {
t.Errorf("Slice size error got %d, expected %d.", n, len(slice))
t.Fatalf("Slice size error got %d, expected %d.", n, len(slice))
}
if !bytes.Equal(slice, resultSlice) {
t.Errorf("Comparison error.")
t.Fatalf("Comparison error.")
}
// Clear memStore
memStore.setCapacity(0)
......@@ -148,7 +148,7 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
}
if _, err = resultReader.ReadAt(resultSlice, 0); err == nil {
t.Errorf("Was able to read %d bytes from an empty memStore.", len(slice))
t.Fatalf("Was able to read %d bytes from an empty memStore.", len(slice))
}
// check how it works with localStore
fileStore.ChunkStore = localStore
......@@ -162,12 +162,12 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
}
n, err = resultReader.ReadAt(resultSlice, 0)
if err != io.EOF {
t.Errorf("Retrieve error after clearing memStore: %v", err)
t.Fatalf("Retrieve error after clearing memStore: %v", err)
}
if n != len(slice) {
t.Errorf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice))
t.Fatalf("Slice size error after clearing memStore got %d, expected %d.", n, len(slice))
}
if !bytes.Equal(slice, resultSlice) {
t.Errorf("Comparison error after clearing memStore.")
t.Fatalf("Comparison error after clearing memStore.")
}
}
......@@ -19,10 +19,10 @@ package storage
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/swarm/chunk"
ch "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
)
......@@ -30,31 +30,36 @@ type hasherStore struct {
store ChunkStore
toEncrypt bool
hashFunc SwarmHasher
hashSize int // content hash size
refSize int64 // reference size (content hash + possibly encryption key)
wg *sync.WaitGroup
closed chan struct{}
hashSize int // content hash size
refSize int64 // reference size (content hash + possibly encryption key)
nrChunks uint64 // number of chunks to store
errC chan error // global error channel
doneC chan struct{} // closed by Close() call to indicate that count is the final number of chunks
quitC chan struct{} // closed to quit unterminated routines
}
// NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces.
// With the HasherStore you can put and get chunk data (which is just []byte) into a ChunkStore
// and the hasherStore will take core of encryption/decryption of data if necessary
func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore {
func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool) *hasherStore {
hashSize := hashFunc().Size()
refSize := int64(hashSize)
if toEncrypt {
refSize += encryption.KeyLength
}
return &hasherStore{
store: chunkStore,
h := &hasherStore{
store: store,
toEncrypt: toEncrypt,
hashFunc: hashFunc,
hashSize: hashSize,
refSize: refSize,
wg: &sync.WaitGroup{},
closed: make(chan struct{}),
errC: make(chan error),
doneC: make(chan struct{}),
quitC: make(chan struct{}),
}
return h
}
// Put stores the chunkData into the ChunkStore of the hasherStore and returns the reference.
......@@ -62,7 +67,6 @@ func NewHasherStore(chunkStore ChunkStore, hashFunc SwarmHasher, toEncrypt bool)
// Asynchronous function, the data will not necessarily be stored when it returns.
func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, error) {
c := chunkData
size := chunkData.Size()
var encryptionKey encryption.Key
if h.toEncrypt {
var err error
......@@ -71,29 +75,28 @@ func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference,
return nil, err
}
}
chunk := h.createChunk(c, size)
chunk := h.createChunk(c)
h.storeChunk(ctx, chunk)
return Reference(append(chunk.Addr, encryptionKey...)), nil
return Reference(append(chunk.Address(), encryptionKey...)), nil
}
// Get returns data of the chunk with the given reference (retrieved from the ChunkStore of hasherStore).
// If the data is encrypted and the reference contains an encryption key, it will be decrypted before
// return.
func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error) {
key, encryptionKey, err := parseReference(ref, h.hashSize)
addr, encryptionKey, err := parseReference(ref, h.hashSize)
if err != nil {
return nil, err
}
toDecrypt := (encryptionKey != nil)
chunk, err := h.store.Get(ctx, key)
chunk, err := h.store.Get(ctx, addr)
if err != nil {
return nil, err
}
chunkData := chunk.SData
chunkData := ChunkData(chunk.Data())
toDecrypt := (encryptionKey != nil)
if toDecrypt {
var err error
chunkData, err = h.decryptChunkData(chunkData, encryptionKey)
......@@ -107,16 +110,40 @@ func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error)
// Close indicates that no more chunks will be put with the hasherStore, so the Wait
// function can return when all the previously put chunks has been stored.
func (h *hasherStore) Close() {
close(h.closed)
close(h.doneC)
}
// Wait returns when
// 1) the Close() function has been called and
// 2) all the chunks which has been Put has been stored
func (h *hasherStore) Wait(ctx context.Context) error {
<-h.closed
h.wg.Wait()
return nil
defer close(h.quitC)
var nrStoredChunks uint64 // number of stored chunks
var done bool
doneC := h.doneC
for {
select {
// if context is done earlier, just return with the error
case <-ctx.Done():
return ctx.Err()
// doneC is closed if all chunks have been submitted, from then we just wait until all of them are also stored
case <-doneC:
done = true
doneC = nil
// a chunk has been stored, if err is nil, then successfully, so increase the stored chunk counter
case err := <-h.errC:
if err != nil {
return err
}
nrStoredChunks++
}
// if all the chunks have been submitted and all of them are stored, then we can return
if done {
if nrStoredChunks >= atomic.LoadUint64(&h.nrChunks) {
return nil
}
}
}
}
func (h *hasherStore) createHash(chunkData ChunkData) Address {
......@@ -126,12 +153,9 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address {
return hasher.Sum(nil)
}
func (h *hasherStore) createChunk(chunkData ChunkData, chunkSize int64) *Chunk {
func (h *hasherStore) createChunk(chunkData ChunkData) *chunk {
hash := h.createHash(chunkData)
chunk := NewChunk(hash, nil)
chunk.SData = chunkData
chunk.Size = chunkSize
chunk := NewChunk(hash, chunkData)
return chunk
}
......@@ -162,10 +186,10 @@ func (h *hasherStore) decryptChunkData(chunkData ChunkData, encryptionKey encryp
// removing extra bytes which were just added for padding
length := ChunkData(decryptedSpan).Size()
for length > chunk.DefaultSize {
length = length + (chunk.DefaultSize - 1)
length = length / chunk.DefaultSize
length *= h.refSize
for length > ch.DefaultSize {
length = length + (ch.DefaultSize - 1)
length = length / ch.DefaultSize
length *= uint64(h.refSize)
}
c := make(ChunkData, length+8)
......@@ -205,32 +229,32 @@ func (h *hasherStore) decrypt(chunkData ChunkData, key encryption.Key) ([]byte,
}
func (h *hasherStore) newSpanEncryption(key encryption.Key) encryption.Encryption {
return encryption.New(key, 0, uint32(chunk.DefaultSize/h.refSize), sha3.NewKeccak256)
return encryption.New(key, 0, uint32(ch.DefaultSize/h.refSize), sha3.NewKeccak256)
}
func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryption {
return encryption.New(key, int(chunk.DefaultSize), 0, sha3.NewKeccak256)
return encryption.New(key, int(ch.DefaultSize), 0, sha3.NewKeccak256)
}
func (h *hasherStore) storeChunk(ctx context.Context, chunk *Chunk) {
h.wg.Add(1)
func (h *hasherStore) storeChunk(ctx context.Context, chunk *chunk) {
atomic.AddUint64(&h.nrChunks, 1)
go func() {
<-chunk.dbStoredC
h.wg.Done()
select {
case h.errC <- h.store.Put(ctx, chunk):
case <-h.quitC:
}
}()
h.store.Put(ctx, chunk)
}
func parseReference(ref Reference, hashSize int) (Address, encryption.Key, error) {
encryptedKeyLength := hashSize + encryption.KeyLength
encryptedRefLength := hashSize + encryption.KeyLength
switch len(ref) {
case KeyLength:
case AddressLength:
return Address(ref), nil, nil
case encryptedKeyLength:
case encryptedRefLength:
encKeyIdx := len(ref) - encryption.KeyLength
return Address(ref[:encKeyIdx]), encryption.Key(ref[encKeyIdx:]), nil
default:
return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedKeyLength, len(ref))
return nil, nil, fmt.Errorf("Invalid reference length, expected %v or %v got %v", hashSize, encryptedRefLength, len(ref))
}
}
......@@ -46,14 +46,16 @@ func TestHasherStore(t *testing.T) {
hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt)
// Put two random chunks into the hasherStore
chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).SData
key1, err := hasherStore.Put(context.TODO(), chunkData1)
chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data()
ctx, cancel := context.WithTimeout(context.Background(), getTimeout)
defer cancel()
key1, err := hasherStore.Put(ctx, chunkData1)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).SData
key2, err := hasherStore.Put(context.TODO(), chunkData2)
chunkData2 := GenerateRandomChunk(int64(tt.chunkLength)).Data()
key2, err := hasherStore.Put(ctx, chunkData2)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
......@@ -61,13 +63,13 @@ func TestHasherStore(t *testing.T) {
hasherStore.Close()
// Wait until chunks are really stored
err = hasherStore.Wait(context.TODO())
err = hasherStore.Wait(ctx)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
// Get the first chunk
retrievedChunkData1, err := hasherStore.Get(context.TODO(), key1)
retrievedChunkData1, err := hasherStore.Get(ctx, key1)
if err != nil {
t.Fatalf("Expected no error, got \"%v\"", err)
}
......@@ -78,7 +80,7 @@ func TestHasherStore(t *testing.T) {
}
// Get the second chunk
retrievedChunkData2, err := hasherStore.Get(context.TODO(), key2)
retrievedChunkData2, err := hasherStore.Get(ctx, key2)
if err != nil {
t.Fatalf("Expected no error, got \"%v\"", err)
}
......@@ -105,12 +107,12 @@ func TestHasherStore(t *testing.T) {
}
// Check if chunk data in store is encrypted or not
chunkInStore, err := chunkStore.Get(context.TODO(), hash1)
chunkInStore, err := chunkStore.Get(ctx, hash1)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
chunkDataInStore := chunkInStore.SData
chunkDataInStore := chunkInStore.Data()
if tt.toEncrypt && bytes.Equal(chunkData1, chunkDataInStore) {
t.Fatalf("Chunk expected to be encrypted but it is stored without encryption")
......
This diff is collapsed.
This diff is collapsed.
......@@ -18,8 +18,6 @@ package storage
import (
"context"
"encoding/binary"
"fmt"
"path/filepath"
"sync"
......@@ -97,123 +95,89 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) {
// when the chunk is stored in memstore.
// After the LDBStore.Put, it is ensured that the MemStore
// contains the chunk with the same data, but nil ReqC channel.
func (ls *LocalStore) Put(ctx context.Context, chunk *Chunk) {
func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error {
valid := true
// ls.Validators contains a list of one validator per chunk type.
// if one validator succeeds, then the chunk is valid
for _, v := range ls.Validators {
if valid = v.Validate(chunk.Addr, chunk.SData); valid {
if valid = v.Validate(chunk.Address(), chunk.Data()); valid {
break
}
}
if !valid {
log.Trace("invalid chunk", "addr", chunk.Addr, "len", len(chunk.SData))
chunk.SetErrored(ErrChunkInvalid)
chunk.markAsStored()
return
return ErrChunkInvalid
}
log.Trace("localstore.put", "addr", chunk.Addr)
log.Trace("localstore.put", "key", chunk.Address())
ls.mu.Lock()
defer ls.mu.Unlock()
chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
memChunk, err := ls.memStore.Get(ctx, chunk.Addr)
switch err {
case nil:
if memChunk.ReqC == nil {
chunk.markAsStored()
return
}
case ErrChunkNotFound:
default:
chunk.SetErrored(err)
return
_, err := ls.memStore.Get(ctx, chunk.Address())
if err == nil {
return nil
}
ls.DbStore.Put(ctx, chunk)
// chunk is no longer a request, but a chunk with data, so replace it in memStore
newc := NewChunk(chunk.Addr, nil)
newc.SData = chunk.SData
newc.Size = chunk.Size
newc.dbStoredC = chunk.dbStoredC
ls.memStore.Put(ctx, newc)
if memChunk != nil && memChunk.ReqC != nil {
close(memChunk.ReqC)
if err != nil && err != ErrChunkNotFound {
return err
}
ls.memStore.Put(ctx, chunk)
err = ls.DbStore.Put(ctx, chunk)
return err
}
// Get(chunk *Chunk) looks up a chunk in the local stores
// This method is blocking until the chunk is retrieved
// so additional timeout may be needed to wrap this call if
// ChunkStores are remote and can have long latency
func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.get(ctx, addr)
}
func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) {
chunk, err = ls.memStore.Get(ctx, addr)
if err != nil && err != ErrChunkNotFound {
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
return nil, err
}
if err == nil {
if chunk.ReqC != nil {
select {
case <-chunk.ReqC:
default:
metrics.GetOrRegisterCounter("localstore.get.errfetching", nil).Inc(1)
return chunk, ErrFetching
}
}
metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
return
return chunk, nil
}
metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1)
chunk, err = ls.DbStore.Get(ctx, addr)
if err != nil {
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
return
return nil, err
}
chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
ls.memStore.Put(ctx, chunk)
return
return chunk, nil
}
// retrieve logic common for local and network chunk retrieval requests
func (ls *LocalStore) GetOrCreateRequest(ctx context.Context, addr Address) (chunk *Chunk, created bool) {
metrics.GetOrRegisterCounter("localstore.getorcreaterequest", nil).Inc(1)
func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error {
ls.mu.Lock()
defer ls.mu.Unlock()
var err error
chunk, err = ls.get(ctx, addr)
if err == nil && chunk.GetErrored() == nil {
metrics.GetOrRegisterCounter("localstore.getorcreaterequest.hit", nil).Inc(1)
log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v found locally", addr))
return chunk, false
_, err := ls.get(ctx, addr)
if err == nil {
return nil
}
if err == ErrFetching && chunk.GetErrored() == nil {
metrics.GetOrRegisterCounter("localstore.getorcreaterequest.errfetching", nil).Inc(1)
log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v hit on an existing request %v", addr, chunk.ReqC))
return chunk, false
return func(context.Context) error {
return err
}
// no data and no request status
metrics.GetOrRegisterCounter("localstore.getorcreaterequest.miss", nil).Inc(1)
log.Trace(fmt.Sprintf("LocalStore.GetOrRetrieve: %v not found locally. open new request", addr))
chunk = NewChunk(addr, make(chan bool))
ls.memStore.Put(ctx, chunk)
return chunk, true
}
// RequestsCacheLen returns the current number of outgoing requests stored in the cache
func (ls *LocalStore) RequestsCacheLen() int {
return ls.memStore.requests.Len()
func (ls *LocalStore) BinIndex(po uint8) uint64 {
return ls.DbStore.BinIndex(po)
}
func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
return ls.DbStore.SyncIterator(from, to, po, f)
}
// Close the local store
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -72,7 +72,7 @@ type UpdateLookup struct {
// 4 bytes period
// 4 bytes version
// storage.Keylength for rootAddr
const updateLookupLength = 4 + 4 + storage.KeyLength
const updateLookupLength = 4 + 4 + storage.AddressLength
// UpdateAddr calculates the resource update chunk address corresponding to this lookup key
func (u *UpdateLookup) UpdateAddr() (updateAddr storage.Address) {
......@@ -90,7 +90,7 @@ func (u *UpdateLookup) binaryPut(serializedData []byte) error {
if len(serializedData) != updateLookupLength {
return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize UpdateLookup. Expected %d, got %d", updateLookupLength, len(serializedData))
}
if len(u.rootAddr) != storage.KeyLength {
if len(u.rootAddr) != storage.AddressLength {
return NewError(ErrInvalidValue, "UpdateLookup.binaryPut called without rootAddr set")
}
binary.LittleEndian.PutUint32(serializedData[:4], u.period)
......@@ -111,7 +111,7 @@ func (u *UpdateLookup) binaryGet(serializedData []byte) error {
}
u.period = binary.LittleEndian.Uint32(serializedData[:4])
u.version = binary.LittleEndian.Uint32(serializedData[4:8])
u.rootAddr = storage.Address(make([]byte, storage.KeyLength))
u.rootAddr = storage.Address(make([]byte, storage.AddressLength))
copy(u.rootAddr[:], serializedData[8:])
return nil
}
......@@ -142,7 +142,7 @@ func (r *ResourceMetadata) serializeAndHash() (rootAddr, metaHash []byte, chunkD
}
// creates a metadata chunk out of a resourceMetadata structure
func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []byte, err error) {
func (metadata *ResourceMetadata) newChunk() (chunk storage.Chunk, metaHash []byte, err error) {
// the metadata chunk contains a timestamp of when the resource starts to be valid
// and also how frequently it is expected to be updated
// from this we know at what time we should look for updates, and how often
......@@ -157,9 +157,7 @@ func (metadata *ResourceMetadata) newChunk() (chunk *storage.Chunk, metaHash []b
}
// make the chunk and send it to swarm
chunk = storage.NewChunk(rootAddr, nil)
chunk.SData = chunkData
chunk.Size = int64(len(chunkData))
chunk = storage.NewChunk(rootAddr, chunkData)
return chunk, metaHash, nil
}
......
......@@ -182,7 +182,7 @@ func (r *Request) fromJSON(j *updateRequestJSON) error {
var declaredRootAddr storage.Address
var declaredMetaHash []byte
declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.KeyLength, "rootAddr")
declaredRootAddr, err = decodeHexSlice(j.RootAddr, storage.AddressLength, "rootAddr")
if err != nil {
return err
}
......
This diff is collapsed.
......@@ -96,7 +96,7 @@ func (r *SignedResourceUpdate) Sign(signer Signer) error {
}
// create an update chunk.
func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) {
func (r *SignedResourceUpdate) toChunk() (storage.Chunk, error) {
// Check that the update is signed and serialized
// For efficiency, data is serialized during signature and cached in
......@@ -105,14 +105,11 @@ func (r *SignedResourceUpdate) toChunk() (*storage.Chunk, error) {
return nil, NewError(ErrInvalidSignature, "newUpdateChunk called without a valid signature or payload data. Call .Sign() first.")
}
chunk := storage.NewChunk(r.updateAddr, nil)
resourceUpdateLength := r.resourceUpdate.binaryLength()
chunk.SData = r.binaryData
// signature is the last item in the chunk data
copy(chunk.SData[resourceUpdateLength:], r.signature[:])
copy(r.binaryData[resourceUpdateLength:], r.signature[:])
chunk.Size = int64(len(chunk.SData))
chunk := storage.NewChunk(r.updateAddr, r.binaryData)
return chunk, nil
}
......
This diff is collapsed.
......@@ -27,7 +27,7 @@ type updateHeader struct {
metaHash []byte // SHA3 hash of the metadata chunk (less ownerAddr). Used to prove ownerhsip of the resource.
}
const metaHashLength = storage.KeyLength
const metaHashLength = storage.AddressLength
// updateLookupLength bytes
// 1 byte flags (multihash bool for now)
......@@ -76,7 +76,7 @@ func (h *updateHeader) binaryGet(serializedData []byte) error {
}
cursor := updateLookupLength
h.metaHash = make([]byte, metaHashLength)
copy(h.metaHash[:storage.KeyLength], serializedData[cursor:cursor+storage.KeyLength])
copy(h.metaHash[:storage.AddressLength], serializedData[cursor:cursor+storage.AddressLength])
cursor += metaHashLength
flags := serializedData[cursor]
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment