Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
374c49e0
Commit
374c49e0
authored
Aug 08, 2017
by
Felix Lange
Committed by
GitHub
Aug 08, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #14522 from ethereum/go-ethereum/chainproc2
parents
10ce8b0e
8edaaa22
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
630 additions
and
0 deletions
+630
-0
chain_indexer.go
core/chain_indexer.go
+396
-0
chain_indexer_test.go
core/chain_indexer_test.go
+234
-0
No files found.
core/chain_indexer.go
0 → 100644
View file @
374c49e0
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package
core
import
(
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// ChainIndexerBackend defines the methods needed to process chain segments in
// the background and write the segment results into the database. These can be
// used to create filter blooms or CHTs.
type
ChainIndexerBackend
interface
{
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset
(
section
uint64
)
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process
(
header
*
types
.
Header
)
// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
Commit
(
db
ethdb
.
Database
)
error
}
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
// ChainEventLoop in a goroutine.
//
// Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
type
ChainIndexer
struct
{
chainDb
ethdb
.
Database
// Chain database to index the data from
indexDb
ethdb
.
Database
// Prefixed table-view of the db to write index metadata into
backend
ChainIndexerBackend
// Background processor generating the index data content
children
[]
*
ChainIndexer
// Child indexers to cascade chain updates to
active
uint32
// Flag whether the event loop was started
update
chan
struct
{}
// Notification channel that headers should be processed
quit
chan
chan
error
// Quit channel to tear down running goroutines
sectionSize
uint64
// Number of blocks in a single chain segment to process
confirmsReq
uint64
// Number of confirmations before processing a completed segment
storedSections
uint64
// Number of sections successfully indexed into the database
knownSections
uint64
// Number of sections known to be complete (block wise)
cascadedHead
uint64
// Block number of the last completed section cascaded to subindexers
throttling
time
.
Duration
// Disk throttling to prevent a heavy upgrade from hogging resources
log
log
.
Logger
lock
sync
.
RWMutex
}
// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.
func
NewChainIndexer
(
chainDb
,
indexDb
ethdb
.
Database
,
backend
ChainIndexerBackend
,
section
,
confirm
uint64
,
throttling
time
.
Duration
,
kind
string
)
*
ChainIndexer
{
c
:=
&
ChainIndexer
{
chainDb
:
chainDb
,
indexDb
:
indexDb
,
backend
:
backend
,
update
:
make
(
chan
struct
{},
1
),
quit
:
make
(
chan
chan
error
),
sectionSize
:
section
,
confirmsReq
:
confirm
,
throttling
:
throttling
,
log
:
log
.
New
(
"type"
,
kind
),
}
// Initialize database dependent fields and start the updater
c
.
loadValidSections
()
go
c
.
updateLoop
()
return
c
}
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing.
func
(
c
*
ChainIndexer
)
Start
(
currentHeader
*
types
.
Header
,
eventMux
*
event
.
TypeMux
)
{
go
c
.
eventLoop
(
currentHeader
,
eventMux
)
}
// Close tears down all goroutines belonging to the indexer and returns any error
// that might have occurred internally.
func
(
c
*
ChainIndexer
)
Close
()
error
{
var
errs
[]
error
// Tear down the primary update loop
errc
:=
make
(
chan
error
)
c
.
quit
<-
errc
if
err
:=
<-
errc
;
err
!=
nil
{
errs
=
append
(
errs
,
err
)
}
// If needed, tear down the secondary event loop
if
atomic
.
LoadUint32
(
&
c
.
active
)
!=
0
{
c
.
quit
<-
errc
if
err
:=
<-
errc
;
err
!=
nil
{
errs
=
append
(
errs
,
err
)
}
}
// Return any failures
switch
{
case
len
(
errs
)
==
0
:
return
nil
case
len
(
errs
)
==
1
:
return
errs
[
0
]
default
:
return
fmt
.
Errorf
(
"%v"
,
errs
)
}
}
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func
(
c
*
ChainIndexer
)
eventLoop
(
currentHeader
*
types
.
Header
,
eventMux
*
event
.
TypeMux
)
{
// Mark the chain indexer as active, requiring an additional teardown
atomic
.
StoreUint32
(
&
c
.
active
,
1
)
// Subscribe to chain head events
sub
:=
eventMux
.
Subscribe
(
ChainEvent
{})
defer
sub
.
Unsubscribe
()
// Fire the initial new head event to start any outstanding processing
c
.
newHead
(
currentHeader
.
Number
.
Uint64
(),
false
)
var
(
prevHeader
=
currentHeader
prevHash
=
currentHeader
.
Hash
()
)
for
{
select
{
case
errc
:=
<-
c
.
quit
:
// Chain indexer terminating, report no failure and abort
errc
<-
nil
return
case
ev
,
ok
:=
<-
sub
.
Chan
()
:
// Received a new event, ensure it's not nil (closing) and update
if
!
ok
{
errc
:=
<-
c
.
quit
errc
<-
nil
return
}
header
:=
ev
.
Data
.
(
ChainEvent
)
.
Block
.
Header
()
if
header
.
ParentHash
!=
prevHash
{
c
.
newHead
(
FindCommonAncestor
(
c
.
chainDb
,
prevHeader
,
header
)
.
Number
.
Uint64
(),
true
)
}
c
.
newHead
(
header
.
Number
.
Uint64
(),
false
)
prevHeader
,
prevHash
=
header
,
header
.
Hash
()
}
}
}
// newHead notifies the indexer about new chain heads and/or reorgs.
func
(
c
*
ChainIndexer
)
newHead
(
head
uint64
,
reorg
bool
)
{
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
// If a reorg happened, invalidate all sections until that point
if
reorg
{
// Revert the known section number to the reorg point
changed
:=
head
/
c
.
sectionSize
if
changed
<
c
.
knownSections
{
c
.
knownSections
=
changed
}
// Revert the stored sections from the database to the reorg point
if
changed
<
c
.
storedSections
{
c
.
setValidSections
(
changed
)
}
// Update the new head number to te finalized section end and notify children
head
=
changed
*
c
.
sectionSize
if
head
<
c
.
cascadedHead
{
c
.
cascadedHead
=
head
for
_
,
child
:=
range
c
.
children
{
child
.
newHead
(
c
.
cascadedHead
,
true
)
}
}
return
}
// No reorg, calculate the number of newly known sections and update if high enough
var
sections
uint64
if
head
>=
c
.
confirmsReq
{
sections
=
(
head
+
1
-
c
.
confirmsReq
)
/
c
.
sectionSize
if
sections
>
c
.
knownSections
{
c
.
knownSections
=
sections
select
{
case
c
.
update
<-
struct
{}{}
:
default
:
}
}
}
}
// updateLoop is the main event loop of the indexer which pushes chain segments
// down into the processing backend.
func
(
c
*
ChainIndexer
)
updateLoop
()
{
var
updated
time
.
Time
for
{
select
{
case
errc
:=
<-
c
.
quit
:
// Chain indexer terminating, report no failure and abort
errc
<-
nil
return
case
<-
c
.
update
:
// Section headers completed (or rolled back), update the index
c
.
lock
.
Lock
()
if
c
.
knownSections
>
c
.
storedSections
{
// Periodically print an upgrade log message to the user
if
time
.
Since
(
updated
)
>
8
*
time
.
Second
{
if
c
.
knownSections
>
c
.
storedSections
+
1
{
c
.
log
.
Info
(
"Upgrading chain index"
,
"percentage"
,
c
.
storedSections
*
100
/
c
.
knownSections
)
}
updated
=
time
.
Now
()
}
// Cache the current section count and head to allow unlocking the mutex
section
:=
c
.
storedSections
var
oldHead
common
.
Hash
if
section
>
0
{
oldHead
=
c
.
sectionHead
(
section
-
1
)
}
// Process the newly defined section in the background
c
.
lock
.
Unlock
()
newHead
,
err
:=
c
.
processSection
(
section
,
oldHead
)
c
.
lock
.
Lock
()
// If processing succeeded and no reorgs occcurred, mark the section completed
if
err
==
nil
&&
oldHead
==
c
.
sectionHead
(
section
-
1
)
{
c
.
setSectionHead
(
section
,
newHead
)
c
.
setValidSections
(
section
+
1
)
c
.
cascadedHead
=
c
.
storedSections
*
c
.
sectionSize
-
1
for
_
,
child
:=
range
c
.
children
{
c
.
log
.
Trace
(
"Cascading chain index update"
,
"head"
,
c
.
cascadedHead
)
child
.
newHead
(
c
.
cascadedHead
,
false
)
}
}
else
{
// If processing failed, don't retry until further notification
c
.
log
.
Debug
(
"Chain index processing failed"
,
"section"
,
section
,
"err"
,
err
)
c
.
knownSections
=
c
.
storedSections
}
}
// If there are still further sections to process, reschedule
if
c
.
knownSections
>
c
.
storedSections
{
time
.
AfterFunc
(
c
.
throttling
,
func
()
{
select
{
case
c
.
update
<-
struct
{}{}
:
default
:
}
})
}
c
.
lock
.
Unlock
()
}
}
}
// processSection processes an entire section by calling backend functions while
// ensuring the continuity of the passed headers. Since the chain mutex is not
// held while processing, the continuity can be broken by a long reorg, in which
// case the function returns with an error.
func
(
c
*
ChainIndexer
)
processSection
(
section
uint64
,
lastHead
common
.
Hash
)
(
common
.
Hash
,
error
)
{
c
.
log
.
Trace
(
"Processing new chain section"
,
"section"
,
section
)
// Reset and partial processing
c
.
backend
.
Reset
(
section
)
for
number
:=
section
*
c
.
sectionSize
;
number
<
(
section
+
1
)
*
c
.
sectionSize
;
number
++
{
hash
:=
GetCanonicalHash
(
c
.
chainDb
,
number
)
if
hash
==
(
common
.
Hash
{})
{
return
common
.
Hash
{},
fmt
.
Errorf
(
"canonical block #%d unknown"
,
number
)
}
header
:=
GetHeader
(
c
.
chainDb
,
hash
,
number
)
if
header
==
nil
{
return
common
.
Hash
{},
fmt
.
Errorf
(
"block #%d [%x…] not found"
,
number
,
hash
[
:
4
])
}
else
if
header
.
ParentHash
!=
lastHead
{
return
common
.
Hash
{},
fmt
.
Errorf
(
"chain reorged during section processing"
)
}
c
.
backend
.
Process
(
header
)
lastHead
=
header
.
Hash
()
}
if
err
:=
c
.
backend
.
Commit
(
c
.
chainDb
);
err
!=
nil
{
return
common
.
Hash
{},
err
}
return
lastHead
,
nil
}
// Sections returns the number of processed sections maintained by the indexer
// and also the information about the last header indexed for potential canonical
// verifications.
func
(
c
*
ChainIndexer
)
Sections
()
(
uint64
,
uint64
,
common
.
Hash
)
{
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
return
c
.
storedSections
,
c
.
storedSections
*
c
.
sectionSize
-
1
,
c
.
sectionHead
(
c
.
storedSections
-
1
)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
func
(
c
*
ChainIndexer
)
AddChildIndexer
(
indexer
*
ChainIndexer
)
{
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
c
.
children
=
append
(
c
.
children
,
indexer
)
// Cascade any pending updates to new children too
if
c
.
storedSections
>
0
{
indexer
.
newHead
(
c
.
storedSections
*
c
.
sectionSize
-
1
,
false
)
}
}
// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func
(
c
*
ChainIndexer
)
loadValidSections
()
{
data
,
_
:=
c
.
indexDb
.
Get
([]
byte
(
"count"
))
if
len
(
data
)
==
8
{
c
.
storedSections
=
binary
.
BigEndian
.
Uint64
(
data
[
:
])
}
}
// setValidSections writes the number of valid sections to the index database
func
(
c
*
ChainIndexer
)
setValidSections
(
sections
uint64
)
{
// Set the current number of valid sections in the database
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
sections
)
c
.
indexDb
.
Put
([]
byte
(
"count"
),
data
[
:
])
// Remove any reorged sections, caching the valids in the mean time
for
c
.
storedSections
>
sections
{
c
.
storedSections
--
c
.
removeSectionHead
(
c
.
storedSections
)
}
c
.
storedSections
=
sections
// needed if new > old
}
// sectionHead retrieves the last block hash of a processed section from the
// index database.
func
(
c
*
ChainIndexer
)
sectionHead
(
section
uint64
)
common
.
Hash
{
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
section
)
hash
,
_
:=
c
.
indexDb
.
Get
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
))
if
len
(
hash
)
==
len
(
common
.
Hash
{})
{
return
common
.
BytesToHash
(
hash
)
}
return
common
.
Hash
{}
}
// setSectionHead writes the last block hash of a processed section to the index
// database.
func
(
c
*
ChainIndexer
)
setSectionHead
(
section
uint64
,
hash
common
.
Hash
)
{
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
section
)
c
.
indexDb
.
Put
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
),
hash
.
Bytes
())
}
// removeSectionHead removes the reference to a processed section from the index
// database.
func
(
c
*
ChainIndexer
)
removeSectionHead
(
section
uint64
)
{
var
data
[
8
]
byte
binary
.
BigEndian
.
PutUint64
(
data
[
:
],
section
)
c
.
indexDb
.
Delete
(
append
([]
byte
(
"shead"
),
data
[
:
]
...
))
}
core/chain_indexer_test.go
0 → 100644
View file @
374c49e0
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package
core
import
(
"fmt"
"math/big"
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
// Runs multiple tests with randomized parameters.
func
TestChainIndexerSingle
(
t
*
testing
.
T
)
{
for
i
:=
0
;
i
<
10
;
i
++
{
testChainIndexer
(
t
,
1
)
}
}
// Runs multiple tests with randomized parameters and different number of
// chain backends.
func
TestChainIndexerWithChildren
(
t
*
testing
.
T
)
{
for
i
:=
2
;
i
<
8
;
i
++
{
testChainIndexer
(
t
,
i
)
}
}
// testChainIndexer runs a test with either a single chain indexer or a chain of
// multiple backends. The section size and required confirmation count parameters
// are randomized.
func
testChainIndexer
(
t
*
testing
.
T
,
count
int
)
{
db
,
_
:=
ethdb
.
NewMemDatabase
()
defer
db
.
Close
()
// Create a chain of indexers and ensure they all report empty
backends
:=
make
([]
*
testChainIndexBackend
,
count
)
for
i
:=
0
;
i
<
count
;
i
++
{
var
(
sectionSize
=
uint64
(
rand
.
Intn
(
100
)
+
1
)
confirmsReq
=
uint64
(
rand
.
Intn
(
10
))
)
backends
[
i
]
=
&
testChainIndexBackend
{
t
:
t
,
processCh
:
make
(
chan
uint64
)}
backends
[
i
]
.
indexer
=
NewChainIndexer
(
db
,
ethdb
.
NewTable
(
db
,
string
([]
byte
{
byte
(
i
)})),
backends
[
i
],
sectionSize
,
confirmsReq
,
0
,
fmt
.
Sprintf
(
"indexer-%d"
,
i
))
defer
backends
[
i
]
.
indexer
.
Close
()
if
sections
,
_
,
_
:=
backends
[
i
]
.
indexer
.
Sections
();
sections
!=
0
{
t
.
Fatalf
(
"Canonical section count mismatch: have %v, want %v"
,
sections
,
0
)
}
if
i
>
0
{
backends
[
i
-
1
]
.
indexer
.
AddChildIndexer
(
backends
[
i
]
.
indexer
)
}
}
// notify pings the root indexer about a new head or reorg, then expect
// processed blocks if a section is processable
notify
:=
func
(
headNum
,
failNum
uint64
,
reorg
bool
)
{
backends
[
0
]
.
indexer
.
newHead
(
headNum
,
reorg
)
if
reorg
{
for
_
,
backend
:=
range
backends
{
headNum
=
backend
.
reorg
(
headNum
)
backend
.
assertSections
()
}
return
}
var
cascade
bool
for
_
,
backend
:=
range
backends
{
headNum
,
cascade
=
backend
.
assertBlocks
(
headNum
,
failNum
)
if
!
cascade
{
break
}
backend
.
assertSections
()
}
}
// inject inserts a new random canonical header into the database directly
inject
:=
func
(
number
uint64
)
{
header
:=
&
types
.
Header
{
Number
:
big
.
NewInt
(
int64
(
number
)),
Extra
:
big
.
NewInt
(
rand
.
Int63
())
.
Bytes
()}
if
number
>
0
{
header
.
ParentHash
=
GetCanonicalHash
(
db
,
number
-
1
)
}
WriteHeader
(
db
,
header
)
WriteCanonicalHash
(
db
,
header
.
Hash
(),
number
)
}
// Start indexer with an already existing chain
for
i
:=
uint64
(
0
);
i
<=
100
;
i
++
{
inject
(
i
)
}
notify
(
100
,
100
,
false
)
// Add new blocks one by one
for
i
:=
uint64
(
101
);
i
<=
1000
;
i
++
{
inject
(
i
)
notify
(
i
,
i
,
false
)
}
// Do a reorg
notify
(
500
,
500
,
true
)
// Create new fork
for
i
:=
uint64
(
501
);
i
<=
1000
;
i
++
{
inject
(
i
)
notify
(
i
,
i
,
false
)
}
for
i
:=
uint64
(
1001
);
i
<=
1500
;
i
++
{
inject
(
i
)
}
// Failed processing scenario where less blocks are available than notified
notify
(
2000
,
1500
,
false
)
// Notify about a reorg (which could have caused the missing blocks if happened during processing)
notify
(
1500
,
1500
,
true
)
// Create new fork
for
i
:=
uint64
(
1501
);
i
<=
2000
;
i
++
{
inject
(
i
)
notify
(
i
,
i
,
false
)
}
}
// testChainIndexBackend implements ChainIndexerBackend
type
testChainIndexBackend
struct
{
t
*
testing
.
T
indexer
*
ChainIndexer
section
,
headerCnt
,
stored
uint64
processCh
chan
uint64
}
// assertSections verifies if a chain indexer has the correct number of section.
func
(
b
*
testChainIndexBackend
)
assertSections
()
{
// Keep trying for 3 seconds if it does not match
var
sections
uint64
for
i
:=
0
;
i
<
300
;
i
++
{
sections
,
_
,
_
=
b
.
indexer
.
Sections
()
if
sections
==
b
.
stored
{
return
}
time
.
Sleep
(
10
*
time
.
Millisecond
)
}
b
.
t
.
Fatalf
(
"Canonical section count mismatch: have %v, want %v"
,
sections
,
b
.
stored
)
}
// assertBlocks expects processing calls after new blocks have arrived. If the
// failNum < headNum then we are simulating a scenario where a reorg has happened
// after the processing has started and the processing of a section fails.
func
(
b
*
testChainIndexBackend
)
assertBlocks
(
headNum
,
failNum
uint64
)
(
uint64
,
bool
)
{
var
sections
uint64
if
headNum
>=
b
.
indexer
.
confirmsReq
{
sections
=
(
headNum
+
1
-
b
.
indexer
.
confirmsReq
)
/
b
.
indexer
.
sectionSize
if
sections
>
b
.
stored
{
// expect processed blocks
for
expectd
:=
b
.
stored
*
b
.
indexer
.
sectionSize
;
expectd
<
sections
*
b
.
indexer
.
sectionSize
;
expectd
++
{
if
expectd
>
failNum
{
// rolled back after processing started, no more process calls expected
// wait until updating is done to make sure that processing actually fails
var
updating
bool
for
i
:=
0
;
i
<
300
;
i
++
{
b
.
indexer
.
lock
.
Lock
()
updating
=
b
.
indexer
.
knownSections
>
b
.
indexer
.
storedSections
b
.
indexer
.
lock
.
Unlock
()
if
!
updating
{
break
}
time
.
Sleep
(
10
*
time
.
Millisecond
)
}
if
updating
{
b
.
t
.
Fatalf
(
"update did not finish"
)
}
sections
=
expectd
/
b
.
indexer
.
sectionSize
break
}
select
{
case
<-
time
.
After
(
10
*
time
.
Second
)
:
b
.
t
.
Fatalf
(
"Expected processed block #%d, got nothing"
,
expectd
)
case
processed
:=
<-
b
.
processCh
:
if
processed
!=
expectd
{
b
.
t
.
Errorf
(
"Expected processed block #%d, got #%d"
,
expectd
,
processed
)
}
}
}
b
.
stored
=
sections
}
}
if
b
.
stored
==
0
{
return
0
,
false
}
return
b
.
stored
*
b
.
indexer
.
sectionSize
-
1
,
true
}
func
(
b
*
testChainIndexBackend
)
reorg
(
headNum
uint64
)
uint64
{
firstChanged
:=
headNum
/
b
.
indexer
.
sectionSize
if
firstChanged
<
b
.
stored
{
b
.
stored
=
firstChanged
}
return
b
.
stored
*
b
.
indexer
.
sectionSize
}
func
(
b
*
testChainIndexBackend
)
Reset
(
section
uint64
)
{
b
.
section
=
section
b
.
headerCnt
=
0
}
func
(
b
*
testChainIndexBackend
)
Process
(
header
*
types
.
Header
)
{
b
.
headerCnt
++
if
b
.
headerCnt
>
b
.
indexer
.
sectionSize
{
b
.
t
.
Error
(
"Processing too many headers"
)
}
//t.processCh <- header.Number.Uint64()
select
{
case
<-
time
.
After
(
10
*
time
.
Second
)
:
b
.
t
.
Fatal
(
"Unexpected call to Process"
)
case
b
.
processCh
<-
header
.
Number
.
Uint64
()
:
}
}
func
(
b
*
testChainIndexBackend
)
Commit
(
db
ethdb
.
Database
)
error
{
if
b
.
headerCnt
!=
b
.
indexer
.
sectionSize
{
b
.
t
.
Error
(
"Not enough headers processed"
)
}
return
nil
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment