eth: pre-process downloader responses on the peer reader thread

parent 721c5723
This diff is collapsed.
...@@ -177,6 +177,10 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i ...@@ -177,6 +177,10 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
} }
} }
} }
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader // Deliver the headers to the downloader
req := &eth.Request{ req := &eth.Request{
Peer: dlp.id, Peer: dlp.id,
...@@ -184,6 +188,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i ...@@ -184,6 +188,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
res := &eth.Response{ res := &eth.Response{
Req: req, Req: req,
Res: (*eth.BlockHeadersPacket)(&headers), Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1, Time: 1,
Done: make(chan error, 1), // Ignore the returned status Done: make(chan error, 1), // Ignore the returned status
} }
...@@ -216,6 +221,10 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, ...@@ -216,6 +221,10 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int,
} }
} }
} }
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader // Deliver the headers to the downloader
req := &eth.Request{ req := &eth.Request{
Peer: dlp.id, Peer: dlp.id,
...@@ -223,6 +232,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, ...@@ -223,6 +232,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int,
res := &eth.Response{ res := &eth.Response{
Req: req, Req: req,
Res: (*eth.BlockHeadersPacket)(&headers), Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1, Time: 1,
Done: make(chan error, 1), // Ignore the returned status Done: make(chan error, 1), // Ignore the returned status
} }
...@@ -243,12 +253,22 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et ...@@ -243,12 +253,22 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
bodies[i] = new(eth.BlockBody) bodies[i] = new(eth.BlockBody)
rlp.DecodeBytes(blob, bodies[i]) rlp.DecodeBytes(blob, bodies[i])
} }
var (
txsHashes = make([]common.Hash, len(bodies))
uncleHashes = make([]common.Hash, len(bodies))
)
hasher := trie.NewStackTrie(nil)
for i, body := range bodies {
txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher)
uncleHashes[i] = types.CalcUncleHash(body.Uncles)
}
req := &eth.Request{ req := &eth.Request{
Peer: dlp.id, Peer: dlp.id,
} }
res := &eth.Response{ res := &eth.Response{
Req: req, Req: req,
Res: (*eth.BlockBodiesPacket)(&bodies), Res: (*eth.BlockBodiesPacket)(&bodies),
Meta: [][]common.Hash{txsHashes, uncleHashes},
Time: 1, Time: 1,
Done: make(chan error, 1), // Ignore the returned status Done: make(chan error, 1), // Ignore the returned status
} }
...@@ -268,12 +288,18 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan * ...@@ -268,12 +288,18 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *
for i, blob := range blobs { for i, blob := range blobs {
rlp.DecodeBytes(blob, &receipts[i]) rlp.DecodeBytes(blob, &receipts[i])
} }
hasher := trie.NewStackTrie(nil)
hashes = make([]common.Hash, len(receipts))
for i, receipt := range receipts {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
}
req := &eth.Request{ req := &eth.Request{
Peer: dlp.id, Peer: dlp.id,
} }
res := &eth.Response{ res := &eth.Response{
Req: req, Req: req,
Res: (*eth.ReceiptsPacket)(&receipts), Res: (*eth.ReceiptsPacket)(&receipts),
Meta: hashes,
Time: 1, Time: 1,
Done: make(chan error, 1), // Ignore the returned status Done: make(chan error, 1), // Ignore the returned status
} }
......
...@@ -27,14 +27,14 @@ import ( ...@@ -27,14 +27,14 @@ import (
// fetchHeadersByHash is a blocking version of Peer.RequestHeadersByHash which // fetchHeadersByHash is a blocking version of Peer.RequestHeadersByHash which
// handles all the cancellation, interruption and timeout mechanisms of a data // handles all the cancellation, interruption and timeout mechanisms of a data
// retrieval to allow blocking API calls. // retrieval to allow blocking API calls.
func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amount int, skip int, reverse bool) ([]*types.Header, error) { func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) {
// Create the response sink and send the network request // Create the response sink and send the network request
start := time.Now() start := time.Now()
resCh := make(chan *eth.Response) resCh := make(chan *eth.Response)
req, err := p.peer.RequestHeadersByHash(hash, amount, skip, reverse, resCh) req, err := p.peer.RequestHeadersByHash(hash, amount, skip, reverse, resCh)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
defer req.Close() defer req.Close()
...@@ -46,14 +46,14 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo ...@@ -46,14 +46,14 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return nil, errCanceled return nil, nil, errCanceled
case <-timeoutTimer.C: case <-timeoutTimer.C:
// Header retrieval timed out, update the metrics // Header retrieval timed out, update the metrics
p.log.Debug("Header request timed out", "elapsed", ttl) p.log.Debug("Header request timed out", "elapsed", ttl)
headerTimeoutMeter.Mark(1) headerTimeoutMeter.Mark(1)
return nil, errTimeout return nil, nil, errTimeout
case res := <-resCh: case res := <-resCh:
// Headers successfully retrieved, update the metrics // Headers successfully retrieved, update the metrics
...@@ -65,21 +65,21 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo ...@@ -65,21 +65,21 @@ func (d *Downloader) fetchHeadersByHash(p *peerConnection, hash common.Hash, amo
// be processed by the caller // be processed by the caller
res.Done <- nil res.Done <- nil
return *res.Res.(*eth.BlockHeadersPacket), nil return *res.Res.(*eth.BlockHeadersPacket), res.Meta.([]common.Hash), nil
} }
} }
// fetchHeadersByNumber is a blocking version of Peer.RequestHeadersByNumber which // fetchHeadersByNumber is a blocking version of Peer.RequestHeadersByNumber which
// handles all the cancellation, interruption and timeout mechanisms of a data // handles all the cancellation, interruption and timeout mechanisms of a data
// retrieval to allow blocking API calls. // retrieval to allow blocking API calls.
func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amount int, skip int, reverse bool) ([]*types.Header, error) { func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error) {
// Create the response sink and send the network request // Create the response sink and send the network request
start := time.Now() start := time.Now()
resCh := make(chan *eth.Response) resCh := make(chan *eth.Response)
req, err := p.peer.RequestHeadersByNumber(number, amount, skip, reverse, resCh) req, err := p.peer.RequestHeadersByNumber(number, amount, skip, reverse, resCh)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
defer req.Close() defer req.Close()
...@@ -91,14 +91,14 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou ...@@ -91,14 +91,14 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return nil, errCanceled return nil, nil, errCanceled
case <-timeoutTimer.C: case <-timeoutTimer.C:
// Header retrieval timed out, update the metrics // Header retrieval timed out, update the metrics
p.log.Debug("Header request timed out", "elapsed", ttl) p.log.Debug("Header request timed out", "elapsed", ttl)
headerTimeoutMeter.Mark(1) headerTimeoutMeter.Mark(1)
return nil, errTimeout return nil, nil, errTimeout
case res := <-resCh: case res := <-resCh:
// Headers successfully retrieved, update the metrics // Headers successfully retrieved, update the metrics
...@@ -110,6 +110,6 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou ...@@ -110,6 +110,6 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou
// be processed by the caller // be processed by the caller
res.Done <- nil res.Done <- nil
return *res.Res.(*eth.BlockHeadersPacket), nil return *res.Res.(*eth.BlockHeadersPacket), res.Meta.([]common.Hash), nil
} }
} }
...@@ -90,8 +90,9 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan ...@@ -90,8 +90,9 @@ func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan
// fetcher, unpacking the body data and delivering it to the downloader's queue. // fetcher, unpacking the body data and delivering it to the downloader's queue.
func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { func (q *bodyQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
txs, uncles := packet.Res.(*eth.BlockBodiesPacket).Unpack() txs, uncles := packet.Res.(*eth.BlockBodiesPacket).Unpack()
hashsets := packet.Meta.([][]common.Hash) // {txs hashes, uncle hashes}
accepted, err := q.queue.DeliverBodies(peer.id, txs, uncles) accepted, err := q.queue.DeliverBodies(peer.id, txs, hashsets[0], uncles, hashsets[1])
switch { switch {
case err == nil && len(txs) == 0: case err == nil && len(txs) == 0:
peer.log.Trace("Requested bodies delivered") peer.log.Trace("Requested bodies delivered")
......
...@@ -19,6 +19,7 @@ package downloader ...@@ -19,6 +19,7 @@ package downloader
import ( import (
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -81,8 +82,9 @@ func (q *headerQueue) request(peer *peerConnection, req *fetchRequest, resCh cha ...@@ -81,8 +82,9 @@ func (q *headerQueue) request(peer *peerConnection, req *fetchRequest, resCh cha
// fetcher, unpacking the header data and delivering it to the downloader's queue. // fetcher, unpacking the header data and delivering it to the downloader's queue.
func (q *headerQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { func (q *headerQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
headers := *packet.Res.(*eth.BlockHeadersPacket) headers := *packet.Res.(*eth.BlockHeadersPacket)
hashes := packet.Meta.([]common.Hash)
accepted, err := q.queue.DeliverHeaders(peer.id, headers, q.headerProcCh) accepted, err := q.queue.DeliverHeaders(peer.id, headers, hashes, q.headerProcCh)
switch { switch {
case err == nil && len(headers) == 0: case err == nil && len(headers) == 0:
peer.log.Trace("Requested headers delivered") peer.log.Trace("Requested headers delivered")
......
...@@ -89,8 +89,9 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch ...@@ -89,8 +89,9 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
// fetcher, unpacking the receipt data and delivering it to the downloader's queue. // fetcher, unpacking the receipt data and delivering it to the downloader's queue.
func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
receipts := *packet.Res.(*eth.ReceiptsPacket) receipts := *packet.Res.(*eth.ReceiptsPacket)
hashes := packet.Meta.([]common.Hash) // {receipt hashes}
accepted, err := q.queue.DeliverReceipts(peer.id, receipts) accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
switch { switch {
case err == nil && len(receipts) == 0: case err == nil && len(receipts) == 0:
peer.log.Trace("Requested receipts delivered") peer.log.Trace("Requested receipts delivered")
......
...@@ -31,7 +31,6 @@ import ( ...@@ -31,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/trie"
) )
const ( const (
...@@ -119,6 +118,7 @@ type queue struct { ...@@ -119,6 +118,7 @@ type queue struct {
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
headerResults []*types.Header // Result cache accumulating the completed headers headerResults []*types.Header // Result cache accumulating the completed headers
headerHashes []common.Hash // Result cache accumulating the completed header hashes
headerProced int // Number of headers already processed from the results headerProced int // Number of headers already processed from the results
headerOffset uint64 // Number of the first header in the result cache headerOffset uint64 // Number of the first header in the result cache
headerContCh chan bool // Channel to notify when header download finishes headerContCh chan bool // Channel to notify when header download finishes
...@@ -260,6 +260,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { ...@@ -260,6 +260,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
q.headerTaskQueue = prque.New(nil) q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0 q.headerProced = 0
q.headerOffset = from q.headerOffset = from
q.headerContCh = make(chan bool, 1) q.headerContCh = make(chan bool, 1)
...@@ -274,27 +275,27 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { ...@@ -274,27 +275,27 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
// RetrieveHeaders retrieves the header chain assemble based on the scheduled // RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton. // skeleton.
func (q *queue) RetrieveHeaders() ([]*types.Header, int) { func (q *queue) RetrieveHeaders() ([]*types.Header, []common.Hash, int) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
headers, proced := q.headerResults, q.headerProced headers, hashes, proced := q.headerResults, q.headerHashes, q.headerProced
q.headerResults, q.headerProced = nil, 0 q.headerResults, q.headerHashes, q.headerProced = nil, nil, 0
return headers, proced return headers, hashes, proced
} }
// Schedule adds a set of headers for the download queue for scheduling, returning // Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered. // the new headers encountered.
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) []*types.Header {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Insert all the headers prioritised by the contained block number // Insert all the headers prioritised by the contained block number
inserts := make([]*types.Header, 0, len(headers)) inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers { for i, header := range headers {
// Make sure chain order is honoured and preserved throughout // Make sure chain order is honoured and preserved throughout
hash := header.Hash() hash := hashes[i]
if header.Number == nil || header.Number.Uint64() != from { if header.Number == nil || header.Number.Uint64() != from {
log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from) log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
break break
...@@ -656,7 +657,7 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue ...@@ -656,7 +657,7 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue
// If the headers are accepted, the method makes an attempt to deliver the set // If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However, it will // of ready headers to the processor to keep the pipeline full. However, it will
// not block to prevent stalling other pending deliveries. // not block to prevent stalling other pending deliveries.
func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []common.Hash, headerProcCh chan *headerTask) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
...@@ -684,17 +685,17 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh ...@@ -684,17 +685,17 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
accepted := len(headers) == MaxHeaderFetch accepted := len(headers) == MaxHeaderFetch
if accepted { if accepted {
if headers[0].Number.Uint64() != request.From { if headers[0].Number.Uint64() != request.From {
logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", headers[0].Hash(), "expected", request.From) logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", hashes[0], "expected", request.From)
accepted = false accepted = false
} else if headers[len(headers)-1].Hash() != target { } else if hashes[len(headers)-1] != target {
logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target) logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", hashes[len(headers)-1], "expected", target)
accepted = false accepted = false
} }
} }
if accepted { if accepted {
parentHash := headers[0].Hash() parentHash := hashes[0]
for i, header := range headers[1:] { for i, header := range headers[1:] {
hash := header.Hash() hash := hashes[i+1]
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want) logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want)
accepted = false accepted = false
...@@ -726,6 +727,8 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh ...@@ -726,6 +727,8 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
} }
// Clean up a successful fetch and try to deliver any sub-results // Clean up a successful fetch and try to deliver any sub-results
copy(q.headerResults[request.From-q.headerOffset:], headers) copy(q.headerResults[request.From-q.headerOffset:], headers)
copy(q.headerHashes[request.From-q.headerOffset:], hashes)
delete(q.headerTaskPool, request.From) delete(q.headerTaskPool, request.From)
ready := 0 ready := 0
...@@ -734,13 +737,19 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh ...@@ -734,13 +737,19 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
} }
if ready > 0 { if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking) // Headers are ready for delivery, gather them and push forward (non blocking)
process := make([]*types.Header, ready) processHeaders := make([]*types.Header, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) copy(processHeaders, q.headerResults[q.headerProced:q.headerProced+ready])
processHashes := make([]common.Hash, ready)
copy(processHashes, q.headerHashes[q.headerProced:q.headerProced+ready])
select { select {
case headerProcCh <- process: case headerProcCh <- &headerTask{
logger.Trace("Pre-scheduled new headers", "count", len(process), "from", process[0].Number) headers: processHeaders,
q.headerProced += len(process) hashes: processHashes,
}:
logger.Trace("Pre-scheduled new headers", "count", len(processHeaders), "from", processHeaders[0].Number)
q.headerProced += len(processHeaders)
default: default:
} }
} }
...@@ -754,16 +763,15 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh ...@@ -754,16 +763,15 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
// DeliverBodies injects a block body retrieval response into the results queue. // DeliverBodies injects a block body retrieval response into the results queue.
// The method returns the number of blocks bodies accepted from the delivery and // The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery. // also wakes any threads waiting for data delivery.
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListHashes []common.Hash, uncleLists [][]*types.Header, uncleListHashes []common.Hash) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
trieHasher := trie.NewStackTrie(nil)
validate := func(index int, header *types.Header) error { validate := func(index int, header *types.Header) error {
if types.DeriveSha(types.Transactions(txLists[index]), trieHasher) != header.TxHash { if txListHashes[index] != header.TxHash {
return errInvalidBody return errInvalidBody
} }
if types.CalcUncleHash(uncleLists[index]) != header.UncleHash { if uncleListHashes[index] != header.UncleHash {
return errInvalidBody return errInvalidBody
} }
return nil return nil
...@@ -781,13 +789,12 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi ...@@ -781,13 +789,12 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
// DeliverReceipts injects a receipt retrieval response into the results queue. // DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery // The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery. // and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, receiptListHashes []common.Hash) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
trieHasher := trie.NewStackTrie(nil)
validate := func(index int, header *types.Header) error { validate := func(index int, header *types.Header) error {
if types.DeriveSha(types.Receipts(receiptList[index]), trieHasher) != header.ReceiptHash { if receiptListHashes[index] != header.ReceiptHash {
return errInvalidReceipt return errInvalidReceipt
} }
return nil return nil
......
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
) )
var ( var (
...@@ -110,7 +111,12 @@ func TestBasics(t *testing.T) { ...@@ -110,7 +111,12 @@ func TestBasics(t *testing.T) {
} }
// Schedule a batch of headers // Schedule a batch of headers
q.Schedule(chain.headers(), 1) headers := chain.headers()
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
q.Schedule(headers, hashes, 1)
if q.Idle() { if q.Idle() {
t.Errorf("queue should not be idle") t.Errorf("queue should not be idle")
} }
...@@ -198,8 +204,14 @@ func TestEmptyBlocks(t *testing.T) { ...@@ -198,8 +204,14 @@ func TestEmptyBlocks(t *testing.T) {
q := newQueue(10, 10) q := newQueue(10, 10)
q.Prepare(1, SnapSync) q.Prepare(1, SnapSync)
// Schedule a batch of headers // Schedule a batch of headers
q.Schedule(emptyChain.headers(), 1) headers := emptyChain.headers()
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
q.Schedule(headers, hashes, 1)
if q.Idle() { if q.Idle() {
t.Errorf("queue should not be idle") t.Errorf("queue should not be idle")
} }
...@@ -280,11 +292,15 @@ func XTestDelivery(t *testing.T) { ...@@ -280,11 +292,15 @@ func XTestDelivery(t *testing.T) {
c := 1 c := 1
for { for {
//fmt.Printf("getting headers from %d\n", c) //fmt.Printf("getting headers from %d\n", c)
hdrs := world.headers(c) headers := world.headers(c)
l := len(hdrs) hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
l := len(headers)
//fmt.Printf("scheduling %d headers, first %d last %d\n", //fmt.Printf("scheduling %d headers, first %d last %d\n",
// l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64()) // l, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64())
q.Schedule(hdrs, uint64(c)) q.Schedule(headers, hashes, uint64(c))
c += l c += l
} }
}() }()
...@@ -311,18 +327,31 @@ func XTestDelivery(t *testing.T) { ...@@ -311,18 +327,31 @@ func XTestDelivery(t *testing.T) {
peer := dummyPeer(fmt.Sprintf("peer-%d", i)) peer := dummyPeer(fmt.Sprintf("peer-%d", i))
f, _, _ := q.ReserveBodies(peer, rand.Intn(30)) f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
if f != nil { if f != nil {
var emptyList []*types.Header var (
var txs [][]*types.Transaction emptyList []*types.Header
var uncles [][]*types.Header txset [][]*types.Transaction
uncleset [][]*types.Header
)
numToSkip := rand.Intn(len(f.Headers)) numToSkip := rand.Intn(len(f.Headers))
for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] { for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] {
txs = append(txs, world.getTransactions(hdr.Number.Uint64())) txset = append(txset, world.getTransactions(hdr.Number.Uint64()))
uncles = append(uncles, emptyList) uncleset = append(uncleset, emptyList)
}
var (
txsHashes = make([]common.Hash, len(txset))
uncleHashes = make([]common.Hash, len(uncleset))
)
hasher := trie.NewStackTrie(nil)
for i, txs := range txset {
txsHashes[i] = types.DeriveSha(types.Transactions(txs), hasher)
}
for i, uncles := range uncleset {
uncleHashes[i] = types.CalcUncleHash(uncles)
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
_, err := q.DeliverBodies(peer.id, txs, uncles) _, err := q.DeliverBodies(peer.id, txset, txsHashes, uncleset, uncleHashes)
if err != nil { if err != nil {
fmt.Printf("delivered %d bodies %v\n", len(txs), err) fmt.Printf("delivered %d bodies %v\n", len(txset), err)
} }
} else { } else {
i++ i++
...@@ -341,7 +370,12 @@ func XTestDelivery(t *testing.T) { ...@@ -341,7 +370,12 @@ func XTestDelivery(t *testing.T) {
for _, hdr := range f.Headers { for _, hdr := range f.Headers {
rcs = append(rcs, world.getReceipts(hdr.Number.Uint64())) rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
} }
_, err := q.DeliverReceipts(peer.id, rcs) hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(rcs))
for i, receipt := range rcs {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
}
_, err := q.DeliverReceipts(peer.id, rcs, hashes)
if err != nil { if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err) fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
} }
......
...@@ -102,6 +102,7 @@ type Response struct { ...@@ -102,6 +102,7 @@ type Response struct {
Req *Request // Original request to cross-reference with Req *Request // Original request to cross-reference with
Res interface{} // Remote response for the request query Res interface{} // Remote response for the request query
Meta interface{} // Metadata generated locally on the receiver thread
Time time.Duration // Time it took for the request to be served Time time.Duration // Time it took for the request to be served
Done chan error // Channel to signal message handling to the reader Done chan error // Channel to signal message handling to the reader
} }
...@@ -137,7 +138,7 @@ func (p *Peer) dispatchRequest(req *Request) error { ...@@ -137,7 +138,7 @@ func (p *Peer) dispatchRequest(req *Request) error {
// dispatchRequest fulfils a pending request and delivers it to the requested // dispatchRequest fulfils a pending request and delivers it to the requested
// sink. // sink.
func (p *Peer) dispatchResponse(res *Response) error { func (p *Peer) dispatchResponse(res *Response, metadata func() interface{}) error {
resOp := &response{ resOp := &response{
res: res, res: res,
fail: make(chan error), fail: make(chan error),
...@@ -151,6 +152,11 @@ func (p *Peer) dispatchResponse(res *Response) error { ...@@ -151,6 +152,11 @@ func (p *Peer) dispatchResponse(res *Response) error {
if err := <-resOp.fail; err != nil { if err := <-resOp.fail; err != nil {
return nil return nil
} }
// Request was accepted, run any postprocessing step to generate metadata
// on the receiver thread, not the sink thread
if metadata != nil {
res.Meta = metadata()
}
// Deliver the filled out response and wait until it's handled. This // Deliver the filled out response and wait until it's handled. This
// path is a bit funky as Go's select has no order, so if a response // path is a bit funky as Go's select has no order, so if a response
// arrives to an already cancelled request, there's a 50-50% changes // arrives to an already cancelled request, there's a 50-50% changes
......
...@@ -286,11 +286,18 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -286,11 +286,18 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
metadata := func() interface{} {
hashes := make([]common.Hash, len(res.BlockHeadersPacket))
for i, header := range res.BlockHeadersPacket {
hashes[i] = header.Hash()
}
return hashes
}
return peer.dispatchResponse(&Response{ return peer.dispatchResponse(&Response{
id: res.RequestId, id: res.RequestId,
code: BlockHeadersMsg, code: BlockHeadersMsg,
Res: &res.BlockHeadersPacket, Res: &res.BlockHeadersPacket,
}) }, metadata)
} }
func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
...@@ -299,11 +306,23 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -299,11 +306,23 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
metadata := func() interface{} {
var (
txsHashes = make([]common.Hash, len(res.BlockBodiesPacket))
uncleHashes = make([]common.Hash, len(res.BlockBodiesPacket))
)
hasher := trie.NewStackTrie(nil)
for i, body := range res.BlockBodiesPacket {
txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher)
uncleHashes[i] = types.CalcUncleHash(body.Uncles)
}
return [][]common.Hash{txsHashes, uncleHashes}
}
return peer.dispatchResponse(&Response{ return peer.dispatchResponse(&Response{
id: res.RequestId, id: res.RequestId,
code: BlockBodiesMsg, code: BlockBodiesMsg,
Res: &res.BlockBodiesPacket, Res: &res.BlockBodiesPacket,
}) }, metadata)
} }
func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
...@@ -316,7 +335,7 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -316,7 +335,7 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
id: res.RequestId, id: res.RequestId,
code: NodeDataMsg, code: NodeDataMsg,
Res: &res.NodeDataPacket, Res: &res.NodeDataPacket,
}) }, nil) // No post-processing, we're not using this packet anymore
} }
func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
...@@ -325,11 +344,19 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { ...@@ -325,11 +344,19 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil { if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
} }
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(res.ReceiptsPacket))
for i, receipt := range res.ReceiptsPacket {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
}
return hashes
}
return peer.dispatchResponse(&Response{ return peer.dispatchResponse(&Response{
id: res.RequestId, id: res.RequestId,
code: ReceiptsMsg, code: ReceiptsMsg,
Res: &res.ReceiptsPacket, Res: &res.ReceiptsPacket,
}) }, metadata)
} }
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment