Commit 9a58a9b9 authored by Janoš Guljaš's avatar Janoš Guljaš Committed by Viktor Trón

swarm/storage/localstore: global batch write lock (#19245)

* swarm/storage/localstore: most basic database

* swarm/storage/localstore: fix typos and comments

* swarm/shed: add uint64 field Dec and DecInBatch methods

* swarm/storage/localstore: decrement size counter on ModeRemoval update

* swarm/storage/localstore: unexport modeAccess and modeRemoval

* swarm/storage/localstore: add WithRetrievalCompositeIndex

* swarm/storage/localstore: add TestModeSyncing

* swarm/storage/localstore: fix test name

* swarm/storage/localstore: add TestModeUpload

* swarm/storage/localstore: add TestModeRequest

* swarm/storage/localstore: add TestModeSynced

* swarm/storage/localstore: add TestModeAccess

* swarm/storage/localstore: add TestModeRemoval

* swarm/storage/localstore: add mock store option for chunk data

* swarm/storage/localstore: add TestDB_pullIndex

* swarm/storage/localstore: add TestDB_gcIndex

* swarm/storage/localstore: change how batches are written

* swarm/storage/localstore: add updateOnAccess function

* swarm/storage/localhost: add DB.gcSize

* swarm/storage/localstore: update comments

* swarm/storage/localstore: add BenchmarkNew

* swarm/storage/localstore: add retrieval tests benchmarks

* swarm/storage/localstore: accessors redesign

* swarm/storage/localstore: add semaphore for updateGC goroutine

* swarm/storage/localstore: implement basic garbage collection

* swarm/storage/localstore: optimize collectGarbage

* swarm/storage/localstore: add more garbage collection tests cases

* swarm/shed, swarm/storage/localstore: rename IndexItem to Item

* swarm/shed: add Index.CountFrom

* swarm/storage/localstore: persist gcSize

* swarm/storage/localstore: remove composite retrieval index

* swarm/shed: IterateWithPrefix and IterateWithPrefixFrom Index functions

* swarm/storage/localstore: writeGCSize function with leveldb batch

* swarm/storage/localstore: unexport modeSetRemove

* swarm/storage/localstore: update writeGCSizeWorker comment

* swarm/storage/localstore: add triggerGarbageCollection function

* swarm/storage/localstore: call writeGCSize on DB Close

* swarm/storage/localstore: additional comment in writeGCSizeWorker

* swarm/storage/localstore: add MetricsPrefix option

* swarm/storage/localstore: fix a typo

* swamr/shed: only one Index Iterate function

* swarm/storage/localstore: use shed Iterate function

* swarm/shed: pass a new byte slice copy to index decode functions

* swarm/storage/localstore: implement feed subscriptions

* swarm/storage/localstore: add more subscriptions tests

* swarm/storage/localsore: add parallel upload test

* swarm/storage/localstore: use storage.MaxPO in subscription tests

* swarm/storage/localstore: subscription of addresses instead chunks

* swarm/storage/localstore: lock item address in collectGarbage iterator

* swarm/storage/localstore: fix TestSubscribePull to include MaxPO

* swarm/storage/localstore: improve subscriptions

* swarm/storage/localstore: add TestDB_SubscribePull_sinceAndUntil test

* swarm/storage/localstore: adjust pull sync tests

* swarm/storage/localstore: remove writeGCSizeDelay and use literal

* swarm/storage/localstore: adjust subscriptions tests delays and comments

* swarm/storage/localstore: add godoc package overview

* swarm/storage/localstore: fix a typo

* swarm/storage/localstore: update package overview

* swarm/storage/localstore: remove repeated index change

* swarm/storage/localstore: rename ChunkInfo to ChunkDescriptor

* swarm/storage/localstore: add comment in collectGarbageWorker

* swarm/storage/localstore: replace atomics with mutexes for gcSize and tests

* swarm/storage/localstore: protect addrs map in pull subs tests

* swarm/storage/localstore: protect slices in push subs test

* swarm/storage/localstore: protect chunks in TestModePutUpload_parallel

* swarm/storage/localstore: fix a race in TestDB_updateGCSem defers

* swarm/storage/localstore: remove parallel flag from tests

* swarm/storage/localstore: fix a race in testDB_collectGarbageWorker

* swarm/storage/localstore: remove unused code

* swarm/storage/localstore: add more context to pull sub log messages

* swarm/storage/localstore: BenchmarkPutUpload and global lock option

* swarm/storage/localstore: pre-generate chunks in BenchmarkPutUpload

* swarm/storage/localstore: correct useGlobalLock in collectGarbage

* swarm/storage/localstore: fix typos and update comments

* swarm/storage/localstore: update writeGCSize comment

* swarm/storage/localstore: global batch write lock

* swarm/storage/localstore: remove global lock option

* swarm/storage/localstore: simplify DB.Close
parent f82185a4
This diff is collapsed.
...@@ -38,7 +38,7 @@ func TestDB_collectGarbageWorker(t *testing.T) { ...@@ -38,7 +38,7 @@ func TestDB_collectGarbageWorker(t *testing.T) {
func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) { func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
// lower the maximal number of chunks in a single // lower the maximal number of chunks in a single
// gc batch to ensure multiple batches. // gc batch to ensure multiple batches.
defer func(s int64) { gcBatchSize = s }(gcBatchSize) defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2 gcBatchSize = 2
testDB_collectGarbageWorker(t) testDB_collectGarbageWorker(t)
...@@ -54,8 +54,8 @@ func testDB_collectGarbageWorker(t *testing.T) { ...@@ -54,8 +54,8 @@ func testDB_collectGarbageWorker(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{ db, cleanupFunc := newTestDB(t, &Options{
Capacity: 100, Capacity: 100,
}) })
testHookCollectGarbageChan := make(chan int64) testHookCollectGarbageChan := make(chan uint64)
defer setTestHookCollectGarbage(func(collectedCount int64) { defer setTestHookCollectGarbage(func(collectedCount uint64) {
select { select {
case testHookCollectGarbageChan <- collectedCount: case testHookCollectGarbageChan <- collectedCount:
case <-db.close: case <-db.close:
...@@ -93,7 +93,10 @@ func testDB_collectGarbageWorker(t *testing.T) { ...@@ -93,7 +93,10 @@ func testDB_collectGarbageWorker(t *testing.T) {
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Error("collect garbage timeout") t.Error("collect garbage timeout")
} }
gcSize := db.getGCSize() gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget { if gcSize == gcTarget {
break break
} }
...@@ -134,8 +137,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { ...@@ -134,8 +137,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
uploader := db.NewPutter(ModePutUpload) uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync) syncer := db.NewSetter(ModeSetSync)
testHookCollectGarbageChan := make(chan int64) testHookCollectGarbageChan := make(chan uint64)
defer setTestHookCollectGarbage(func(collectedCount int64) { defer setTestHookCollectGarbage(func(collectedCount uint64) {
testHookCollectGarbageChan <- collectedCount testHookCollectGarbageChan <- collectedCount
})() })()
...@@ -202,7 +205,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { ...@@ -202,7 +205,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
gcTarget := db.gcTarget() gcTarget := db.gcTarget()
var totalCollectedCount int64 var totalCollectedCount uint64
for { for {
select { select {
case c := <-testHookCollectGarbageChan: case c := <-testHookCollectGarbageChan:
...@@ -210,13 +213,16 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { ...@@ -210,13 +213,16 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Error("collect garbage timeout") t.Error("collect garbage timeout")
} }
gcSize := db.getGCSize() gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget { if gcSize == gcTarget {
break break
} }
} }
wantTotalCollectedCount := int64(len(addrs)) - gcTarget wantTotalCollectedCount := uint64(len(addrs)) - gcTarget
if totalCollectedCount != wantTotalCollectedCount { if totalCollectedCount != wantTotalCollectedCount {
t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount) t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount)
} }
...@@ -288,10 +294,7 @@ func TestDB_gcSize(t *testing.T) { ...@@ -288,10 +294,7 @@ func TestDB_gcSize(t *testing.T) {
} }
} }
// DB.Close writes gc size to disk, so if err := db.Close(); err != nil {
// Instead calling Close, close the database
// without it.
if err := db.closeWithOptions(false); err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -302,14 +305,12 @@ func TestDB_gcSize(t *testing.T) { ...@@ -302,14 +305,12 @@ func TestDB_gcSize(t *testing.T) {
defer db.Close() defer db.Close()
t.Run("gc index size", newIndexGCSizeTest(db)) t.Run("gc index size", newIndexGCSizeTest(db))
t.Run("gc uncounted hashes index count", newItemsCountTest(db.gcUncountedHashesIndex, 0))
} }
// setTestHookCollectGarbage sets testHookCollectGarbage and // setTestHookCollectGarbage sets testHookCollectGarbage and
// returns a function that will reset it to the // returns a function that will reset it to the
// value before the change. // value before the change.
func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) { func setTestHookCollectGarbage(h func(collectedCount uint64)) (reset func()) {
current := testHookCollectGarbage current := testHookCollectGarbage
reset = func() { testHookCollectGarbage = current } reset = func() { testHookCollectGarbage = current }
testHookCollectGarbage = h testHookCollectGarbage = h
...@@ -321,7 +322,7 @@ func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) { ...@@ -321,7 +322,7 @@ func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) {
// resets the original function. // resets the original function.
func TestSetTestHookCollectGarbage(t *testing.T) { func TestSetTestHookCollectGarbage(t *testing.T) {
// Set the current function after the test finishes. // Set the current function after the test finishes.
defer func(h func(collectedCount int64)) { testHookCollectGarbage = h }(testHookCollectGarbage) defer func(h func(collectedCount uint64)) { testHookCollectGarbage = h }(testHookCollectGarbage)
// expected value for the unchanged function // expected value for the unchanged function
original := 1 original := 1
...@@ -332,7 +333,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) { ...@@ -332,7 +333,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
var got int var got int
// define the original (unchanged) functions // define the original (unchanged) functions
testHookCollectGarbage = func(_ int64) { testHookCollectGarbage = func(_ uint64) {
got = original got = original
} }
...@@ -345,7 +346,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) { ...@@ -345,7 +346,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
} }
// set the new function // set the new function
reset := setTestHookCollectGarbage(func(_ int64) { reset := setTestHookCollectGarbage(func(_ uint64) {
got = changed got = changed
}) })
......
...@@ -18,7 +18,6 @@ package localstore ...@@ -18,7 +18,6 @@ package localstore
import ( import (
"encoding/binary" "encoding/binary"
"encoding/hex"
"errors" "errors"
"sync" "sync"
"time" "time"
...@@ -41,7 +40,7 @@ var ( ...@@ -41,7 +40,7 @@ var (
var ( var (
// Default value for Capacity DB option. // Default value for Capacity DB option.
defaultCapacity int64 = 5000000 defaultCapacity uint64 = 5000000
// Limit the number of goroutines created by Getters // Limit the number of goroutines created by Getters
// that call updateGC function. Value 0 sets no limit. // that call updateGC function. Value 0 sets no limit.
maxParallelUpdateGC = 1000 maxParallelUpdateGC = 1000
...@@ -54,8 +53,6 @@ type DB struct { ...@@ -54,8 +53,6 @@ type DB struct {
// schema name of loaded data // schema name of loaded data
schemaName shed.StringField schemaName shed.StringField
// field that stores number of intems in gc index
storedGCSize shed.Uint64Field
// retrieval indexes // retrieval indexes
retrievalDataIndex shed.Index retrievalDataIndex shed.Index
...@@ -74,23 +71,16 @@ type DB struct { ...@@ -74,23 +71,16 @@ type DB struct {
// garbage collection index // garbage collection index
gcIndex shed.Index gcIndex shed.Index
// index that stores hashes that are not
// counted in and saved to storedGCSize
gcUncountedHashesIndex shed.Index
// number of elements in garbage collection index // field that stores number of intems in gc index
// it must be always read by getGCSize and gcSize shed.Uint64Field
// set with incGCSize which are locking gcSizeMu
gcSize int64
gcSizeMu sync.RWMutex
// garbage collection is triggered when gcSize exceeds // garbage collection is triggered when gcSize exceeds
// the capacity value // the capacity value
capacity int64 capacity uint64
// triggers garbage collection event loop // triggers garbage collection event loop
collectGarbageTrigger chan struct{} collectGarbageTrigger chan struct{}
// triggers write gc size event loop
writeGCSizeTrigger chan struct{}
// a buffered channel acting as a semaphore // a buffered channel acting as a semaphore
// to limit the maximal number of goroutines // to limit the maximal number of goroutines
...@@ -102,7 +92,7 @@ type DB struct { ...@@ -102,7 +92,7 @@ type DB struct {
baseKey []byte baseKey []byte
addressLocks sync.Map batchMu sync.Mutex
// this channel is closed when close function is called // this channel is closed when close function is called
// to terminate other goroutines // to terminate other goroutines
...@@ -112,7 +102,6 @@ type DB struct { ...@@ -112,7 +102,6 @@ type DB struct {
// garbage collection and gc size write workers // garbage collection and gc size write workers
// are done // are done
collectGarbageWorkerDone chan struct{} collectGarbageWorkerDone chan struct{}
writeGCSizeWorkerDone chan struct{}
} }
// Options struct holds optional parameters for configuring DB. // Options struct holds optional parameters for configuring DB.
...@@ -125,7 +114,7 @@ type Options struct { ...@@ -125,7 +114,7 @@ type Options struct {
MockStore *mock.NodeStore MockStore *mock.NodeStore
// Capacity is a limit that triggers garbage collection when // Capacity is a limit that triggers garbage collection when
// number of items in gcIndex equals or exceeds it. // number of items in gcIndex equals or exceeds it.
Capacity int64 Capacity uint64
// MetricsPrefix defines a prefix for metrics names. // MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string MetricsPrefix string
} }
...@@ -140,15 +129,13 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ...@@ -140,15 +129,13 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
db = &DB{ db = &DB{
capacity: o.Capacity, capacity: o.Capacity,
baseKey: baseKey, baseKey: baseKey,
// channels collectGarbageTrigger and writeGCSizeTrigger // channel collectGarbageTrigger
// need to be buffered with the size of 1 // needs to be buffered with the size of 1
// to signal another event if it // to signal another event if it
// is triggered during already running function // is triggered during already running function
collectGarbageTrigger: make(chan struct{}, 1), collectGarbageTrigger: make(chan struct{}, 1),
writeGCSizeTrigger: make(chan struct{}, 1),
close: make(chan struct{}), close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}),
writeGCSizeWorkerDone: make(chan struct{}),
} }
if db.capacity <= 0 { if db.capacity <= 0 {
db.capacity = defaultCapacity db.capacity = defaultCapacity
...@@ -167,7 +154,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ...@@ -167,7 +154,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return nil, err return nil, err
} }
// Persist gc size. // Persist gc size.
db.storedGCSize, err = db.shed.NewUint64Field("gc-size") db.gcSize, err = db.shed.NewUint64Field("gc-size")
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -318,48 +305,6 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ...@@ -318,48 +305,6 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// gc uncounted hashes index keeps hashes that are in gc index
// but not counted in and saved to storedGCSize
db.gcUncountedHashesIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
if err != nil {
return nil, err
}
// count number of elements in garbage collection index
gcSize, err := db.storedGCSize.Get()
if err != nil {
return nil, err
}
// get number of uncounted hashes
gcUncountedSize, err := db.gcUncountedHashesIndex.Count()
if err != nil {
return nil, err
}
gcSize += uint64(gcUncountedSize)
// remove uncounted hashes from the index and
// save the total gcSize after uncounted hashes are removed
err = db.writeGCSize(int64(gcSize))
if err != nil {
return nil, err
}
db.incGCSize(int64(gcSize))
// start worker to write gc size
go db.writeGCSizeWorker()
// start garbage collection worker // start garbage collection worker
go db.collectGarbageWorker() go db.collectGarbageWorker()
return db, nil return db, nil
...@@ -367,34 +312,16 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { ...@@ -367,34 +312,16 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// Close closes the underlying database. // Close closes the underlying database.
func (db *DB) Close() (err error) { func (db *DB) Close() (err error) {
return db.closeWithOptions(true)
}
// closeWithOptions provides a more control which part of closing
// is done for tests.
func (db *DB) closeWithOptions(writeGCSize bool) (err error) {
close(db.close) close(db.close)
db.updateGCWG.Wait() db.updateGCWG.Wait()
// wait for gc worker and gc size write workers to // wait for gc worker to
// return before closing the shed // return before closing the shed
timeout := time.After(5 * time.Second)
select { select {
case <-db.collectGarbageWorkerDone: case <-db.collectGarbageWorkerDone:
case <-timeout: case <-time.After(5 * time.Second):
log.Error("localstore: collect garbage worker did not return after db close") log.Error("localstore: collect garbage worker did not return after db close")
} }
select {
case <-db.writeGCSizeWorkerDone:
case <-timeout:
log.Error("localstore: write gc size worker did not return after db close")
}
if writeGCSize {
if err := db.writeGCSize(db.getGCSize()); err != nil {
log.Error("localstore: write gc size", "err", err)
}
}
return db.shed.Close() return db.shed.Close()
} }
...@@ -404,35 +331,6 @@ func (db *DB) po(addr chunk.Address) (bin uint8) { ...@@ -404,35 +331,6 @@ func (db *DB) po(addr chunk.Address) (bin uint8) {
return uint8(chunk.Proximity(db.baseKey, addr)) return uint8(chunk.Proximity(db.baseKey, addr))
} }
var (
// Maximal time for lockAddr to wait until it
// returns error.
addressLockTimeout = 3 * time.Second
// duration between two lock checks in lockAddr.
addressLockCheckDelay = 30 * time.Microsecond
)
// lockAddr sets the lock on a particular address
// using addressLocks sync.Map and returns unlock function.
// If the address is locked this function will check it
// in a for loop for addressLockTimeout time, after which
// it will return ErrAddressLockTimeout error.
func (db *DB) lockAddr(addr chunk.Address) (unlock func(), err error) {
start := time.Now()
lockKey := hex.EncodeToString(addr)
for {
_, loaded := db.addressLocks.LoadOrStore(lockKey, struct{}{})
if !loaded {
break
}
time.Sleep(addressLockCheckDelay)
if time.Since(start) > addressLockTimeout {
return nil, ErrAddressLockTimeout
}
}
return func() { db.addressLocks.Delete(lockKey) }, nil
}
// chunkToItem creates new Item with data provided by the Chunk. // chunkToItem creates new Item with data provided by the Chunk.
func chunkToItem(ch chunk.Chunk) shed.Item { func chunkToItem(ch chunk.Chunk) shed.Item {
return shed.Item{ return shed.Item{
......
...@@ -24,7 +24,6 @@ import ( ...@@ -24,7 +24,6 @@ import (
"os" "os"
"runtime" "runtime"
"sort" "sort"
"strconv"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -137,89 +136,6 @@ func TestDB_updateGCSem(t *testing.T) { ...@@ -137,89 +136,6 @@ func TestDB_updateGCSem(t *testing.T) {
} }
} }
// BenchmarkNew measures the time that New function
// needs to initialize and count the number of key/value
// pairs in GC index.
// This benchmark generates a number of chunks, uploads them,
// sets them to synced state for them to enter the GC index,
// and measures the execution time of New function by creating
// new databases with the same data directory.
//
// This benchmark takes significant amount of time.
//
// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show
// that New function executes around 1s for database with 1M chunks.
//
// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkNew -v -timeout 20m
// goos: darwin
// goarch: amd64
// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore
// BenchmarkNew/1000-8 200 11672414 ns/op 9570960 B/op 10008 allocs/op
// BenchmarkNew/10000-8 100 14890609 ns/op 10490118 B/op 7759 allocs/op
// BenchmarkNew/100000-8 20 58334080 ns/op 17763157 B/op 22978 allocs/op
// BenchmarkNew/1000000-8 2 748595153 ns/op 45297404 B/op 253242 allocs/op
// PASS
func BenchmarkNew(b *testing.B) {
if testing.Short() {
b.Skip("skipping benchmark in short mode")
}
for _, count := range []int{
1000,
10000,
100000,
1000000,
} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
dir, err := ioutil.TempDir("", "localstore-new-benchmark")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(dir)
baseKey := make([]byte, 32)
if _, err := rand.Read(baseKey); err != nil {
b.Fatal(err)
}
db, err := New(dir, baseKey, nil)
if err != nil {
b.Fatal(err)
}
defer db.Close()
uploader := db.NewPutter(ModePutUpload)
syncer := db.NewSetter(ModeSetSync)
for i := 0; i < count; i++ {
chunk := generateTestRandomChunk()
err := uploader.Put(chunk)
if err != nil {
b.Fatal(err)
}
err = syncer.Set(chunk.Address())
if err != nil {
b.Fatal(err)
}
}
err = db.Close()
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
b.StartTimer()
db, err := New(dir, baseKey, nil)
b.StopTimer()
if err != nil {
b.Fatal(err)
}
err = db.Close()
if err != nil {
b.Fatal(err)
}
}
})
}
}
// newTestDB is a helper function that constructs a // newTestDB is a helper function that constructs a
// temporary database and returns a cleanup function that must // temporary database and returns a cleanup function that must
// be called to remove the data. // be called to remove the data.
...@@ -411,7 +327,7 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) { ...@@ -411,7 +327,7 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
// value is the same as the number of items in DB.gcIndex. // value is the same as the number of items in DB.gcIndex.
func newIndexGCSizeTest(db *DB) func(t *testing.T) { func newIndexGCSizeTest(db *DB) func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
var want int64 var want uint64
err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
want++ want++
return return
...@@ -419,7 +335,10 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) { ...@@ -419,7 +335,10 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
got := db.getGCSize() got, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if got != want { if got != want {
t.Errorf("got gc size %v, want %v", got, want) t.Errorf("got gc size %v, want %v", got, want)
} }
......
...@@ -113,11 +113,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { ...@@ -113,11 +113,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) {
// only Address and Data fields with non zero values, // only Address and Data fields with non zero values,
// which is ensured by the get function. // which is ensured by the get function.
func (db *DB) updateGC(item shed.Item) (err error) { func (db *DB) updateGC(item shed.Item) (err error) {
unlock, err := db.lockAddr(item.Address) db.batchMu.Lock()
if err != nil { defer db.batchMu.Unlock()
return err
}
defer unlock()
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
......
...@@ -64,11 +64,8 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) { ...@@ -64,11 +64,8 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) {
// with their nil values. // with their nil values.
func (db *DB) put(mode ModePut, item shed.Item) (err error) { func (db *DB) put(mode ModePut, item shed.Item) (err error) {
// protect parallel updates // protect parallel updates
unlock, err := db.lockAddr(item.Address) db.batchMu.Lock()
if err != nil { defer db.batchMu.Unlock()
return err
}
defer unlock()
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
...@@ -116,7 +113,6 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { ...@@ -116,7 +113,6 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
db.retrievalAccessIndex.PutInBatch(batch, item) db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index // add new entry to gc index
db.gcIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item)
db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++ gcSizeChange++
db.retrievalDataIndex.PutInBatch(batch, item) db.retrievalDataIndex.PutInBatch(batch, item)
...@@ -143,12 +139,14 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { ...@@ -143,12 +139,14 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) {
return ErrInvalidMode return ErrInvalidMode
} }
err = db.shed.WriteBatch(batch) err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil { if err != nil {
return err return err
} }
if gcSizeChange != 0 {
db.incGCSize(gcSizeChange) err = db.shed.WriteBatch(batch)
if err != nil {
return err
} }
if triggerPullFeed { if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address)) db.triggerPullSubscriptions(db.po(item.Address))
......
...@@ -63,11 +63,8 @@ func (s *Setter) Set(addr chunk.Address) (err error) { ...@@ -63,11 +63,8 @@ func (s *Setter) Set(addr chunk.Address) (err error) {
// of this function for the same address in parallel. // of this function for the same address in parallel.
func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
// protect parallel updates // protect parallel updates
unlock, err := db.lockAddr(addr) db.batchMu.Lock()
if err != nil { defer db.batchMu.Unlock()
return err
}
defer unlock()
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
...@@ -113,7 +110,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { ...@@ -113,7 +110,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.pullIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item)
triggerPullFeed = true triggerPullFeed = true
db.gcIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item)
db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++ gcSizeChange++
case ModeSetSync: case ModeSetSync:
...@@ -151,7 +147,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { ...@@ -151,7 +147,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.retrievalAccessIndex.PutInBatch(batch, item) db.retrievalAccessIndex.PutInBatch(batch, item)
db.pushIndex.DeleteInBatch(batch, item) db.pushIndex.DeleteInBatch(batch, item)
db.gcIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item)
db.gcUncountedHashesIndex.PutInBatch(batch, item)
gcSizeChange++ gcSizeChange++
case modeSetRemove: case modeSetRemove:
...@@ -179,7 +174,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { ...@@ -179,7 +174,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
db.retrievalAccessIndex.DeleteInBatch(batch, item) db.retrievalAccessIndex.DeleteInBatch(batch, item)
db.pullIndex.DeleteInBatch(batch, item) db.pullIndex.DeleteInBatch(batch, item)
db.gcIndex.DeleteInBatch(batch, item) db.gcIndex.DeleteInBatch(batch, item)
db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
// a check is needed for decrementing gcSize // a check is needed for decrementing gcSize
// as delete is not reporting if the key/value pair // as delete is not reporting if the key/value pair
// is deleted or not // is deleted or not
...@@ -191,12 +185,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { ...@@ -191,12 +185,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) {
return ErrInvalidMode return ErrInvalidMode
} }
err = db.shed.WriteBatch(batch) err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil { if err != nil {
return err return err
} }
if gcSizeChange != 0 {
db.incGCSize(gcSizeChange) err = db.shed.WriteBatch(batch)
if err != nil {
return err
} }
if triggerPullFeed { if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address)) db.triggerPullSubscriptions(db.po(item.Address))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment