Commit 996755c4 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by Anton Evangelatov

cmd/swarm, swarm: LocalStore storage integration

parent c94d582a
......@@ -252,15 +252,15 @@ func cmdLineOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Con
}
if storePath := ctx.GlobalString(SwarmStorePath.Name); storePath != "" {
currentConfig.LocalStoreParams.ChunkDbPath = storePath
currentConfig.ChunkDbPath = storePath
}
if storeCapacity := ctx.GlobalUint64(SwarmStoreCapacity.Name); storeCapacity != 0 {
currentConfig.LocalStoreParams.DbCapacity = storeCapacity
currentConfig.DbCapacity = storeCapacity
}
if ctx.GlobalIsSet(SwarmStoreCacheCapacity.Name) {
currentConfig.LocalStoreParams.CacheCapacity = ctx.GlobalUint(SwarmStoreCacheCapacity.Name)
currentConfig.CacheCapacity = ctx.GlobalUint(SwarmStoreCacheCapacity.Name)
}
if ctx.GlobalIsSet(SwarmBootnodeModeFlag.Name) {
......
......@@ -447,8 +447,8 @@ func TestConfigCmdLineOverridesFile(t *testing.T) {
t.Fatal("Expected Sync to be disabled, but is true")
}
if info.LocalStoreParams.DbCapacity != 9000000 {
t.Fatalf("Expected Capacity to be %d, got %d", 9000000, info.LocalStoreParams.DbCapacity)
if info.DbCapacity != 9000000 {
t.Fatalf("Expected Capacity to be %d, got %d", 9000000, info.DbCapacity)
}
if info.HiveParams.KeepAliveInterval != 6000000000 {
......
......@@ -17,6 +17,10 @@
package main
import (
"archive/tar"
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"os"
......@@ -25,10 +29,22 @@ import (
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"gopkg.in/urfave/cli.v1"
)
var legacyKeyIndex = byte(0)
var keyData = byte(6)
type dpaDBIndex struct {
Idx uint64
Access uint64
}
var dbCommand = cli.Command{
Name: "db",
CustomHelpTemplate: helpTemplate,
......@@ -67,6 +83,9 @@ The import may be quite large, consider piping the input through the Unix
pv(1) tool to get a progress bar:
pv chunks.tar | swarm db import ~/.ethereum/swarm/bzz-KEY/chunks -`,
Flags: []cli.Flag{
SwarmLegacyFlag,
},
},
},
}
......@@ -77,12 +96,6 @@ func dbExport(ctx *cli.Context) {
utils.Fatalf("invalid arguments, please specify both <chunkdb> (path to a local chunk database), <file> (path to write the tar archive to, - for stdout) and the base key")
}
store, err := openLDBStore(args[0], common.Hex2Bytes(args[2]))
if err != nil {
utils.Fatalf("error opening local chunk database: %s", err)
}
defer store.Close()
var out io.Writer
if args[1] == "-" {
out = os.Stdout
......@@ -95,6 +108,23 @@ func dbExport(ctx *cli.Context) {
out = f
}
isLegacy := localstore.IsLegacyDatabase(args[0])
if isLegacy {
count, err := exportLegacy(args[0], common.Hex2Bytes(args[2]), out)
if err != nil {
utils.Fatalf("error exporting legacy local chunk database: %s", err)
}
log.Info(fmt.Sprintf("successfully exported %d chunks from legacy db", count))
return
}
store, err := openLDBStore(args[0], common.Hex2Bytes(args[2]))
if err != nil {
utils.Fatalf("error opening local chunk database: %s", err)
}
defer store.Close()
count, err := store.Export(out)
if err != nil {
utils.Fatalf("error exporting local chunk database: %s", err)
......@@ -109,6 +139,8 @@ func dbImport(ctx *cli.Context) {
utils.Fatalf("invalid arguments, please specify both <chunkdb> (path to a local chunk database), <file> (path to read the tar archive from, - for stdin) and the base key")
}
legacy := ctx.IsSet(SwarmLegacyFlag.Name)
store, err := openLDBStore(args[0], common.Hex2Bytes(args[2]))
if err != nil {
utils.Fatalf("error opening local chunk database: %s", err)
......@@ -127,7 +159,7 @@ func dbImport(ctx *cli.Context) {
in = f
}
count, err := store.Import(in)
count, err := store.Import(in, legacy)
if err != nil {
utils.Fatalf("error importing local chunk database: %s", err)
}
......@@ -135,13 +167,73 @@ func dbImport(ctx *cli.Context) {
log.Info(fmt.Sprintf("successfully imported %d chunks", count))
}
func openLDBStore(path string, basekey []byte) (*storage.LDBStore, error) {
func openLDBStore(path string, basekey []byte) (*localstore.DB, error) {
if _, err := os.Stat(filepath.Join(path, "CURRENT")); err != nil {
return nil, fmt.Errorf("invalid chunkdb path: %s", err)
}
storeparams := storage.NewDefaultStoreParams()
ldbparams := storage.NewLDBStoreParams(storeparams, path)
ldbparams.BaseKey = basekey
return storage.NewLDBStore(ldbparams)
return localstore.New(path, basekey, nil)
}
func decodeIndex(data []byte, index *dpaDBIndex) error {
dec := rlp.NewStream(bytes.NewReader(data), 0)
return dec.Decode(index)
}
func getDataKey(idx uint64, po uint8) []byte {
key := make([]byte, 10)
key[0] = keyData
key[1] = po
binary.BigEndian.PutUint64(key[2:], idx)
return key
}
func exportLegacy(path string, basekey []byte, out io.Writer) (int64, error) {
tw := tar.NewWriter(out)
defer tw.Close()
db, err := leveldb.OpenFile(path, &opt.Options{OpenFilesCacheCapacity: 128})
if err != nil {
return 0, err
}
defer db.Close()
it := db.NewIterator(nil, nil)
defer it.Release()
var count int64
for ok := it.Seek([]byte{legacyKeyIndex}); ok; ok = it.Next() {
key := it.Key()
if (key == nil) || (key[0] != legacyKeyIndex) {
break
}
var index dpaDBIndex
hash := key[1:]
decodeIndex(it.Value(), &index)
po := uint8(chunk.Proximity(basekey, hash))
datakey := getDataKey(index.Idx, po)
data, err := db.Get(datakey, nil)
if err != nil {
log.Crit(fmt.Sprintf("Chunk %x found but could not be accessed: %v, %x", key, err, datakey))
continue
}
hdr := &tar.Header{
Name: hex.EncodeToString(hash),
Mode: 0644,
Size: int64(len(data)),
}
if err := tw.WriteHeader(hdr); err != nil {
return count, err
}
if _, err := tw.Write(data); err != nil {
return count, err
}
count++
}
return count, nil
}
......@@ -17,19 +17,34 @@
package main
import (
"archive/tar"
"bytes"
"compress/gzip"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"runtime"
"strings"
"testing"
"github.com/ethereum/go-ethereum/cmd/swarm/testdata"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
const (
DATABASE_FIXTURE_BZZ_ACCOUNT = "0aa159029fa13ffa8fa1c6fff6ebceface99d6a4"
DATABASE_FIXTURE_PASSWORD = "pass"
FIXTURE_DATADIR_PREFIX = "swarm/bzz-0aa159029fa13ffa8fa1c6fff6ebceface99d6a4"
FixtureBaseKey = "a9f22b3d77b4bdf5f3eefce995d6c8e7cecf2636f20956f08a0d1ed95adb52ad"
)
// TestCLISwarmExportImport perform the following test:
// 1. runs swarm node
// 2. uploads a random file
......@@ -99,6 +114,112 @@ func TestCLISwarmExportImport(t *testing.T) {
mustEqualFiles(t, bytes.NewReader(content), res.Body)
}
// TestExportLegacyToNew checks that an old database gets imported correctly into the new localstore structure
// The test sequence is as follows:
// 1. unpack database fixture to tmp dir
// 2. try to open with new swarm binary that should complain about old database
// 3. export from old database
// 4. remove the chunks folder
// 5. import the dump
// 6. file should be accessible
func TestExportLegacyToNew(t *testing.T) {
/*
fixture bzz account 0aa159029fa13ffa8fa1c6fff6ebceface99d6a4
*/
const UPLOADED_FILE_MD5_HASH = "a001fdae53ba50cae584b8b02b06f821"
const UPLOADED_HASH = "67a86082ee0ea1bc7dd8d955bb1e14d04f61d55ae6a4b37b3d0296a3a95e454a"
tmpdir, err := ioutil.TempDir("", "swarm-test")
log.Trace("running legacy datastore migration test", "temp dir", tmpdir)
defer os.RemoveAll(tmpdir)
if err != nil {
t.Fatal(err)
}
inflateBase64Gzip(t, testdata.DATADIR_MIGRATION_FIXTURE, tmpdir)
tmpPassword := testutil.TempFileWithContent(t, DATABASE_FIXTURE_PASSWORD)
defer os.Remove(tmpPassword)
flags := []string{
"--datadir", tmpdir,
"--bzzaccount", DATABASE_FIXTURE_BZZ_ACCOUNT,
"--password", tmpPassword,
}
newSwarmOldDb := runSwarm(t, flags...)
_, matches := newSwarmOldDb.ExpectRegexp(".+")
newSwarmOldDb.ExpectExit()
if len(matches) == 0 {
t.Fatalf("stdout not matched")
}
if newSwarmOldDb.ExitStatus() == 0 {
t.Fatal("should error")
}
t.Log("exporting legacy database")
actualDataDir := path.Join(tmpdir, FIXTURE_DATADIR_PREFIX)
exportCmd := runSwarm(t, "--verbosity", "5", "db", "export", actualDataDir+"/chunks", tmpdir+"/export.tar", FixtureBaseKey)
exportCmd.ExpectExit()
stat, err := os.Stat(tmpdir + "/export.tar")
if err != nil {
t.Fatal(err)
}
// make some silly size assumption
if stat.Size() < 90000 {
t.Fatal("export size too small")
}
t.Log("removing chunk datadir")
err = os.RemoveAll(path.Join(actualDataDir, "chunks"))
if err != nil {
t.Fatal(err)
}
// start second cluster
cluster2 := newTestCluster(t, 1)
var info2 swarm.Info
if err := cluster2.Nodes[0].Client.Call(&info2, "bzz_info"); err != nil {
t.Fatal(err)
}
// stop second cluster, so that we close LevelDB
cluster2.Stop()
defer cluster2.Cleanup()
// import the export.tar
importCmd := runSwarm(t, "db", "import", "--legacy", info2.Path+"/chunks", tmpdir+"/export.tar", strings.TrimPrefix(info2.BzzKey, "0x"))
importCmd.ExpectExit()
// spin second cluster back up
cluster2.StartExistingNodes(t, 1, strings.TrimPrefix(info2.BzzAccount, "0x"))
t.Log("trying to http get the file")
// try to fetch imported file
res, err := http.Get(cluster2.Nodes[0].URL + "/bzz:/" + UPLOADED_HASH)
if err != nil {
t.Fatal(err)
}
if res.StatusCode != 200 {
t.Fatalf("expected HTTP status %d, got %s", 200, res.Status)
}
h := md5.New()
if _, err := io.Copy(h, res.Body); err != nil {
t.Fatal(err)
}
sum := h.Sum(nil)
b, err := hex.DecodeString(UPLOADED_FILE_MD5_HASH)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(sum, b) {
t.Fatal("should be equal")
}
}
func mustEqualFiles(t *testing.T, up io.Reader, down io.Reader) {
h := md5.New()
upLen, err := io.Copy(h, up)
......@@ -117,3 +238,46 @@ func mustEqualFiles(t *testing.T, up io.Reader, down io.Reader) {
t.Fatalf("downloaded imported file md5=%x (length %v) is not the same as the generated one mp5=%x (length %v)", downHash, downLen, upHash, upLen)
}
}
func inflateBase64Gzip(t *testing.T, base64File, directory string) {
t.Helper()
f := base64.NewDecoder(base64.StdEncoding, strings.NewReader(base64File))
gzf, err := gzip.NewReader(f)
if err != nil {
t.Fatal(err)
}
tarReader := tar.NewReader(gzf)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
name := header.Name
switch header.Typeflag {
case tar.TypeDir:
err := os.Mkdir(path.Join(directory, name), os.ModePerm)
if err != nil {
t.Fatal(err)
}
case tar.TypeReg:
file, err := os.Create(path.Join(directory, name))
if err != nil {
t.Fatal(err)
}
if _, err := io.Copy(file, tarReader); err != nil {
t.Fatal(err)
}
default:
t.Fatal("shouldn't happen")
}
}
}
......@@ -182,4 +182,8 @@ var (
Usage: "URL of the Global Store API provider (only for testing)",
EnvVar: SwarmGlobalstoreAPI,
}
SwarmLegacyFlag = cli.BoolFlag{
Name: "legacy",
Usage: "Use this flag when importing a db export from a legacy local store database dump (for schemas older than 'sanctuary')",
}
)
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -45,7 +45,13 @@ const (
type Config struct {
// serialised/persisted fields
*storage.FileStoreParams
*storage.LocalStoreParams
// LocalStore
ChunkDbPath string
DbCapacity uint64
CacheCapacity uint
BaseKey []byte
*network.HiveParams
Swap *swap.LocalProfile
Pss *pss.PssParams
......@@ -78,7 +84,6 @@ type Config struct {
func NewConfig() (c *Config) {
c = &Config{
LocalStoreParams: storage.NewDefaultLocalStoreParams(),
FileStoreParams: storage.NewFileStoreParams(),
HiveParams: network.NewHiveParams(),
Swap: swap.NewDefaultSwapParams(),
......@@ -130,8 +135,9 @@ func (c *Config) Init(prvKey *ecdsa.PrivateKey, nodeKey *ecdsa.PrivateKey) error
c.Swap.Init(c.Contract, prvKey)
}
c.LocalStoreParams.Init(c.Path)
c.LocalStoreParams.BaseKey = common.FromHex(c.BzzKey)
c.privateKey = prvKey
c.ChunkDbPath = filepath.Join(c.Path, "chunks")
c.BaseKey = common.FromHex(c.BzzKey)
c.Pss = c.Pss.WithPrivateKey(c.privateKey)
return nil
......
......@@ -41,7 +41,6 @@ func TestConfig(t *testing.T) {
one := NewConfig()
two := NewConfig()
one.LocalStoreParams = two.LocalStoreParams
if equal := reflect.DeepEqual(one, two); !equal {
t.Fatal("Two default configs are not equal")
}
......
......@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/api"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
type TestServer interface {
......@@ -37,16 +38,12 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
if err != nil {
t.Fatal(err)
}
storeParams := storage.NewDefaultLocalStoreParams()
storeParams.DbCapacity = 5000000
storeParams.CacheCapacity = 5000
storeParams.Init(swarmDir)
localStore, err := storage.NewLocalStore(storeParams, nil)
localStore, err := localstore.New(dir, make([]byte, 32), nil)
if err != nil {
os.RemoveAll(swarmDir)
t.Fatal(err)
}
fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
// Swarm feeds test setup
feedsDir, err := ioutil.TempDir("", "swarm-feeds-test")
......
......@@ -60,7 +60,11 @@ func (inspector *Inspector) Has(chunkAddresses []storage.Address) []HasInfo {
for _, addr := range chunkAddresses {
res := HasInfo{}
res.Addr = addr.String()
res.Has = inspector.netStore.Has(context.Background(), addr)
has, err := inspector.netStore.Has(context.Background(), addr)
if err != nil {
has = false
}
res.Has = has
results = append(results, res)
}
return results
......
......@@ -235,7 +235,6 @@ func loadManifest(ctx context.Context, fileStore *storage.FileStore, addr storag
}
func readManifest(mr storage.LazySectionReader, addr storage.Address, fileStore *storage.FileStore, isEncrypted bool, quitC chan bool, decrypt DecryptFunc) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
// TODO check size for oversized manifests
size, err := mr.Size(mr.Context(), quitC)
if err != nil { // size == 0
......
package chunk
import (
"context"
"errors"
"fmt"
......@@ -28,7 +29,7 @@ type chunk struct {
sdata []byte
}
func NewChunk(addr Address, data []byte) *chunk {
func NewChunk(addr Address, data []byte) Chunk {
return &chunk{
addr: addr,
sdata: data,
......@@ -107,3 +108,105 @@ func Proximity(one, other []byte) (ret int) {
}
return MaxPO
}
// ModeGet enumerates different Getter modes.
type ModeGet int
// Getter modes.
const (
// ModeGetRequest: when accessed for retrieval
ModeGetRequest ModeGet = iota
// ModeGetSync: when accessed for syncing or proof of custody request
ModeGetSync
// ModeGetLookup: when accessed to lookup a a chunk in feeds or other places
ModeGetLookup
)
// ModePut enumerates different Putter modes.
type ModePut int
// Putter modes.
const (
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
ModePutRequest ModePut = iota
// ModePutSync: when a chunk is received via syncing
ModePutSync
// ModePutUpload: when a chunk is created by local upload
ModePutUpload
)
// ModeSet enumerates different Setter modes.
type ModeSet int
// Setter modes.
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
ModeSetAccess ModeSet = iota
// ModeSetSync: when push sync receipt is received
ModeSetSync
// ModeSetRemove: when a chunk is removed
ModeSetRemove
)
// Descriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type Descriptor struct {
Address Address
BinID uint64
}
func (d *Descriptor) String() string {
if d == nil {
return ""
}
return fmt.Sprintf("%s bin id %v", d.Address.Hex(), d.BinID)
}
type Store interface {
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func())
Close() (err error)
}
// FetchStore is a Store which supports syncing
type FetchStore interface {
Store
FetchFunc(ctx context.Context, addr Address) func(context.Context) error
}
// Validator validates a chunk.
type Validator interface {
Validate(ch Chunk) bool
}
// ValidatorStore encapsulates Store by decorting the Put method
// with validators check.
type ValidatorStore struct {
Store
validators []Validator
}
// NewValidatorStore returns a new ValidatorStore which uses
// provided validators to validate chunks on Put.
func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore) {
return &ValidatorStore{
Store: store,
validators: validators,
}
}
// Put overrides Store put method with validators check. If one of the validators
// return true, the chunk is considered valid and Store Put method is called.
// If all validators return false, ErrChunkInvalid is returned.
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) {
for _, v := range s.validators {
if v.Validate(ch) {
return s.Store.Put(ctx, mode, ch)
}
}
return false, ErrChunkInvalid
}
......@@ -30,16 +30,19 @@ import (
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
......@@ -51,7 +54,6 @@ var (
useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)")
longrunning = flag.Bool("longrunning", false, "do run long-running tests")
bucketKeyDB = simulation.BucketKey("db")
bucketKeyStore = simulation.BucketKey("store")
bucketKeyFileStore = simulation.BucketKey("filestore")
bucketKeyNetStore = simulation.BucketKey("netstore")
......@@ -113,16 +115,15 @@ func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket
func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) {
n := ctx.Config.Node()
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
if *useMockStore {
store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr)
}
localStore, localStoreCleanup, err := newTestLocalStore(n.ID(), addr, nil)
if err != nil {
return nil, nil, nil, err
}
localStore := store.(*storage.LocalStore)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
localStore.Close()
localStoreCleanup()
return nil, nil, nil, err
}
......@@ -131,8 +132,7 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map,
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
bucket.Store(bucketKeyStore, store)
bucket.Store(bucketKeyDB, netStore)
bucket.Store(bucketKeyStore, localStore)
bucket.Store(bucketKeyDelivery, delivery)
bucket.Store(bucketKeyFileStore, fileStore)
// for the kademlia object, we use the global key from the simulation package,
......@@ -141,13 +141,13 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map,
cleanup := func() {
netStore.Close()
os.RemoveAll(datadir)
localStoreCleanup()
}
return netStore, delivery, cleanup, nil
}
func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *localstore.DB, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
......@@ -161,11 +161,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
os.RemoveAll(datadir)
}
params := storage.NewDefaultLocalStoreParams()
params.Init(datadir)
params.BaseKey = addr.Over()
localStore, err := storage.NewTestLocalStoreForAddr(params)
localStore, err := localstore.New(datadir, addr.Over(), nil)
if err != nil {
removeDataDir()
return nil, nil, nil, nil, err
......@@ -173,15 +169,19 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
localStore.Close()
removeDataDir()
return nil, nil, nil, nil, err
}
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions, nil)
intervalsStore := state.NewInmemoryStore()
streamer := NewRegistry(addr.ID(), delivery, netStore, intervalsStore, registryOptions, nil)
teardown := func() {
streamer.Close()
intervalsStore.Close()
netStore.Close()
removeDataDir()
}
prvkey, err := crypto.GenerateKey()
......@@ -228,24 +228,37 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
// not used in this context, only to fulfill ChunkStore interface
func (rrs *roundRobinStore) Has(ctx context.Context, addr storage.Address) bool {
panic("RoundRobinStor doesn't support HasChunk")
func (rrs *roundRobinStore) Has(_ context.Context, _ storage.Address) (bool, error) {
return false, errors.New("roundRobinStore doesn't support Has")
}
func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Address) (storage.Chunk, error) {
return nil, errors.New("roundRobinStore doesn't support Get")
}
func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error {
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
return rrs.stores[idx].Put(ctx, chunk)
return rrs.stores[idx].Put(ctx, mode, ch)
}
func (rrs *roundRobinStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
return errors.New("roundRobinStore doesn't support Set")
}
func (rrs *roundRobinStore) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
return 0, errors.New("roundRobinStore doesn't support LastPullSubscriptionBinID")
}
func (rrs *roundRobinStore) Close() {
func (rrs *roundRobinStore) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
return nil, nil
}
func (rrs *roundRobinStore) Close() error {
for _, store := range rrs.stores {
store.Close()
}
return nil
}
func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) {
......@@ -311,24 +324,28 @@ func generateRandomFile() (string, error) {
return string(b), nil
}
//create a local store for the given node
func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.ChunkStore, string, error) {
var datadir string
var err error
datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString()))
func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) {
dir, err := ioutil.TempDir("", "swarm-stream-")
if err != nil {
return nil, "", err
return nil, nil, err
}
cleanup = func() {
os.RemoveAll(dir)
}
var store storage.ChunkStore
params := storage.NewDefaultLocalStoreParams()
params.ChunkDbPath = datadir
params.BaseKey = addr.Over()
store, err = storage.NewTestLocalStoreForAddr(params)
var mockStore *mock.NodeStore
if globalStore != nil {
mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes()))
}
localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{
MockStore: mockStore,
})
if err != nil {
os.RemoveAll(datadir)
return nil, "", err
cleanup()
return nil, nil, err
}
return store, datadir, nil
return localStore, cleanup, nil
}
// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
......
......@@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
......@@ -47,12 +48,12 @@ var (
)
type Delivery struct {
chunkStore storage.SyncChunkStore
chunkStore chunk.FetchStore
kad *network.Kademlia
getPeer func(enode.ID) *Peer
}
func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery {
return &Delivery{
chunkStore: chunkStore,
kad: kad,
......@@ -122,13 +123,13 @@ func (s *SwarmChunkServer) Close() {
close(s.quit)
}
// GetData retrives chunk data from db store
// GetData retrieves chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
ch, err := s.chunkStore.Get(ctx, chunk.ModeGetRequest, storage.Address(key))
if err != nil {
return nil, err
}
return chunk.Data(), nil
return ch.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
......@@ -171,7 +172,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
go func() {
defer osp.Finish()
chunk, err := d.chunkStore.Get(ctx, req.Addr)
ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
......@@ -181,7 +182,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
syncing := false
osp.LogFields(olog.Bool("skipCheck", true))
err = sp.Deliver(ctx, chunk, s.priority, syncing)
err = sp.Deliver(ctx, ch, s.priority, syncing)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
......@@ -190,7 +191,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
}
osp.LogFields(olog.Bool("skipCheck", false))
select {
case streamer.deliveryC <- chunk.Address()[:]:
case streamer.deliveryC <- ch.Address()[:]:
case <-streamer.quit:
}
......@@ -216,7 +217,7 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
// chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req interface{}) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
......@@ -224,11 +225,32 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
processReceivedChunksCount.Inc(1)
var msg *ChunkDeliveryMsg
var mode chunk.ModePut
switch r := req.(type) {
case *ChunkDeliveryMsgRetrieval:
msg = (*ChunkDeliveryMsg)(r)
peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr)
po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr)
depth := d.kad.NeighbourhoodDepth()
// chunks within the area of responsibility should always sync
// https://github.com/ethersphere/go-ethereum/pull/1282#discussion_r269406125
if po >= depth || peerPO < po {
mode = chunk.ModePutSync
} else {
// do not sync if peer that is sending us a chunk is closer to the chunk then we are
mode = chunk.ModePutRequest
}
case *ChunkDeliveryMsgSyncing:
msg = (*ChunkDeliveryMsg)(r)
mode = chunk.ModePutSync
}
// retrieve the span for the originating retrieverequest
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
span := tracing.ShiftSpanByKey(spanId)
spanID := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), msg.Addr)
span := tracing.ShiftSpanByKey(spanID)
log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID())
log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID())
go func() {
defer osp.Finish()
......@@ -238,18 +260,18 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
defer span.Finish()
}
req.peer = sp
log.Trace("handle.chunk.delivery", "put", req.Addr)
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
_, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
// TODO: Enable this log line
// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
req.peer.Drop(err)
// log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, )
msg.peer.Drop(err)
}
}
log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err)
log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err)
}()
return nil
}
......
......@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
......@@ -189,8 +190,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
})
hash := storage.Address(hash0[:])
chunk := storage.NewChunk(hash, hash)
err = localStore.Put(context.TODO(), chunk)
ch := storage.NewChunk(hash, hash)
_, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
......@@ -241,8 +242,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
hash = storage.Address(hash1[:])
chunk = storage.NewChunk(hash, hash1[:])
err = localStore.Put(context.TODO(), chunk)
ch = storage.NewChunk(hash, hash1[:])
_, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
......@@ -420,14 +421,14 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
defer cancel()
// wait for the chunk to get stored
storedChunk, err := localStore.Get(ctx, chunkKey)
storedChunk, err := localStore.Get(ctx, chunk.ModeGetRequest, chunkKey)
for err != nil {
select {
case <-ctx.Done():
t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
default:
}
storedChunk, err = localStore.Get(ctx, chunkKey)
storedChunk, err = localStore.Get(ctx, chunk.ModeGetRequest, chunkKey)
time.Sleep(50 * time.Millisecond)
}
......@@ -700,7 +701,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
errs := make(chan error)
for _, hash := range hashes {
go func(h storage.Address) {
_, err := netStore.Get(ctx, h)
_, err := netStore.Get(ctx, chunk.ModeGetRequest, h)
log.Warn("test check netstore get", "hash", h, "err", err)
errs <- err
}(hash)
......
......@@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
......@@ -287,11 +288,11 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
store storage.SyncChunkStore
store chunk.FetchStore
enableNotificationsC chan struct{}
}
func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
func newTestExternalClient(store chunk.FetchStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
store: store,
......
......@@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
......@@ -278,8 +279,8 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
if !ok {
return fmt.Errorf("No localstore")
}
lstore := item.(*storage.LocalStore)
conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
store := item.(chunk.Store)
conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, store)
if err != nil {
return err
}
......
......@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/pot"
......@@ -190,10 +191,10 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
node := sim.Net.GetRandomUpNode()
item, ok := sim.NodeItem(node.ID(), bucketKeyStore)
if !ok {
return fmt.Errorf("No localstore")
return errors.New("no store in simulation bucket")
}
lstore := item.(*storage.LocalStore)
hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
store := item.(chunk.Store)
hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, store)
if err != nil {
return err
}
......@@ -221,25 +222,25 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
localChunks := conf.idToChunksMap[id]
for _, ch := range localChunks {
//get the real chunk by the index in the index array
chunk := conf.hashes[ch]
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
ch := conf.hashes[ch]
log.Trace("node has chunk", "address", ch)
//check if the expected chunk is indeed in the localstore
var err error
if *useMockStore {
//use the globalStore if the mockStore should be used; in that case,
//the complete localStore stack is bypassed for getting the chunk
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), ch)
} else {
//use the actual localstore
item, ok := sim.NodeItem(id, bucketKeyStore)
if !ok {
return fmt.Errorf("Error accessing localstore")
return errors.New("no store in simulation bucket")
}
lstore := item.(*storage.LocalStore)
_, err = lstore.Get(ctx, chunk)
store := item.(chunk.Store)
_, err = store.Get(ctx, chunk.ModeGetLookup, ch)
}
if err != nil {
log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
log.Debug("chunk not found", "address", ch.Hex(), "node", id)
// Do not get crazy with logging the warn message
time.Sleep(500 * time.Millisecond)
continue REPEAT
......@@ -247,10 +248,10 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
evt := &simulations.Event{
Type: EventTypeChunkArrived,
Node: sim.Net.GetNode(id),
Data: chunk.String(),
Data: ch.String(),
}
sim.Net.Events().Send(evt)
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
log.Trace("chunk found", "address", ch.Hex(), "node", id)
}
}
return nil
......@@ -296,9 +297,9 @@ func mapKeysToNodes(conf *synctestConfig) {
}
//upload a file(chunks) to a single local node store
func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) {
func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, store chunk.Store) ([]storage.Address, error) {
log.Debug(fmt.Sprintf("Uploading to node id: %s", id))
fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams())
fileStore := storage.NewFileStore(store, storage.NewFileStoreParams())
size := chunkSize
var rootAddrs []storage.Address
for i := 0; i < chunkCount; i++ {
......
......@@ -30,11 +30,11 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
const (
......@@ -108,7 +108,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.FetchStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
if options == nil {
options = &RegistryOptions{}
}
......@@ -627,13 +627,8 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
case *WantedHashesMsg:
return p.handleWantedHashesMsg(ctx, msg)
case *ChunkDeliveryMsgRetrieval:
// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *ChunkDeliveryMsgSyncing:
// handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *ChunkDeliveryMsgRetrieval, *ChunkDeliveryMsgSyncing:
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg)
case *RetrieveRequestMsg:
return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg)
......
......@@ -21,8 +21,7 @@ import (
"strconv"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
)
......@@ -36,12 +35,12 @@ const (
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
store storage.SyncChunkStore
store chunk.FetchStore
quit chan struct{}
}
// NewSwarmSyncerServer is constructor for SwarmSyncerServer
func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) {
return &SwarmSyncerServer{
po: po,
store: syncChunkStore,
......@@ -49,7 +48,7 @@ func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*Swa
}, nil
}
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) {
streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
......@@ -69,76 +68,103 @@ func (s *SwarmSyncerServer) Close() {
// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
chunk, err := s.store.Get(ctx, storage.Address(key))
ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
return chunk.Data(), nil
return ch.Data(), nil
}
// SessionIndex returns current storage bin (po) index.
func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
return s.store.BinIndex(s.po), nil
return s.store.LastPullSubscriptionBinID(s.po)
}
// GetBatch retrieves the next batch of hashes from the dbstore
// SetNextBatch retrieves the next batch of hashes from the localstore.
// It expects a range of bin IDs, both ends inclusive in syncing, and returns
// concatenated byte slice of chunk addresses and bin IDs of the first and
// the last one in that slice. The batch may have up to BatchSize number of
// chunk addresses. If at least one chunk is added to the batch and no new chunks
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
var ticker *time.Ticker
descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to)
defer stop()
const batchTimeout = 2 * time.Second
var (
batch []byte
batchSize int
batchStartID *uint64
batchEndID uint64
timer *time.Timer
timerC <-chan time.Time
)
defer func() {
if ticker != nil {
ticker.Stop()
if timer != nil {
timer.Stop()
}
}()
var wait bool
for {
if wait {
if ticker == nil {
ticker = time.NewTicker(1000 * time.Millisecond)
for iterate := true; iterate; {
select {
case d, ok := <-descriptors:
if !ok {
iterate = false
break
}
select {
case <-ticker.C:
case <-s.quit:
return nil, 0, 0, nil, nil
batch = append(batch, d.Address[:]...)
// This is the most naive approach to label the chunk as synced
// allowing it to be garbage collected. A proper way requires
// validating that the chunk is successfully stored by the peer.
err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address)
if err != nil {
return nil, 0, 0, nil, err
}
}
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
select {
case <-s.quit:
return false
default:
batchSize++
if batchStartID == nil {
// set batch start id only if
// this is the first iteration
batchStartID = &d.BinID
}
batch = append(batch, key[:]...)
i++
to = idx
return i < BatchSize
})
if err != nil {
return nil, 0, 0, nil, err
}
if len(batch) > 0 {
break
batchEndID = d.BinID
if batchSize >= BatchSize {
iterate = false
}
if timer == nil {
timer = time.NewTimer(batchTimeout)
} else {
if !timer.Stop() {
<-timer.C
}
timer.Reset(batchTimeout)
}
timerC = timer.C
case <-timerC:
// return batch if new chunks are not
// received after some time
iterate = false
case <-s.quit:
iterate = false
}
wait = true
}
log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
return batch, from, to, nil, nil
if batchStartID == nil {
// if batch start id is not set, return 0
batchStartID = new(uint64)
}
return batch, *batchStartID, batchEndID, nil, nil
}
// SwarmSyncerClient
type SwarmSyncerClient struct {
store storage.SyncChunkStore
store chunk.FetchStore
peer *Peer
stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
store: store,
peer: p,
......@@ -184,7 +210,7 @@ func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream)
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
......
......@@ -21,22 +21,20 @@ import (
"errors"
"fmt"
"io/ioutil"
"math"
"os"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
......@@ -55,24 +53,6 @@ func TestSyncerSimulation(t *testing.T) {
}
}
func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) {
address := common.BytesToAddress(id.Bytes())
mockStore := globalStore.NewNodeStore(address)
params := storage.NewDefaultLocalStoreParams()
datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString())
if err != nil {
return nil, "", err
}
params.Init(datadir)
params.BaseKey = addr.Over()
lstore, err = storage.NewLocalStore(params, mockStore)
if err != nil {
return nil, "", err
}
return lstore, datadir, nil
}
func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, po uint8) {
sim := simulation.New(map[string]simulation.ServiceFunc{
......@@ -181,17 +161,32 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
if i < nodes-1 {
hashCounts[i] = hashCounts[i+1]
}
item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB)
item, ok := sim.NodeItem(nodeIDs[i], bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
netStore := item.(*storage.NetStore)
netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
return true
})
store := item.(chunk.Store)
until, err := store.LastPullSubscriptionBinID(po)
if err != nil {
return err
}
if until > 0 {
c, _ := store.SubscribePull(ctx, po, 0, until)
for iterate := true; iterate; {
select {
case cd, ok := <-c:
if !ok {
iterate = false
break
}
hashes[i] = append(hashes[i], cd.Address)
totalHashes++
hashCounts[i]++
case <-ctx.Done():
return ctx.Err()
}
}
}
}
var total, found int
for _, node := range nodeIDs {
......@@ -200,12 +195,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
for j := i; j < nodes; j++ {
total += len(hashes[j])
for _, key := range hashes[j] {
item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB)
item, ok := sim.NodeItem(nodeIDs[j], bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
db := item.(*storage.NetStore)
_, err := db.Get(ctx, key)
db := item.(chunk.Store)
_, err := db.Get(ctx, chunk.ModeGetRequest, key)
if err == nil {
found++
}
......@@ -216,7 +211,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
if total == found && total > 0 {
return nil
}
return fmt.Errorf("Total not equallying found: total is %d", total)
return fmt.Errorf("Total not equallying found %v: total is %d", found, total)
})
if result.Error != nil {
......
......@@ -40,9 +40,7 @@ type Item struct {
Data []byte
AccessTimestamp int64
StoreTimestamp int64
// UseMockStore is a pointer to identify
// an unset state of the field in Join function.
UseMockStore *bool
BinID uint64
}
// Merge is a helper method to construct a new
......@@ -61,8 +59,8 @@ func (i Item) Merge(i2 Item) (new Item) {
if i.StoreTimestamp == 0 {
i.StoreTimestamp = i2.StoreTimestamp
}
if i.UseMockStore == nil {
i.UseMockStore = i2.UseMockStore
if i.BinID == 0 {
i.BinID = i2.BinID
}
return i
}
......
......@@ -52,7 +52,7 @@ type indexSpec struct {
Name string `json:"name"`
}
// schemaFieldKey retrives the complete LevelDB key for
// schemaFieldKey retrieves the complete LevelDB key for
// a particular field form the schema definition.
func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) {
if name == "" {
......
......@@ -22,8 +22,6 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"testing"
"time"
......@@ -59,30 +57,6 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader
}
}
func newLDBStore(t *testing.T) (*LDBStore, func()) {
dir, err := ioutil.TempDir("", "bzz-storage-test")
if err != nil {
t.Fatal(err)
}
log.Trace("memstore.tempdir", "dir", dir)
ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir)
db, err := NewLDBStore(ldbparams)
if err != nil {
t.Fatal(err)
}
cleanup := func() {
db.Close()
err := os.RemoveAll(dir)
if err != nil {
t.Fatal(err)
}
}
return db, cleanup
}
func mputRandomChunks(store ChunkStore, n int) ([]Chunk, error) {
return mput(store, n, GenerateRandomChunk)
}
......@@ -94,14 +68,15 @@ func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
for i := int64(0); i < int64(n); i++ {
chunk := f(chunk.DefaultSize)
ch := f(chunk.DefaultSize)
go func() {
_, err := store.Put(ctx, chunk.ModePutUpload, ch)
select {
case errc <- store.Put(ctx, chunk):
case errc <- err:
case <-ctx.Done():
}
}()
hs = append(hs, chunk)
hs = append(hs, ch)
}
// wait for all chunks to be stored
......@@ -123,13 +98,13 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error)
go func(h Address) {
defer wg.Done()
// TODO: write timeout with context
chunk, err := store.Get(context.TODO(), h)
ch, err := store.Get(context.TODO(), chunk.ModeGetRequest, h)
if err != nil {
errc <- err
return
}
if f != nil {
err = f(h, chunk)
err = f(h, ch)
if err != nil {
errc <- err
return
......@@ -250,14 +225,15 @@ func NewMapChunkStore() *MapChunkStore {
}
}
func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error {
func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
_, exists := m.chunks[ch.Address().Hex()]
m.chunks[ch.Address().Hex()] = ch
return nil
return exists, nil
}
func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
m.mu.RLock()
defer m.mu.RUnlock()
chunk := m.chunks[ref.Hex()]
......@@ -268,15 +244,28 @@ func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
}
// Need to implement Has from SyncChunkStore
func (m *MapChunkStore) Has(ctx context.Context, ref Address) bool {
func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err error) {
m.mu.RLock()
defer m.mu.RUnlock()
_, has := m.chunks[ref.Hex()]
return has
_, has = m.chunks[ref.Hex()]
return has, nil
}
func (m *MapChunkStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
return nil
}
func (m *MapChunkStore) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
return 0, nil
}
func (m *MapChunkStore) Close() {
func (m *MapChunkStore) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
return nil, nil
}
func (m *MapChunkStore) Close() error {
return nil
}
func chunkAddresses(chunks []Chunk) []Address {
......
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package storage
// this is a clone of an earlier state of the ethereum ethdb/database
// no need for queueing/caching
import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
)
const openFileLimit = 128
type LDBDatabase struct {
db *leveldb.DB
}
func NewLDBDatabase(file string) (*LDBDatabase, error) {
// Open the db
db, err := leveldb.OpenFile(file, &opt.Options{OpenFilesCacheCapacity: openFileLimit})
if err != nil {
return nil, err
}
database := &LDBDatabase{db: db}
return database, nil
}
func (db *LDBDatabase) Put(key []byte, value []byte) error {
metrics.GetOrRegisterCounter("ldbdatabase.put", nil).Inc(1)
return db.db.Put(key, value, nil)
}
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
metrics.GetOrRegisterCounter("ldbdatabase.get", nil).Inc(1)
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
}
return dat, nil
}
func (db *LDBDatabase) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
func (db *LDBDatabase) NewIterator() iterator.Iterator {
metrics.GetOrRegisterCounter("ldbdatabase.newiterator", nil).Inc(1)
return db.db.NewIterator(nil, nil)
}
func (db *LDBDatabase) Write(batch *leveldb.Batch) error {
metrics.GetOrRegisterCounter("ldbdatabase.write", nil).Inc(1)
return db.db.Write(batch, nil)
}
func (db *LDBDatabase) Close() {
// Close the leveldb database
db.db.Close()
}
......@@ -24,6 +24,8 @@ import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
"github.com/ethereum/go-ethereum/swarm/log"
......@@ -189,7 +191,7 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
defer cancel()
chunk, err := h.chunkStore.Get(ctx, id.Addr())
ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, id.Addr())
if err != nil {
if err == context.DeadlineExceeded { // chunk not found
return nil, nil
......@@ -198,7 +200,7 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
}
var request Request
if err := request.fromChunk(chunk); err != nil {
if err := request.fromChunk(ch); err != nil {
return nil, nil
}
if request.Time <= timeLimit {
......@@ -257,14 +259,14 @@ func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Ad
return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
}
chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
ch, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big
if err != nil {
return nil, err
}
// send the chunk
h.chunkStore.Put(ctx, chunk)
log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
h.chunkStore.Put(ctx, chunk.ModePutUpload, ch)
log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", ch.Data())
// update our feed updates map cache entry if the new update is older than the one we have, if we have it.
if feedUpdate != nil && r.Epoch.After(feedUpdate.Epoch) {
feedUpdate.Epoch = r.Epoch
......
......@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
var (
......@@ -400,9 +401,7 @@ func TestValidatorInStore(t *testing.T) {
}
defer os.RemoveAll(datadir)
handlerParams := storage.NewDefaultLocalStoreParams()
handlerParams.Init(datadir)
store, err := storage.NewLocalStore(handlerParams, nil)
localstore, err := localstore.New(datadir, make([]byte, 32), nil)
if err != nil {
t.Fatal(err)
}
......@@ -410,7 +409,7 @@ func TestValidatorInStore(t *testing.T) {
// set up Swarm feeds handler and add is as a validator to the localstore
fhParams := &HandlerParams{}
fh := NewHandler(fhParams)
store.Validators = append(store.Validators, fh)
store := chunk.NewValidatorStore(localstore, fh)
// create content addressed chunks, one good, one faulty
chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2)
......@@ -447,15 +446,15 @@ func TestValidatorInStore(t *testing.T) {
}
// put the chunks in the store and check their error status
err = store.Put(context.Background(), goodChunk)
_, err = store.Put(context.Background(), chunk.ModePutUpload, goodChunk)
if err == nil {
t.Fatal("expected error on good content address chunk with feed update validator only, but got nil")
}
err = store.Put(context.Background(), badChunk)
_, err = store.Put(context.Background(), chunk.ModePutUpload, badChunk)
if err == nil {
t.Fatal("expected error on bad content address chunk with feed update validator only, but got nil")
}
err = store.Put(context.Background(), uglyChunk)
_, err = store.Put(context.Background(), chunk.ModePutUpload, uglyChunk)
if err != nil {
t.Fatalf("expected no error on feed update chunk with feed update validator only, but got: %s", err)
}
......
......@@ -18,12 +18,13 @@ package feed
import (
"context"
"fmt"
"path/filepath"
"sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
const (
......@@ -53,14 +54,14 @@ func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetF
func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) {
path := filepath.Join(datadir, testDbDirName)
fh := NewHandler(params)
localstoreparams := storage.NewDefaultLocalStoreParams()
localstoreparams.Init(path)
localStore, err := storage.NewLocalStore(localstoreparams, nil)
db, err := localstore.New(filepath.Join(path, "chunks"), make([]byte, 32), nil)
if err != nil {
return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err)
return nil, err
}
localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)))
localStore.Validators = append(localStore.Validators, fh)
localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
return nil, err
......
......@@ -21,6 +21,9 @@ import (
"io"
"sort"
"sync"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
)
/*
......@@ -58,14 +61,11 @@ func NewFileStoreParams() *FileStoreParams {
// for testing locally
func NewLocalFileStore(datadir string, basekey []byte) (*FileStore, error) {
params := NewDefaultLocalStoreParams()
params.Init(datadir)
localStore, err := NewLocalStore(params, nil)
localStore, err := localstore.New(datadir, basekey, nil)
if err != nil {
return nil, err
}
localStore.Validators = append(localStore.Validators, NewContentAddressValidator(MakeHashFunc(DefaultHash)))
return NewFileStore(localStore, NewFileStoreParams()), nil
return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams()), nil
}
func NewFileStore(store ChunkStore, params *FileStoreParams) *FileStore {
......
......@@ -22,8 +22,10 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
"github.com/ethereum/go-ethereum/swarm/testutil"
)
......@@ -35,21 +37,18 @@ func TestFileStorerandom(t *testing.T) {
}
func testFileStoreRandom(toEncrypt bool, t *testing.T) {
tdb, cleanup, err := newTestDbStore(false, false)
defer cleanup()
dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
t.Fatalf("init dbStore failed: %v", err)
t.Fatal(err)
}
db := tdb.LDBStore
db.setCapacity(50000)
memStore := NewMemStore(NewDefaultStoreParams(), db)
localStore := &LocalStore{
memStore: memStore,
DbStore: db,
defer os.RemoveAll(dir)
localStore, err := localstore.New(dir, make([]byte, 32), nil)
if err != nil {
t.Fatal(err)
}
defer localStore.Close()
fileStore := NewFileStore(localStore, NewFileStoreParams())
defer os.RemoveAll("/tmp/bzz")
slice := testutil.RandomBytes(1, testDataSize)
ctx := context.TODO()
......@@ -76,9 +75,8 @@ func testFileStoreRandom(toEncrypt bool, t *testing.T) {
if !bytes.Equal(slice, resultSlice) {
t.Fatalf("Comparison error.")
}
ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666)
ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666)
localStore.memStore = NewMemStore(NewDefaultStoreParams(), db)
ioutil.WriteFile(filepath.Join(dir, "slice.bzz.16M"), slice, 0666)
ioutil.WriteFile(filepath.Join(dir, "result.bzz.16M"), resultSlice, 0666)
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
......@@ -104,17 +102,17 @@ func TestFileStoreCapacity(t *testing.T) {
}
func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
tdb, cleanup, err := newTestDbStore(false, false)
defer cleanup()
dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
t.Fatalf("init dbStore failed: %v", err)
t.Fatal(err)
}
db := tdb.LDBStore
memStore := NewMemStore(NewDefaultStoreParams(), db)
localStore := &LocalStore{
memStore: memStore,
DbStore: db,
defer os.RemoveAll(dir)
localStore, err := localstore.New(dir, make([]byte, 32), nil)
if err != nil {
t.Fatal(err)
}
defer localStore.Close()
fileStore := NewFileStore(localStore, NewFileStoreParams())
slice := testutil.RandomBytes(1, testDataSize)
ctx := context.TODO()
......@@ -141,10 +139,6 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
if !bytes.Equal(slice, resultSlice) {
t.Fatalf("Comparison error.")
}
// Clear memStore
memStore.setCapacity(0)
// check whether it is, indeed, empty
fileStore.ChunkStore = memStore
resultReader, isEncrypted = fileStore.Retrieve(context.TODO(), key)
if isEncrypted != toEncrypt {
t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted)
......@@ -177,17 +171,17 @@ func testFileStoreCapacity(toEncrypt bool, t *testing.T) {
// TestGetAllReferences only tests that GetAllReferences returns an expected
// number of references for a given file
func TestGetAllReferences(t *testing.T) {
tdb, cleanup, err := newTestDbStore(false, false)
defer cleanup()
dir, err := ioutil.TempDir("", "swarm-storage-")
if err != nil {
t.Fatalf("init dbStore failed: %v", err)
t.Fatal(err)
}
db := tdb.LDBStore
memStore := NewMemStore(NewDefaultStoreParams(), db)
localStore := &LocalStore{
memStore: memStore,
DbStore: db,
defer os.RemoveAll(dir)
localStore, err := localstore.New(dir, make([]byte, 32), nil)
if err != nil {
t.Fatal(err)
}
defer localStore.Close()
fileStore := NewFileStore(localStore, NewFileStoreParams())
// testRuns[i] and expectedLen[i] are dataSize and expected length respectively
......
......@@ -93,7 +93,7 @@ func (h *hasherStore) Get(ctx context.Context, ref Reference) (ChunkData, error)
return nil, err
}
chunk, err := h.store.Get(ctx, addr)
chunk, err := h.store.Get(ctx, chunk.ModeGetRequest, addr)
if err != nil {
return nil, err
}
......@@ -239,11 +239,12 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio
return encryption.New(key, int(chunk.DefaultSize), 0, sha3.NewLegacyKeccak256)
}
func (h *hasherStore) storeChunk(ctx context.Context, chunk Chunk) {
func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
atomic.AddUint64(&h.nrChunks, 1)
go func() {
_, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
select {
case h.errC <- h.store.Put(ctx, chunk):
case h.errC <- err:
case <-h.quitC:
}
}()
......
......@@ -21,9 +21,9 @@ import (
"context"
"testing"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage/encryption"
)
func TestHasherStore(t *testing.T) {
......@@ -107,7 +107,7 @@ func TestHasherStore(t *testing.T) {
}
// Check if chunk data in store is encrypted or not
chunkInStore, err := chunkStore.Get(ctx, hash1)
chunkInStore, err := chunkStore.Get(ctx, chunk.ModeGetRequest, hash1)
if err != nil {
t.Fatalf("Expected no error got \"%v\"", err)
}
......
This diff is collapsed.
This diff is collapsed.
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"context"
"path/filepath"
"sync"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
)
type LocalStoreParams struct {
*StoreParams
ChunkDbPath string
Validators []ChunkValidator `toml:"-"`
}
func NewDefaultLocalStoreParams() *LocalStoreParams {
return &LocalStoreParams{
StoreParams: NewDefaultStoreParams(),
}
}
//this can only finally be set after all config options (file, cmd line, env vars)
//have been evaluated
func (p *LocalStoreParams) Init(path string) {
if p.ChunkDbPath == "" {
p.ChunkDbPath = filepath.Join(path, "chunks")
}
}
// LocalStore is a combination of inmemory db over a disk persisted db
// implements a Get/Put with fallback (caching) logic using any 2 ChunkStores
type LocalStore struct {
Validators []ChunkValidator
memStore *MemStore
DbStore *LDBStore
mu sync.Mutex
}
// This constructor uses MemStore and DbStore as components
func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalStore, error) {
ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath)
dbStore, err := NewMockDbStore(ldbparams, mockStore)
if err != nil {
return nil, err
}
return &LocalStore{
memStore: NewMemStore(params.StoreParams, dbStore),
DbStore: dbStore,
Validators: params.Validators,
}, nil
}
func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) {
ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath)
dbStore, err := NewLDBStore(ldbparams)
if err != nil {
return nil, err
}
localStore := &LocalStore{
memStore: NewMemStore(params.StoreParams, dbStore),
DbStore: dbStore,
Validators: params.Validators,
}
return localStore, nil
}
// isValid returns true if chunk passes any of the LocalStore Validators.
// isValid also returns true if LocalStore has no Validators.
func (ls *LocalStore) isValid(chunk Chunk) bool {
// by default chunks are valid. if we have 0 validators, then all chunks are valid.
valid := true
// ls.Validators contains a list of one validator per chunk type.
// if one validator succeeds, then the chunk is valid
for _, v := range ls.Validators {
if valid = v.Validate(chunk); valid {
break
}
}
return valid
}
// Put is responsible for doing validation and storage of the chunk
// by using configured ChunkValidators, MemStore and LDBStore.
// If the chunk is not valid, its GetErrored function will
// return ErrChunkInvalid.
// This method will check if the chunk is already in the MemStore
// and it will return it if it is. If there is an error from
// the MemStore.Get, it will be returned by calling GetErrored
// on the chunk.
// This method is responsible for closing Chunk.ReqC channel
// when the chunk is stored in memstore.
// After the LDBStore.Put, it is ensured that the MemStore
// contains the chunk with the same data, but nil ReqC channel.
func (ls *LocalStore) Put(ctx context.Context, chunk Chunk) error {
if !ls.isValid(chunk) {
return ErrChunkInvalid
}
log.Trace("localstore.put", "key", chunk.Address())
ls.mu.Lock()
defer ls.mu.Unlock()
_, err := ls.memStore.Get(ctx, chunk.Address())
if err == nil {
return nil
}
if err != nil && err != ErrChunkNotFound {
return err
}
ls.memStore.Put(ctx, chunk)
err = ls.DbStore.Put(ctx, chunk)
return err
}
// Has queries the underlying DbStore if a chunk with the given address
// is being stored there.
// Returns true if it is stored, false if not
func (ls *LocalStore) Has(ctx context.Context, addr Address) bool {
return ls.DbStore.Has(ctx, addr)
}
// Get(chunk *Chunk) looks up a chunk in the local stores
// This method is blocking until the chunk is retrieved
// so additional timeout may be needed to wrap this call if
// ChunkStores are remote and can have long latency
func (ls *LocalStore) Get(ctx context.Context, addr Address) (chunk Chunk, err error) {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.get(ctx, addr)
}
func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err error) {
chunk, err = ls.memStore.Get(ctx, addr)
if err != nil && err != ErrChunkNotFound {
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
return nil, err
}
if err == nil {
metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
go ls.DbStore.MarkAccessed(addr)
return chunk, nil
}
metrics.GetOrRegisterCounter("localstore.get.cachemiss", nil).Inc(1)
chunk, err = ls.DbStore.Get(ctx, addr)
if err != nil {
metrics.GetOrRegisterCounter("localstore.get.error", nil).Inc(1)
return nil, err
}
ls.memStore.Put(ctx, chunk)
return chunk, nil
}
func (ls *LocalStore) FetchFunc(ctx context.Context, addr Address) func(context.Context) error {
ls.mu.Lock()
defer ls.mu.Unlock()
_, err := ls.get(ctx, addr)
if err == nil {
return nil
}
return func(context.Context) error {
return err
}
}
func (ls *LocalStore) BinIndex(po uint8) uint64 {
return ls.DbStore.BinIndex(po)
}
func (ls *LocalStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
return ls.DbStore.SyncIterator(from, to, po, f)
}
// Close the local store
func (ls *LocalStore) Close() {
ls.DbStore.Close()
}
// Migrate checks the datastore schema vs the runtime schema and runs
// migrations if they don't match
func (ls *LocalStore) Migrate() error {
actualDbSchema, err := ls.DbStore.GetSchema()
if err != nil {
log.Error(err.Error())
return err
}
if actualDbSchema == CurrentDbSchema {
return nil
}
log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema)
if actualDbSchema == DbSchemaNone {
ls.migrateFromNoneToPurity()
actualDbSchema = DbSchemaPurity
}
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
return err
}
if actualDbSchema == DbSchemaPurity {
if err := ls.migrateFromPurityToHalloween(); err != nil {
return err
}
actualDbSchema = DbSchemaHalloween
}
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
return err
}
return nil
}
func (ls *LocalStore) migrateFromNoneToPurity() {
// delete chunks that are not valid, i.e. chunks that do not pass
// any of the ls.Validators
ls.DbStore.Cleanup(func(c Chunk) bool {
return !ls.isValid(c)
})
}
func (ls *LocalStore) migrateFromPurityToHalloween() error {
return ls.DbStore.CleanGCIndex()
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"archive/tar"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/shed"
)
const (
// filename in tar archive that holds the information
// about exported data format version
exportVersionFilename = ".swarm-export-version"
// legacy version for previous LDBStore
legacyExportVersion = "1"
// current export format version
currentExportVersion = "2"
)
// Export writes a tar structured data to the writer of
// all chunks in the retrieval data index. It returns the
// number of chunks exported.
func (db *DB) Export(w io.Writer) (count int64, err error) {
tw := tar.NewWriter(w)
defer tw.Close()
if err := tw.WriteHeader(&tar.Header{
Name: exportVersionFilename,
Mode: 0644,
Size: int64(len(currentExportVersion)),
}); err != nil {
return 0, err
}
if _, err := tw.Write([]byte(currentExportVersion)); err != nil {
return 0, err
}
err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {
hdr := &tar.Header{
Name: hex.EncodeToString(item.Address),
Mode: 0644,
Size: int64(len(item.Data)),
}
if err := tw.WriteHeader(hdr); err != nil {
return false, err
}
if _, err := tw.Write(item.Data); err != nil {
return false, err
}
count++
return false, nil
}, nil)
return count, err
}
// Import reads a tar structured data from the reader and
// stores chunks in the database. It returns the number of
// chunks imported.
func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) {
tr := tar.NewReader(r)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errC := make(chan error)
doneC := make(chan struct{})
tokenPool := make(chan struct{}, 100)
var wg sync.WaitGroup
go func() {
var (
firstFile = true
// if exportVersionFilename file is not present
// assume legacy version
version = legacyExportVersion
)
for {
hdr, err := tr.Next()
if err != nil {
if err == io.EOF {
break
}
select {
case errC <- err:
case <-ctx.Done():
}
}
if firstFile {
firstFile = false
if hdr.Name == exportVersionFilename {
data, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
version = string(data)
continue
}
}
if len(hdr.Name) != 64 {
log.Warn("ignoring non-chunk file", "name", hdr.Name)
continue
}
keybytes, err := hex.DecodeString(hdr.Name)
if err != nil {
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
continue
}
data, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
key := chunk.Address(keybytes)
var ch chunk.Chunk
switch version {
case legacyExportVersion:
// LDBStore Export exported chunk data prefixed with the chunk key.
// That is not necessary, as the key is in the chunk filename,
// but backward compatibility needs to be preserved.
ch = chunk.NewChunk(key, data[32:])
case currentExportVersion:
ch = chunk.NewChunk(key, data)
default:
select {
case errC <- fmt.Errorf("unsupported export data version %q", version):
case <-ctx.Done():
}
}
tokenPool <- struct{}{}
wg.Add(1)
go func() {
_, err := db.Put(ctx, chunk.ModePutUpload, ch)
select {
case errC <- err:
case <-ctx.Done():
wg.Done()
<-tokenPool
default:
err := db.Put(ctx, chunk.ModePutUpload, ch)
if err != nil {
errC <- err
}
wg.Done()
<-tokenPool
}
}()
count++
}
wg.Wait()
close(doneC)
}()
// wait for all chunks to be stored
for {
select {
case err := <-errC:
if err != nil {
return count, err
}
case <-ctx.Done():
return count, ctx.Err()
default:
select {
case <-doneC:
return count, nil
default:
}
}
}
}
// Copyright 2018 The go-ethereum Authors
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
......@@ -14,79 +14,67 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// memory storage layer for the package blockhash
package storage
package localstore
import (
"bytes"
"context"
"testing"
lru "github.com/hashicorp/golang-lru"
"github.com/ethereum/go-ethereum/swarm/chunk"
)
type MemStore struct {
cache *lru.Cache
disabled bool
}
// TestExportImport constructs two databases, one to put and export
// chunks and another one to import and validate that all chunks are
// imported.
func TestExportImport(t *testing.T) {
db1, cleanup1 := newTestDB(t, nil)
defer cleanup1()
var chunkCount = 100
//NewMemStore is instantiating a MemStore cache keeping all frequently requested
//chunks in the `cache` LRU cache.
func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) {
if params.CacheCapacity == 0 {
return &MemStore{
disabled: true,
chunks := make(map[string][]byte, chunkCount)
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()
_, err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[string(ch.Address())] = ch.Data()
}
c, err := lru.New(int(params.CacheCapacity))
var buf bytes.Buffer
c, err := db1.Export(&buf)
if err != nil {
panic(err)
t.Fatal(err)
}
return &MemStore{
cache: c,
wantChunksCount := int64(len(chunks))
if c != wantChunksCount {
t.Errorf("got export count %v, want %v", c, wantChunksCount)
}
}
// Has needed to implement SyncChunkStore
func (m *MemStore) Has(_ context.Context, addr Address) bool {
return m.cache.Contains(addr)
}
func (m *MemStore) Get(_ context.Context, addr Address) (Chunk, error) {
if m.disabled {
return nil, ErrChunkNotFound
}
db2, cleanup2 := newTestDB(t, nil)
defer cleanup2()
c, ok := m.cache.Get(string(addr))
if !ok {
return nil, ErrChunkNotFound
c, err = db2.Import(&buf, false)
if err != nil {
t.Fatal(err)
}
return c.(Chunk), nil
}
func (m *MemStore) Put(_ context.Context, c Chunk) error {
if m.disabled {
return nil
if c != wantChunksCount {
t.Errorf("got import count %v, want %v", c, wantChunksCount)
}
m.cache.Add(string(c.Address()), c)
return nil
}
func (m *MemStore) setCapacity(n int) {
if n <= 0 {
m.disabled = true
} else {
c, err := lru.New(n)
for a, want := range chunks {
addr := chunk.Address([]byte(a))
ch, err := db2.Get(context.Background(), chunk.ModeGetRequest, addr)
if err != nil {
panic(err)
t.Fatal(err)
}
*m = MemStore{
cache: c,
got := ch.Data()
if !bytes.Equal(got, want) {
t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want)
}
}
}
func (s *MemStore) Close() {}
......@@ -17,6 +17,7 @@
package localstore
import (
"context"
"io/ioutil"
"math/rand"
"os"
......@@ -63,26 +64,23 @@ func testDB_collectGarbageWorker(t *testing.T) {
})()
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
addrs := make([]chunk.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := uploader.Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, chunk.Address())
addrs = append(addrs, ch.Address())
}
gcTarget := db.gcTarget()
......@@ -110,7 +108,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
......@@ -118,7 +116,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
......@@ -134,9 +132,6 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
})
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
testHookCollectGarbageChan := make(chan uint64)
defer setTestHookCollectGarbage(func(collectedCount uint64) {
testHookCollectGarbageChan <- collectedCount
......@@ -146,19 +141,19 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload random chunks just up to the capacity
for i := 0; i < int(db.capacity)-1; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := uploader.Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, chunk.Address())
addrs = append(addrs, ch.Address())
}
// set update gc test hook to signal when
......@@ -172,7 +167,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// request the latest synced chunk
// to prioritize it in the gc index
// not to be collected
_, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
......@@ -191,11 +186,11 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload and sync another chunk to trigger
// garbage collection
ch := generateTestRandomChunk()
err = uploader.Put(ch)
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -235,7 +230,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// requested chunk should not be removed
t.Run("get requested chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[0])
if err != nil {
t.Fatal(err)
}
......@@ -243,7 +238,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// the second synced chunk should be removed
t.Run("get gc-ed chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[1])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[1])
if err != chunk.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)
}
......@@ -251,7 +246,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
_, err := db.Get(context.Background(), chunk.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
......@@ -275,20 +270,17 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err)
}
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
count := 100
for i := 0; i < count; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := uploader.Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
......
......@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
"context"
"math/rand"
"testing"
......@@ -35,29 +36,22 @@ func TestDB_pullIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := uploader.Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
Chunk: chunk,
// this timestamp is not the same as in
// the index, but given that uploads
// are sequential and that only ordering
// of events matter, this information is
// sufficient
storeTimestamp: now(),
Chunk: ch,
binID: uint64(i),
}
}
......@@ -70,10 +64,10 @@ func TestDB_pullIndex(t *testing.T) {
if poi > poj {
return false
}
if chunks[i].storeTimestamp < chunks[j].storeTimestamp {
if chunks[i].binID < chunks[j].binID {
return true
}
if chunks[i].storeTimestamp > chunks[j].storeTimestamp {
if chunks[i].binID > chunks[j].binID {
return false
}
return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
......@@ -87,23 +81,21 @@ func TestDB_gcIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := uploader.Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
Chunk: chunk,
Chunk: ch,
}
}
......@@ -123,9 +115,9 @@ func TestDB_gcIndex(t *testing.T) {
})()
t.Run("request unsynced", func(t *testing.T) {
chunk := chunks[1]
ch := chunks[1]
_, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -140,9 +132,9 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("sync one chunk", func(t *testing.T) {
chunk := chunks[0]
ch := chunks[0]
err := db.NewSetter(ModeSetSync).Set(chunk.Address())
err := db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -154,10 +146,8 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("sync all chunks", func(t *testing.T) {
setter := db.NewSetter(ModeSetSync)
for i := range chunks {
err := setter.Set(chunks[i].Address())
err := db.Set(context.Background(), chunk.ModeSetSync, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
......@@ -171,7 +161,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("request one chunk", func(t *testing.T) {
i := 6
_, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address())
_, err := db.Get(context.Background(), chunk.ModeGetRequest, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
......@@ -189,14 +179,13 @@ func TestDB_gcIndex(t *testing.T) {
})
t.Run("random chunk request", func(t *testing.T) {
requester := db.NewGetter(ModeGetRequest)
rand.Shuffle(len(chunks), func(i, j int) {
chunks[i], chunks[j] = chunks[j], chunks[i]
})
for _, chunk := range chunks {
_, err := requester.Get(chunk.Address())
for _, ch := range chunks {
_, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -212,7 +201,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("remove one chunk", func(t *testing.T) {
i := 3
err := db.NewSetter(modeSetRemove).Set(chunks[i].Address())
err := db.Set(context.Background(), chunk.ModeSetRemove, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
......
......@@ -28,6 +28,9 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage/mock"
)
// DB implements chunk.Store.
var _ chunk.Store = &DB{}
var (
// ErrInvalidMode is retuned when an unknown Mode
// is provided to the function.
......@@ -69,6 +72,10 @@ type DB struct {
pullTriggers map[uint8][]chan struct{}
pullTriggersMu sync.RWMutex
// binIDs stores the latest chunk serial ID for very
// proximity order bin
binIDs shed.Uint64Vector
// garbage collection index
gcIndex shed.Index
......@@ -124,7 +131,10 @@ type Options struct {
// One goroutine for writing batches is created.
func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if o == nil {
o = new(Options)
// default options
o = &Options{
Capacity: 5000000,
}
}
db = &DB{
capacity: o.Capacity,
......@@ -148,11 +158,23 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil {
return nil, err
}
// Identify current storage schema by arbitrary name.
db.schemaName, err = db.shed.NewStringField("schema-name")
if err != nil {
return nil, err
}
schemaName, err := db.schemaName.Get()
if err != nil {
return nil, err
}
if schemaName == "" {
// initial new localstore run
err := db.schemaName.Put(DbSchemaSanctuary)
if err != nil {
return nil, err
}
}
// Persist gc size.
db.gcSize, err = db.shed.NewUint64Field("gc-size")
if err != nil {
......@@ -165,8 +187,9 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
)
if o.MockStore != nil {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
b := make([]byte, 16)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
err = o.MockStore.Put(fields.Address, fields.Data)
if err != nil {
return nil, err
......@@ -174,25 +197,28 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return b, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data, err = o.MockStore.Get(keyItem.Address)
return e, err
}
} else {
encodeValueFunc = func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
b := make([]byte, 16)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
}
decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:]
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
e.Data = value[16:]
return e, nil
}
}
// Index storing actual chunk address, data and store timestamp.
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
// Index storing actual chunk address, data and bin id.
db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
......@@ -230,33 +256,37 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return nil, err
}
// pull index allows history and live syncing per po bin
db.pullIndex, err = db.shed.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{
db.pullIndex, err = db.shed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 41)
key[0] = db.po(fields.Address)
binary.BigEndian.PutUint64(key[1:9], uint64(fields.StoreTimestamp))
copy(key[9:], fields.Address[:])
binary.BigEndian.PutUint64(key[1:9], fields.BinID)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key[9:]
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[1:9]))
e.BinID = binary.BigEndian.Uint64(key[1:9])
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
return fields.Address, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.Address = value
return e, nil
},
})
if err != nil {
return nil, err
}
// create a vector for bin IDs
db.binIDs, err = db.shed.NewUint64Vector("bin-ids")
if err != nil {
return nil, err
}
// create a pull syncing triggers used by SubscribePull function
db.pullTriggers = make(map[uint8][]chan struct{})
// push index contains as yet unsynced chunks
db.pushIndex, err = db.shed.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{
db.pushIndex, err = db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
......@@ -281,17 +311,17 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// create a push syncing triggers used by SubscribePush function
db.pushTriggers = make([]chan struct{}, 0)
// gc index for removable chunk ordered by ascending last access time
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{
db.gcIndex, err = db.shed.NewIndex("AccessTimestamp|BinID|Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
binary.BigEndian.PutUint64(b[8:16], fields.BinID)
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
e.BinID = binary.BigEndian.Uint64(key[8:16])
e.Address = key[16:]
return e, nil
},
......
......@@ -18,6 +18,7 @@ package localstore
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math/rand"
......@@ -59,23 +60,23 @@ func TestDB(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := db.NewPutter(ModePutUpload).Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
got, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
got, err := db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Address(), chunk.Address()) {
t.Errorf("got address %x, want %x", got.Address(), chunk.Address())
if !bytes.Equal(got.Address(), ch.Address()) {
t.Errorf("got address %x, want %x", got.Address(), ch.Address())
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Errorf("got data %x, want %x", got.Data(), chunk.Data())
if !bytes.Equal(got.Data(), ch.Data()) {
t.Errorf("got data %x, want %x", got.Data(), ch.Data())
}
}
......@@ -113,19 +114,17 @@ func TestDB_updateGCSem(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunk := generateTestRandomChunk()
ch := generateTestRandomChunk()
err := db.NewPutter(ModePutUpload).Put(chunk)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
getter := db.NewGetter(ModeGetRequest)
// get more chunks then maxParallelUpdateGC
// in time shorter then updateGCSleep
for i := 0; i < 5; i++ {
_, err = getter.Get(chunk.Address())
_, err = db.Get(context.Background(), chunk.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
......@@ -237,71 +236,71 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
// newRetrieveIndexesTestWithAccess returns a test function that validates if the right
// chunk values are in the retrieval indexes when access time must be stored.
func newRetrieveIndexesTestWithAccess(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0)
validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0)
if accessTimestamp > 0 {
item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address()))
if err != nil {
t.Fatal(err)
}
validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
validateItem(t, item, ch.Address(), nil, 0, accessTimestamp)
}
}
}
// newPullIndexTest returns a test function that validates if the right
// chunk values are in the pull index.
func newPullIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.pullIndex.Get(shed.Item{
Address: chunk.Address(),
StoreTimestamp: storeTimestamp,
Address: ch.Address(),
BinID: binID,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
validateItem(t, item, ch.Address(), nil, 0, 0)
}
}
}
// newPushIndexTest returns a test function that validates if the right
// chunk values are in the push index.
func newPushIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.pushIndex.Get(shed.Item{
Address: chunk.Address(),
Address: ch.Address(),
StoreTimestamp: storeTimestamp,
})
if err != wantError {
t.Errorf("got error %v, want %v", err, wantError)
}
if err == nil {
validateItem(t, item, chunk.Address(), nil, storeTimestamp, 0)
validateItem(t, item, ch.Address(), nil, storeTimestamp, 0)
}
}
}
// newGCIndexTest returns a test function that validates if the right
// chunk values are in the push index.
func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
return func(t *testing.T) {
item, err := db.gcIndex.Get(shed.Item{
Address: chunk.Address(),
StoreTimestamp: storeTimestamp,
BinID: binID,
AccessTimestamp: accessTimestamp,
})
if err != nil {
t.Fatal(err)
}
validateItem(t, item, chunk.Address(), nil, storeTimestamp, accessTimestamp)
validateItem(t, item, chunk.Address(), nil, 0, accessTimestamp)
}
}
......@@ -349,7 +348,7 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {
// in database. It is used for index values validations.
type testIndexChunk struct {
chunk.Chunk
storeTimestamp int64
binID uint64
}
// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment