v4_udp_test.go 19.9 KB
Newer Older
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16

17 18 19
package discover

import (
20 21
	"bytes"
	"crypto/ecdsa"
22
	crand "crypto/rand"
23
	"encoding/binary"
24
	"errors"
25
	"fmt"
26
	"io"
27
	"math/rand"
28
	"net"
29 30
	"reflect"
	"sync"
31 32 33
	"testing"
	"time"

34 35
	"github.com/ethereum/go-ethereum/internal/testlog"
	"github.com/ethereum/go-ethereum/log"
36
	"github.com/ethereum/go-ethereum/p2p/discover/v4wire"
37
	"github.com/ethereum/go-ethereum/p2p/enode"
38
	"github.com/ethereum/go-ethereum/p2p/enr"
39 40
)

41 42 43
// shared test variables
var (
	futureExp          = uint64(time.Now().Add(10 * time.Hour).Unix())
44 45 46 47
	testTarget         = v4wire.Pubkey{0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1}
	testRemote         = v4wire.Endpoint{IP: net.ParseIP("1.1.1.1").To4(), UDP: 1, TCP: 2}
	testLocalAnnounced = v4wire.Endpoint{IP: net.ParseIP("2.2.2.2").To4(), UDP: 3, TCP: 4}
	testLocal          = v4wire.Endpoint{IP: net.ParseIP("3.3.3.3").To4(), UDP: 5, TCP: 6}
48 49
)

50 51 52 53
type udpTest struct {
	t                   *testing.T
	pipe                *dgramPipe
	table               *Table
54
	db                  *enode.DB
55
	udp                 *UDPv4
56 57 58 59
	sent                [][]byte
	localkey, remotekey *ecdsa.PrivateKey
	remoteaddr          *net.UDPAddr
}
60

61 62 63 64 65 66
func newUDPTest(t *testing.T) *udpTest {
	test := &udpTest{
		t:          t,
		pipe:       newpipe(),
		localkey:   newkey(),
		remotekey:  newkey(),
67
		remoteaddr: &net.UDPAddr{IP: net.IP{10, 0, 1, 99}, Port: 30303},
68
	}
69

70 71
	test.db, _ = enode.OpenDB("")
	ln := enode.NewLocalNode(test.db, test.localkey)
72 73 74 75 76
	test.udp, _ = ListenV4(test.pipe, ln, Config{
		PrivateKey: test.localkey,
		Log:        testlog.Logger(t, log.LvlTrace),
	})
	test.table = test.udp.tab
77 78
	// Wait for initial refresh so the table doesn't send unexpected findnode.
	<-test.table.initDone
79 80 81
	return test
}

82
func (test *udpTest) close() {
83
	test.udp.Close()
84 85 86
	test.db.Close()
}

87
// handles a packet as if it had been sent to the transport.
88
func (test *udpTest) packetIn(wantError error, data v4wire.Packet) {
89 90
	test.t.Helper()

91
	test.packetInFrom(wantError, test.remotekey, test.remoteaddr, data)
92 93 94
}

// handles a packet as if it had been sent to the transport by the key/endpoint.
95
func (test *udpTest) packetInFrom(wantError error, key *ecdsa.PrivateKey, addr *net.UDPAddr, data v4wire.Packet) {
96 97
	test.t.Helper()

98
	enc, _, err := v4wire.Encode(key, data)
99
	if err != nil {
100
		test.t.Errorf("%s encode error: %v", data.Name(), err)
101
	}
102
	test.sent = append(test.sent, enc)
103
	if err = test.udp.handlePacket(addr, enc); err != wantError {
104
		test.t.Errorf("error mismatch: got %q, want %q", err, wantError)
105 106 107
	}
}

108
// waits for a packet to be sent by the transport.
109 110 111 112
// validate should have type func(X, *net.UDPAddr, []byte), where X is a packet type.
func (test *udpTest) waitPacketOut(validate interface{}) (closed bool) {
	test.t.Helper()

113 114
	dgram, err := test.pipe.receive()
	if err == errClosed {
115
		return true
116 117 118
	} else if err != nil {
		test.t.Error("packet receive error:", err)
		return false
119
	}
120
	p, _, hash, err := v4wire.Decode(dgram.data)
121
	if err != nil {
122 123
		test.t.Errorf("sent packet decode error: %v", err)
		return false
124
	}
125 126
	fn := reflect.ValueOf(validate)
	exptype := fn.Type().In(0)
127 128 129
	if !reflect.TypeOf(p).AssignableTo(exptype) {
		test.t.Errorf("sent packet type mismatch, got: %v, want: %v", reflect.TypeOf(p), exptype)
		return false
130
	}
131 132
	fn.Call([]reflect.Value{reflect.ValueOf(p), reflect.ValueOf(&dgram.to), reflect.ValueOf(hash)})
	return false
133 134
}

135
func TestUDPv4_packetErrors(t *testing.T) {
136
	test := newUDPTest(t)
137
	defer test.close()
138

139 140 141 142
	test.packetIn(errExpired, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4})
	test.packetIn(errUnsolicitedReply, &v4wire.Pong{ReplyTok: []byte{}, Expiration: futureExp})
	test.packetIn(errUnknownNode, &v4wire.Findnode{Expiration: futureExp})
	test.packetIn(errUnsolicitedReply, &v4wire.Neighbors{Expiration: futureExp})
143 144
}

145
func TestUDPv4_pingTimeout(t *testing.T) {
146 147
	t.Parallel()
	test := newUDPTest(t)
148
	defer test.close()
149

150
	key := newkey()
151
	toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
152
	node := enode.NewV4(&key.PublicKey, toaddr.IP, 0, toaddr.Port)
153
	if _, err := test.udp.ping(node); err != errTimeout {
154 155 156 157
		t.Error("expected timeout error, got", err)
	}
}

158 159
type testPacket byte

160 161
func (req testPacket) Kind() byte   { return byte(req) }
func (req testPacket) Name() string { return "" }
162

163
func TestUDPv4_responseTimeouts(t *testing.T) {
164 165
	t.Parallel()
	test := newUDPTest(t)
166
	defer test.close()
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183

	rand.Seed(time.Now().UnixNano())
	randomDuration := func(max time.Duration) time.Duration {
		return time.Duration(rand.Int63n(int64(max)))
	}

	var (
		nReqs      = 200
		nTimeouts  = 0                       // number of requests with ptype > 128
		nilErr     = make(chan error, nReqs) // for requests that get a reply
		timeoutErr = make(chan error, nReqs) // for requests that time out
	)
	for i := 0; i < nReqs; i++ {
		// Create a matcher for a random request in udp.loop. Requests
		// with ptype <= 128 will not get a reply and should time out.
		// For all other requests, a reply is scheduled to arrive
		// within the timeout window.
184
		p := &replyMatcher{
185
			ptype:    byte(rand.Intn(255)),
186
			callback: func(v4wire.Packet) (bool, bool) { return true, true },
187 188 189 190
		}
		binary.BigEndian.PutUint64(p.from[:], uint64(i))
		if p.ptype <= 128 {
			p.errc = timeoutErr
191
			test.udp.addReplyMatcher <- p
192 193 194
			nTimeouts++
		} else {
			p.errc = nilErr
195
			test.udp.addReplyMatcher <- p
196
			time.AfterFunc(randomDuration(60*time.Millisecond), func() {
197
				if !test.udp.handleReply(p.from, p.ip, testPacket(p.ptype)) {
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
					t.Logf("not matched: %v", p)
				}
			})
		}
		time.Sleep(randomDuration(30 * time.Millisecond))
	}

	// Check that all timeouts were delivered and that the rest got nil errors.
	// The replies must be delivered.
	var (
		recvDeadline        = time.After(20 * time.Second)
		nTimeoutsRecv, nNil = 0, 0
	)
	for i := 0; i < nReqs; i++ {
		select {
		case err := <-timeoutErr:
			if err != errTimeout {
				t.Fatalf("got non-timeout error on timeoutErr %d: %v", i, err)
			}
			nTimeoutsRecv++
		case err := <-nilErr:
			if err != nil {
				t.Fatalf("got non-nil error on nilErr %d: %v", i, err)
			}
			nNil++
		case <-recvDeadline:
			t.Fatalf("exceeded recv deadline")
		}
	}
	if nTimeoutsRecv != nTimeouts {
		t.Errorf("wrong number of timeout errors received: got %d, want %d", nTimeoutsRecv, nTimeouts)
	}
	if nNil != nReqs-nTimeouts {
		t.Errorf("wrong number of successful replies: got %d, want %d", nNil, nReqs-nTimeouts)
	}
}

235
func TestUDPv4_findnodeTimeout(t *testing.T) {
236
	t.Parallel()
237
	test := newUDPTest(t)
238
	defer test.close()
239

240
	toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
241
	toid := enode.ID{1, 2, 3, 4}
242
	target := v4wire.Pubkey{4, 5, 6, 7}
243 244 245 246 247 248 249 250
	result, err := test.udp.findnode(toid, toaddr, target)
	if err != errTimeout {
		t.Error("expected timeout error, got", err)
	}
	if len(result) > 0 {
		t.Error("expected empty result, got", result)
	}
}
251

252
func TestUDPv4_findnode(t *testing.T) {
253
	test := newUDPTest(t)
254
	defer test.close()
255 256

	// put a few nodes into the table. their exact
257
	// distribution shouldn't matter much, although we need to
258
	// take care not to overflow any bucket.
259
	nodes := &nodesByDistance{target: testTarget.ID()}
260 261 262
	live := make(map[enode.ID]bool)
	numCandidates := 2 * bucketSize
	for i := 0; i < numCandidates; i++ {
263
		key := newkey()
264 265 266 267 268 269 270 271
		ip := net.IP{10, 13, 0, byte(i)}
		n := wrapNode(enode.NewV4(&key.PublicKey, ip, 0, 2000))
		// Ensure half of table content isn't verified live yet.
		if i > numCandidates/2 {
			n.livenessChecks = 1
			live[n.ID()] = true
		}
		nodes.push(n, numCandidates)
272
	}
273
	fillTable(test.table, nodes.entries)
274 275 276

	// ensure there's a bond with the test node,
	// findnode won't be accepted otherwise.
277
	remoteID := v4wire.EncodePubkey(&test.remotekey.PublicKey).ID()
278
	test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())
279

280
	// check that closest neighbors are returned.
281
	expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
282
	test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
283
	waitNeighbors := func(want []*node) {
284
		test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
285 286
			if len(p.Nodes) != len(want) {
				t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
287
			}
288
			for i, n := range p.Nodes {
289
				if n.ID.ID() != want[i].ID() {
290 291
					t.Errorf("result mismatch at %d:\n  got:  %v\n  want: %v", i, n, expected.entries[i])
				}
292 293
				if !live[n.ID.ID()] {
					t.Errorf("result includes dead node %v", n.ID.ID())
294
				}
295
			}
296 297
		})
	}
298 299
	// Receive replies.
	want := expected.entries
300 301 302
	if len(want) > v4wire.MaxNeighbors {
		waitNeighbors(want[:v4wire.MaxNeighbors])
		want = want[v4wire.MaxNeighbors:]
303 304
	}
	waitNeighbors(want)
305 306
}

307
func TestUDPv4_findnodeMultiReply(t *testing.T) {
308
	test := newUDPTest(t)
309
	defer test.close()
310

311
	rid := enode.PubkeyToIDV4(&test.remotekey.PublicKey)
312
	test.table.db.UpdateLastPingReceived(rid, test.remoteaddr.IP, time.Now())
313

314
	// queue a pending findnode request
315
	resultc, errc := make(chan []*node), make(chan error)
316
	go func() {
317
		rid := encodePubkey(&test.remotekey.PublicKey).id()
318 319 320 321 322 323 324
		ns, err := test.udp.findnode(rid, test.remoteaddr, testTarget)
		if err != nil && len(ns) == 0 {
			errc <- err
		} else {
			resultc <- ns
		}
	}()
325

326 327
	// wait for the findnode to be sent.
	// after it is sent, the transport is waiting for a reply
328
	test.waitPacketOut(func(p *v4wire.Findnode, to *net.UDPAddr, hash []byte) {
329 330 331 332
		if p.Target != testTarget {
			t.Errorf("wrong target: got %v, want %v", p.Target, testTarget)
		}
	})
333

334
	// send the reply as two packets.
335
	list := []*node{
336 337 338 339
		wrapNode(enode.MustParse("enode://ba85011c70bcc5c04d8607d3a0ed29aa6179c092cbdda10d5d32684fb33ed01bd94f588ca8f91ac48318087dcb02eaf36773a7a453f0eedd6742af668097b29c@10.0.1.16:30303?discport=30304")),
		wrapNode(enode.MustParse("enode://81fa361d25f157cd421c60dcc28d8dac5ef6a89476633339c5df30287474520caca09627da18543d9079b5b288698b542d56167aa5c09111e55acdbbdf2ef799@10.0.1.16:30303")),
		wrapNode(enode.MustParse("enode://9bffefd833d53fac8e652415f4973bee289e8b1a5c6c4cbe70abf817ce8a64cee11b823b66a987f51aaa9fba0d6a91b3e6bf0d5a5d1042de8e9eeea057b217f8@10.0.1.36:30301?discport=17")),
		wrapNode(enode.MustParse("enode://1b5b4aa662d7cb44a7221bfba67302590b643028197a7d5214790f3bac7aaa4a3241be9e83c09cf1f6c69d007c634faae3dc1b1221793e8446c0b3a09de65960@10.0.1.16:30303")),
340
	}
341
	rpclist := make([]v4wire.Node, len(list))
342 343 344
	for i := range list {
		rpclist[i] = nodeToRPC(list[i])
	}
345 346
	test.packetIn(nil, &v4wire.Neighbors{Expiration: futureExp, Nodes: rpclist[:2]})
	test.packetIn(nil, &v4wire.Neighbors{Expiration: futureExp, Nodes: rpclist[2:]})
347

348 349 350
	// check that the sent neighbors are all returned by findnode
	select {
	case result := <-resultc:
351 352 353
		want := append(list[:2], list[3:]...)
		if !reflect.DeepEqual(result, want) {
			t.Errorf("neighbors mismatch:\n  got:  %v\n  want: %v", result, want)
354 355 356 357 358
		}
	case err := <-errc:
		t.Errorf("findnode error: %v", err)
	case <-time.After(5 * time.Second):
		t.Error("findnode did not return within 5 seconds")
359 360 361
	}
}

362
// This test checks that reply matching of pong verifies the ping hash.
363
func TestUDPv4_pingMatch(t *testing.T) {
364 365 366 367 368 369
	test := newUDPTest(t)
	defer test.close()

	randToken := make([]byte, 32)
	crand.Read(randToken)

370 371 372 373
	test.packetIn(nil, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
	test.waitPacketOut(func(*v4wire.Pong, *net.UDPAddr, []byte) {})
	test.waitPacketOut(func(*v4wire.Ping, *net.UDPAddr, []byte) {})
	test.packetIn(errUnsolicitedReply, &v4wire.Pong{ReplyTok: randToken, To: testLocalAnnounced, Expiration: futureExp})
374 375
}

376
// This test checks that reply matching of pong verifies the sender IP address.
377
func TestUDPv4_pingMatchIP(t *testing.T) {
378 379 380
	test := newUDPTest(t)
	defer test.close()

381 382
	test.packetIn(nil, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
	test.waitPacketOut(func(*v4wire.Pong, *net.UDPAddr, []byte) {})
383

384
	test.waitPacketOut(func(p *v4wire.Ping, to *net.UDPAddr, hash []byte) {
385
		wrongAddr := &net.UDPAddr{IP: net.IP{33, 44, 1, 2}, Port: 30000}
386
		test.packetInFrom(errUnsolicitedReply, test.remotekey, wrongAddr, &v4wire.Pong{
387 388 389 390
			ReplyTok:   hash,
			To:         testLocalAnnounced,
			Expiration: futureExp,
		})
391 392 393
	})
}

394
func TestUDPv4_successfulPing(t *testing.T) {
395
	test := newUDPTest(t)
396 397
	added := make(chan *node, 1)
	test.table.nodeAddedHook = func(n *node) { added <- n }
398
	defer test.close()
399

400
	// The remote side sends a ping packet to initiate the exchange.
401
	go test.packetIn(nil, &v4wire.Ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
402

403
	// The ping is replied to.
404 405
	test.waitPacketOut(func(p *v4wire.Pong, to *net.UDPAddr, hash []byte) {
		pinghash := test.sent[0][:32]
406
		if !bytes.Equal(p.ReplyTok, pinghash) {
407 408
			t.Errorf("got pong.ReplyTok %x, want %x", p.ReplyTok, pinghash)
		}
409
		wantTo := v4wire.Endpoint{
410 411 412 413 414 415 416
			// The mirrored UDP address is the UDP packet sender
			IP: test.remoteaddr.IP, UDP: uint16(test.remoteaddr.Port),
			// The mirrored TCP port is the one from the ping packet
			TCP: testRemote.TCP,
		}
		if !reflect.DeepEqual(p.To, wantTo) {
			t.Errorf("got pong.To %v, want %v", p.To, wantTo)
417
		}
418
	})
419

420
	// Remote is unknown, the table pings back.
421
	test.waitPacketOut(func(p *v4wire.Ping, to *net.UDPAddr, hash []byte) {
422 423
		if !reflect.DeepEqual(p.From, test.udp.ourEndpoint()) {
			t.Errorf("got ping.From %#v, want %#v", p.From, test.udp.ourEndpoint())
424
		}
425
		wantTo := v4wire.Endpoint{
426
			// The mirrored UDP address is the UDP packet sender.
427 428
			IP:  test.remoteaddr.IP,
			UDP: uint16(test.remoteaddr.Port),
429 430 431 432 433
			TCP: 0,
		}
		if !reflect.DeepEqual(p.To, wantTo) {
			t.Errorf("got ping.To %v, want %v", p.To, wantTo)
		}
434
		test.packetIn(nil, &v4wire.Pong{ReplyTok: hash, Expiration: futureExp})
435
	})
436

437
	// The node should be added to the table shortly after getting the
438 439 440
	// pong packet.
	select {
	case n := <-added:
441 442 443
		rid := encodePubkey(&test.remotekey.PublicKey).id()
		if n.ID() != rid {
			t.Errorf("node has wrong ID: got %v, want %v", n.ID(), rid)
444
		}
445 446
		if !n.IP().Equal(test.remoteaddr.IP) {
			t.Errorf("node has wrong IP: got %v, want: %v", n.IP(), test.remoteaddr.IP)
447
		}
448
		if n.UDP() != test.remoteaddr.Port {
449
			t.Errorf("node has wrong UDP port: got %v, want: %v", n.UDP(), test.remoteaddr.Port)
450
		}
451 452
		if n.TCP() != int(testRemote.TCP) {
			t.Errorf("node has wrong TCP port: got %v, want: %v", n.TCP(), testRemote.TCP)
453 454 455
		}
	case <-time.After(2 * time.Second):
		t.Errorf("node was not added within 2 seconds")
456
	}
457
}
458

459 460 461 462 463 464 465 466 467
// This test checks that EIP-868 requests work.
func TestUDPv4_EIP868(t *testing.T) {
	test := newUDPTest(t)
	defer test.close()

	test.udp.localNode.Set(enr.WithEntry("foo", "bar"))
	wantNode := test.udp.localNode.Node()

	// ENR requests aren't allowed before endpoint proof.
468
	test.packetIn(errUnknownNode, &v4wire.ENRRequest{Expiration: futureExp})
469 470

	// Perform endpoint proof and check for sequence number in packet tail.
471 472
	test.packetIn(nil, &v4wire.Ping{Expiration: futureExp})
	test.waitPacketOut(func(p *v4wire.Pong, addr *net.UDPAddr, hash []byte) {
473 474
		if p.ENRSeq != wantNode.Seq() {
			t.Errorf("wrong sequence number in pong: %d, want %d", p.ENRSeq, wantNode.Seq())
475 476
		}
	})
477
	test.waitPacketOut(func(p *v4wire.Ping, addr *net.UDPAddr, hash []byte) {
478 479
		if p.ENRSeq != wantNode.Seq() {
			t.Errorf("wrong sequence number in ping: %d, want %d", p.ENRSeq, wantNode.Seq())
480
		}
481
		test.packetIn(nil, &v4wire.Pong{Expiration: futureExp, ReplyTok: hash})
482 483 484
	})

	// Request should work now.
485 486
	test.packetIn(nil, &v4wire.ENRRequest{Expiration: futureExp})
	test.waitPacketOut(func(p *v4wire.ENRResponse, addr *net.UDPAddr, hash []byte) {
487 488 489 490 491 492 493 494 495 496
		n, err := enode.New(enode.ValidSchemes, &p.Record)
		if err != nil {
			t.Fatalf("invalid record: %v", err)
		}
		if !reflect.DeepEqual(n, wantNode) {
			t.Fatalf("wrong node in enrResponse: %v", n)
		}
	})
}

497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
// This test verifies that a small network of nodes can boot up into a healthy state.
func TestUDPv4_smallNetConvergence(t *testing.T) {
	t.Parallel()

	// Start the network.
	nodes := make([]*UDPv4, 4)
	for i := range nodes {
		var cfg Config
		if i > 0 {
			bn := nodes[0].Self()
			cfg.Bootnodes = []*enode.Node{bn}
		}
		nodes[i] = startLocalhostV4(t, cfg)
		defer nodes[i].Close()
	}

	// Run through the iterator on all nodes until
	// they have all found each other.
	status := make(chan error, len(nodes))
	for i := range nodes {
		node := nodes[i]
		go func() {
			found := make(map[enode.ID]bool, len(nodes))
			it := node.RandomNodes()
			for it.Next() {
				found[it.Node().ID()] = true
				if len(found) == len(nodes) {
					status <- nil
					return
				}
			}
			status <- fmt.Errorf("node %s didn't find all nodes", node.Self().ID().TerminalString())
		}()
	}

	// Wait for all status reports.
	timeout := time.NewTimer(30 * time.Second)
	defer timeout.Stop()
	for received := 0; received < len(nodes); {
		select {
		case <-timeout.C:
			for _, node := range nodes {
				node.Close()
			}
		case err := <-status:
			received++
			if err != nil {
				t.Error("ERROR:", err)
				return
			}
		}
	}
}

func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
	t.Helper()

	cfg.PrivateKey = newkey()
	db, _ := enode.OpenDB("")
	ln := enode.NewLocalNode(db, cfg.PrivateKey)

	// Prefix logs with node ID.
	lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
	lfmt := log.TerminalFormat(false)
	cfg.Log = testlog.Logger(t, log.LvlTrace)
	cfg.Log.SetHandler(log.FuncHandler(func(r *log.Record) error {
		t.Logf("%s %s", lprefix, lfmt.Format(r))
		return nil
	}))

	// Listen.
	socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
	if err != nil {
		t.Fatal(err)
	}
	realaddr := socket.LocalAddr().(*net.UDPAddr)
	ln.SetStaticIP(realaddr.IP)
	ln.SetFallbackUDP(realaddr.Port)
	udp, err := ListenV4(socket, ln, cfg)
	if err != nil {
		t.Fatal(err)
	}
	return udp
}

582 583 584 585 586 587
// dgramPipe is a fake UDP socket. It queues all sent datagrams.
type dgramPipe struct {
	mu      *sync.Mutex
	cond    *sync.Cond
	closing chan struct{}
	closed  bool
588 589 590 591 592 593
	queue   []dgram
}

type dgram struct {
	to   net.UDPAddr
	data []byte
594 595 596 597 598 599 600 601
}

func newpipe() *dgramPipe {
	mu := new(sync.Mutex)
	return &dgramPipe{
		closing: make(chan struct{}),
		cond:    &sync.Cond{L: mu},
		mu:      mu,
602
	}
603 604 605 606 607 608 609 610 611 612 613
}

// WriteToUDP queues a datagram.
func (c *dgramPipe) WriteToUDP(b []byte, to *net.UDPAddr) (n int, err error) {
	msg := make([]byte, len(b))
	copy(msg, b)
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.closed {
		return 0, errors.New("closed")
	}
614
	c.queue = append(c.queue, dgram{*to, b})
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
	c.cond.Signal()
	return len(b), nil
}

// ReadFromUDP just hangs until the pipe is closed.
func (c *dgramPipe) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
	<-c.closing
	return 0, nil, io.EOF
}

func (c *dgramPipe) Close() error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if !c.closed {
		close(c.closing)
		c.closed = true
631
	}
632
	c.cond.Broadcast()
633
	return nil
634
}
635 636

func (c *dgramPipe) LocalAddr() net.Addr {
637
	return &net.UDPAddr{IP: testLocal.IP, Port: int(testLocal.UDP)}
638 639
}

640
func (c *dgramPipe) receive() (dgram, error) {
641 642
	c.mu.Lock()
	defer c.mu.Unlock()
643 644 645 646 647 648 649 650 651 652 653

	var timedOut bool
	timer := time.AfterFunc(3*time.Second, func() {
		c.mu.Lock()
		timedOut = true
		c.mu.Unlock()
		c.cond.Broadcast()
	})
	defer timer.Stop()

	for len(c.queue) == 0 && !c.closed && !timedOut {
654 655
		c.cond.Wait()
	}
656
	if c.closed {
657 658 659 660
		return dgram{}, errClosed
	}
	if timedOut {
		return dgram{}, errTimeout
661
	}
662 663 664
	p := c.queue[0]
	copy(c.queue, c.queue[1:])
	c.queue = c.queue[:len(c.queue)-1]
665
	return p, nil
666
}