Commit 356c49fa authored by Janoš Guljaš's avatar Janoš Guljaš Committed by Anton Evangelatov

swarm: Shed Index and Uint64Field additions (#18398)

parent 428eabe2
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
// more complex operations on storage data organized in fields and indexes. // more complex operations on storage data organized in fields and indexes.
// //
// Only type which holds logical information about swarm storage chunks data // Only type which holds logical information about swarm storage chunks data
// and metadata is IndexItem. This part is not generalized mostly for // and metadata is Item. This part is not generalized mostly for
// performance reasons. // performance reasons.
package shed package shed
......
...@@ -71,20 +71,20 @@ func New(path string) (s *Store, err error) { ...@@ -71,20 +71,20 @@ func New(path string) (s *Store, err error) {
} }
// Index storing actual chunk address, data and store timestamp. // Index storing actual chunk address, data and store timestamp.
s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil return fields.Address, nil
}, },
DecodeKey: func(key []byte) (e shed.IndexItem, err error) { DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key e.Address = key
return e, nil return e, nil
}, },
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8) b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
value = append(b, fields.Data...) value = append(b, fields.Data...)
return value, nil return value, nil
}, },
DecodeValue: func(value []byte) (e shed.IndexItem, err error) { DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:] e.Data = value[8:]
return e, nil return e, nil
...@@ -96,19 +96,19 @@ func New(path string) (s *Store, err error) { ...@@ -96,19 +96,19 @@ func New(path string) (s *Store, err error) {
// Index storing access timestamp for a particular address. // Index storing access timestamp for a particular address.
// It is needed in order to update gc index keys for iteration order. // It is needed in order to update gc index keys for iteration order.
s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil return fields.Address, nil
}, },
DecodeKey: func(key []byte) (e shed.IndexItem, err error) { DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key e.Address = key
return e, nil return e, nil
}, },
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8) b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
return b, nil return b, nil
}, },
DecodeValue: func(value []byte) (e shed.IndexItem, err error) { DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
return e, nil return e, nil
}, },
...@@ -118,23 +118,23 @@ func New(path string) (s *Store, err error) { ...@@ -118,23 +118,23 @@ func New(path string) (s *Store, err error) {
} }
// Index with keys ordered by access timestamp for garbage collection prioritization. // Index with keys ordered by access timestamp for garbage collection prioritization.
s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{ s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address)) b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
key = append(b, fields.Address...) key = append(b, fields.Address...)
return key, nil return key, nil
}, },
DecodeKey: func(key []byte) (e shed.IndexItem, err error) { DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
e.Address = key[16:] e.Address = key[16:]
return e, nil return e, nil
}, },
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil return nil, nil
}, },
DecodeValue: func(value []byte) (e shed.IndexItem, err error) { DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil return e, nil
}, },
}) })
...@@ -146,7 +146,7 @@ func New(path string) (s *Store, err error) { ...@@ -146,7 +146,7 @@ func New(path string) (s *Store, err error) {
// Put stores the chunk and sets it store timestamp. // Put stores the chunk and sets it store timestamp.
func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) { func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
return s.retrievalIndex.Put(shed.IndexItem{ return s.retrievalIndex.Put(shed.Item{
Address: ch.Address(), Address: ch.Address(),
Data: ch.Data(), Data: ch.Data(),
StoreTimestamp: time.Now().UTC().UnixNano(), StoreTimestamp: time.Now().UTC().UnixNano(),
...@@ -161,7 +161,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e ...@@ -161,7 +161,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
batch := new(leveldb.Batch) batch := new(leveldb.Batch)
// Get the chunk data and storage timestamp. // Get the chunk data and storage timestamp.
item, err := s.retrievalIndex.Get(shed.IndexItem{ item, err := s.retrievalIndex.Get(shed.Item{
Address: addr, Address: addr,
}) })
if err != nil { if err != nil {
...@@ -172,13 +172,13 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e ...@@ -172,13 +172,13 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
} }
// Get the chunk access timestamp. // Get the chunk access timestamp.
accessItem, err := s.accessIndex.Get(shed.IndexItem{ accessItem, err := s.accessIndex.Get(shed.Item{
Address: addr, Address: addr,
}) })
switch err { switch err {
case nil: case nil:
// Remove gc index entry if access timestamp is found. // Remove gc index entry if access timestamp is found.
err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{ err = s.gcIndex.DeleteInBatch(batch, shed.Item{
Address: item.Address, Address: item.Address,
StoreTimestamp: accessItem.AccessTimestamp, StoreTimestamp: accessItem.AccessTimestamp,
AccessTimestamp: item.StoreTimestamp, AccessTimestamp: item.StoreTimestamp,
...@@ -197,7 +197,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e ...@@ -197,7 +197,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
accessTimestamp := time.Now().UTC().UnixNano() accessTimestamp := time.Now().UTC().UnixNano()
// Put new access timestamp in access index. // Put new access timestamp in access index.
err = s.accessIndex.PutInBatch(batch, shed.IndexItem{ err = s.accessIndex.PutInBatch(batch, shed.Item{
Address: addr, Address: addr,
AccessTimestamp: accessTimestamp, AccessTimestamp: accessTimestamp,
}) })
...@@ -206,7 +206,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e ...@@ -206,7 +206,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
} }
// Put new access timestamp in gc index. // Put new access timestamp in gc index.
err = s.gcIndex.PutInBatch(batch, shed.IndexItem{ err = s.gcIndex.PutInBatch(batch, shed.Item{
Address: item.Address, Address: item.Address,
AccessTimestamp: accessTimestamp, AccessTimestamp: accessTimestamp,
StoreTimestamp: item.StoreTimestamp, StoreTimestamp: item.StoreTimestamp,
...@@ -244,7 +244,7 @@ func (s *Store) CollectGarbage() (err error) { ...@@ -244,7 +244,7 @@ func (s *Store) CollectGarbage() (err error) {
// New batch for a new cg round. // New batch for a new cg round.
trash := new(leveldb.Batch) trash := new(leveldb.Batch)
// Iterate through all index items and break when needed. // Iterate through all index items and break when needed.
err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) { err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// Remove the chunk. // Remove the chunk.
err = s.retrievalIndex.DeleteInBatch(trash, item) err = s.retrievalIndex.DeleteInBatch(trash, item)
if err != nil { if err != nil {
...@@ -265,7 +265,7 @@ func (s *Store) CollectGarbage() (err error) { ...@@ -265,7 +265,7 @@ func (s *Store) CollectGarbage() (err error) {
return true, nil return true, nil
} }
return false, nil return false, nil
}) }, nil)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -99,6 +99,44 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) { ...@@ -99,6 +99,44 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
return val, nil return val, nil
} }
// Dec decrements a uint64 value in the database.
// This operation is not goroutine save.
// The field is protected from overflow to a negative value.
func (f Uint64Field) Dec() (val uint64, err error) {
val, err = f.Get()
if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
return 0, err
}
}
if val != 0 {
val--
}
return val, f.Put(val)
}
// DecInBatch decrements a uint64 value in the batch
// by retreiving a value from the database, not the same batch.
// This operation is not goroutine save.
// The field is protected from overflow to a negative value.
func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error) {
val, err = f.Get()
if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
return 0, err
}
}
if val != 0 {
val--
}
f.PutInBatch(batch, val)
return val, nil
}
// encode transforms uint64 to 8 byte long // encode transforms uint64 to 8 byte long
// slice in big endian encoding. // slice in big endian encoding.
func encodeUint64(val uint64) (b []byte) { func encodeUint64(val uint64) (b []byte) {
......
...@@ -192,3 +192,109 @@ func TestUint64Field_IncInBatch(t *testing.T) { ...@@ -192,3 +192,109 @@ func TestUint64Field_IncInBatch(t *testing.T) {
t.Errorf("got uint64 %v, want %v", got, want) t.Errorf("got uint64 %v, want %v", got, want)
} }
} }
// TestUint64Field_Dec validates Dec operation
// of the Uint64Field.
func TestUint64Field_Dec(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()
counter, err := db.NewUint64Field("counter")
if err != nil {
t.Fatal(err)
}
// test overflow protection
var want uint64
got, err := counter.Dec()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
want = 32
err = counter.Put(want)
if err != nil {
t.Fatal(err)
}
want = 31
got, err = counter.Dec()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
}
// TestUint64Field_DecInBatch validates DecInBatch operation
// of the Uint64Field.
func TestUint64Field_DecInBatch(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()
counter, err := db.NewUint64Field("counter")
if err != nil {
t.Fatal(err)
}
batch := new(leveldb.Batch)
var want uint64
got, err := counter.DecInBatch(batch)
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
err = db.WriteBatch(batch)
if err != nil {
t.Fatal(err)
}
got, err = counter.Get()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
batch2 := new(leveldb.Batch)
want = 42
counter.PutInBatch(batch2, want)
err = db.WriteBatch(batch2)
if err != nil {
t.Fatal(err)
}
got, err = counter.Get()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
batch3 := new(leveldb.Batch)
want = 41
got, err = counter.DecInBatch(batch3)
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
err = db.WriteBatch(batch3)
if err != nil {
t.Fatal(err)
}
got, err = counter.Get()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
}
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment