Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
8edaaa22
Unverified
Commit
8edaaa22
authored
Aug 03, 2017
by
Péter Szilágyi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
core: polish chain indexer a bit
parent
bd74882d
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
403 additions
and
302 deletions
+403
-302
chain_indexer.go
core/chain_indexer.go
+278
-176
chain_indexer_test.go
core/chain_indexer_test.go
+125
-126
No files found.
core/chain_indexer.go
View file @
8edaaa22
...
...
@@ -14,261 +14,361 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package core implements the Ethereum consensus protocol.
package
core
import
(
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// ChainIndexer does a post-processing job for equally sized sections of the canonical
// chain (like BlooomBits and CHT structures). A ChainIndexer is connected to the blockchain
// through the event system by starting a ChainEventLoop in a goroutine.
// Further child ChainIndexers can be added which use the output of the parent section
// indexer. These child indexers receive new head notifications only after an entire section
// has been finished or in case of rollbacks that might affect already finished sections.
type
ChainIndexer
struct
{
chainDb
,
indexDb
ethdb
.
Database
backend
ChainIndexerBackend
sectionSize
,
confirmReq
uint64
stop
chan
struct
{}
lock
sync
.
Mutex
procWait
time
.
Duration
tryUpdate
chan
struct
{}
stored
,
targetCount
,
calcIdx
,
lastForwarded
uint64
updating
bool
children
[]
*
ChainIndexer
// ChainIndexerBackend defines the methods needed to process chain segments in
// the background and write the segment results into the database. These can be
// used to create filter blooms or CHTs.
type
ChainIndexerBackend
interface
{
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset
(
section
uint64
)
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process
(
header
*
types
.
Header
)
// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
Commit
(
db
ethdb
.
Database
)
error
}
// ChainIndexerBackend interface is a backend for the indexer doing the actual post-processing job
type
ChainIndexerBackend
interface
{
Reset
(
section
uint64
)
// start processing a new section
Process
(
header
*
types
.
Header
)
// process a single block (called for each block in the section)
Commit
(
db
ethdb
.
Database
)
error
// do some more processing if necessary and store the results in the database
UpdateMsg
(
done
,
all
uint64
)
// print a progress update message if necessary (only called when multiple sections need to be processed)
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
// ChainEventLoop in a goroutine.
//
// Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
type
ChainIndexer
struct
{
chainDb
ethdb
.
Database
// Chain database to index the data from
indexDb
ethdb
.
Database
// Prefixed table-view of the db to write index metadata into
backend
ChainIndexerBackend
// Background processor generating the index data content
children
[]
*
ChainIndexer
// Child indexers to cascade chain updates to
active
uint32
// Flag whether the event loop was started
update
chan
struct
{}
// Notification channel that headers should be processed
quit
chan
chan
error
// Quit channel to tear down running goroutines
sectionSize
uint64
// Number of blocks in a single chain segment to process
confirmsReq
uint64
// Number of confirmations before processing a completed segment
storedSections
uint64
// Number of sections successfully indexed into the database
knownSections
uint64
// Number of sections known to be complete (block wise)
cascadedHead
uint64
// Block number of the last completed section cascaded to subindexers
throttling
time
.
Duration
// Disk throttling to prevent a heavy upgrade from hogging resources
log
log
.
Logger
lock
sync
.
RWMutex
}
// NewChainIndexer creates a new ChainIndexer
// db: database where the index of available processed sections is stored (the index is stored by the
// indexer, the actual processed chain data is stored by the backend)
// dbKey: key prefix where the index is stored
// backend: an implementation of ChainIndexerBackend
// sectionSize: the size of processable sections
// confirmReq: required number of confirmation blocks before a new section is being processed
// procWait: waiting time between processing sections (simple way of limiting the resource usage of a db upgrade)
// stop: quit channel
func
NewChainIndexer
(
chainDb
,
indexDb
ethdb
.
Database
,
backend
ChainIndexerBackend
,
sectionSize
,
confirmReq
uint64
,
procWait
time
.
Duration
,
stop
chan
struct
{})
*
ChainIndexer
{
// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.
func
NewChainIndexer
(
chainDb
,
indexDb
ethdb
.
Database
,
backend
ChainIndexerBackend
,
section
,
confirm
uint64
,
throttling
time
.
Duration
,
kind
string
)
*
ChainIndexer
{
c
:=
&
ChainIndexer
{
chainDb
:
chainDb
,
indexDb
:
indexDb
,
backend
:
backend
,
sectionSize
:
sectionSize
,
confirmReq
:
confirmReq
,
tryUpdate
:
make
(
chan
struct
{},
1
),
stop
:
stop
,
procWait
:
procWait
,
update
:
make
(
chan
struct
{},
1
),
quit
:
make
(
chan
chan
error
),
sectionSize
:
section
,
confirmsReq
:
confirm
,
throttling
:
throttling
,
log
:
log
.
New
(
"type"
,
kind
),
}
c
.
stored
=
c
.
getValidSections
()
// Initialize database dependent fields and start the updater
c
.
loadValidSections
()
go
c
.
updateLoop
()
return
c
}
// updateLoop is the main event loop of the indexer
func
(
c
*
ChainIndexer
)
updateLoop
()
{
updateMsg
:=
false
for
{
select
{
case
<-
c
.
stop
:
return
case
<-
c
.
tryUpdate
:
c
.
lock
.
Lock
()
if
c
.
targetCount
>
c
.
stored
{
if
!
updateMsg
&&
c
.
targetCount
>
c
.
stored
+
1
{
updateMsg
=
true
c
.
backend
.
UpdateMsg
(
c
.
stored
,
c
.
targetCount
)
}
c
.
calcIdx
=
c
.
stored
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing.
func
(
c
*
ChainIndexer
)
Start
(
currentHeader
*
types
.
Header
,
eventMux
*
event
.
TypeMux
)
{
go
c
.
eventLoop
(
currentHeader
,
eventMux
)
}
var
lastSectionHead
common
.
Hash
if
c
.
calcIdx
>
0
{
lastSectionHead
=
c
.
getSectionHead
(
c
.
calcIdx
-
1
)
}
// Close tears down all goroutines belonging to the indexer and returns any error
// that might have occurred internally.
func
(
c
*
ChainIndexer
)
Close
()
error
{
var
errs
[]
error
c
.
lock
.
Unlock
()
sectionHead
,
ok
:=
c
.
processSection
(
c
.
calcIdx
,
lastSectionHead
)
c
.
lock
.
Lock
()
// Tear down the primary update loop
errc
:=
make
(
chan
error
)
c
.
quit
<-
errc
if
err
:=
<-
errc
;
err
!=
nil
{
errs
=
append
(
errs
,
err
)
}
// If needed, tear down the secondary event loop
if
atomic
.
LoadUint32
(
&
c
.
active
)
!=
0
{
c
.
quit
<-
errc
if
err
:=
<-
errc
;
err
!=
nil
{
errs
=
append
(
errs
,
err
)
}
}
// Return any failures
switch
{
case
len
(
errs
)
==
0
:
return
nil
if
ok
&&
lastSectionHead
==
c
.
getSectionHead
(
c
.
calcIdx
-
1
)
{
c
.
stored
=
c
.
calcIdx
+
1
c
.
setSectionHead
(
c
.
calcIdx
,
sectionHead
)
c
.
setValidSections
(
c
.
stored
)
if
updateMsg
{
c
.
backend
.
UpdateMsg
(
c
.
stored
,
c
.
targetCount
)
if
c
.
stored
>=
c
.
targetCount
{
updateMsg
=
false
}
}
c
.
lastForwarded
=
c
.
stored
*
c
.
sectionSize
-
1
for
_
,
cp
:=
range
c
.
children
{
cp
.
newHead
(
c
.
lastForwarded
,
false
)
}
}
else
{
// if processing has failed, do not retry until further notification
c
.
targetCount
=
c
.
stored
}
}
case
len
(
errs
)
==
1
:
return
errs
[
0
]
if
c
.
targetCount
>
c
.
stored
{
go
func
()
{
time
.
Sleep
(
c
.
procWait
)
c
.
tryUpdate
<-
struct
{}{}
}()
}
else
{
c
.
updating
=
false
}
c
.
lock
.
Unlock
()
}
default
:
return
fmt
.
Errorf
(
"%v"
,
errs
)
}
}
// ChainEventLoop runs in a goroutine and feeds blockchain events to the indexer by calling newHead
// (not needed for child indexers where the parent calls newHead)
func
(
c
*
ChainIndexer
)
ChainEventLoop
(
currentHeader
*
types
.
Header
,
eventMux
*
event
.
TypeMux
)
{
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func
(
c
*
ChainIndexer
)
eventLoop
(
currentHeader
*
types
.
Header
,
eventMux
*
event
.
TypeMux
)
{
// Mark the chain indexer as active, requiring an additional teardown
atomic
.
StoreUint32
(
&
c
.
active
,
1
)
// Subscribe to chain head events
sub
:=
eventMux
.
Subscribe
(
ChainEvent
{})
defer
sub
.
Unsubscribe
()
// Fire the initial new head event to start any outstanding processing
c
.
newHead
(
currentHeader
.
Number
.
Uint64
(),
false
)
lastHead
:=
currentHeader
.
Hash
()
var
(
prevHeader
=
currentHeader
prevHash
=
currentHeader
.
Hash
()
)
for
{
select
{
case
<-
c
.
stop
:
case
errc
:=
<-
c
.
quit
:
// Chain indexer terminating, report no failure and abort
errc
<-
nil
return
case
ev
:=
<-
sub
.
Chan
()
:
case
ev
,
ok
:=
<-
sub
.
Chan
()
:
// Received a new event, ensure it's not nil (closing) and update
if
!
ok
{
errc
:=
<-
c
.
quit
errc
<-
nil
return
}
header
:=
ev
.
Data
.
(
ChainEvent
)
.
Block
.
Header
()
c
.
newHead
(
header
.
Number
.
Uint64
(),
header
.
ParentHash
!=
lastHead
)
lastHead
=
header
.
Hash
()
if
header
.
ParentHash
!=
prevHash
{
c
.
newHead
(
FindCommonAncestor
(
c
.
chainDb
,
prevHeader
,
header
)
.
Number
.
Uint64
(),
true
)
}
c
.
newHead
(
header
.
Number
.
Uint64
(),
false
)
prevHeader
,
prevHash
=
header
,
header
.
Hash
()
}
}
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
func
(
c
*
ChainIndexer
)
AddChildIndexer
(
ci
*
ChainIndexer
)
{
c
.
children
=
append
(
c
.
children
,
ci
)
}
// newHead notifies the indexer about new chain heads or rollbacks
func
(
c
*
ChainIndexer
)
newHead
(
headNum
uint64
,
rollback
bool
)
{
// newHead notifies the indexer about new chain heads and/or reorgs.
func
(
c
*
ChainIndexer
)
newHead
(
head
uint64
,
reorg
bool
)
{
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
if
rollback
{
firstChanged
:=
headNum
/
c
.
sectionSize
if
firstChanged
<
c
.
targetCount
{
c
.
targetCount
=
firstChanged
// If a reorg happened, invalidate all sections until that point
if
reorg
{
// Revert the known section number to the reorg point
changed
:=
head
/
c
.
sectionSize
if
changed
<
c
.
knownSections
{
c
.
knownSections
=
changed
}
if
firstChanged
<
c
.
stored
{
c
.
stored
=
firstChanged
c
.
setValidSections
(
c
.
stor
ed
)
// Revert the stored sections from the database to the reorg point
if
changed
<
c
.
storedSections
{
c
.
setValidSections
(
c
hang
ed
)
}
headNum
=
firstChanged
*
c
.
sectionSize
// Update the new head number to te finalized section end and notify children
head
=
changed
*
c
.
sectionSize
if
head
Num
<
c
.
lastForwarde
d
{
c
.
lastForwarded
=
headNum
for
_
,
c
p
:=
range
c
.
children
{
c
p
.
newHead
(
c
.
lastForwarde
d
,
true
)
if
head
<
c
.
cascadedHea
d
{
c
.
cascadedHead
=
head
for
_
,
c
hild
:=
range
c
.
children
{
c
hild
.
newHead
(
c
.
cascadedHea
d
,
true
)
}
}
return
}
// No reorg, calculate the number of newly known sections and update if high enough
var
sections
uint64
if
head
>=
c
.
confirmsReq
{
sections
=
(
head
+
1
-
c
.
confirmsReq
)
/
c
.
sectionSize
if
sections
>
c
.
knownSections
{
c
.
knownSections
=
sections
select
{
case
c
.
update
<-
struct
{}{}
:
default
:
}
}
}
}
// updateLoop is the main event loop of the indexer which pushes chain segments
// down into the processing backend.
func
(
c
*
ChainIndexer
)
updateLoop
()
{
var
updated
time
.
Time
for
{
select
{
case
errc
:=
<-
c
.
quit
:
// Chain indexer terminating, report no failure and abort
errc
<-
nil
return
case
<-
c
.
update
:
// Section headers completed (or rolled back), update the index
c
.
lock
.
Lock
()
if
c
.
knownSections
>
c
.
storedSections
{
// Periodically print an upgrade log message to the user
if
time
.
Since
(
updated
)
>
8
*
time
.
Second
{
if
c
.
knownSections
>
c
.
storedSections
+
1
{
c
.
log
.
Info
(
"Upgrading chain index"
,
"percentage"
,
c
.
storedSections
*
100
/
c
.
knownSections
)
}
updated
=
time
.
Now
()
}
// Cache the current section count and head to allow unlocking the mutex
section
:=
c
.
storedSections
var
oldHead
common
.
Hash
if
section
>
0
{
oldHead
=
c
.
sectionHead
(
section
-
1
)
}
// Process the newly defined section in the background
c
.
lock
.
Unlock
()
newHead
,
err
:=
c
.
processSection
(
section
,
oldHead
)
c
.
lock
.
Lock
()
// If processing succeeded and no reorgs occcurred, mark the section completed
if
err
==
nil
&&
oldHead
==
c
.
sectionHead
(
section
-
1
)
{
c
.
setSectionHead
(
section
,
newHead
)
c
.
setValidSections
(
section
+
1
)
}
else
{
var
newCount
uint64
if
headNum
>=
c
.
confirmReq
{
newCount
=
(
headNum
+
1
-
c
.
confirmReq
)
/
c
.
sectionSize
if
newCount
>
c
.
targetCount
{
c
.
targetCount
=
newCount
if
!
c
.
updating
{
c
.
updating
=
true
c
.
tryUpdate
<-
struct
{}{}
c
.
cascadedHead
=
c
.
storedSections
*
c
.
sectionSize
-
1
for
_
,
child
:=
range
c
.
children
{
c
.
log
.
Trace
(
"Cascading chain index update"
,
"head"
,
c
.
cascadedHead
)
child
.
newHead
(
c
.
cascadedHead
,
false
)
}
}
else
{
// If processing failed, don't retry until further notification
c
.
log
.
Debug
(
"Chain index processing failed"
,
"section"
,
section
,
"err"
,
err
)
c
.
knownSections
=
c
.
storedSections
}
}
// If there are still further sections to process, reschedule
if
c
.
knownSections
>
c
.
storedSections
{
time
.
AfterFunc
(
c
.
throttling
,
func
()
{
select
{
case
c
.
update
<-
struct
{}{}
:
default
:
}
})
}
c
.
lock
.
Unlock
()
}
}
}
// processSection processes an entire section by calling backend functions while ensuring
// the continuity of the passed headers. Since the chain mutex is not held while processing,
// the continuity can be broken by a long reorg, in which case the function returns with ok == false.
func
(
c
*
ChainIndexer
)
processSection
(
section
uint64
,
lastSectionHead
common
.
Hash
)
(
sectionHead
common
.
Hash
,
ok
bool
)
{
// processSection processes an entire section by calling backend functions while
// ensuring the continuity of the passed headers. Since the chain mutex is not
// held while processing, the continuity can be broken by a long reorg, in which
// case the function returns with an error.
func
(
c
*
ChainIndexer
)
processSection
(
section
uint64
,
lastHead
common
.
Hash
)
(
common
.
Hash
,
error
)
{
c
.
log
.
Trace
(
"Processing new chain section"
,
"section"
,
section
)
// Reset and partial processing
c
.
backend
.
Reset
(
section
)
head
:=
lastSectionHead
for
i
:=
section
*
c
.
sectionSize
;
i
<
(
section
+
1
)
*
c
.
sectionSize
;
i
++
{
hash
:=
GetCanonicalHash
(
c
.
chainDb
,
i
)
for
number
:=
section
*
c
.
sectionSize
;
number
<
(
section
+
1
)
*
c
.
sectionSize
;
number
++
{
hash
:=
GetCanonicalHash
(
c
.
chainDb
,
number
)
if
hash
==
(
common
.
Hash
{})
{
return
common
.
Hash
{},
f
alse
return
common
.
Hash
{},
f
mt
.
Errorf
(
"canonical block #%d unknown"
,
number
)
}
header
:=
GetHeader
(
c
.
chainDb
,
hash
,
i
)
if
header
==
nil
||
header
.
ParentHash
!=
head
{
return
common
.
Hash
{},
false
header
:=
GetHeader
(
c
.
chainDb
,
hash
,
number
)
if
header
==
nil
{
return
common
.
Hash
{},
fmt
.
Errorf
(
"block #%d [%x…] not found"
,
number
,
hash
[
:
4
])
}
else
if
header
.
ParentHash
!=
lastHead
{
return
common
.
Hash
{},
fmt
.
Errorf
(
"chain reorged during section processing"
)
}
c
.
backend
.
Process
(
header
)
h
ead
=
header
.
Hash
()
lastH
ead
=
header
.
Hash
()
}
if
err
:=
c
.
backend
.
Commit
(
c
.
chainDb
);
err
!=
nil
{
return
common
.
Hash
{},
false
return
common
.
Hash
{},
err
}
return
head
,
true
return
lastHead
,
nil
}
// CanonicalSections returns the number of processed sections that are consistent with
// the current canonical chain
func
(
c
*
ChainIndexer
)
CanonicalSections
()
uint64
{
// Sections returns the number of processed sections maintained by the indexer
// and also the information about the last header indexed for potential canonical
// verifications.
func
(
c
*
ChainIndexer
)
Sections
()
(
uint64
,
uint64
,
common
.
Hash
)
{
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
cnt
:=
c
.
getValidSections
()
for
cnt
>
0
{
if
c
.
getSectionHead
(
cnt
-
1
)
==
GetCanonicalHash
(
c
.
chainDb
,
cnt
*
c
.
sectionSize
-
1
)
{
break
}
cnt
--
c
.
setValidSections
(
cnt
)
return
c
.
storedSections
,
c
.
storedSections
*
c
.
sectionSize
-
1
,
c
.
sectionHead
(
c
.
storedSections
-
1
)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
func
(
c
*
ChainIndexer
)
AddChildIndexer
(
indexer
*
ChainIndexer
)
{
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
c
.
children
=
append
(
c
.
children
,
indexer
)
// Cascade any pending updates to new children too
if
c
.
storedSections
>
0
{
indexer
.
newHead
(
c
.
storedSections
*
c
.
sectionSize
-
1
,
false
)
}
return
cnt
}
// getValidSections reads the number of valid sections from the index database
func
(
c
*
ChainIndexer
)
getValidSections
()
uint64
{
// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func
(
c
*
ChainIndexer
)
loadValidSections
()
{
data
,
_
:=
c
.
indexDb
.
Get
([]
byte
(
"count"
))
if
len
(
data
)
==
8
{
return
binary
.
BigEndian
.
Uint64
(
data
[
:
])
c
.
storedSections
=
binary
.
BigEndian
.
Uint64
(
data
[
:
])
}
return
0
}
// setValidSections writes the number of valid sections to the index database
func
(
c
*
ChainIndexer
)
setValidSections
(
cnt
uint64
)
{
oldCnt
:=
c
.
getValidSections
()
if
cnt
<
oldCnt
{
for
i
:=
cnt
;
i
<
oldCnt
;
i
++
{
c
.
removeSectionHead
(
i
)
}
}
func
(
c
*
ChainIndexer
)
setValidSections
(
sections
uint64
)
{
// Set the current number of valid sections in the database
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
cnt
)
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
sections
)
c
.
indexDb
.
Put
([]
byte
(
"count"
),
data
[
:
])
// Remove any reorged sections, caching the valids in the mean time
for
c
.
storedSections
>
sections
{
c
.
storedSections
--
c
.
removeSectionHead
(
c
.
storedSections
)
}
c
.
storedSections
=
sections
// needed if new > old
}
// getSectionHead reads the last block hash of a processed section from the index database
func
(
c
*
ChainIndexer
)
getSectionHead
(
idx
uint64
)
common
.
Hash
{
// sectionHead retrieves the last block hash of a processed section from the
// index database.
func
(
c
*
ChainIndexer
)
sectionHead
(
section
uint64
)
common
.
Hash
{
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
idx
)
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
section
)
hash
,
_
:=
c
.
indexDb
.
Get
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
))
if
len
(
hash
)
==
len
(
common
.
Hash
{})
{
...
...
@@ -277,18 +377,20 @@ func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash {
return
common
.
Hash
{}
}
// setSectionHead writes the last block hash of a processed section to the index database
func
(
c
*
ChainIndexer
)
setSectionHead
(
idx
uint64
,
shead
common
.
Hash
)
{
// setSectionHead writes the last block hash of a processed section to the index
// database.
func
(
c
*
ChainIndexer
)
setSectionHead
(
section
uint64
,
hash
common
.
Hash
)
{
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
idx
)
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
section
)
c
.
indexDb
.
Put
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
),
shead
.
Bytes
())
c
.
indexDb
.
Put
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
),
hash
.
Bytes
())
}
// removeSectionHead removes the reference to a processed section from the index database
func
(
c
*
ChainIndexer
)
removeSectionHead
(
idx
uint64
)
{
// removeSectionHead removes the reference to a processed section from the index
// database.
func
(
c
*
ChainIndexer
)
removeSectionHead
(
section
uint64
)
{
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
idx
)
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
section
)
c
.
indexDb
.
Delete
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
))
}
core/chain_indexer_test.go
View file @
8edaaa22
...
...
@@ -14,11 +14,10 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package core implements the Ethereum consensus protocol.
package
core
import
(
"
encoding/binary
"
"
fmt
"
"math/big"
"math/rand"
"testing"
...
...
@@ -28,208 +27,208 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
)
// Runs multiple tests with randomized parameters.
func
TestChainIndexerSingle
(
t
*
testing
.
T
)
{
// run multiple tests with randomized parameters
for
i
:=
0
;
i
<
10
;
i
++
{
testChainIndexer
(
t
,
1
)
}
}
// Runs multiple tests with randomized parameters and different number of
// chain backends.
func
TestChainIndexerWithChildren
(
t
*
testing
.
T
)
{
// run multiple tests with randomized parameters and different number of
// chained indexers
for
i
:=
2
;
i
<
8
;
i
++
{
testChainIndexer
(
t
,
i
)
}
}
// testChainIndexer runs a test with either a single ChainIndexer or a chain of multiple indexers
// sectionSize and confirmReq parameters are randomized
func
testChainIndexer
(
t
*
testing
.
T
,
tciCount
int
)
{
// testChainIndexer runs a test with either a single chain indexer or a chain of
// multiple backends. The section size and required confirmation count parameters
// are randomized.
func
testChainIndexer
(
t
*
testing
.
T
,
count
int
)
{
db
,
_
:=
ethdb
.
NewMemDatabase
()
stop
:=
make
(
chan
struct
{})
tciList
:=
make
([]
*
testChainIndex
,
tciCount
)
var
lastIndexer
*
ChainIndexer
for
i
,
_
:=
range
tciList
{
tci
:=
&
testChainIndex
{
t
:
t
,
sectionSize
:
uint64
(
rand
.
Intn
(
100
)
+
1
),
confirmReq
:
uint64
(
rand
.
Intn
(
10
)),
processCh
:
make
(
chan
uint64
)}
tciList
[
i
]
=
tci
tci
.
indexer
=
NewChainIndexer
(
db
,
ethdb
.
NewTable
(
db
,
string
([]
byte
{
byte
(
i
)})),
tci
,
tci
.
sectionSize
,
tci
.
confirmReq
,
0
,
stop
)
if
cs
:=
tci
.
indexer
.
CanonicalSections
();
cs
!=
0
{
t
.
Errorf
(
"Expected 0 canonical sections, got %d"
,
cs
)
defer
db
.
Close
()
// Create a chain of indexers and ensure they all report empty
backends
:=
make
([]
*
testChainIndexBackend
,
count
)
for
i
:=
0
;
i
<
count
;
i
++
{
var
(
sectionSize
=
uint64
(
rand
.
Intn
(
100
)
+
1
)
confirmsReq
=
uint64
(
rand
.
Intn
(
10
))
)
backends
[
i
]
=
&
testChainIndexBackend
{
t
:
t
,
processCh
:
make
(
chan
uint64
)}
backends
[
i
]
.
indexer
=
NewChainIndexer
(
db
,
ethdb
.
NewTable
(
db
,
string
([]
byte
{
byte
(
i
)})),
backends
[
i
],
sectionSize
,
confirmsReq
,
0
,
fmt
.
Sprintf
(
"indexer-%d"
,
i
))
defer
backends
[
i
]
.
indexer
.
Close
()
if
sections
,
_
,
_
:=
backends
[
i
]
.
indexer
.
Sections
();
sections
!=
0
{
t
.
Fatalf
(
"Canonical section count mismatch: have %v, want %v"
,
sections
,
0
)
}
if
lastIndexer
!=
nil
{
lastIndexer
.
AddChildIndexer
(
tci
.
indexer
)
if
i
>
0
{
backends
[
i
-
1
]
.
indexer
.
AddChildIndexer
(
backends
[
i
]
.
indexer
)
}
lastIndexer
=
tci
.
indexer
}
//
expectCs expects a certain number of available canonical sections
expectCs
:=
func
(
indexer
*
ChainIndexer
,
expCs
uint64
)
{
cnt
:=
0
for
{
cs
:=
indexer
.
CanonicalSections
()
if
cs
==
expCs
{
return
// notify pings the root indexer about a new head or reorg, then expect
//
processed blocks if a section is processable
notify
:=
func
(
headNum
,
failNum
uint64
,
reorg
bool
)
{
backends
[
0
]
.
indexer
.
newHead
(
headNum
,
reorg
)
if
reorg
{
for
_
,
backend
:=
range
backends
{
headNum
=
backend
.
reorg
(
headNum
)
backend
.
assertSections
()
}
// keep trying for 10 seconds if it does not match
cnt
++
if
cnt
==
10000
{
t
.
Fatalf
(
"Expected %d canonical sections, got %d"
,
expCs
,
cs
)
return
}
var
cascade
bool
for
_
,
backend
:=
range
backends
{
headNum
,
cascade
=
backend
.
assertBlocks
(
headNum
,
failNum
)
if
!
cascade
{
break
}
time
.
Sleep
(
time
.
Millisecond
)
backend
.
assertSections
(
)
}
}
// notify the indexer about a new head or rollback, then expect processed blocks if a section is processable
notify
:=
func
(
headNum
,
expFailAfter
uint64
,
rollback
bool
)
{
tciList
[
0
]
.
indexer
.
newHead
(
headNum
,
rollback
)
if
rollback
{
for
_
,
tci
:=
range
tciList
{
headNum
=
tci
.
rollback
(
headNum
)
expectCs
(
tci
.
indexer
,
tci
.
stored
)
}
}
else
{
for
_
,
tci
:=
range
tciList
{
var
more
bool
headNum
,
more
=
tci
.
newBlocks
(
headNum
,
expFailAfter
)
if
!
more
{
break
}
expectCs
(
tci
.
indexer
,
tci
.
stored
)
}
// inject inserts a new random canonical header into the database directly
inject
:=
func
(
number
uint64
)
{
header
:=
&
types
.
Header
{
Number
:
big
.
NewInt
(
int64
(
number
)),
Extra
:
big
.
NewInt
(
rand
.
Int63
())
.
Bytes
()}
if
number
>
0
{
header
.
ParentHash
=
GetCanonicalHash
(
db
,
number
-
1
)
}
WriteHeader
(
db
,
header
)
WriteCanonicalHash
(
db
,
header
.
Hash
(),
number
)
}
// Start indexer with an already existing chain
for
i
:=
uint64
(
0
);
i
<=
100
;
i
++
{
testCanonicalHeader
(
db
,
i
)
inject
(
i
)
}
// start indexer with an already existing chain
notify
(
100
,
100
,
false
)
// add new blocks one by one
// Add new blocks one by one
for
i
:=
uint64
(
101
);
i
<=
1000
;
i
++
{
testCanonicalHeader
(
db
,
i
)
inject
(
i
)
notify
(
i
,
i
,
false
)
}
//
do a rollback
//
Do a reorg
notify
(
500
,
500
,
true
)
// create new fork
// Create new fork
for
i
:=
uint64
(
501
);
i
<=
1000
;
i
++
{
testCanonicalHeader
(
db
,
i
)
inject
(
i
)
notify
(
i
,
i
,
false
)
}
for
i
:=
uint64
(
1001
);
i
<=
1500
;
i
++
{
testCanonicalHeader
(
db
,
i
)
inject
(
i
)
}
//
create a failed processing scenario where less blocks are available at processing tim
e than notified
//
Failed processing scenario where less blocks are availabl
e than notified
notify
(
2000
,
1500
,
false
)
// notify about a rollback (which could have caused the missing blocks if happened during processing)
// Notify about a reorg (which could have caused the missing blocks if happened during processing)
notify
(
1500
,
1500
,
true
)
//
c
reate new fork
//
C
reate new fork
for
i
:=
uint64
(
1501
);
i
<=
2000
;
i
++
{
testCanonicalHeader
(
db
,
i
)
inject
(
i
)
notify
(
i
,
i
,
false
)
}
close
(
stop
)
db
.
Close
()
}
func
testCanonicalHeader
(
db
ethdb
.
Database
,
idx
uint64
)
{
var
rnd
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
rnd
[
:
],
uint64
(
rand
.
Int63
()))
header
:=
&
types
.
Header
{
Number
:
big
.
NewInt
(
int64
(
idx
)),
Extra
:
rnd
[
:
]}
if
idx
>
0
{
header
.
ParentHash
=
GetCanonicalHash
(
db
,
idx
-
1
)
}
WriteHeader
(
db
,
header
)
WriteCanonicalHash
(
db
,
header
.
Hash
(),
idx
)
}
// testChainIndex implements ChainIndexerBackend
type
testChainIndex
struct
{
// testChainIndexBackend implements ChainIndexerBackend
type
testChainIndexBackend
struct
{
t
*
testing
.
T
sectionSize
,
confirmReq
uint64
section
,
headerCnt
,
stored
uint64
indexer
*
ChainIndexer
section
,
headerCnt
,
stored
uint64
processCh
chan
uint64
}
// newBlocks expects process calls after new blocks have arrived. If expFailAfter < headNum then
// we are simulating a scenario where a rollback has happened after the processing has started and
// the processing of a section fails.
func
(
t
*
testChainIndex
)
newBlocks
(
headNum
,
expFailAfter
uint64
)
(
uint64
,
bool
)
{
var
newCount
uint64
if
headNum
>=
t
.
confirmReq
{
newCount
=
(
headNum
+
1
-
t
.
confirmReq
)
/
t
.
sectionSize
if
newCount
>
t
.
stored
{
// assertSections verifies if a chain indexer has the correct number of section.
func
(
b
*
testChainIndexBackend
)
assertSections
()
{
// Keep trying for 3 seconds if it does not match
var
sections
uint64
for
i
:=
0
;
i
<
300
;
i
++
{
sections
,
_
,
_
=
b
.
indexer
.
Sections
()
if
sections
==
b
.
stored
{
return
}
time
.
Sleep
(
10
*
time
.
Millisecond
)
}
b
.
t
.
Fatalf
(
"Canonical section count mismatch: have %v, want %v"
,
sections
,
b
.
stored
)
}
// assertBlocks expects processing calls after new blocks have arrived. If the
// failNum < headNum then we are simulating a scenario where a reorg has happened
// after the processing has started and the processing of a section fails.
func
(
b
*
testChainIndexBackend
)
assertBlocks
(
headNum
,
failNum
uint64
)
(
uint64
,
bool
)
{
var
sections
uint64
if
headNum
>=
b
.
indexer
.
confirmsReq
{
sections
=
(
headNum
+
1
-
b
.
indexer
.
confirmsReq
)
/
b
.
indexer
.
sectionSize
if
sections
>
b
.
stored
{
// expect processed blocks
for
exp
:=
t
.
stored
*
t
.
sectionSize
;
exp
<
newCount
*
t
.
sectionSize
;
exp
++
{
if
exp
>
expFailAfter
{
for
exp
ectd
:=
b
.
stored
*
b
.
indexer
.
sectionSize
;
expectd
<
sections
*
b
.
indexer
.
sectionSize
;
expectd
++
{
if
exp
ectd
>
failNum
{
// rolled back after processing started, no more process calls expected
// wait until updating is done to make sure that processing actually fails
for
{
t
.
indexer
.
lock
.
Lock
()
u
:=
t
.
indexer
.
updating
t
.
indexer
.
lock
.
Unlock
()
if
!
u
{
var
updating
bool
for
i
:=
0
;
i
<
300
;
i
++
{
b
.
indexer
.
lock
.
Lock
()
updating
=
b
.
indexer
.
knownSections
>
b
.
indexer
.
storedSections
b
.
indexer
.
lock
.
Unlock
()
if
!
updating
{
break
}
time
.
Sleep
(
time
.
Millisecond
)
time
.
Sleep
(
10
*
time
.
Millisecond
)
}
newCount
=
exp
/
t
.
sectionSize
if
updating
{
b
.
t
.
Fatalf
(
"update did not finish"
)
}
sections
=
expectd
/
b
.
indexer
.
sectionSize
break
}
select
{
case
<-
time
.
After
(
10
*
time
.
Second
)
:
t
.
t
.
Fatalf
(
"Expected processed block #%d, got nothing"
,
exp
)
case
proc
:=
<-
t
.
processCh
:
if
proc
!=
exp
{
t
.
t
.
Errorf
(
"Expected processed block #%d, got #%d"
,
exp
,
proc
)
b
.
t
.
Fatalf
(
"Expected processed block #%d, got nothing"
,
expectd
)
case
proc
essed
:=
<-
b
.
processCh
:
if
proc
essed
!=
expectd
{
b
.
t
.
Errorf
(
"Expected processed block #%d, got #%d"
,
expectd
,
processed
)
}
}
}
t
.
stored
=
newCount
b
.
stored
=
sections
}
}
if
t
.
stored
==
0
{
if
b
.
stored
==
0
{
return
0
,
false
}
return
t
.
stored
*
t
.
sectionSize
-
1
,
true
return
b
.
stored
*
b
.
indexer
.
sectionSize
-
1
,
true
}
func
(
t
*
testChainIndex
)
rollback
(
headNum
uint64
)
uint64
{
firstChanged
:=
headNum
/
t
.
sectionSize
if
firstChanged
<
t
.
stored
{
t
.
stored
=
firstChanged
func
(
b
*
testChainIndexBackend
)
reorg
(
headNum
uint64
)
uint64
{
firstChanged
:=
headNum
/
b
.
indexer
.
sectionSize
if
firstChanged
<
b
.
stored
{
b
.
stored
=
firstChanged
}
return
t
.
stored
*
t
.
sectionSize
return
b
.
stored
*
b
.
indexer
.
sectionSize
}
func
(
t
*
testChainIndex
)
Reset
(
section
uint64
)
{
t
.
section
=
section
t
.
headerCnt
=
0
func
(
b
*
testChainIndexBackend
)
Reset
(
section
uint64
)
{
b
.
section
=
section
b
.
headerCnt
=
0
}
func
(
t
*
testChainIndex
)
Process
(
header
*
types
.
Header
)
{
t
.
headerCnt
++
if
t
.
headerCnt
>
t
.
sectionSize
{
t
.
t
.
Error
(
"Processing too many headers"
)
func
(
b
*
testChainIndexBackend
)
Process
(
header
*
types
.
Header
)
{
b
.
headerCnt
++
if
b
.
headerCnt
>
b
.
indexer
.
sectionSize
{
b
.
t
.
Error
(
"Processing too many headers"
)
}
//t.processCh <- header.Number.Uint64()
select
{
case
<-
time
.
After
(
10
*
time
.
Second
)
:
t
.
t
.
Fatal
(
"Unexpected call to Process"
)
case
t
.
processCh
<-
header
.
Number
.
Uint64
()
:
b
.
t
.
Fatal
(
"Unexpected call to Process"
)
case
b
.
processCh
<-
header
.
Number
.
Uint64
()
:
}
}
func
(
t
*
testChainIndex
)
Commit
(
db
ethdb
.
Database
)
error
{
if
t
.
headerCnt
!=
t
.
sectionSize
{
t
.
t
.
Error
(
"Not enough headers processed"
)
func
(
b
*
testChainIndexBackend
)
Commit
(
db
ethdb
.
Database
)
error
{
if
b
.
headerCnt
!=
b
.
indexer
.
sectionSize
{
b
.
t
.
Error
(
"Not enough headers processed"
)
}
return
nil
}
func
(
t
*
testChainIndex
)
UpdateMsg
(
done
,
all
uint64
)
{}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment