crawl.go 4.92 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright 2019 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.

package main

import (
20 21
	"sync"
	"sync/atomic"
22 23 24 25 26 27 28 29 30
	"time"

	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/p2p/enode"
)

type crawler struct {
	input     nodeSet
	output    nodeSet
31
	disc      resolver
32 33 34 35 36 37 38
	iters     []enode.Iterator
	inputIter enode.Iterator
	ch        chan *enode.Node
	closed    chan struct{}

	// settings
	revalidateInterval time.Duration
39
	mu                 sync.RWMutex
40 41
}

42 43 44 45 46 47 48 49
const (
	nodeRemoved = iota
	nodeSkipRecent
	nodeSkipIncompat
	nodeAdded
	nodeUpdated
)

50 51 52 53 54
type resolver interface {
	RequestENR(*enode.Node) (*enode.Node, error)
}

func newCrawler(input nodeSet, disc resolver, iters ...enode.Iterator) *crawler {
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
	c := &crawler{
		input:     input,
		output:    make(nodeSet, len(input)),
		disc:      disc,
		iters:     iters,
		inputIter: enode.IterNodes(input.nodes()),
		ch:        make(chan *enode.Node),
		closed:    make(chan struct{}),
	}
	c.iters = append(c.iters, c.inputIter)
	// Copy input to output initially. Any nodes that fail validation
	// will be dropped from output during the run.
	for id, n := range input {
		c.output[id] = n
	}
	return c
}

73
func (c *crawler) run(timeout time.Duration, nthreads int) nodeSet {
74 75 76
	var (
		timeoutTimer = time.NewTimer(timeout)
		timeoutCh    <-chan time.Time
77
		statusTicker = time.NewTicker(time.Second * 8)
78 79 80
		doneCh       = make(chan enode.Iterator, len(c.iters))
		liveIters    = len(c.iters)
	)
81 82 83
	if nthreads < 1 {
		nthreads = 1
	}
84
	defer timeoutTimer.Stop()
85
	defer statusTicker.Stop()
86 87 88
	for _, it := range c.iters {
		go c.runIterator(doneCh, it)
	}
89
	var (
90 91 92 93 94
		added   atomic.Uint64
		updated atomic.Uint64
		skipped atomic.Uint64
		recent  atomic.Uint64
		removed atomic.Uint64
95
		wg      sync.WaitGroup
96
	)
97 98 99 100 101 102 103 104 105
	wg.Add(nthreads)
	for i := 0; i < nthreads; i++ {
		go func() {
			defer wg.Done()
			for {
				select {
				case n := <-c.ch:
					switch c.updateNode(n) {
					case nodeSkipIncompat:
106
						skipped.Add(1)
107
					case nodeSkipRecent:
108
						recent.Add(1)
109
					case nodeRemoved:
110
						removed.Add(1)
111
					case nodeAdded:
112
						added.Add(1)
113
					default:
114
						updated.Add(1)
115 116 117 118 119 120 121 122
					}
				case <-c.closed:
					return
				}
			}
		}()
	}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
loop:
	for {
		select {
		case it := <-doneCh:
			if it == c.inputIter {
				// Enable timeout when we're done revalidating the input nodes.
				log.Info("Revalidation of input set is done", "len", len(c.input))
				if timeout > 0 {
					timeoutCh = timeoutTimer.C
				}
			}
			if liveIters--; liveIters == 0 {
				break loop
			}
		case <-timeoutCh:
			break loop
139 140
		case <-statusTicker.C:
			log.Info("Crawling in progress",
141 142 143 144 145
				"added", added.Load(),
				"updated", updated.Load(),
				"removed", removed.Load(),
				"ignored(recent)", recent.Load(),
				"ignored(incompatible)", skipped.Load())
146 147 148 149 150 151 152 153 154 155
		}
	}

	close(c.closed)
	for _, it := range c.iters {
		it.Close()
	}
	for ; liveIters > 0; liveIters-- {
		<-doneCh
	}
156
	wg.Wait()
157 158 159 160 161 162 163 164 165 166 167 168 169 170
	return c.output
}

func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) {
	defer func() { done <- it }()
	for it.Next() {
		select {
		case c.ch <- it.Node():
		case <-c.closed:
			return
		}
	}
}

171 172 173
// updateNode updates the info about the given node, and returns a status
// about what changed
func (c *crawler) updateNode(n *enode.Node) int {
174
	c.mu.RLock()
175
	node, ok := c.output[n.ID()]
176
	c.mu.RUnlock()
177 178 179

	// Skip validation of recently-seen nodes.
	if ok && time.Since(node.LastCheck) < c.revalidateInterval {
180
		return nodeSkipRecent
181 182 183
	}

	// Request the node record.
184
	status := nodeUpdated
185 186
	node.LastCheck = truncNow()
	if nn, err := c.disc.RequestENR(n); err != nil {
187 188 189
		if node.Score == 0 {
			// Node doesn't implement EIP-868.
			log.Debug("Skipping node", "id", n.ID())
190
			return nodeSkipIncompat
191 192 193 194 195 196 197 198
		}
		node.Score /= 2
	} else {
		node.N = nn
		node.Seq = nn.Seq()
		node.Score++
		if node.FirstResponse.IsZero() {
			node.FirstResponse = node.LastCheck
199
			status = nodeAdded
200 201 202 203
		}
		node.LastResponse = node.LastCheck
	}
	// Store/update node in output set.
204 205
	c.mu.Lock()
	defer c.mu.Unlock()
206
	if node.Score <= 0 {
207
		log.Debug("Removing node", "id", n.ID())
208
		delete(c.output, n.ID())
209
		return nodeRemoved
210
	}
211 212 213
	log.Debug("Updating node", "id", n.ID(), "seq", n.Seq(), "score", node.Score)
	c.output[n.ID()] = node
	return status
214 215 216 217 218
}

func truncNow() time.Time {
	return time.Now().UTC().Truncate(1 * time.Second)
}