Commit 890ffa05 authored by Péter Szilágyi's avatar Péter Szilágyi Committed by GitHub

Merge pull request #3189 from fjl/leveldb-update

Update goleveldb, add debug.chaindbCompact
parents 89014b45 b930baa5
{
"ImportPath": "github.com/ethereum/go-ethereum",
"GoVersion": "go1.5.2",
"GoVersion": "go1.7",
"GodepVersion": "v74",
"Packages": [
"./..."
......@@ -110,6 +110,10 @@
"ImportPath": "github.com/rcrowley/go-metrics",
"Rev": "51425a2415d21afadfd55cd93432c0bc69e9598d"
},
{
"ImportPath": "github.com/rcrowley/go-metrics/exp",
"Rev": "51425a2415d21afadfd55cd93432c0bc69e9598d"
},
{
"ImportPath": "github.com/rjeczalik/notify",
"Rev": "f627deca7a510d96f0ef9388f2d0e8b16d21f87f"
......@@ -152,51 +156,51 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/cache",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/comparer",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/errors",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/filter",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/iterator",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/journal",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/memdb",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/opt",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/storage",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/table",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb/util",
"Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e"
"Rev": "6b4daa5362b502898ddf367c5c11deb9e7a5c727"
},
{
"ImportPath": "golang.org/x/crypto/pbkdf2",
......
......@@ -16,7 +16,7 @@ import (
)
// Cacher provides interface to implements a caching functionality.
// An implementation must be goroutine-safe.
// An implementation must be safe for concurrent use.
type Cacher interface {
// Capacity returns cache capacity.
Capacity() int
......@@ -511,18 +511,12 @@ func (r *Cache) EvictAll() {
}
}
// Close closes the 'cache map' and releases all 'cache node'.
// Close closes the 'cache map' and forcefully releases all 'cache node'.
func (r *Cache) Close() error {
r.mu.Lock()
if !r.closed {
r.closed = true
if r.cacher != nil {
if err := r.cacher.Close(); err != nil {
return err
}
}
h := (*mNode)(r.mHead)
h.initBuckets()
......@@ -541,10 +535,37 @@ func (r *Cache) Close() error {
for _, f := range n.onDel {
f()
}
n.onDel = nil
}
}
}
r.mu.Unlock()
// Avoid deadlock.
if r.cacher != nil {
if err := r.cacher.Close(); err != nil {
return err
}
}
return nil
}
// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
// unlike Close it doesn't forcefully releases 'cache node'.
func (r *Cache) CloseWeak() error {
r.mu.Lock()
if !r.closed {
r.closed = true
}
r.mu.Unlock()
// Avoid deadlock.
if r.cacher != nil {
r.cacher.EvictAll()
if err := r.cacher.Close(); err != nil {
return err
}
}
return nil
}
......
......@@ -6,7 +6,9 @@
package leveldb
import "github.com/syndtr/goleveldb/leveldb/comparer"
import (
"github.com/syndtr/goleveldb/leveldb/comparer"
)
type iComparer struct {
ucmp comparer.Comparer
......@@ -33,12 +35,12 @@ func (icmp *iComparer) Name() string {
}
func (icmp *iComparer) Compare(a, b []byte) int {
x := icmp.ucmp.Compare(internalKey(a).ukey(), internalKey(b).ukey())
x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey())
if x == 0 {
if m, n := internalKey(a).num(), internalKey(b).num(); m > n {
x = -1
return -1
} else if m < n {
x = 1
return 1
}
}
return x
......@@ -46,30 +48,20 @@ func (icmp *iComparer) Compare(a, b []byte) int {
func (icmp *iComparer) Separator(dst, a, b []byte) []byte {
ua, ub := internalKey(a).ukey(), internalKey(b).ukey()
dst = icmp.ucmp.Separator(dst, ua, ub)
if dst == nil {
return nil
dst = icmp.uSeparator(dst, ua, ub)
if dst != nil && len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 {
// Append earliest possible number.
return append(dst, keyMaxNumBytes...)
}
if len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 {
dst = append(dst, keyMaxNumBytes...)
} else {
// Did not close possibilities that n maybe longer than len(ub).
dst = append(dst, a[len(a)-8:]...)
}
return dst
return nil
}
func (icmp *iComparer) Successor(dst, b []byte) []byte {
ub := internalKey(b).ukey()
dst = icmp.ucmp.Successor(dst, ub)
if dst == nil {
return nil
}
if len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 {
dst = append(dst, keyMaxNumBytes...)
} else {
// Did not close possibilities that n maybe longer than len(ub).
dst = append(dst, b[len(b)-8:]...)
dst = icmp.uSuccessor(dst, ub)
if dst != nil && len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 {
// Append earliest possible number.
return append(dst, keyMaxNumBytes...)
}
return dst
return nil
}
......@@ -53,14 +53,13 @@ type DB struct {
aliveSnaps, aliveIters int32
// Write.
writeC chan *Batch
batchPool sync.Pool
writeMergeC chan writeMerge
writeMergedC chan bool
writeLockC chan struct{}
writeAckC chan error
writeDelay time.Duration
writeDelayN int
journalC chan *Batch
journalAckC chan error
tr *Transaction
// Compaction.
......@@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) {
// Snapshot
snapsList: list.New(),
// Write
writeC: make(chan *Batch),
batchPool: sync.Pool{New: newBatch},
writeMergeC: make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
journalC: make(chan *Batch),
journalAckC: make(chan error),
// Compaction
tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}),
......@@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) {
if readOnly {
db.SetReadOnly()
} else {
db.closeW.Add(3)
db.closeW.Add(2)
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
// go db.jWriter()
}
s.logf("db@open done T·%v", time.Since(start))
......@@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) {
// os.ErrExist error.
//
// Open will return an error with type of ErrCorrupted if corruption
// detected in the DB. Corrupted DB can be recovered with Recover
// function.
// detected in the DB. Use errors.IsCorrupted to test whether an error is
// due to corruption. Corrupted DB can be recovered with Recover function.
//
// The returned DB instance is goroutine-safe.
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
......@@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// os.ErrExist error.
//
// OpenFile uses standard file-system backed storage implementation as
// desribed in the leveldb/storage package.
// described in the leveldb/storage package.
//
// OpenFile will return an error with type of ErrCorrupted if corruption
// detected in the DB. Corrupted DB can be recovered with Recover
// function.
// detected in the DB. Use errors.IsCorrupted to test whether an error is
// due to corruption. Corrupted DB can be recovered with Recover function.
//
// The returned DB instance is goroutine-safe.
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, o.GetReadOnly())
......@@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
// The returned DB instance is goroutine-safe.
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
......@@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
// RecoverFile uses standard file-system backed storage implementation as desribed
// RecoverFile uses standard file-system backed storage implementation as described
// in the leveldb/storage package.
//
// The returned DB instance is goroutine-safe.
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, false)
......@@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error {
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
jr *journal.Reader
mdb = memdb.New(db.s.icmp, writeBuffer)
buf = &util.Buffer{}
batch = &Batch{}
jr *journal.Reader
mdb = memdb.New(db.s.icmp, writeBuffer)
buf = &util.Buffer{}
batchSeq uint64
batchLen int
)
for _, fd := range fds {
......@@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error {
}
// Flush memdb and remove obsolete journal file.
if !ofd.Nil() {
if !ofd.Zero() {
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
......@@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error {
fr.Close()
return errors.SetFd(err, fd)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
if err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
......@@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error {
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
db.seq = batchSeq + uint64(batchLen)
// Flush it if large enough.
if mdb.Size() >= writeBuffer {
......@@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error {
}
// Remove the last obsolete journal file.
if !ofd.Nil() {
if !ofd.Zero() {
db.s.stor.Remove(ofd)
}
......@@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error {
db.logf("journal@recovery RO·Mode F·%d", len(fds))
var (
jr *journal.Reader
buf = &util.Buffer{}
batch = &Batch{}
jr *journal.Reader
buf = &util.Buffer{}
batchSeq uint64
batchLen int
)
for _, fd := range fds {
......@@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error {
fr.Close()
return errors.SetFd(err, fd)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
if err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
......@@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error {
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
db.seq = batchSeq + uint64(batchLen)
}
fr.Close()
......@@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// NewIterator returns an iterator for the latest snapshot of the
// underlying DB.
// The returned iterator is not goroutine-safe, but it is safe to use
// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
......@@ -1062,6 +1064,8 @@ func (db *DB) Close() error {
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
db.journal = nil
db.journalWriter = nil
}
if db.writeDelayN > 0 {
......@@ -1077,15 +1081,11 @@ func (db *DB) Close() error {
if err1 := db.closer.Close(); err == nil {
err = err1
}
db.closer = nil
}
// NIL'ing pointers.
db.s = nil
db.mem = nil
db.frozenMem = nil
db.journal = nil
db.journalWriter = nil
db.closer = nil
// Clear memdbs.
db.clearMems()
return err
}
......@@ -96,7 +96,7 @@ noerr:
default:
goto haserr
}
case _, _ = <-db.closeC:
case <-db.closeC:
return
}
}
......@@ -113,7 +113,7 @@ haserr:
goto hasperr
default:
}
case _, _ = <-db.closeC:
case <-db.closeC:
return
}
}
......@@ -126,7 +126,7 @@ hasperr:
case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through.
db.compWriteLocking = true
case _, _ = <-db.closeC:
case <-db.closeC:
if db.compWriteLocking {
// We should release the lock or Close will hang.
<-db.writeLockC
......@@ -195,7 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
db.logf("%s exiting (persistent error %q)", name, perr)
db.compactionExitTransact()
}
case _, _ = <-db.closeC:
case <-db.closeC:
db.logf("%s exiting", name)
db.compactionExitTransact()
}
......@@ -224,7 +224,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
}
select {
case <-backoffT.C:
case _, _ = <-db.closeC:
case <-db.closeC:
db.logf("%s exiting", name)
db.compactionExitTransact()
}
......@@ -288,7 +288,7 @@ func (db *DB) memCompaction() {
case <-db.compPerErrC:
close(resumeC)
resumeC = nil
case _, _ = <-db.closeC:
case <-db.closeC:
return
}
......@@ -337,7 +337,7 @@ func (db *DB) memCompaction() {
select {
case <-resumeC:
close(resumeC)
case _, _ = <-db.closeC:
case <-db.closeC:
return
}
}
......@@ -378,7 +378,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
select {
case ch := <-b.db.tcompPauseC:
b.db.pauseCompaction(ch)
case _, _ = <-b.db.closeC:
case <-b.db.closeC:
b.db.compactionExitTransact()
default:
}
......@@ -643,7 +643,7 @@ func (db *DB) tableNeedCompaction() bool {
func (db *DB) pauseCompaction(ch chan<- struct{}) {
select {
case ch <- struct{}{}:
case _, _ = <-db.closeC:
case <-db.closeC:
db.compactionExitTransact()
}
}
......@@ -697,14 +697,14 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
case compC <- cAuto{ch}:
case err = <-db.compErrC:
return
case _, _ = <-db.closeC:
case <-db.closeC:
return ErrClosed
}
// Wait cmd.
select {
case err = <-ch:
case err = <-db.compErrC:
case _, _ = <-db.closeC:
case <-db.closeC:
return ErrClosed
}
return err
......@@ -719,14 +719,14 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e
case compC <- cRange{level, min, max, ch}:
case err := <-db.compErrC:
return err
case _, _ = <-db.closeC:
case <-db.closeC:
return ErrClosed
}
// Wait cmd.
select {
case err = <-ch:
case err = <-db.compErrC:
case _, _ = <-db.closeC:
case <-db.closeC:
return ErrClosed
}
return err
......@@ -758,7 +758,7 @@ func (db *DB) mCompaction() {
default:
panic("leveldb: unknown command")
}
case _, _ = <-db.closeC:
case <-db.closeC:
return
}
}
......@@ -791,7 +791,7 @@ func (db *DB) tCompaction() {
case ch := <-db.tcompPauseC:
db.pauseCompaction(ch)
continue
case _, _ = <-db.closeC:
case <-db.closeC:
return
default:
}
......@@ -806,7 +806,7 @@ func (db *DB) tCompaction() {
case ch := <-db.tcompPauseC:
db.pauseCompaction(ch)
continue
case _, _ = <-db.closeC:
case <-db.closeC:
return
}
}
......
......@@ -59,7 +59,7 @@ func (db *DB) releaseSnapshot(se *snapshotElement) {
}
}
// Gets minimum sequence that not being snapshoted.
// Gets minimum sequence that not being snapshotted.
func (db *DB) minSeq() uint64 {
db.snapsMu.Lock()
defer db.snapsMu.Unlock()
......@@ -131,7 +131,7 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
}
// NewIterator returns an iterator for the snapshot of the underlying DB.
// The returned iterator is not goroutine-safe, but it is safe to use
// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
......
......@@ -67,12 +67,11 @@ func (db *DB) sampleSeek(ikey internalKey) {
}
func (db *DB) mpoolPut(mem *memdb.DB) {
defer func() {
recover()
}()
select {
case db.memPool <- mem:
default:
if !db.isClosed() {
select {
case db.memPool <- mem:
default:
}
}
}
......@@ -100,7 +99,13 @@ func (db *DB) mpoolDrain() {
case <-db.memPool:
default:
}
case _, _ = <-db.closeC:
case <-db.closeC:
ticker.Stop()
// Make sure the pool is drained.
select {
case <-db.memPool:
case <-time.After(time.Second):
}
close(db.memPool)
return
}
......@@ -148,24 +153,26 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
func (db *DB) getMems() (e, f *memDB) {
db.memMu.RLock()
defer db.memMu.RUnlock()
if db.mem == nil {
if db.mem != nil {
db.mem.incref()
} else if !db.isClosed() {
panic("nil effective mem")
}
db.mem.incref()
if db.frozenMem != nil {
db.frozenMem.incref()
}
return db.mem, db.frozenMem
}
// Get frozen memdb.
// Get effective memdb.
func (db *DB) getEffectiveMem() *memDB {
db.memMu.RLock()
defer db.memMu.RUnlock()
if db.mem == nil {
if db.mem != nil {
db.mem.incref()
} else if !db.isClosed() {
panic("nil effective mem")
}
db.mem.incref()
return db.mem
}
......@@ -200,6 +207,14 @@ func (db *DB) dropFrozenMem() {
db.memMu.Unlock()
}
// Clear mems ptr; used by DB.Close().
func (db *DB) clearMems() {
db.memMu.Lock()
db.mem = nil
db.frozenMem = nil
db.memMu.Unlock()
}
// Set closed flag; return true if not already closed.
func (db *DB) setClosed() bool {
return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
......
......@@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
}
// NewIterator returns an iterator for the latest snapshot of the transaction.
// The returned iterator is not goroutine-safe, but it is safe to use multiple
// iterators concurrently, with each in a dedicated goroutine.
// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently while writes to the
// transaction. The resultant key/value pairs are guaranteed to be consistent.
//
......@@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
if tr.closed {
return errTransactionDone
}
return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
return tr.put(kt, key, value)
return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
return tr.put(kt, k, v)
})
}
......@@ -179,7 +179,8 @@ func (tr *Transaction) setDone() {
<-tr.db.writeLockC
}
// Commit commits the transaction.
// Commit commits the transaction. If error is not nil, then the transaction is
// not committed, it can then either be retried or discarded.
//
// Other methods should not be called after transaction has been committed.
func (tr *Transaction) Commit() error {
......@@ -192,24 +193,27 @@ func (tr *Transaction) Commit() error {
if tr.closed {
return errTransactionDone
}
defer tr.setDone()
if err := tr.flush(); err != nil {
tr.discard()
// Return error, lets user decide either to retry or discard
// transaction.
return err
}
if len(tr.tables) != 0 {
// Committing transaction.
tr.rec.setSeqNum(tr.seq)
tr.db.compCommitLk.Lock()
defer tr.db.compCommitLk.Unlock()
tr.stats.startTimer()
var cerr error
for retry := 0; retry < 3; retry++ {
if err := tr.db.s.commit(&tr.rec); err != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, err)
cerr = tr.db.s.commit(&tr.rec)
if cerr != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select {
case <-time.After(time.Second):
case _, _ = <-tr.db.closeC:
case <-tr.db.closeC:
tr.db.logf("transaction@commit exiting")
return err
tr.db.compCommitLk.Unlock()
return cerr
}
} else {
// Success. Set db.seq.
......@@ -217,9 +221,26 @@ func (tr *Transaction) Commit() error {
break
}
}
tr.stats.stopTimer()
if cerr != nil {
// Return error, lets user decide either to retry or discard
// transaction.
return cerr
}
// Update compaction stats. This is safe as long as we hold compCommitLk.
tr.db.compStats.addStat(0, &tr.stats)
// Trigger table auto-compaction.
tr.db.compTrigger(tr.db.tcompCmdC)
tr.db.compCommitLk.Unlock()
// Additionally, wait compaction when certain threshold reached.
// Ignore error, returns error only if transaction can't be committed.
tr.db.waitCompaction()
}
// Only mark as done if transaction committed successfully.
tr.setDone()
return nil
}
......@@ -245,10 +266,20 @@ func (tr *Transaction) Discard() {
tr.lk.Unlock()
}
func (db *DB) waitCompaction() error {
if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
return db.compTriggerWait(db.tcompCmdC)
}
return nil
}
// OpenTransaction opens an atomic DB transaction. Only one transaction can be
// opened at a time. Write will be blocked until the transaction is committed or
// discarded.
// The returned transaction handle is goroutine-safe.
// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
// until in-flight transaction is committed or discarded.
// The returned transaction handle is safe for concurrent use.
//
// Transaction is expensive and can overwhelm compaction, especially if
// transaction size is small. Use with caution.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
......@@ -263,7 +294,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC:
return nil, err
case _, _ = <-db.closeC:
case <-db.closeC:
return nil, ErrClosed
}
......@@ -278,6 +309,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
}
}
// Wait compaction when certain threshold reached.
if err := db.waitCompaction(); err != nil {
return nil, err
}
tr := &Transaction{
db: db,
seq: db.seq,
......
......@@ -62,7 +62,7 @@ func (db *DB) checkAndCleanFiles() error {
case storage.TypeManifest:
keep = fd.Num >= db.s.manifestFd.Num
case storage.TypeJournal:
if !db.frozenJournalFd.Nil() {
if !db.frozenJournalFd.Zero() {
keep = fd.Num >= db.frozenJournalFd.Num
} else {
keep = fd.Num >= db.journalFd.Num
......
......@@ -10,6 +10,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/errors"
)
// Common errors.
var (
ErrNotFound = errors.ErrNotFound
ErrReadOnly = errors.New("leveldb: read-only mode")
......
......@@ -15,6 +15,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
// Common errors.
var (
ErrNotFound = New("leveldb: not found")
ErrReleased = util.ErrReleased
......@@ -34,11 +35,10 @@ type ErrCorrupted struct {
}
func (e *ErrCorrupted) Error() string {
if !e.Fd.Nil() {
if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
} else {
return e.Err.Error()
}
return e.Err.Error()
}
// NewErrCorrupted creates new ErrCorrupted error.
......
......@@ -21,13 +21,13 @@ var (
// IteratorSeeker is the interface that wraps the 'seeks method'.
type IteratorSeeker interface {
// First moves the iterator to the first key/value pair. If the iterator
// only contains one key/value pair then First and Last whould moves
// only contains one key/value pair then First and Last would moves
// to the same key/value pair.
// It returns whether such pair exist.
First() bool
// Last moves the iterator to the last key/value pair. If the iterator
// only contains one key/value pair then First and Last whould moves
// only contains one key/value pair then First and Last would moves
// to the same key/value pair.
// It returns whether such pair exist.
Last() bool
......@@ -48,7 +48,7 @@ type IteratorSeeker interface {
Prev() bool
}
// CommonIterator is the interface that wraps common interator methods.
// CommonIterator is the interface that wraps common iterator methods.
type CommonIterator interface {
IteratorSeeker
......@@ -71,14 +71,15 @@ type CommonIterator interface {
// Iterator iterates over a DB's key/value pairs in key order.
//
// When encouter an error any 'seeks method' will return false and will
// When encounter an error any 'seeks method' will return false and will
// yield no key/value pairs. The error can be queried by calling the Error
// method. Calling Release is still necessary.
//
// An iterator must be released after use, but it is not necessary to read
// an iterator until exhaustion.
// Also, an iterator is not necessarily goroutine-safe, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// Also, an iterator is not necessarily safe for concurrent use, but it is
// safe to use multiple iterators concurrently, with each in a dedicated
// goroutine.
type Iterator interface {
CommonIterator
......@@ -98,7 +99,7 @@ type Iterator interface {
//
// ErrorCallbackSetter implemented by indexed and merged iterator.
type ErrorCallbackSetter interface {
// SetErrorCallback allows set an error callback of the coresponding
// SetErrorCallback allows set an error callback of the corresponding
// iterator. Use nil to clear the callback.
SetErrorCallback(f func(err error))
}
......
......@@ -180,34 +180,37 @@ func (r *Reader) nextChunk(first bool) error {
checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
chunkType := r.buf[r.j+6]
unprocBlock := r.n - r.j
if checksum == 0 && length == 0 && chunkType == 0 {
// Drop entire block.
m := r.n - r.j
r.i = r.n
r.j = r.n
return r.corrupt(m, "zero header", false)
} else {
m := r.n - r.j
r.i = r.j + headerSize
r.j = r.j + headerSize + int(length)
if r.j > r.n {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(m, "chunk length overflows block", false)
} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(m, "checksum mismatch", false)
}
return r.corrupt(unprocBlock, "zero header", false)
}
if chunkType < fullChunkType || chunkType > lastChunkType {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
}
r.i = r.j + headerSize
r.j = r.j + headerSize + int(length)
if r.j > r.n {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, "chunk length overflows block", false)
} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, "checksum mismatch", false)
}
if first && chunkType != fullChunkType && chunkType != firstChunkType {
m := r.j - r.i
chunkLength := (r.j - r.i) + headerSize
r.i = r.j
// Report the error, but skip it.
return r.corrupt(m+headerSize, "orphan chunk", true)
return r.corrupt(chunkLength, "orphan chunk", true)
}
r.last = chunkType == fullChunkType || chunkType == lastChunkType
return nil
......
......@@ -37,14 +37,14 @@ func (kt keyType) String() string {
case keyTypeVal:
return "v"
}
return "x"
return fmt.Sprintf("<invalid:%#x>", uint(kt))
}
// Value types encoded as the last component of internal keys.
// Don't modify; this value are saved to disk.
const (
keyTypeDel keyType = iota
keyTypeVal
keyTypeDel = keyType(0)
keyTypeVal = keyType(1)
)
// keyTypeSeek defines the keyType that should be passed when constructing an
......@@ -79,11 +79,7 @@ func makeInternalKey(dst, ukey []byte, seq uint64, kt keyType) internalKey {
panic("leveldb: invalid type")
}
if n := len(ukey) + 8; cap(dst) < n {
dst = make([]byte, n)
} else {
dst = dst[:n]
}
dst = ensureBuffer(dst, len(ukey)+8)
copy(dst, ukey)
binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt))
return internalKey(dst)
......@@ -143,5 +139,5 @@ func (ik internalKey) String() string {
if ukey, seq, kt, err := parseInternalKey(ik); err == nil {
return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq)
}
return "<invalid>"
return fmt.Sprintf("<invalid:%#x>", []byte(ik))
}
......@@ -17,6 +17,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
// Common errors.
var (
ErrNotFound = errors.ErrNotFound
ErrIterReleased = errors.New("leveldb/memdb: iterator released")
......@@ -385,7 +386,7 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) {
}
// NewIterator returns an iterator of the DB.
// The returned iterator is not goroutine-safe, but it is safe to use
// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. However, the resultant key/value pairs are not guaranteed
......@@ -411,7 +412,7 @@ func (p *DB) Capacity() int {
}
// Size returns sum of keys and values length. Note that deleted
// key/value will not be accouted for, but it will still consume
// key/value will not be accounted for, but it will still consume
// the buffer, since the buffer is append only.
func (p *DB) Size() int {
p.mu.RLock()
......@@ -453,11 +454,14 @@ func (p *DB) Reset() {
p.mu.Unlock()
}
// New creates a new initalized in-memory key/value DB. The capacity
// New creates a new initialized in-memory key/value DB. The capacity
// is the initial key/value buffer capacity. The capacity is advisory,
// not enforced.
//
// The returned DB instance is goroutine-safe.
// This DB is append-only, deleting an entry would remove entry node but not
// reclaim KV buffer.
//
// The returned DB instance is safe for concurrent use.
func New(cmp comparer.BasicComparer, capacity int) *DB {
p := &DB{
cmp: cmp,
......
......@@ -312,6 +312,11 @@ type Options struct {
// The default is false.
NoSync bool
// NoWriteMerge allows disabling write merge.
//
// The default is false.
NoWriteMerge bool
// OpenFilesCacher provides cache algorithm for open files caching.
// Specify NoCacher to disable caching algorithm.
//
......@@ -543,6 +548,13 @@ func (o *Options) GetNoSync() bool {
return o.NoSync
}
func (o *Options) GetNoWriteMerge() bool {
if o == nil {
return false
}
return o.NoWriteMerge
}
func (o *Options) GetOpenFilesCacher() Cacher {
if o == nil || o.OpenFilesCacher == nil {
return DefaultOpenFilesCacher
......@@ -629,6 +641,11 @@ func (ro *ReadOptions) GetStrict(strict Strict) bool {
// WriteOptions holds the optional parameters for 'write operation'. The
// 'write operation' includes Write, Put and Delete.
type WriteOptions struct {
// NoWriteMerge allows disabling write merge.
//
// The default is false.
NoWriteMerge bool
// Sync is whether to sync underlying writes from the OS buffer cache
// through to actual disk, if applicable. Setting Sync can result in
// slower writes.
......@@ -644,6 +661,13 @@ type WriteOptions struct {
Sync bool
}
func (wo *WriteOptions) GetNoWriteMerge() bool {
if wo == nil {
return false
}
return wo.NoWriteMerge
}
func (wo *WriteOptions) GetSync() bool {
if wo == nil {
return false
......
......@@ -18,7 +18,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage"
)
// ErrManifestCorrupted records manifest corruption.
// ErrManifestCorrupted records manifest corruption. This error will be
// wrapped with errors.ErrCorrupted.
type ErrManifestCorrupted struct {
Field string
Reason string
......@@ -42,7 +43,7 @@ type session struct {
stSeqNum uint64 // last mem compacted seq; need external synchronization
stor storage.Storage
storLock storage.Lock
storLock storage.Locker
o *cachedOptions
icmp *iComparer
tops *tOps
......@@ -87,12 +88,12 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
s.stVersion = nil
s.setVersion(&version{s: s, closing: true})
}
// Release session lock.
func (s *session) release() {
s.storLock.Release()
s.storLock.Unlock()
}
// Create a new database session; need external synchronization.
......
......@@ -50,6 +50,12 @@ func (s *session) version() *version {
return s.stVersion
}
func (s *session) tLen(level int) int {
s.vmu.Lock()
defer s.vmu.Unlock()
return s.stVersion.tLen(level)
}
// Set current version to v.
func (s *session) setVersion(v *version) {
s.vmu.Lock()
......@@ -197,7 +203,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
if s.manifestWriter != nil {
s.manifestWriter.Close()
}
if !s.manifestFd.Nil() {
if !s.manifestFd.Zero() {
s.stor.Remove(s.manifestFd)
}
s.manifestFd = fd
......
......@@ -32,7 +32,7 @@ type fileStorageLock struct {
fs *fileStorage
}
func (lock *fileStorageLock) Release() {
func (lock *fileStorageLock) Unlock() {
if lock.fs != nil {
lock.fs.mu.Lock()
defer lock.fs.mu.Unlock()
......@@ -116,7 +116,7 @@ func OpenFile(path string, readOnly bool) (Storage, error) {
return fs, nil
}
func (fs *fileStorage) Lock() (Lock, error) {
func (fs *fileStorage) Lock() (Locker, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
......@@ -323,7 +323,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
}
}
// Don't remove any files if there is no valid CURRENT file.
if fd.Nil() {
if fd.Zero() {
if cerr != nil {
err = cerr
} else {
......
......@@ -18,7 +18,7 @@ type memStorageLock struct {
ms *memStorage
}
func (lock *memStorageLock) Release() {
func (lock *memStorageLock) Unlock() {
ms := lock.ms
ms.mu.Lock()
defer ms.mu.Unlock()
......@@ -43,7 +43,7 @@ func NewMemStorage() Storage {
}
}
func (ms *memStorage) Lock() (Lock, error) {
func (ms *memStorage) Lock() (Locker, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.slock != nil {
......@@ -69,7 +69,7 @@ func (ms *memStorage) SetMeta(fd FileDesc) error {
func (ms *memStorage) GetMeta() (FileDesc, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.meta.Nil() {
if ms.meta.Zero() {
return FileDesc{}, os.ErrNotExist
}
return ms.meta, nil
......@@ -78,7 +78,7 @@ func (ms *memStorage) GetMeta() (FileDesc, error) {
func (ms *memStorage) List(ft FileType) ([]FileDesc, error) {
ms.mu.Lock()
var fds []FileDesc
for x, _ := range ms.files {
for x := range ms.files {
fd := unpackFile(x)
if fd.Type&ft != 0 {
fds = append(fds, fd)
......
......@@ -11,12 +11,12 @@ import (
"errors"
"fmt"
"io"
"github.com/syndtr/goleveldb/leveldb/util"
)
// FileType represent a file type.
type FileType int
// File types.
const (
TypeManifest FileType = 1 << iota
TypeJournal
......@@ -40,6 +40,7 @@ func (t FileType) String() string {
return fmt.Sprintf("<unknown:%d>", t)
}
// Common error.
var (
ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument")
ErrLocked = errors.New("leveldb/storage: already locked")
......@@ -55,11 +56,10 @@ type ErrCorrupted struct {
}
func (e *ErrCorrupted) Error() string {
if !e.Fd.Nil() {
if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
} else {
return e.Err.Error()
}
return e.Err.Error()
}
// Syncer is the interface that wraps basic Sync method.
......@@ -83,11 +83,12 @@ type Writer interface {
Syncer
}
type Lock interface {
util.Releaser
// Locker is the interface that wraps Unlock method.
type Locker interface {
Unlock()
}
// FileDesc is a file descriptor.
// FileDesc is a 'file descriptor'.
type FileDesc struct {
Type FileType
Num int64
......@@ -108,12 +109,12 @@ func (fd FileDesc) String() string {
}
}
// Nil returns true if fd == (FileDesc{}).
func (fd FileDesc) Nil() bool {
// Zero returns true if fd == (FileDesc{}).
func (fd FileDesc) Zero() bool {
return fd == (FileDesc{})
}
// FileDescOk returns true if fd is a valid file descriptor.
// FileDescOk returns true if fd is a valid 'file descriptor'.
func FileDescOk(fd FileDesc) bool {
switch fd.Type {
case TypeManifest:
......@@ -126,43 +127,44 @@ func FileDescOk(fd FileDesc) bool {
return fd.Num >= 0
}
// Storage is the storage. A storage instance must be goroutine-safe.
// Storage is the storage. A storage instance must be safe for concurrent use.
type Storage interface {
// Lock locks the storage. Any subsequent attempt to call Lock will fail
// until the last lock released.
// After use the caller should call the Release method.
Lock() (Lock, error)
// Caller should call Unlock method after use.
Lock() (Locker, error)
// Log logs a string. This is used for logging.
// An implementation may write to a file, stdout or simply do nothing.
Log(str string)
// SetMeta sets to point to the given fd, which then can be acquired using
// GetMeta method.
// SetMeta should be implemented in such way that changes should happened
// SetMeta store 'file descriptor' that can later be acquired using GetMeta
// method. The 'file descriptor' should point to a valid file.
// SetMeta should be implemented in such way that changes should happen
// atomically.
SetMeta(fd FileDesc) error
// GetManifest returns a manifest file.
// Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd
// that doesn't exist.
// GetMeta returns 'file descriptor' stored in meta. The 'file descriptor'
// can be updated using SetMeta method.
// Returns os.ErrNotExist if meta doesn't store any 'file descriptor', or
// 'file descriptor' point to nonexistent file.
GetMeta() (FileDesc, error)
// List returns fds that match the given file types.
// List returns file descriptors that match the given file types.
// The file types may be OR'ed together.
List(ft FileType) ([]FileDesc, error)
// Open opens file with the given fd read-only.
// Open opens file with the given 'file descriptor' read-only.
// Returns os.ErrNotExist error if the file does not exist.
// Returns ErrClosed if the underlying storage is closed.
Open(fd FileDesc) (Reader, error)
// Create creates file with the given fd, truncate if already exist and
// opens write-only.
// Create creates file with the given 'file descriptor', truncate if already
// exist and opens write-only.
// Returns ErrClosed if the underlying storage is closed.
Create(fd FileDesc) (Writer, error)
// Remove removes file with the given fd.
// Remove removes file with the given 'file descriptor'.
// Returns ErrClosed if the underlying storage is closed.
Remove(fd FileDesc) error
......
......@@ -434,7 +434,7 @@ func (t *tOps) close() {
t.bpool.Close()
t.cache.Close()
if t.bcache != nil {
t.bcache.Close()
t.bcache.CloseWeak()
}
}
......
......@@ -26,12 +26,15 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
// Reader errors.
var (
ErrNotFound = errors.ErrNotFound
ErrReaderReleased = errors.New("leveldb/table: reader released")
ErrIterReleased = errors.New("leveldb/table: iterator released")
)
// ErrCorrupted describes error due to corruption. This error will be wrapped
// with errors.ErrCorrupted.
type ErrCorrupted struct {
Pos int64
Size int64
......@@ -61,7 +64,7 @@ type block struct {
func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
offset += 1 // shared always zero, since this is a restart point
offset++ // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(b.data[offset:]) // key length
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
......@@ -356,7 +359,7 @@ func (i *blockIter) Prev() bool {
i.value = nil
offset := i.block.restartOffset(ri)
if offset == i.offset {
ri -= 1
ri--
if ri < 0 {
i.dir = dirSOI
return false
......@@ -783,8 +786,8 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe
// table. And a nil Range.Limit is treated as a key after all keys in
// the table.
//
// The returned iterator is not goroutine-safe and should be released
// when not used.
// The returned iterator is not safe for concurrent use and should be released
// after use.
//
// Also read Iterator documentation of the leveldb/iterator package.
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
......@@ -826,18 +829,21 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release()
if !index.Seek(key) {
err = index.Error()
if err == nil {
if err = index.Error(); err == nil {
err = ErrNotFound
}
return
}
dataBH, n := decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return
return nil, nil, r.err
}
// The filter should only used for exact match.
if filtered && r.filter != nil {
filterBlock, frel, ferr := r.getFilterBlock(true)
if ferr == nil {
......@@ -847,30 +853,53 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
}
frel.Release()
} else if !errors.IsCorrupted(ferr) {
err = ferr
return
return nil, nil, ferr
}
}
data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
defer data.Release()
if !data.Seek(key) {
err = data.Error()
if err == nil {
err = ErrNotFound
data.Release()
if err = data.Error(); err != nil {
return
}
// The nearest greater-than key is the first key of the next block.
if !index.Next() {
if err = index.Error(); err == nil {
err = ErrNotFound
}
return
}
dataBH, n = decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return nil, nil, r.err
}
data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
if !data.Next() {
data.Release()
if err = data.Error(); err == nil {
err = ErrNotFound
}
return
}
return
}
// Don't use block buffer, no need to copy the buffer.
// Key doesn't use block buffer, no need to copy the buffer.
rkey = data.Key()
if !noValue {
if r.bpool == nil {
value = data.Value()
} else {
// Use block buffer, and since the buffer will be recycled, the buffer
// need to be copied.
// Value does use block buffer, and since the buffer will be
// recycled, it need to be copied.
value = append([]byte{}, data.Value()...)
}
}
data.Release()
return
}
......@@ -888,7 +917,7 @@ func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, val
return r.find(key, filtered, ro, false)
}
// Find finds key that is greater than or equal to the given key.
// FindKey finds key that is greater than or equal to the given key.
// It returns ErrNotFound if the table doesn't contain such key.
// If filtered is true then the nearest 'block' will be checked against
// 'filter data' (if present) and will immediately return ErrNotFound if
......@@ -987,7 +1016,7 @@ func (r *Reader) Release() {
// NewReader creates a new initialized table reader for the file.
// The fi, cache and bpool is optional and can be nil.
//
// The returned table reader instance is goroutine-safe.
// The returned table reader instance is safe for concurrent use.
func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil {
return nil, errors.New("leveldb/table: nil file")
......@@ -1039,9 +1068,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name
if errors.IsCorrupted(err) {
r.err = err
return r, nil
} else {
return nil, err
}
return nil, err
}
// Set data end.
......@@ -1086,9 +1114,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name
if errors.IsCorrupted(err) {
r.err = err
return r, nil
} else {
return nil, err
}
return nil, err
}
if r.filter != nil {
r.filterBlock, err = r.readFilterBlock(r.filterBH)
......
......@@ -349,7 +349,7 @@ func (w *Writer) Close() error {
// NewWriter creates a new initialized table writer for the file.
//
// Table writer is not goroutine-safe.
// Table writer is not safe for concurrent use.
func NewWriter(f io.Writer, o *opt.Options) *Writer {
w := &Writer{
writer: f,
......
......@@ -89,3 +89,10 @@ func (p fdSorter) Swap(i, j int) {
func sortFds(fds []storage.FileDesc) {
sort.Sort(fdSorter(fds))
}
func ensureBuffer(b []byte, n int) []byte {
if cap(b) < n {
return make([]byte, n)
}
return b[:n]
}
......@@ -34,7 +34,8 @@ type version struct {
cSeek unsafe.Pointer
ref int
closing bool
ref int
// Succeeding version.
next *version
}
......@@ -131,6 +132,10 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int
}
func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
if v.closing {
return nil, false, ErrClosed
}
ukey := ikey.ukey()
var (
......
......@@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"golang.org/x/net/context"
)
......@@ -1321,6 +1322,24 @@ func (api *PrivateDebugAPI) ChaindbProperty(property string) (string, error) {
return ldb.LDB().GetProperty(property)
}
func (api *PrivateDebugAPI) ChaindbCompact() error {
ldb, ok := api.b.ChainDb().(interface {
LDB() *leveldb.DB
})
if !ok {
return fmt.Errorf("chaindbCompact does not work for memory databases")
}
for b := byte(0); b < 255; b++ {
glog.V(logger.Info).Infof("compacting chain DB range 0x%0.2X-0x%0.2X", b, b+1)
err := ldb.LDB().CompactRange(util.Range{Start: []byte{b}, Limit: []byte{b + 1}})
if err != nil {
glog.Errorf("compaction error: %v", err)
return err
}
}
return nil
}
// SetHead rewinds the head of the blockchain to a previous block.
func (api *PrivateDebugAPI) SetHead(number rpc.HexNumber) {
api.b.SetHead(uint64(number.Int64()))
......
......@@ -337,6 +337,10 @@ web3._extend({
params: 1,
outputFormatter: console.log
}),
new web3._extend.Method({
name: 'chaindbCompact',
call: 'debug_chaindbCompact',
}),
new web3._extend.Method({
name: 'metrics',
call: 'debug_metrics',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment