Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
eb2d1687
Commit
eb2d1687
authored
Jul 09, 2015
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
eth, ethdb: fix a data race during startup/shutdown
parent
a2333bcb
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
83 additions
and
61 deletions
+83
-61
backend.go
eth/backend.go
+3
-25
database.go
ethdb/database.go
+80
-36
No files found.
eth/backend.go
View file @
eb2d1687
...
@@ -41,7 +41,6 @@ import (
...
@@ -41,7 +41,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discover"
...
@@ -267,42 +266,21 @@ func New(config *Config) (*Ethereum, error) {
...
@@ -267,42 +266,21 @@ func New(config *Config) (*Ethereum, error) {
return
nil
,
fmt
.
Errorf
(
"blockchain db err: %v"
,
err
)
return
nil
,
fmt
.
Errorf
(
"blockchain db err: %v"
,
err
)
}
}
if
db
,
ok
:=
blockDb
.
(
*
ethdb
.
LDBDatabase
);
ok
{
if
db
,
ok
:=
blockDb
.
(
*
ethdb
.
LDBDatabase
);
ok
{
db
.
GetTimer
=
metrics
.
NewTimer
(
"eth/db/block/user/gets"
)
db
.
Meter
(
"eth/db/block/"
)
db
.
PutTimer
=
metrics
.
NewTimer
(
"eth/db/block/user/puts"
)
db
.
MissMeter
=
metrics
.
NewMeter
(
"eth/db/block/user/misses"
)
db
.
ReadMeter
=
metrics
.
NewMeter
(
"eth/db/block/user/reads"
)
db
.
WriteMeter
=
metrics
.
NewMeter
(
"eth/db/block/user/writes"
)
db
.
CompTimeMeter
=
metrics
.
NewMeter
(
"eth/db/block/compact/time"
)
db
.
CompReadMeter
=
metrics
.
NewMeter
(
"eth/db/block/compact/input"
)
db
.
CompWriteMeter
=
metrics
.
NewMeter
(
"eth/db/block/compact/output"
)
}
}
stateDb
,
err
:=
newdb
(
filepath
.
Join
(
config
.
DataDir
,
"state"
))
stateDb
,
err
:=
newdb
(
filepath
.
Join
(
config
.
DataDir
,
"state"
))
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"state db err: %v"
,
err
)
return
nil
,
fmt
.
Errorf
(
"state db err: %v"
,
err
)
}
}
if
db
,
ok
:=
stateDb
.
(
*
ethdb
.
LDBDatabase
);
ok
{
if
db
,
ok
:=
stateDb
.
(
*
ethdb
.
LDBDatabase
);
ok
{
db
.
GetTimer
=
metrics
.
NewTimer
(
"eth/db/state/user/gets"
)
db
.
Meter
(
"eth/db/state/"
)
db
.
PutTimer
=
metrics
.
NewTimer
(
"eth/db/state/user/puts"
)
db
.
MissMeter
=
metrics
.
NewMeter
(
"eth/db/state/user/misses"
)
db
.
ReadMeter
=
metrics
.
NewMeter
(
"eth/db/state/user/reads"
)
db
.
WriteMeter
=
metrics
.
NewMeter
(
"eth/db/state/user/writes"
)
db
.
CompTimeMeter
=
metrics
.
NewMeter
(
"eth/db/state/compact/time"
)
db
.
CompReadMeter
=
metrics
.
NewMeter
(
"eth/db/state/compact/input"
)
db
.
CompWriteMeter
=
metrics
.
NewMeter
(
"eth/db/state/compact/output"
)
}
}
extraDb
,
err
:=
newdb
(
filepath
.
Join
(
config
.
DataDir
,
"extra"
))
extraDb
,
err
:=
newdb
(
filepath
.
Join
(
config
.
DataDir
,
"extra"
))
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"extra db err: %v"
,
err
)
return
nil
,
fmt
.
Errorf
(
"extra db err: %v"
,
err
)
}
}
if
db
,
ok
:=
extraDb
.
(
*
ethdb
.
LDBDatabase
);
ok
{
if
db
,
ok
:=
extraDb
.
(
*
ethdb
.
LDBDatabase
);
ok
{
db
.
GetTimer
=
metrics
.
NewTimer
(
"eth/db/extra/user/gets"
)
db
.
Meter
(
"eth/db/extra/"
)
db
.
PutTimer
=
metrics
.
NewTimer
(
"eth/db/extra/user/puts"
)
db
.
MissMeter
=
metrics
.
NewMeter
(
"eth/db/extra/user/misses"
)
db
.
ReadMeter
=
metrics
.
NewMeter
(
"eth/db/extra/user/reads"
)
db
.
WriteMeter
=
metrics
.
NewMeter
(
"eth/db/extra/user/writes"
)
db
.
CompTimeMeter
=
metrics
.
NewMeter
(
"eth/db/extra/compact/time"
)
db
.
CompReadMeter
=
metrics
.
NewMeter
(
"eth/db/extra/compact/input"
)
db
.
CompWriteMeter
=
metrics
.
NewMeter
(
"eth/db/extra/compact/output"
)
}
}
nodeDb
:=
filepath
.
Join
(
config
.
DataDir
,
"nodes"
)
nodeDb
:=
filepath
.
Join
(
config
.
DataDir
,
"nodes"
)
...
...
ethdb/database.go
View file @
eb2d1687
...
@@ -19,16 +19,19 @@ package ethdb
...
@@ -19,16 +19,19 @@ package ethdb
import
(
import
(
"strconv"
"strconv"
"strings"
"strings"
"sync"
"time"
"time"
"github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/
rcrowley/go-
metrics"
"github.com/
ethereum/go-ethereum/
metrics"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/opt"
gometrics
"github.com/rcrowley/go-metrics"
)
)
var
OpenFileLimit
=
64
var
OpenFileLimit
=
64
...
@@ -37,15 +40,18 @@ type LDBDatabase struct {
...
@@ -37,15 +40,18 @@ type LDBDatabase struct {
fn
string
// filename for reporting
fn
string
// filename for reporting
db
*
leveldb
.
DB
// LevelDB instance
db
*
leveldb
.
DB
// LevelDB instance
GetTimer
metrics
.
Timer
// Timer for measuring the database get request counts and latencies
getTimer
gometrics
.
Timer
// Timer for measuring the database get request counts and latencies
PutTimer
metrics
.
Timer
// Timer for measuring the database put request counts and latencies
putTimer
gometrics
.
Timer
// Timer for measuring the database put request counts and latencies
DelTimer
metrics
.
Timer
// Timer for measuring the database delete request counts and latencies
delTimer
gometrics
.
Timer
// Timer for measuring the database delete request counts and latencies
MissMeter
metrics
.
Meter
// Meter for measuring the missed database get requests
missMeter
gometrics
.
Meter
// Meter for measuring the missed database get requests
ReadMeter
metrics
.
Meter
// Meter for measuring the database get request data usage
readMeter
gometrics
.
Meter
// Meter for measuring the database get request data usage
WriteMeter
metrics
.
Meter
// Meter for measuring the database put request data usage
writeMeter
gometrics
.
Meter
// Meter for measuring the database put request data usage
CompTimeMeter
metrics
.
Meter
// Meter for measuring the total time spent in database compaction
compTimeMeter
gometrics
.
Meter
// Meter for measuring the total time spent in database compaction
CompReadMeter
metrics
.
Meter
// Meter for measuring the data read during compaction
compReadMeter
gometrics
.
Meter
// Meter for measuring the data read during compaction
CompWriteMeter
metrics
.
Meter
// Meter for measuring the data written during compaction
compWriteMeter
gometrics
.
Meter
// Meter for measuring the data written during compaction
quitLock
sync
.
Mutex
// Mutex protecting the quit channel access
quitChan
chan
chan
error
// Quit channel to stop the metrics collection before closing the database
}
}
// NewLDBDatabase returns a LevelDB wrapped object. LDBDatabase does not persist data by
// NewLDBDatabase returns a LevelDB wrapped object. LDBDatabase does not persist data by
...
@@ -54,7 +60,7 @@ type LDBDatabase struct {
...
@@ -54,7 +60,7 @@ type LDBDatabase struct {
func
NewLDBDatabase
(
file
string
)
(
*
LDBDatabase
,
error
)
{
func
NewLDBDatabase
(
file
string
)
(
*
LDBDatabase
,
error
)
{
// Open the db
// Open the db
db
,
err
:=
leveldb
.
OpenFile
(
file
,
&
opt
.
Options
{
OpenFilesCacheCapacity
:
OpenFileLimit
})
db
,
err
:=
leveldb
.
OpenFile
(
file
,
&
opt
.
Options
{
OpenFilesCacheCapacity
:
OpenFileLimit
})
// check for c
u
rruption and attempt to recover
// check for c
o
rruption and attempt to recover
if
_
,
iscorrupted
:=
err
.
(
*
errors
.
ErrCorrupted
);
iscorrupted
{
if
_
,
iscorrupted
:=
err
.
(
*
errors
.
ErrCorrupted
);
iscorrupted
{
db
,
err
=
leveldb
.
RecoverFile
(
file
,
nil
)
db
,
err
=
leveldb
.
RecoverFile
(
file
,
nil
)
}
}
...
@@ -62,26 +68,23 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
...
@@ -62,26 +68,23 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
database
:=
&
LDBDatabase
{
return
&
LDBDatabase
{
fn
:
file
,
fn
:
file
,
db
:
db
,
db
:
db
,
}
},
nil
go
database
.
meter
(
3
*
time
.
Second
)
return
database
,
nil
}
}
// Put puts the given key / value to the queue
// Put puts the given key / value to the queue
func
(
self
*
LDBDatabase
)
Put
(
key
[]
byte
,
value
[]
byte
)
error
{
func
(
self
*
LDBDatabase
)
Put
(
key
[]
byte
,
value
[]
byte
)
error
{
// Measure the database put latency, if requested
// Measure the database put latency, if requested
if
self
.
P
utTimer
!=
nil
{
if
self
.
p
utTimer
!=
nil
{
defer
self
.
P
utTimer
.
UpdateSince
(
time
.
Now
())
defer
self
.
p
utTimer
.
UpdateSince
(
time
.
Now
())
}
}
// Generate the data to write to disk, update the meter and write
// Generate the data to write to disk, update the meter and write
dat
:=
rle
.
Compress
(
value
)
dat
:=
rle
.
Compress
(
value
)
if
self
.
W
riteMeter
!=
nil
{
if
self
.
w
riteMeter
!=
nil
{
self
.
W
riteMeter
.
Mark
(
int64
(
len
(
dat
)))
self
.
w
riteMeter
.
Mark
(
int64
(
len
(
dat
)))
}
}
return
self
.
db
.
Put
(
key
,
dat
,
nil
)
return
self
.
db
.
Put
(
key
,
dat
,
nil
)
}
}
...
@@ -89,20 +92,20 @@ func (self *LDBDatabase) Put(key []byte, value []byte) error {
...
@@ -89,20 +92,20 @@ func (self *LDBDatabase) Put(key []byte, value []byte) error {
// Get returns the given key if it's present.
// Get returns the given key if it's present.
func
(
self
*
LDBDatabase
)
Get
(
key
[]
byte
)
([]
byte
,
error
)
{
func
(
self
*
LDBDatabase
)
Get
(
key
[]
byte
)
([]
byte
,
error
)
{
// Measure the database get latency, if requested
// Measure the database get latency, if requested
if
self
.
G
etTimer
!=
nil
{
if
self
.
g
etTimer
!=
nil
{
defer
self
.
G
etTimer
.
UpdateSince
(
time
.
Now
())
defer
self
.
g
etTimer
.
UpdateSince
(
time
.
Now
())
}
}
// Retrieve the key and increment the miss counter if not found
// Retrieve the key and increment the miss counter if not found
dat
,
err
:=
self
.
db
.
Get
(
key
,
nil
)
dat
,
err
:=
self
.
db
.
Get
(
key
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
if
self
.
M
issMeter
!=
nil
{
if
self
.
m
issMeter
!=
nil
{
self
.
M
issMeter
.
Mark
(
1
)
self
.
m
issMeter
.
Mark
(
1
)
}
}
return
nil
,
err
return
nil
,
err
}
}
// Otherwise update the actually retrieved amount of data
// Otherwise update the actually retrieved amount of data
if
self
.
R
eadMeter
!=
nil
{
if
self
.
r
eadMeter
!=
nil
{
self
.
R
eadMeter
.
Mark
(
int64
(
len
(
dat
)))
self
.
r
eadMeter
.
Mark
(
int64
(
len
(
dat
)))
}
}
return
rle
.
Decompress
(
dat
)
return
rle
.
Decompress
(
dat
)
}
}
...
@@ -110,8 +113,8 @@ func (self *LDBDatabase) Get(key []byte) ([]byte, error) {
...
@@ -110,8 +113,8 @@ func (self *LDBDatabase) Get(key []byte) ([]byte, error) {
// Delete deletes the key from the queue and database
// Delete deletes the key from the queue and database
func
(
self
*
LDBDatabase
)
Delete
(
key
[]
byte
)
error
{
func
(
self
*
LDBDatabase
)
Delete
(
key
[]
byte
)
error
{
// Measure the database delete latency, if requested
// Measure the database delete latency, if requested
if
self
.
D
elTimer
!=
nil
{
if
self
.
d
elTimer
!=
nil
{
defer
self
.
D
elTimer
.
UpdateSince
(
time
.
Now
())
defer
self
.
d
elTimer
.
UpdateSince
(
time
.
Now
())
}
}
// Execute the actual operation
// Execute the actual operation
return
self
.
db
.
Delete
(
key
,
nil
)
return
self
.
db
.
Delete
(
key
,
nil
)
...
@@ -127,8 +130,20 @@ func (self *LDBDatabase) Flush() error {
...
@@ -127,8 +130,20 @@ func (self *LDBDatabase) Flush() error {
}
}
func
(
self
*
LDBDatabase
)
Close
()
{
func
(
self
*
LDBDatabase
)
Close
()
{
// Stop the metrics collection to avoid internal database races
self
.
quitLock
.
Lock
()
defer
self
.
quitLock
.
Unlock
()
if
self
.
quitChan
!=
nil
{
errc
:=
make
(
chan
error
)
self
.
quitChan
<-
errc
if
err
:=
<-
errc
;
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"metrics failure in '%s': %v
\n
"
,
self
.
fn
,
err
)
}
}
// Flush and close the database
if
err
:=
self
.
Flush
();
err
!=
nil
{
if
err
:=
self
.
Flush
();
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"
error: flush '%s'
: %v
\n
"
,
self
.
fn
,
err
)
glog
.
V
(
logger
.
Error
)
.
Infof
(
"
flushing '%s' failed
: %v
\n
"
,
self
.
fn
,
err
)
}
}
self
.
db
.
Close
()
self
.
db
.
Close
()
glog
.
V
(
logger
.
Error
)
.
Infoln
(
"flushed and closed db:"
,
self
.
fn
)
glog
.
V
(
logger
.
Error
)
.
Infoln
(
"flushed and closed db:"
,
self
.
fn
)
...
@@ -138,6 +153,27 @@ func (self *LDBDatabase) LDB() *leveldb.DB {
...
@@ -138,6 +153,27 @@ func (self *LDBDatabase) LDB() *leveldb.DB {
return
self
.
db
return
self
.
db
}
}
// Meter configures the database metrics collectors and
func
(
self
*
LDBDatabase
)
Meter
(
prefix
string
)
{
// Initialize all the metrics collector at the requested prefix
self
.
getTimer
=
metrics
.
NewTimer
(
prefix
+
"user/gets"
)
self
.
putTimer
=
metrics
.
NewTimer
(
prefix
+
"user/puts"
)
self
.
delTimer
=
metrics
.
NewTimer
(
prefix
+
"user/dels"
)
self
.
missMeter
=
metrics
.
NewMeter
(
prefix
+
"user/misses"
)
self
.
readMeter
=
metrics
.
NewMeter
(
prefix
+
"user/reads"
)
self
.
writeMeter
=
metrics
.
NewMeter
(
prefix
+
"user/writes"
)
self
.
compTimeMeter
=
metrics
.
NewMeter
(
prefix
+
"compact/time"
)
self
.
compReadMeter
=
metrics
.
NewMeter
(
prefix
+
"compact/input"
)
self
.
compWriteMeter
=
metrics
.
NewMeter
(
prefix
+
"compact/output"
)
// Create a quit channel for the periodic collector and run it
self
.
quitLock
.
Lock
()
self
.
quitChan
=
make
(
chan
chan
error
)
self
.
quitLock
.
Unlock
()
go
self
.
meter
(
3
*
time
.
Second
)
}
// meter periodically retrieves internal leveldb counters and reports them to
// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
// the metrics subsystem.
//
//
...
@@ -193,16 +229,24 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
...
@@ -193,16 +229,24 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
}
}
}
}
// Update all the requested meters
// Update all the requested meters
if
self
.
C
ompTimeMeter
!=
nil
{
if
self
.
c
ompTimeMeter
!=
nil
{
self
.
C
ompTimeMeter
.
Mark
(
int64
((
counters
[
i
%
2
][
0
]
-
counters
[(
i
-
1
)
%
2
][
0
])
*
1000
*
1000
*
1000
))
self
.
c
ompTimeMeter
.
Mark
(
int64
((
counters
[
i
%
2
][
0
]
-
counters
[(
i
-
1
)
%
2
][
0
])
*
1000
*
1000
*
1000
))
}
}
if
self
.
C
ompReadMeter
!=
nil
{
if
self
.
c
ompReadMeter
!=
nil
{
self
.
C
ompReadMeter
.
Mark
(
int64
((
counters
[
i
%
2
][
1
]
-
counters
[(
i
-
1
)
%
2
][
1
])
*
1024
*
1024
))
self
.
c
ompReadMeter
.
Mark
(
int64
((
counters
[
i
%
2
][
1
]
-
counters
[(
i
-
1
)
%
2
][
1
])
*
1024
*
1024
))
}
}
if
self
.
C
ompWriteMeter
!=
nil
{
if
self
.
c
ompWriteMeter
!=
nil
{
self
.
C
ompWriteMeter
.
Mark
(
int64
((
counters
[
i
%
2
][
2
]
-
counters
[(
i
-
1
)
%
2
][
2
])
*
1024
*
1024
))
self
.
c
ompWriteMeter
.
Mark
(
int64
((
counters
[
i
%
2
][
2
]
-
counters
[(
i
-
1
)
%
2
][
2
])
*
1024
*
1024
))
}
}
// Sleep a bit, then repeat the stats collection
// Sleep a bit, then repeat the stats collection
time
.
Sleep
(
refresh
)
select
{
case
errc
:=
<-
self
.
quitChan
:
// Quit requesting, stop hammering the database
errc
<-
nil
return
case
<-
time
.
After
(
refresh
)
:
// Timeout, gather a new set of stats
}
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment