sync_test.go 15.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package trie

import (
	"bytes"
	"testing"

	"github.com/ethereum/go-ethereum/common"
24
	"github.com/ethereum/go-ethereum/crypto"
25
	"github.com/ethereum/go-ethereum/ethdb/memorydb"
26 27 28
)

// makeTestTrie create a sample test trie to test node-wise reconstruction.
29
func makeTestTrie() (*Database, *SecureTrie, map[string][]byte) {
30
	// Create an empty trie
31
	triedb := NewDatabase(memorydb.New())
32
	trie, _ := NewSecure(common.Hash{}, triedb)
33 34 35 36

	// Fill it with some arbitrary data
	content := make(map[string][]byte)
	for i := byte(0); i < 255; i++ {
37
		// Map the same data under multiple keys
38 39 40 41 42 43 44
		key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i}
		content[string(key)] = val
		trie.Update(key, val)

		key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i}
		content[string(key)] = val
		trie.Update(key, val)
45

46
		// Add some other data to inflate the trie
47 48 49 50 51
		for j := byte(3); j < 13; j++ {
			key, val = common.LeftPadBytes([]byte{j, i}, 32), []byte{j, i}
			content[string(key)] = val
			trie.Update(key, val)
		}
52
	}
53
	trie.Commit(nil)
54 55

	// Return the generated trie
56
	return triedb, trie, content
57 58 59 60
}

// checkTrieContents cross references a reconstructed trie with an expected data
// content map.
61
func checkTrieContents(t *testing.T, db *Database, root []byte, content map[string][]byte) {
62
	// Check root availability and trie contents
63
	trie, err := NewSecure(common.BytesToHash(root), db)
64 65 66
	if err != nil {
		t.Fatalf("failed to create trie at %x: %v", root, err)
	}
67 68 69
	if err := checkTrieConsistency(db, common.BytesToHash(root)); err != nil {
		t.Fatalf("inconsistent trie at %x: %v", root, err)
	}
70
	for key, val := range content {
71
		if have := trie.Get([]byte(key)); !bytes.Equal(have, val) {
72 73 74 75 76
			t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val)
		}
	}
}

77
// checkTrieConsistency checks that all nodes in a trie are indeed present.
78
func checkTrieConsistency(db *Database, root common.Hash) error {
79
	// Create and iterate a trie rooted in a subnode
80
	trie, err := NewSecure(root, db)
81
	if err != nil {
82
		return nil // Consider a non existent state consistent
83
	}
84
	it := trie.NodeIterator(nil)
85
	for it.Next(true) {
86
	}
87
	return it.Error()
88 89
}

90
// Tests that an empty trie is not scheduled for syncing.
91
func TestEmptySync(t *testing.T) {
92 93
	dbA := NewDatabase(memorydb.New())
	dbB := NewDatabase(memorydb.New())
94 95
	emptyA, _ := New(common.Hash{}, dbA)
	emptyB, _ := New(emptyRoot, dbB)
96 97

	for i, trie := range []*Trie{emptyA, emptyB} {
98 99 100
		sync := NewSync(trie.Hash(), memorydb.New(), nil, NewSyncBloom(1, memorydb.New()))
		if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 {
			t.Errorf("test %d: content requested for empty trie: %v, %v, %v", i, nodes, paths, codes)
101 102 103 104 105 106
		}
	}
}

// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go.
107 108 109 110
func TestIterativeSyncIndividual(t *testing.T)       { testIterativeSync(t, 1, false) }
func TestIterativeSyncBatched(t *testing.T)          { testIterativeSync(t, 100, false) }
func TestIterativeSyncIndividualByPath(t *testing.T) { testIterativeSync(t, 1, true) }
func TestIterativeSyncBatchedByPath(t *testing.T)    { testIterativeSync(t, 100, true) }
111

112
func testIterativeSync(t *testing.T, count int, bypath bool) {
113 114 115 116
	// Create a random trie to copy
	srcDb, srcTrie, srcData := makeTestTrie()

	// Create a destination trie and sync with the scheduler
117
	diskdb := memorydb.New()
118
	triedb := NewDatabase(diskdb)
119
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
120

121 122 123 124 125 126 127 128 129 130 131 132 133 134
	nodes, paths, codes := sched.Missing(count)
	var (
		hashQueue []common.Hash
		pathQueue []SyncPath
	)
	if !bypath {
		hashQueue = append(append(hashQueue[:0], nodes...), codes...)
	} else {
		hashQueue = append(hashQueue[:0], codes...)
		pathQueue = append(pathQueue[:0], paths...)
	}
	for len(hashQueue)+len(pathQueue) > 0 {
		results := make([]SyncResult, len(hashQueue)+len(pathQueue))
		for i, hash := range hashQueue {
135
			data, err := srcDb.Node(hash)
136
			if err != nil {
137
				t.Fatalf("failed to retrieve node data for hash %x: %v", hash, err)
138 139 140
			}
			results[i] = SyncResult{hash, data}
		}
141 142 143 144 145 146 147
		for i, path := range pathQueue {
			data, _, err := srcTrie.TryGetNode(path[0])
			if err != nil {
				t.Fatalf("failed to retrieve node data for path %x: %v", path, err)
			}
			results[len(hashQueue)+i] = SyncResult{crypto.Keccak256Hash(data), data}
		}
148 149 150 151
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
152
		}
153 154 155
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
156
		}
157
		batch.Write()
158 159 160 161 162 163 164 165

		nodes, paths, codes = sched.Missing(count)
		if !bypath {
			hashQueue = append(append(hashQueue[:0], nodes...), codes...)
		} else {
			hashQueue = append(hashQueue[:0], codes...)
			pathQueue = append(pathQueue[:0], paths...)
		}
166
	}
167
	// Cross check that the two tries are in sync
168
	checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
169 170 171 172
}

// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned, and the others sent only later.
173
func TestIterativeDelayedSync(t *testing.T) {
174 175 176 177
	// Create a random trie to copy
	srcDb, srcTrie, srcData := makeTestTrie()

	// Create a destination trie and sync with the scheduler
178
	diskdb := memorydb.New()
179
	triedb := NewDatabase(diskdb)
180
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
181

182 183 184
	nodes, _, codes := sched.Missing(10000)
	queue := append(append([]common.Hash{}, nodes...), codes...)

185 186 187 188
	for len(queue) > 0 {
		// Sync only half of the scheduled nodes
		results := make([]SyncResult, len(queue)/2+1)
		for i, hash := range queue[:len(results)] {
189
			data, err := srcDb.Node(hash)
190 191 192 193 194
			if err != nil {
				t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
			}
			results[i] = SyncResult{hash, data}
		}
195 196 197 198
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
199
		}
200 201 202
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
203
		}
204
		batch.Write()
205 206 207

		nodes, _, codes = sched.Missing(10000)
		queue = append(append(queue[len(results):], nodes...), codes...)
208
	}
209
	// Cross check that the two tries are in sync
210
	checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
211 212 213 214 215
}

// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go, however in a
// random order.
216 217
func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) }
func TestIterativeRandomSyncBatched(t *testing.T)    { testIterativeRandomSync(t, 100) }
218

219
func testIterativeRandomSync(t *testing.T, count int) {
220 221 222 223
	// Create a random trie to copy
	srcDb, srcTrie, srcData := makeTestTrie()

	// Create a destination trie and sync with the scheduler
224
	diskdb := memorydb.New()
225
	triedb := NewDatabase(diskdb)
226
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
227 228

	queue := make(map[common.Hash]struct{})
229 230
	nodes, _, codes := sched.Missing(count)
	for _, hash := range append(nodes, codes...) {
231 232 233 234 235
		queue[hash] = struct{}{}
	}
	for len(queue) > 0 {
		// Fetch all the queued nodes in a random order
		results := make([]SyncResult, 0, len(queue))
Felix Lange's avatar
Felix Lange committed
236
		for hash := range queue {
237
			data, err := srcDb.Node(hash)
238 239 240 241 242 243
			if err != nil {
				t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
			}
			results = append(results, SyncResult{hash, data})
		}
		// Feed the retrieved results back and queue new tasks
244 245 246 247
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
248
		}
249 250 251
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
252
		}
253
		batch.Write()
254

255
		queue = make(map[common.Hash]struct{})
256 257
		nodes, _, codes = sched.Missing(count)
		for _, hash := range append(nodes, codes...) {
258 259 260
			queue[hash] = struct{}{}
		}
	}
261
	// Cross check that the two tries are in sync
262
	checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
263 264 265 266
}

// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned (Even those randomly), others sent only later.
267
func TestIterativeRandomDelayedSync(t *testing.T) {
268 269 270 271
	// Create a random trie to copy
	srcDb, srcTrie, srcData := makeTestTrie()

	// Create a destination trie and sync with the scheduler
272
	diskdb := memorydb.New()
273
	triedb := NewDatabase(diskdb)
274
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
275 276

	queue := make(map[common.Hash]struct{})
277 278
	nodes, _, codes := sched.Missing(10000)
	for _, hash := range append(nodes, codes...) {
279 280 281 282 283
		queue[hash] = struct{}{}
	}
	for len(queue) > 0 {
		// Sync only half of the scheduled nodes, even those in random order
		results := make([]SyncResult, 0, len(queue)/2+1)
Felix Lange's avatar
Felix Lange committed
284
		for hash := range queue {
285
			data, err := srcDb.Node(hash)
286 287 288 289 290 291 292 293 294 295
			if err != nil {
				t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
			}
			results = append(results, SyncResult{hash, data})

			if len(results) >= cap(results) {
				break
			}
		}
		// Feed the retrieved results back and queue new tasks
296 297 298 299
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
300
		}
301 302 303
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
304
		}
305
		batch.Write()
306 307 308
		for _, result := range results {
			delete(queue, result.Hash)
		}
309 310
		nodes, _, codes = sched.Missing(10000)
		for _, hash := range append(nodes, codes...) {
311 312 313
			queue[hash] = struct{}{}
		}
	}
314
	// Cross check that the two tries are in sync
315
	checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
316 317 318 319
}

// Tests that a trie sync will not request nodes multiple times, even if they
// have such references.
320
func TestDuplicateAvoidanceSync(t *testing.T) {
321 322 323 324
	// Create a random trie to copy
	srcDb, srcTrie, srcData := makeTestTrie()

	// Create a destination trie and sync with the scheduler
325
	diskdb := memorydb.New()
326
	triedb := NewDatabase(diskdb)
327
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
328

329 330
	nodes, _, codes := sched.Missing(0)
	queue := append(append([]common.Hash{}, nodes...), codes...)
331 332 333 334 335
	requested := make(map[common.Hash]struct{})

	for len(queue) > 0 {
		results := make([]SyncResult, len(queue))
		for i, hash := range queue {
336
			data, err := srcDb.Node(hash)
337 338 339 340 341 342 343 344 345 346
			if err != nil {
				t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
			}
			if _, ok := requested[hash]; ok {
				t.Errorf("hash %x already requested once", hash)
			}
			requested[hash] = struct{}{}

			results[i] = SyncResult{hash, data}
		}
347 348 349 350
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
351
		}
352 353 354
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
355
		}
356
		batch.Write()
357 358 359

		nodes, _, codes = sched.Missing(0)
		queue = append(append(queue[:0], nodes...), codes...)
360
	}
361
	// Cross check that the two tries are in sync
362
	checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)
363
}
364 365 366

// Tests that at any point in time during a sync, only complete sub-tries are in
// the database.
367
func TestIncompleteSync(t *testing.T) {
368 369 370 371
	// Create a random trie to copy
	srcDb, srcTrie, _ := makeTestTrie()

	// Create a destination trie and sync with the scheduler
372
	diskdb := memorydb.New()
373
	triedb := NewDatabase(diskdb)
374
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))
375

376
	var added []common.Hash
377 378 379

	nodes, _, codes := sched.Missing(1)
	queue := append(append([]common.Hash{}, nodes...), codes...)
380 381 382 383
	for len(queue) > 0 {
		// Fetch a batch of trie nodes
		results := make([]SyncResult, len(queue))
		for i, hash := range queue {
384
			data, err := srcDb.Node(hash)
385 386 387 388 389 390
			if err != nil {
				t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
			}
			results[i] = SyncResult{hash, data}
		}
		// Process each of the trie nodes
391 392 393 394
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
395
		}
396 397 398
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
399
		}
400
		batch.Write()
401 402
		for _, result := range results {
			added = append(added, result.Hash)
403 404
			// Check that all known sub-tries in the synced trie are complete
			if err := checkTrieConsistency(triedb, result.Hash); err != nil {
405 406 407 408
				t.Fatalf("trie inconsistent: %v", err)
			}
		}
		// Fetch the next batch to retrieve
409 410
		nodes, _, codes = sched.Missing(1)
		queue = append(append(queue[:0], nodes...), codes...)
411 412 413 414
	}
	// Sanity check that removing any node from the database is detected
	for _, node := range added[1:] {
		key := node.Bytes()
415
		value, _ := diskdb.Get(key)
416

417 418
		diskdb.Delete(key)
		if err := checkTrieConsistency(triedb, added[0]); err == nil {
419 420
			t.Fatalf("trie inconsistency not caught, missing: %x", key)
		}
421
		diskdb.Put(key, value)
422 423
	}
}
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478

// Tests that trie nodes get scheduled lexicographically when having the same
// depth.
func TestSyncOrdering(t *testing.T) {
	// Create a random trie to copy
	srcDb, srcTrie, srcData := makeTestTrie()

	// Create a destination trie and sync with the scheduler, tracking the requests
	diskdb := memorydb.New()
	triedb := NewDatabase(diskdb)
	sched := NewSync(srcTrie.Hash(), diskdb, nil, NewSyncBloom(1, diskdb))

	nodes, paths, _ := sched.Missing(1)
	queue := append([]common.Hash{}, nodes...)
	reqs := append([]SyncPath{}, paths...)

	for len(queue) > 0 {
		results := make([]SyncResult, len(queue))
		for i, hash := range queue {
			data, err := srcDb.Node(hash)
			if err != nil {
				t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
			}
			results[i] = SyncResult{hash, data}
		}
		for _, result := range results {
			if err := sched.Process(result); err != nil {
				t.Fatalf("failed to process result %v", err)
			}
		}
		batch := diskdb.NewBatch()
		if err := sched.Commit(batch); err != nil {
			t.Fatalf("failed to commit data: %v", err)
		}
		batch.Write()

		nodes, paths, _ = sched.Missing(1)
		queue = append(queue[:0], nodes...)
		reqs = append(reqs, paths...)
	}
	// Cross check that the two tries are in sync
	checkTrieContents(t, triedb, srcTrie.Hash().Bytes(), srcData)

	// Check that the trie nodes have been requested path-ordered
	for i := 0; i < len(reqs)-1; i++ {
		if len(reqs[i]) > 1 || len(reqs[i+1]) > 1 {
			// In the case of the trie tests, there's no storage so the tuples
			// must always be single items. 2-tuples should be tested in state.
			t.Errorf("Invalid request tuples: len(%v) or len(%v) > 1", reqs[i], reqs[i+1])
		}
		if bytes.Compare(compactToHex(reqs[i][0]), compactToHex(reqs[i+1][0])) > 0 {
			t.Errorf("Invalid request order: %v before %v", compactToHex(reqs[i][0]), compactToHex(reqs[i+1][0]))
		}
	}
}