Commit 4f3d22f0 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by Anton Evangelatov

swarm/storage/localstore: new localstore package (#19015)

parent 41597c28
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
Package localstore provides disk storage layer for Swarm Chunk persistence.
It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB
implementation.
The main type is DB which manages the storage by providing methods to
access and add Chunks and to manage their status.
Modes are abstractions that do specific changes to Chunks. There are three
mode types:
- ModeGet, for Chunk access
- ModePut, for adding Chunks to the database
- ModeSet, for changing Chunk statuses
Every mode type has a corresponding type (Getter, Putter and Setter)
that provides adequate method to perform the opperation and that type
should be injected into localstore consumers instead the whole DB.
This provides more clear insight which operations consumer is performing
on the database.
Getters, Putters and Setters accept different get, put and set modes
to perform different actions. For example, ModeGet has two different
variables ModeGetRequest and ModeGetSync and two different Getters
can be constructed with them that are used when the chunk is requested
or when the chunk is synced as this two events are differently changing
the database.
Subscription methods are implemented for a specific purpose of
continuous iterations over Chunks that should be provided to
Push and Pull syncing.
DB implements an internal garbage collector that removes only synced
Chunks from the database based on their most recent access time.
Internally, DB stores Chunk data and any required information, such as
store and access timestamps in different shed indexes that can be
iterated on by garbage collector or subscriptions.
*/
package localstore
This diff is collapsed.
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"io/ioutil"
"math/rand"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// TestDB_collectGarbageWorker tests garbage collection runs
// by uploading and syncing a number of chunks.
func TestDB_collectGarbageWorker(t *testing.T) {
testDB_collectGarbageWorker(t)
}
// TestDB_collectGarbageWorker_multipleBatches tests garbage
// collection runs by uploading and syncing a number of
// chunks by having multiple smaller batches.
func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
// lower the maximal number of chunks in a single
// gc batch to ensure multiple batches.
defer func(s int64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2
testDB_collectGarbageWorker(t)
}
// testDB_collectGarbageWorker is a helper test function to test
// garbage collection runs by uploading and syncing a number of chunks.
func testDB_collectGarbageWorker(t *testing.T) {
chunkCount := 150
testHookCollectGarbageChan := make(chan int64)
defer setTestHookCollectGarbage(func(collectedCount int64) {
testHookCollectGarbageChan <- collectedCount
})()
db, cleanupFunc := newTestDB(t, &Options{
Capacity: 100,
})
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
addrs := make([]storage.Address, 0)
// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, chunk.Address())
}
gcTarget := db.gcTarget()
for {
select {
case <-testHookCollectGarbageChan:
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize := db.getGCSize()
if gcSize == gcTarget {
break
}
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
if err != storage.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound)
}
})
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
})
// cleanup: drain the last testHookCollectGarbageChan
// element before calling deferred functions not to block
// collectGarbageWorker loop, preventing the race in
// setting testHookCollectGarbage function
select {
case <-testHookCollectGarbageChan:
default:
}
}
// TestDB_collectGarbageWorker_withRequests is a helper test function
// to test garbage collection runs by uploading, syncing and
// requesting a number of chunks.
func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{
Capacity: 100,
})
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
testHookCollectGarbageChan := make(chan int64)
defer setTestHookCollectGarbage(func(collectedCount int64) {
testHookCollectGarbageChan <- collectedCount
})()
addrs := make([]storage.Address, 0)
// upload random chunks just up to the capacity
for i := 0; i < int(db.capacity)-1; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, chunk.Address())
}
// request the latest synced chunk
// to prioritize it in the gc index
// not to be collected
_, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
if err != nil {
t.Fatal(err)
}
// upload and sync another chunk to trigger
// garbage collection
chunk := generateRandomChunk()
err = uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, chunk.Address())
// wait for garbage collection
gcTarget := db.gcTarget()
var totalCollectedCount int64
for {
select {
case c := <-testHookCollectGarbageChan:
totalCollectedCount += c
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize := db.getGCSize()
if gcSize == gcTarget {
break
}
}
wantTotalCollectedCount := int64(len(addrs)) - gcTarget
if totalCollectedCount != wantTotalCollectedCount {
t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount)
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))
t.Run("gc size", newIndexGCSizeTest(db))
// requested chunk should not be removed
t.Run("get requested chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[0])
if err != nil {
t.Fatal(err)
}
})
// the second synced chunk should be removed
t.Run("get gc-ed chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[1])
if err != storage.ErrChunkNotFound {
t.Errorf("got error %v, want %v", err, storage.ErrChunkNotFound)
}
})
// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.NewGetter(ModeGetRequest).Get(addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
})
}
// TestDB_gcSize checks if gcSize has a correct value after
// database is initialized with existing data.
func TestDB_gcSize(t *testing.T) {
dir, err := ioutil.TempDir("", "localstore-stored-gc-size")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
t.Fatal(err)
}
db, err := New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
count := 100
for i := 0; i < count; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
err = syncer.Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
}
// DB.Close writes gc size to disk, so
// Instead calling Close, simulate database shutdown
// without it.
close(db.close)
db.updateGCWG.Wait()
err = db.shed.Close()
if err != nil {
t.Fatal(err)
}
db, err = New(dir, baseKey, nil)
if err != nil {
t.Fatal(err)
}
t.Run("gc index size", newIndexGCSizeTest(db))
t.Run("gc uncounted hashes index count", newItemsCountTest(db.gcUncountedHashesIndex, 0))
}
// setTestHookCollectGarbage sets testHookCollectGarbage and
// returns a function that will reset it to the
// value before the change.
func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) {
current := testHookCollectGarbage
reset = func() { testHookCollectGarbage = current }
testHookCollectGarbage = h
return reset
}
// TestSetTestHookCollectGarbage tests if setTestHookCollectGarbage changes
// testHookCollectGarbage function correctly and if its reset function
// resets the original function.
func TestSetTestHookCollectGarbage(t *testing.T) {
// Set the current function after the test finishes.
defer func(h func(collectedCount int64)) { testHookCollectGarbage = h }(testHookCollectGarbage)
// expected value for the unchanged function
original := 1
// expected value for the changed function
changed := 2
// this variable will be set with two different functions
var got int
// define the original (unchanged) functions
testHookCollectGarbage = func(_ int64) {
got = original
}
// set got variable
testHookCollectGarbage(0)
// test if got variable is set correctly
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
// set the new function
reset := setTestHookCollectGarbage(func(_ int64) {
got = changed
})
// set got variable
testHookCollectGarbage(0)
// test if got variable is set correctly to changed value
if got != changed {
t.Errorf("got hook value %v, want %v", got, changed)
}
// set the function to the original one
reset()
// set got variable
testHookCollectGarbage(0)
// test if got variable is set correctly to original value
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// TestDB_pullIndex validates the ordering of keys in pull index.
// Pull index key contains PO prefix which is calculated from
// DB base key and chunk address. This is not an Item field
// which are checked in Mode tests.
// This test uploads chunks, sorts them in expected order and
// validates that pull index iterator will iterate it the same
// order.
func TestDB_pullIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
Chunk: chunk,
// this timestamp is not the same as in
// the index, but given that uploads
// are sequential and that only ordering
// of events matter, this information is
// sufficient
storeTimestamp: now(),
}
}
testItemsOrder(t, db.pullIndex, chunks, func(i, j int) (less bool) {
poi := storage.Proximity(db.baseKey, chunks[i].Address())
poj := storage.Proximity(db.baseKey, chunks[j].Address())
if poi < poj {
return true
}
if poi > poj {
return false
}
if chunks[i].storeTimestamp < chunks[j].storeTimestamp {
return true
}
if chunks[i].storeTimestamp > chunks[j].storeTimestamp {
return false
}
return bytes.Compare(chunks[i].Address(), chunks[j].Address()) == -1
})
}
// TestDB_gcIndex validates garbage collection index by uploading
// a chunk with and performing operations using synced, access and
// request modes.
func TestDB_gcIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunkCount := 50
chunks := make([]testIndexChunk, chunkCount)
// upload random chunks
for i := 0; i < chunkCount; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
chunks[i] = testIndexChunk{
Chunk: chunk,
}
}
// check if all chunks are stored
newItemsCountTest(db.pullIndex, chunkCount)(t)
// check that chunks are not collectable for garbage
newItemsCountTest(db.gcIndex, 0)(t)
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
// used to wait for indexes change verifications
testHookUpdateGCChan := make(chan struct{})
defer setTestHookUpdateGC(func() {
testHookUpdateGCChan <- struct{}{}
})()
t.Run("request unsynced", func(t *testing.T) {
chunk := chunks[1]
_, err := db.NewGetter(ModeGetRequest).Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
// the chunk is not synced
// should not be in the garbace collection index
newItemsCountTest(db.gcIndex, 0)(t)
newIndexGCSizeTest(db)(t)
})
t.Run("sync one chunk", func(t *testing.T) {
chunk := chunks[0]
err := db.NewSetter(ModeSetSync).Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
// the chunk is synced and should be in gc index
newItemsCountTest(db.gcIndex, 1)(t)
newIndexGCSizeTest(db)(t)
})
t.Run("sync all chunks", func(t *testing.T) {
setter := db.NewSetter(ModeSetSync)
for i := range chunks {
err := setter.Set(chunks[i].Address())
if err != nil {
t.Fatal(err)
}
}
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
t.Run("request one chunk", func(t *testing.T) {
i := 6
_, err := db.NewGetter(ModeGetRequest).Get(chunks[i].Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
// move the chunk to the end of the expected gc
c := chunks[i]
chunks = append(chunks[:i], chunks[i+1:]...)
chunks = append(chunks, c)
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
t.Run("random chunk request", func(t *testing.T) {
requester := db.NewGetter(ModeGetRequest)
rand.Shuffle(len(chunks), func(i, j int) {
chunks[i], chunks[j] = chunks[j], chunks[i]
})
for _, chunk := range chunks {
_, err := requester.Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
}
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
t.Run("remove one chunk", func(t *testing.T) {
i := 3
err := db.NewSetter(modeSetRemove).Set(chunks[i].Address())
if err != nil {
t.Fatal(err)
}
// remove the chunk from the expected chunks in gc index
chunks = append(chunks[:i], chunks[i+1:]...)
testItemsOrder(t, db.gcIndex, chunks, nil)
newIndexGCSizeTest(db)(t)
})
}
This diff is collapsed.
This diff is collapsed.
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
)
// ModeGet enumerates different Getter modes.
type ModeGet int
// Getter modes.
const (
// ModeGetRequest: when accessed for retrieval
ModeGetRequest ModeGet = iota
// ModeGetSync: when accessed for syncing or proof of custody request
ModeGetSync
)
// Getter provides Get method to retrieve Chunks
// from database.
type Getter struct {
db *DB
mode ModeGet
}
// NewGetter returns a new Getter on database
// with a specific Mode.
func (db *DB) NewGetter(mode ModeGet) *Getter {
return &Getter{
mode: mode,
db: db,
}
}
// Get returns a chunk from the database. If the chunk is
// not found storage.ErrChunkNotFound will be returned.
// All required indexes will be updated required by the
// Getter Mode.
func (g *Getter) Get(addr storage.Address) (chunk storage.Chunk, err error) {
out, err := g.db.get(g.mode, addr)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, storage.ErrChunkNotFound
}
return nil, err
}
return storage.NewChunk(out.Address, out.Data), nil
}
// get returns Item from the retrieval index
// and updates other indexes.
func (db *DB) get(mode ModeGet, addr storage.Address) (out shed.Item, err error) {
item := addressToItem(addr)
out, err = db.retrievalDataIndex.Get(item)
if err != nil {
return out, err
}
switch mode {
// update the access timestamp and gc index
case ModeGetRequest:
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
db.updateGCSem <- struct{}{}
}
db.updateGCWG.Add(1)
go func() {
defer db.updateGCWG.Done()
if db.updateGCSem != nil {
// free a spot in updateGCSem buffer
// for a new goroutine
defer func() { <-db.updateGCSem }()
}
err := db.updateGC(out)
if err != nil {
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
if testHookUpdateGC != nil {
testHookUpdateGC()
}
}()
// no updates to indexes
case ModeGetSync:
default:
return out, ErrInvalidMode
}
return out, nil
}
// updateGC updates garbage collection index for
// a single item. Provided item is expected to have
// only Address and Data fields with non zero values,
// which is ensured by the get function.
func (db *DB) updateGC(item shed.Item) (err error) {
unlock, err := db.lockAddr(item.Address)
if err != nil {
return err
}
defer unlock()
batch := new(leveldb.Batch)
// update accessTimeStamp in retrieve, gc
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
// no chunk accesses
default:
return err
}
if item.AccessTimestamp == 0 {
// chunk is not yet synced
// do not add it to the gc index
return nil
}
// delete current entry from the gc index
db.gcIndex.DeleteInBatch(batch, item)
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index
db.gcIndex.PutInBatch(batch, item)
return db.shed.WriteBatch(batch)
}
// testHookUpdateGC is a hook that can provide
// information when a garbage collection index is updated.
var testHookUpdateGC func()
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"testing"
"time"
)
// TestModeGetRequest validates ModeGetRequest index values on the provided DB.
func TestModeGetRequest(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploadTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return uploadTimestamp
})()
chunk := generateRandomChunk()
err := db.NewPutter(ModePutUpload).Put(chunk)
if err != nil {
t.Fatal(err)
}
requester := db.NewGetter(ModeGetRequest)
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
// used to wait for garbage colletion index
// changes
testHookUpdateGCChan := make(chan struct{})
defer setTestHookUpdateGC(func() {
testHookUpdateGCChan <- struct{}{}
})()
t.Run("get unsynced", func(t *testing.T) {
got, err := requester.Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), chunk.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
})
// set chunk to synced state
err = db.NewSetter(ModeSetSync).Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
t.Run("first get", func(t *testing.T) {
got, err := requester.Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), chunk.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, uploadTimestamp))
t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, uploadTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
t.Run("second get", func(t *testing.T) {
accessTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return accessTimestamp
})()
got, err := requester.Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
if !bytes.Equal(got.Address(), chunk.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, accessTimestamp))
t.Run("gc index", newGCIndexTest(db, chunk, uploadTimestamp, accessTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
}
// TestModeGetSync validates ModeGetSync index values on the provided DB.
func TestModeGetSync(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploadTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return uploadTimestamp
})()
chunk := generateRandomChunk()
err := db.NewPutter(ModePutUpload).Put(chunk)
if err != nil {
t.Fatal(err)
}
got, err := db.NewGetter(ModeGetSync).Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Address(), chunk.Address()) {
t.Errorf("got chunk address %x, want %x", got.Address(), chunk.Address())
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), chunk.Data())
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, uploadTimestamp, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
}
// setTestHookUpdateGC sets testHookUpdateGC and
// returns a function that will reset it to the
// value before the change.
func setTestHookUpdateGC(h func()) (reset func()) {
current := testHookUpdateGC
reset = func() { testHookUpdateGC = current }
testHookUpdateGC = h
return reset
}
// TestSetTestHookUpdateGC tests if setTestHookUpdateGC changes
// testHookUpdateGC function correctly and if its reset function
// resets the original function.
func TestSetTestHookUpdateGC(t *testing.T) {
// Set the current function after the test finishes.
defer func(h func()) { testHookUpdateGC = h }(testHookUpdateGC)
// expected value for the unchanged function
original := 1
// expected value for the changed function
changed := 2
// this variable will be set with two different functions
var got int
// define the original (unchanged) functions
testHookUpdateGC = func() {
got = original
}
// set got variable
testHookUpdateGC()
// test if got variable is set correctly
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
// set the new function
reset := setTestHookUpdateGC(func() {
got = changed
})
// set got variable
testHookUpdateGC()
// test if got variable is set correctly to changed value
if got != changed {
t.Errorf("got hook value %v, want %v", got, changed)
}
// set the function to the original one
reset()
// set got variable
testHookUpdateGC()
// test if got variable is set correctly to original value
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
)
// ModePut enumerates different Putter modes.
type ModePut int
// Putter modes.
const (
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
ModePutRequest ModePut = iota
// ModePutSync: when a chunk is received via syncing
ModePutSync
// ModePutUpload: when a chunk is created by local upload
ModePutUpload
)
// Putter provides Put method to store Chunks
// to database.
type Putter struct {
db *DB
mode ModePut
}
// NewPutter returns a new Putter on database
// with a specific Mode.
func (db *DB) NewPutter(mode ModePut) *Putter {
return &Putter{
mode: mode,
db: db,
}
}
// Put stores the Chunk to database and depending
// on the Putter mode, it updates required indexes.
func (p *Putter) Put(ch storage.Chunk) (err error) {
return p.db.put(p.mode, chunkToItem(ch))
}
// put stores Item to database and updates other
// indexes. It acquires lockAddr to protect two calls
// of this function for the same address in parallel.
// Item fields Address and Data must not be
// with their nil values.
func (db *DB) put(mode ModePut, item shed.Item) (err error) {
// protect parallel updates
unlock, err := db.lockAddr(item.Address)
if err != nil {
return err
}
defer unlock()
batch := new(leveldb.Batch)
// variables that provide information for operations
// to be done after write batch function successfully executes
var gcSizeChange int64 // number to add or subtract from gcSize
var triggerPullFeed bool // signal pull feed subscriptions to iterate
var triggerPushFeed bool // signal push feed subscriptions to iterate
switch mode {
case ModePutRequest:
// put to indexes: retrieve, gc; it does not enter the syncpool
// check if the chunk already is in the database
// as gc index is updated
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
// no chunk accesses
default:
return err
}
i, err = db.retrievalDataIndex.Get(item)
switch err {
case nil:
item.StoreTimestamp = i.StoreTimestamp
case leveldb.ErrNotFound:
// no chunk accesses
default:
return err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
}
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index
db.gcIndex.PutInBatch(batch, item)
db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++
db.retrievalDataIndex.PutInBatch(batch, item)
case ModePutUpload:
// put to indexes: retrieve, push, pull
item.StoreTimestamp = now()
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
triggerPullFeed = true
db.pushIndex.PutInBatch(batch, item)
triggerPushFeed = true
case ModePutSync:
// put to indexes: retrieve, pull
item.StoreTimestamp = now()
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
triggerPullFeed = true
default:
return ErrInvalidMode
}
err = db.shed.WriteBatch(batch)
if err != nil {
return err
}
if gcSizeChange != 0 {
db.incGCSize(gcSizeChange)
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
}
if triggerPushFeed {
db.triggerPushSubscriptions()
}
return nil
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// TestModePutRequest validates ModePutRequest index values on the provided DB.
func TestModePutRequest(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
putter := db.NewPutter(ModePutRequest)
chunk := generateRandomChunk()
// keep the record when the chunk is stored
var storeTimestamp int64
t.Run("first put", func(t *testing.T) {
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
storeTimestamp = wantTimestamp
err := putter.Put(chunk)
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
t.Run("second put", func(t *testing.T) {
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
err := putter.Put(chunk)
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, storeTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
})
}
// TestModePutSync validates ModePutSync index values on the provided DB.
func TestModePutSync(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
chunk := generateRandomChunk()
err := db.NewPutter(ModePutSync).Put(chunk)
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
}
// TestModePutUpload validates ModePutUpload index values on the provided DB.
func TestModePutUpload(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
chunk := generateRandomChunk()
err := db.NewPutter(ModePutUpload).Put(chunk)
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", newRetrieveIndexesTest(db, chunk, wantTimestamp, 0))
t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, nil))
}
// TestModePutUpload_parallel uploads chunks in parallel
// and validates if all chunks can be retrieved with correct data.
func TestModePutUpload_parallel(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunkCount := 1000
workerCount := 100
chunkChan := make(chan storage.Chunk)
errChan := make(chan error)
doneChan := make(chan struct{})
defer close(doneChan)
// start uploader workers
for i := 0; i < workerCount; i++ {
go func(i int) {
uploader := db.NewPutter(ModePutUpload)
for {
select {
case chunk, ok := <-chunkChan:
if !ok {
return
}
err := uploader.Put(chunk)
select {
case errChan <- err:
case <-doneChan:
}
case <-doneChan:
return
}
}
}(i)
}
chunks := make([]storage.Chunk, 0)
var chunksMu sync.Mutex
// send chunks to workers
go func() {
for i := 0; i < chunkCount; i++ {
chunk := generateRandomChunk()
select {
case chunkChan <- chunk:
case <-doneChan:
return
}
chunksMu.Lock()
chunks = append(chunks, chunk)
chunksMu.Unlock()
}
close(chunkChan)
}()
// validate every error from workers
for i := 0; i < chunkCount; i++ {
err := <-errChan
if err != nil {
t.Fatal(err)
}
}
// get every chunk and validate its data
getter := db.NewGetter(ModeGetRequest)
chunksMu.Lock()
defer chunksMu.Unlock()
for _, chunk := range chunks {
got, err := getter.Get(chunk.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), chunk.Data()) {
t.Fatalf("got chunk %s data %x, want %x", chunk.Address().Hex(), got.Data(), chunk.Data())
}
}
}
// BenchmarkPutUpload runs a series of benchmarks that upload
// a specific number of chunks in parallel.
//
// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014)
//
// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkPutUpload -v
//
// goos: darwin
// goarch: amd64
// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
// BenchmarkPutUpload/count_100_parallel_1-8 300 5107704 ns/op 2081461 B/op 2374 allocs/op
// BenchmarkPutUpload/count_100_parallel_2-8 300 5411742 ns/op 2081608 B/op 2364 allocs/op
// BenchmarkPutUpload/count_100_parallel_4-8 500 3704964 ns/op 2081696 B/op 2324 allocs/op
// BenchmarkPutUpload/count_100_parallel_8-8 500 2932663 ns/op 2082594 B/op 2295 allocs/op
// BenchmarkPutUpload/count_100_parallel_16-8 500 3117157 ns/op 2085438 B/op 2282 allocs/op
// BenchmarkPutUpload/count_100_parallel_32-8 500 3449122 ns/op 2089721 B/op 2286 allocs/op
// BenchmarkPutUpload/count_1000_parallel_1-8 20 79784470 ns/op 25211240 B/op 23225 allocs/op
// BenchmarkPutUpload/count_1000_parallel_2-8 20 75422164 ns/op 25210730 B/op 23187 allocs/op
// BenchmarkPutUpload/count_1000_parallel_4-8 20 70698378 ns/op 25206522 B/op 22692 allocs/op
// BenchmarkPutUpload/count_1000_parallel_8-8 20 71285528 ns/op 25213436 B/op 22345 allocs/op
// BenchmarkPutUpload/count_1000_parallel_16-8 20 71301826 ns/op 25205040 B/op 22090 allocs/op
// BenchmarkPutUpload/count_1000_parallel_32-8 30 57713506 ns/op 25219781 B/op 21848 allocs/op
// BenchmarkPutUpload/count_10000_parallel_1-8 2 656719345 ns/op 216792908 B/op 248940 allocs/op
// BenchmarkPutUpload/count_10000_parallel_2-8 2 646301962 ns/op 216730800 B/op 248270 allocs/op
// BenchmarkPutUpload/count_10000_parallel_4-8 2 532784228 ns/op 216667080 B/op 241910 allocs/op
// BenchmarkPutUpload/count_10000_parallel_8-8 3 494290188 ns/op 216297749 B/op 236247 allocs/op
// BenchmarkPutUpload/count_10000_parallel_16-8 3 483485315 ns/op 216060384 B/op 231090 allocs/op
// BenchmarkPutUpload/count_10000_parallel_32-8 3 434461294 ns/op 215371280 B/op 224800 allocs/op
// BenchmarkPutUpload/count_100000_parallel_1-8 1 22767894338 ns/op 2331372088 B/op 4049876 allocs/op
// BenchmarkPutUpload/count_100000_parallel_2-8 1 25347872677 ns/op 2344140160 B/op 4106763 allocs/op
// BenchmarkPutUpload/count_100000_parallel_4-8 1 23580460174 ns/op 2338582576 B/op 4027452 allocs/op
// BenchmarkPutUpload/count_100000_parallel_8-8 1 22197559193 ns/op 2321803496 B/op 3877553 allocs/op
// BenchmarkPutUpload/count_100000_parallel_16-8 1 22527046476 ns/op 2327854800 B/op 3885455 allocs/op
// BenchmarkPutUpload/count_100000_parallel_32-8 1 21332243613 ns/op 2299654568 B/op 3697181 allocs/op
// PASS
func BenchmarkPutUpload(b *testing.B) {
for _, count := range []int{
100,
1000,
10000,
100000,
} {
for _, maxParallelUploads := range []int{
1,
2,
4,
8,
16,
32,
} {
name := fmt.Sprintf("count %v parallel %v", count, maxParallelUploads)
b.Run(name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
benchmarkPutUpload(b, nil, count, maxParallelUploads)
}
})
}
}
}
// benchmarkPutUpload runs a benchmark by uploading a specific number
// of chunks with specified max parallel uploads.
func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunks := make([]storage.Chunk, count)
for i := 0; i < count; i++ {
chunks[i] = generateFakeRandomChunk()
}
errs := make(chan error)
b.StartTimer()
go func() {
sem := make(chan struct{}, maxParallelUploads)
for i := 0; i < count; i++ {
sem <- struct{}{}
go func(i int) {
defer func() { <-sem }()
errs <- uploader.Put(chunks[i])
}(i)
}
}()
for i := 0; i < count; i++ {
err := <-errs
if err != nil {
b.Fatal(err)
}
}
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
)
// ModeSet enumerates different Setter modes.
type ModeSet int
// Setter modes.
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
ModeSetAccess ModeSet = iota
// ModeSetSync: when push sync receipt is received
ModeSetSync
// modeSetRemove: when GC-d
// unexported as no external packages should remove chunks from database
modeSetRemove
)
// Setter sets the state of a particular
// Chunk in database by changing indexes.
type Setter struct {
db *DB
mode ModeSet
}
// NewSetter returns a new Setter on database
// with a specific Mode.
func (db *DB) NewSetter(mode ModeSet) *Setter {
return &Setter{
mode: mode,
db: db,
}
}
// Set updates database indexes for a specific
// chunk represented by the address.
func (s *Setter) Set(addr storage.Address) (err error) {
return s.db.set(s.mode, addr)
}
// set updates database indexes for a specific
// chunk represented by the address.
// It acquires lockAddr to protect two calls
// of this function for the same address in parallel.
func (db *DB) set(mode ModeSet, addr storage.Address) (err error) {
// protect parallel updates
unlock, err := db.lockAddr(addr)
if err != nil {
return err
}
defer unlock()
batch := new(leveldb.Batch)
// variables that provide information for operations
// to be done after write batch function successfully executes
var gcSizeChange int64 // number to add or subtract from gcSize
var triggerPullFeed bool // signal pull feed subscriptions to iterate
item := addressToItem(addr)
switch mode {
case ModeSetAccess:
// add to pull, insert to gc
// need to get access timestamp here as it is not
// provided by the access function, and it is not
// a property of a chunk provided to Accessor.Put.
i, err := db.retrievalDataIndex.Get(item)
switch err {
case nil:
item.StoreTimestamp = i.StoreTimestamp
case leveldb.ErrNotFound:
db.pushIndex.DeleteInBatch(batch, item)
item.StoreTimestamp = now()
default:
return err
}
i, err = db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
case leveldb.ErrNotFound:
// the chunk is not accessed before
default:
return err
}
item.AccessTimestamp = now()
db.retrievalAccessIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
triggerPullFeed = true
db.gcIndex.PutInBatch(batch, item)
db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++
case ModeSetSync:
// delete from push, insert to gc
// need to get access timestamp here as it is not
// provided by the access function, and it is not
// a property of a chunk provided to Accessor.Put.
i, err := db.retrievalDataIndex.Get(item)
if err != nil {
if err == leveldb.ErrNotFound {
// chunk is not found,
// no need to update gc index
// just delete from the push index
// if it is there
db.pushIndex.DeleteInBatch(batch, item)
return nil
}
return err
}
item.StoreTimestamp = i.StoreTimestamp
i, err = db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
case leveldb.ErrNotFound:
// the chunk is not accessed before
default:
return err
}
item.AccessTimestamp = now()
db.retrievalAccessIndex.PutInBatch(batch, item)
db.pushIndex.DeleteInBatch(batch, item)
db.gcIndex.PutInBatch(batch, item)
db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++
case modeSetRemove:
// delete from retrieve, pull, gc
// need to get access timestamp here as it is not
// provided by the access function, and it is not
// a property of a chunk provided to Accessor.Put.
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
default:
return err
}
i, err = db.retrievalDataIndex.Get(item)
if err != nil {
return err
}
item.StoreTimestamp = i.StoreTimestamp
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
db.pullIndex.DeleteInBatch(batch, item)
db.gcIndex.DeleteInBatch(batch, item)
db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
// a check is needed for decrementing gcSize
// as delete is not reporting if the key/value pair
// is deleted or not
if _, err := db.gcIndex.Get(item); err == nil {
gcSizeChange = -1
}
default:
return ErrInvalidMode
}
err = db.shed.WriteBatch(batch)
if err != nil {
return err
}
if gcSizeChange != 0 {
db.incGCSize(gcSizeChange)
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
}
return nil
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"testing"
"time"
"github.com/syndtr/goleveldb/leveldb"
)
// TestModeSetAccess validates ModeSetAccess index values on the provided DB.
func TestModeSetAccess(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunk := generateRandomChunk()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
err := db.NewSetter(ModeSetAccess).Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
t.Run("pull index", newPullIndexTest(db, chunk, wantTimestamp, nil))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 1))
t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
}
// TestModeSetSync validates ModeSetSync index values on the provided DB.
func TestModeSetSync(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunk := generateRandomChunk()
wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()
err := db.NewPutter(ModePutUpload).Put(chunk)
if err != nil {
t.Fatal(err)
}
err = db.NewSetter(ModeSetSync).Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", newRetrieveIndexesTestWithAccess(db, chunk, wantTimestamp, wantTimestamp))
t.Run("push index", newPushIndexTest(db, chunk, wantTimestamp, leveldb.ErrNotFound))
t.Run("gc index", newGCIndexTest(db, chunk, wantTimestamp, wantTimestamp))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 1))
t.Run("gc size", newIndexGCSizeTest(db))
}
// TestModeSetRemove validates ModeSetRemove index values on the provided DB.
func TestModeSetRemove(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
chunk := generateRandomChunk()
err := db.NewPutter(ModePutUpload).Put(chunk)
if err != nil {
t.Fatal(err)
}
err = db.NewSetter(modeSetRemove).Set(chunk.Address())
if err != nil {
t.Fatal(err)
}
t.Run("retrieve indexes", func(t *testing.T) {
wantErr := leveldb.ErrNotFound
_, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
}
t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0))
// access index should not be set
_, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address()))
if err != wantErr {
t.Errorf("got error %v, want %v", err, wantErr)
}
t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0))
})
t.Run("pull index", newPullIndexTest(db, chunk, 0, leveldb.ErrNotFound))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))
t.Run("gc index count", newItemsCountTest(db.gcIndex, 0))
t.Run("gc size", newIndexGCSizeTest(db))
}
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"strconv"
"testing"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// BenchmarkRetrievalIndexes uploads a number of chunks in order to measure
// total time of updating their retrieval indexes by setting them
// to synced state and requesting them.
//
// This benchmark takes significant amount of time.
//
// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show
// that two separated indexes perform better.
//
// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkRetrievalIndexes -v
// goos: darwin
// goarch: amd64
// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
// BenchmarkRetrievalIndexes/1000-8 20 75556686 ns/op 19033493 B/op 84500 allocs/op
// BenchmarkRetrievalIndexes/10000-8 1 1079084922 ns/op 382792064 B/op 1429644 allocs/op
// BenchmarkRetrievalIndexes/100000-8 1 16891305737 ns/op 2629165304 B/op 12465019 allocs/op
// PASS
func BenchmarkRetrievalIndexes(b *testing.B) {
for _, count := range []int{
1000,
10000,
100000,
} {
b.Run(strconv.Itoa(count)+"-split", func(b *testing.B) {
for n := 0; n < b.N; n++ {
benchmarkRetrievalIndexes(b, nil, count)
}
})
}
}
// benchmarkRetrievalIndexes is used in BenchmarkRetrievalIndexes
// to do benchmarks with a specific number of chunks and different
// database options.
func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
requester := db.NewGetter(ModeGetRequest)
addrs := make([]storage.Address, count)
for i := 0; i < count; i++ {
chunk := generateFakeRandomChunk()
err := uploader.Put(chunk)
if err != nil {
b.Fatal(err)
}
addrs[i] = chunk.Address()
}
// set update gc test hook to signal when
// update gc goroutine is done by sending to
// testHookUpdateGCChan channel, which is
// used to wait for gc index updates to be
// included in the benchmark time
testHookUpdateGCChan := make(chan struct{})
defer setTestHookUpdateGC(func() {
testHookUpdateGCChan <- struct{}{}
})()
b.StartTimer()
for i := 0; i < count; i++ {
err := syncer.Set(addrs[i])
if err != nil {
b.Fatal(err)
}
_, err = requester.Get(addrs[i])
if err != nil {
b.Fatal(err)
}
// wait for update gc goroutine to be done
<-testHookUpdateGCChan
}
}
// BenchmarkUpload compares uploading speed for different
// retrieval indexes and various number of chunks.
//
// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014).
//
// go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkUpload -v
// goos: darwin
// goarch: amd64
// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
// BenchmarkUpload/1000-8 20 59437463 ns/op 25205193 B/op 23208 allocs/op
// BenchmarkUpload/10000-8 2 580646362 ns/op 216532932 B/op 248090 allocs/op
// BenchmarkUpload/100000-8 1 22373390892 ns/op 2323055312 B/op 3995903 allocs/op
// PASS
func BenchmarkUpload(b *testing.B) {
for _, count := range []int{
1000,
10000,
100000,
} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for n := 0; n < b.N; n++ {
benchmarkUpload(b, nil, count)
}
})
}
}
// benchmarkUpload is used in BenchmarkUpload
// to do benchmarks with a specific number of chunks and different
// database options.
func benchmarkUpload(b *testing.B, o *Options, count int) {
b.StopTimer()
db, cleanupFunc := newTestDB(b, o)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunks := make([]storage.Chunk, count)
for i := 0; i < count; i++ {
chunk := generateFakeRandomChunk()
chunks[i] = chunk
}
b.StartTimer()
for i := 0; i < count; i++ {
err := uploader.Put(chunks[i])
if err != nil {
b.Fatal(err)
}
}
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
chunkDescriptors := make(chan ChunkDescriptor)
trigger := make(chan struct{}, 1)
db.pullTriggersMu.Lock()
if _, ok := db.pullTriggers[bin]; !ok {
db.pullTriggers[bin] = make([]chan struct{}, 0)
}
db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
db.pullTriggersMu.Unlock()
// send signal for the initial iteration
trigger <- struct{}{}
stopChan := make(chan struct{})
var stopChanOnce sync.Once
// used to provide information from the iterator to
// stop subscription when until chunk descriptor is reached
var errStopSubscription = errors.New("stop subscription")
go func() {
// close the returned ChunkDescriptor channel at the end to
// signal that the subscription is done
defer close(chunkDescriptors)
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
if since != nil {
sinceItem = &shed.Item{
Address: since.Address,
StoreTimestamp: since.StoreTimestamp,
}
}
for {
select {
case <-trigger:
// iterate until:
// - last index Item is reached
// - subscription stop is called
// - context is done
err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
select {
case chunkDescriptors <- ChunkDescriptor{
Address: item.Address,
StoreTimestamp: item.StoreTimestamp,
}:
// until chunk descriptor is sent
// break the iteration
if until != nil &&
(item.StoreTimestamp >= until.StoreTimestamp ||
bytes.Equal(item.Address, until.Address)) {
return true, errStopSubscription
}
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
return false, nil
case <-stopChan:
// gracefully stop the iteration
// on stop
return true, nil
case <-db.close:
// gracefully stop the iteration
// on database close
return true, nil
case <-ctx.Done():
return true, ctx.Err()
}
}, &shed.IterateOptions{
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
// iterator call, skip it in this one
SkipStartFromItem: true,
Prefix: []byte{bin},
})
if err != nil {
if err == errStopSubscription {
// stop subscription without any errors
// if until is reached
return
}
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
return
}
case <-stopChan:
// terminate the subscription
// on stop
return
case <-db.close:
// terminate the subscription
// on database close
return
case <-ctx.Done():
err := ctx.Err()
if err != nil {
log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
}
return
}
}
}()
stop = func() {
stopChanOnce.Do(func() {
close(stopChan)
})
db.pullTriggersMu.Lock()
defer db.pullTriggersMu.Unlock()
for i, t := range db.pullTriggers[bin] {
if t == trigger {
db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
break
}
}
}
return chunkDescriptors, stop
}
// ChunkDescriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type ChunkDescriptor struct {
Address storage.Address
StoreTimestamp int64
}
func (c *ChunkDescriptor) String() string {
if c == nil {
return "none"
}
return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
}
// triggerPullSubscriptions is used internally for starting iterations
// on Pull subscriptions for a particular bin. When new item with address
// that is in particular bin for DB's baseKey is added to pull index
// this function should be called.
func (db *DB) triggerPullSubscriptions(bin uint8) {
db.pullTriggersMu.RLock()
triggers, ok := db.pullTriggers[bin]
db.pullTriggersMu.RUnlock()
if !ok {
return
}
for _, t := range triggers {
select {
case t <- struct{}{}:
default:
}
}
}
This diff is collapsed.
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
// Returned stop function will terminate current and further iterations, and also it will close
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan storage.Chunk, stop func()) {
chunks := make(chan storage.Chunk)
trigger := make(chan struct{}, 1)
db.pushTriggersMu.Lock()
db.pushTriggers = append(db.pushTriggers, trigger)
db.pushTriggersMu.Unlock()
// send signal for the initial iteration
trigger <- struct{}{}
stopChan := make(chan struct{})
var stopChanOnce sync.Once
go func() {
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
for {
select {
case <-trigger:
// iterate until:
// - last index Item is reached
// - subscription stop is called
// - context is done
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// get chunk data
dataItem, err := db.retrievalDataIndex.Get(item)
if err != nil {
return true, err
}
select {
case chunks <- storage.NewChunk(dataItem.Address, dataItem.Data):
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
return false, nil
case <-stopChan:
// gracefully stop the iteration
// on stop
return true, nil
case <-db.close:
// gracefully stop the iteration
// on database close
return true, nil
case <-ctx.Done():
return true, ctx.Err()
}
}, &shed.IterateOptions{
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
// iterator call, skip it in this one
SkipStartFromItem: true,
})
if err != nil {
log.Error("localstore push subscription iteration", "err", err)
return
}
case <-stopChan:
// terminate the subscription
// on stop
return
case <-db.close:
// terminate the subscription
// on database close
return
case <-ctx.Done():
err := ctx.Err()
if err != nil {
log.Error("localstore push subscription", "err", err)
}
return
}
}
}()
stop = func() {
stopChanOnce.Do(func() {
close(stopChan)
})
db.pushTriggersMu.Lock()
defer db.pushTriggersMu.Unlock()
for i, t := range db.pushTriggers {
if t == trigger {
db.pushTriggers = append(db.pushTriggers[:i], db.pushTriggers[i+1:]...)
break
}
}
}
return chunks, stop
}
// triggerPushSubscriptions is used internally for starting iterations
// on Push subscriptions. Whenever new item is added to the push index,
// this function should be called.
func (db *DB) triggerPushSubscriptions() {
db.pushTriggersMu.RLock()
triggers := db.pushTriggers
db.pushTriggersMu.RUnlock()
for _, t := range triggers {
select {
case t <- struct{}{}:
default:
}
}
}
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"bytes"
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/swarm/storage"
)
// TestDB_SubscribePush uploads some chunks before and after
// push syncing subscription is created and validates if
// all addresses are received in the right order.
func TestDB_SubscribePush(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
chunks := make([]storage.Chunk, 0)
var chunksMu sync.Mutex
uploadRandomChunks := func(count int) {
for i := 0; i < count; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
chunksMu.Lock()
chunks = append(chunks, chunk)
chunksMu.Unlock()
}
}
// prepopulate database with some chunks
// before the subscription
uploadRandomChunks(10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// collect all errors from validating addresses, even nil ones
// to validate the number of addresses received by the subscription
errChan := make(chan error)
ch, stop := db.SubscribePush(ctx)
defer stop()
// receive and validate addresses from the subscription
go func() {
var i int // address index
for {
select {
case got, ok := <-ch:
if !ok {
return
}
chunksMu.Lock()
want := chunks[i]
chunksMu.Unlock()
var err error
if !bytes.Equal(got.Data(), want.Data()) {
err = fmt.Errorf("got chunk %v data %x, want %x", i, got.Data(), want.Data())
}
if !bytes.Equal(got.Address(), want.Address()) {
err = fmt.Errorf("got chunk %v address %s, want %s", i, got.Address().Hex(), want.Address().Hex())
}
i++
// send one and only one error per received address
errChan <- err
case <-ctx.Done():
return
}
}
}()
// upload some chunks just after subscribe
uploadRandomChunks(5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
uploadRandomChunks(3)
checkErrChan(ctx, t, errChan, len(chunks))
}
// TestDB_SubscribePush_multiple uploads chunks before and after
// multiple push syncing subscriptions are created and
// validates if all addresses are received in the right order.
func TestDB_SubscribePush_multiple(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()
uploader := db.NewPutter(ModePutUpload)
addrs := make([]storage.Address, 0)
var addrsMu sync.Mutex
uploadRandomChunks := func(count int) {
for i := 0; i < count; i++ {
chunk := generateRandomChunk()
err := uploader.Put(chunk)
if err != nil {
t.Fatal(err)
}
addrsMu.Lock()
addrs = append(addrs, chunk.Address())
addrsMu.Unlock()
}
}
// prepopulate database with some chunks
// before the subscription
uploadRandomChunks(10)
// set a timeout on subscription
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// collect all errors from validating addresses, even nil ones
// to validate the number of addresses received by the subscription
errChan := make(chan error)
subsCount := 10
// start a number of subscriptions
// that all of them will write every addresses error to errChan
for j := 0; j < subsCount; j++ {
ch, stop := db.SubscribePush(ctx)
defer stop()
// receive and validate addresses from the subscription
go func(j int) {
var i int // address index
for {
select {
case got, ok := <-ch:
if !ok {
return
}
addrsMu.Lock()
want := addrs[i]
addrsMu.Unlock()
var err error
if !bytes.Equal(got.Address(), want) {
err = fmt.Errorf("got chunk %v address on subscription %v %s, want %s", i, j, got, want)
}
i++
// send one and only one error per received address
errChan <- err
case <-ctx.Done():
return
}
}
}(j)
}
// upload some chunks just after subscribe
uploadRandomChunks(5)
time.Sleep(200 * time.Millisecond)
// upload some chunks after some short time
// to ensure that subscription will include them
// in a dynamic environment
uploadRandomChunks(3)
// number of addresses received by all subscriptions
wantedChunksCount := len(addrs) * subsCount
checkErrChan(ctx, t, errChan, wantedChunksCount)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment