Commit 8d041546 authored by gluk256's avatar gluk256 Committed by Viktor Trón

p2p/simulations: wait until all connections are recreated when uploading snapshot (#19312)

* swarm/network/simulation: test cases refactored

* swarm/pss: minor refactoring

* swarm/simulation: UploadSnapshot updated

* swarm/network: style fix

* swarm/pss: bugfix
parent 09924cbc
...@@ -103,7 +103,7 @@ func (s *Simulation) kademlias() (ks map[enode.ID]*network.Kademlia) { ...@@ -103,7 +103,7 @@ func (s *Simulation) kademlias() (ks map[enode.ID]*network.Kademlia) {
// in the snapshot are registered in the kademlia. // in the snapshot are registered in the kademlia.
// It differs from WaitTillHealthy, which waits only until all the kademlias are // It differs from WaitTillHealthy, which waits only until all the kademlias are
// healthy (it might happen even before all the connections are established). // healthy (it might happen even before all the connections are established).
func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap simulations.Snapshot) error { func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap *simulations.Snapshot) error {
expected := getSnapshotConnections(snap.Conns) expected := getSnapshotConnections(snap.Conns)
ticker := time.NewTicker(150 * time.Millisecond) ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
......
...@@ -182,7 +182,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) { ...@@ -182,7 +182,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = controlSim.WaitTillSnapshotRecreated(ctx, *snap) err = controlSim.WaitTillSnapshotRecreated(ctx, snap)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package simulation package simulation
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
...@@ -24,7 +25,6 @@ import ( ...@@ -24,7 +25,6 @@ import (
"os" "os"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
...@@ -217,30 +217,24 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i ...@@ -217,30 +217,24 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i
// UploadSnapshot uploads a snapshot to the simulation // UploadSnapshot uploads a snapshot to the simulation
// This method tries to open the json file provided, applies the config to all nodes // This method tries to open the json file provided, applies the config to all nodes
// and then loads the snapshot into the Simulation network // and then loads the snapshot into the Simulation network
func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error { func (s *Simulation) UploadSnapshot(ctx context.Context, snapshotFile string, opts ...AddNodeOption) error {
f, err := os.Open(snapshotFile) f, err := os.Open(snapshotFile)
if err != nil { if err != nil {
return err return err
} }
defer func() { defer f.Close()
err := f.Close()
if err != nil {
log.Error("Error closing snapshot file", "err", err)
}
}()
jsonbyte, err := ioutil.ReadAll(f) jsonbyte, err := ioutil.ReadAll(f)
if err != nil { if err != nil {
return err return err
} }
var snap simulations.Snapshot var snap simulations.Snapshot
err = json.Unmarshal(jsonbyte, &snap) if err := json.Unmarshal(jsonbyte, &snap); err != nil {
if err != nil {
return err return err
} }
//the snapshot probably has the property EnableMsgEvents not set //the snapshot probably has the property EnableMsgEvents not set
//just in case, set it to true! //set it to true (we need this to wait for messages before uploading)
//(we need this to wait for messages before uploading)
for i := range snap.Nodes { for i := range snap.Nodes {
snap.Nodes[i].Node.Config.EnableMsgEvents = true snap.Nodes[i].Node.Config.EnableMsgEvents = true
snap.Nodes[i].Node.Config.Services = s.serviceNames snap.Nodes[i].Node.Config.Services = s.serviceNames
...@@ -249,15 +243,10 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) ...@@ -249,15 +243,10 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption)
} }
} }
log.Info("Waiting for p2p connections to be established...") if err := s.Net.Load(&snap); err != nil {
//now we can load the snapshot
err = s.Net.Load(&snap)
if err != nil {
return err return err
} }
log.Info("Snapshot loaded") return s.WaitTillSnapshotRecreated(ctx, &snap)
return nil
} }
// StartNode starts a node by NodeID. // StartNode starts a node by NodeID.
......
...@@ -289,6 +289,7 @@ func TestUploadSnapshot(t *testing.T) { ...@@ -289,6 +289,7 @@ func TestUploadSnapshot(t *testing.T) {
HiveParams: hp, HiveParams: hp,
} }
kad := network.NewKademlia(addr.Over(), network.NewKadParams()) kad := network.NewKademlia(addr.Over(), network.NewKadParams())
b.Store(BucketKeyKademlia, kad)
return network.NewBzz(config, kad, nil, nil, nil), nil, nil return network.NewBzz(config, kad, nil, nil, nil), nil, nil
}, },
}) })
...@@ -296,12 +297,13 @@ func TestUploadSnapshot(t *testing.T) { ...@@ -296,12 +297,13 @@ func TestUploadSnapshot(t *testing.T) {
nodeCount := 16 nodeCount := 16
log.Debug("Uploading snapshot") log.Debug("Uploading snapshot")
err := s.UploadSnapshot(fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount)) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := s.UploadSnapshot(ctx, fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount))
if err != nil { if err != nil {
t.Fatalf("Error uploading snapshot to simulation network: %v", err) t.Fatalf("Error uploading snapshot to simulation network: %v", err)
} }
ctx := context.Background()
log.Debug("Starting simulation...") log.Debug("Starting simulation...")
s.Run(ctx, func(ctx context.Context, sim *Simulation) error { s.Run(ctx, func(ctx context.Context, sim *Simulation) error {
log.Debug("Checking") log.Debug("Checking")
......
...@@ -22,8 +22,6 @@ import ( ...@@ -22,8 +22,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
...@@ -31,6 +29,7 @@ import ( ...@@ -31,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/testutil"
) )
//constants for random file generation //constants for random file generation
...@@ -155,14 +154,15 @@ func runFileRetrievalTest(nodeCount int) error { ...@@ -155,14 +154,15 @@ func runFileRetrievalTest(nodeCount int) error {
//array where the generated chunk hashes will be stored //array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0) conf.hashes = make([]storage.Address, 0)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelSimRun()
filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
err := sim.UploadSnapshot(ctx, filename)
if err != nil { if err != nil {
return err return err
} }
ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancelSimRun()
log.Info("Starting simulation") log.Info("Starting simulation")
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
...@@ -188,9 +188,6 @@ func runFileRetrievalTest(nodeCount int) error { ...@@ -188,9 +188,6 @@ func runFileRetrievalTest(nodeCount int) error {
if err != nil { if err != nil {
return err return err
} }
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}
log.Info("network healthy, start file checks") log.Info("network healthy, start file checks")
...@@ -253,12 +250,15 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { ...@@ -253,12 +250,15 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
//array where the generated chunk hashes will be stored //array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0) conf.hashes = make([]storage.Address, 0)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
err := sim.UploadSnapshot(ctx, filename)
if err != nil { if err != nil {
return err return err
} }
ctx := context.Background()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs() nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs { for _, n := range nodeIDs {
...@@ -283,9 +283,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { ...@@ -283,9 +283,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error {
if err != nil { if err != nil {
return err return err
} }
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}
// File retrieval check is repeated until all uploaded files are retrieved from all nodes // File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached. // or until the timeout is reached.
......
...@@ -147,20 +147,16 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { ...@@ -147,20 +147,16 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
//array where the generated chunk hashes will be stored //array where the generated chunk hashes will be stored
conf.hashes = make([]storage.Address, 0) conf.hashes = make([]storage.Address, 0)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute)
if err != nil {
t.Fatal(err)
}
ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun() defer cancelSimRun()
if _, err := sim.WaitTillHealthy(ctx); err != nil { filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
err := sim.UploadSnapshot(ctx, filename)
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
result := runSim(conf, ctx, sim, chunkCount) result := runSim(conf, ctx, sim, chunkCount)
if result.Error != nil { if result.Error != nil {
t.Fatal(result.Error) t.Fatal(result.Error)
} }
......
...@@ -1257,9 +1257,10 @@ func TestGetSubscriptionsRPC(t *testing.T) { ...@@ -1257,9 +1257,10 @@ func TestGetSubscriptionsRPC(t *testing.T) {
simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode), simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
) )
// upload a snapshot ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) defer cancel()
if err != nil { filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount)
if err := sim.UploadSnapshot(ctx, filename); err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -3,11 +3,8 @@ package pss ...@@ -3,11 +3,8 @@ package pss
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
...@@ -20,7 +17,6 @@ import ( ...@@ -20,7 +17,6 @@ import (
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network"
...@@ -105,24 +101,6 @@ func getCmdParams(t *testing.T) (int, int) { ...@@ -105,24 +101,6 @@ func getCmdParams(t *testing.T) (int, int) {
return int(msgCount), int(nodeCount) return int(msgCount), int(nodeCount)
} }
func readSnapshot(t *testing.T, nodeCount int) simulations.Snapshot {
f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount))
if err != nil {
t.Fatal(err)
}
defer f.Close()
jsonbyte, err := ioutil.ReadAll(f)
if err != nil {
t.Fatal(err)
}
var snap simulations.Snapshot
err = json.Unmarshal(jsonbyte, &snap)
if err != nil {
t.Fatal(err)
}
return snap
}
func newTestData() *testData { func newTestData() *testData {
return &testData{ return &testData{
kademlias: make(map[enode.ID]*network.Kademlia), kademlias: make(map[enode.ID]*network.Kademlia),
...@@ -235,16 +213,12 @@ func testProxNetwork(t *testing.T) { ...@@ -235,16 +213,12 @@ func testProxNetwork(t *testing.T) {
services := newProxServices(tstdata, true, handlerContextFuncs, tstdata.kademlias) services := newProxServices(tstdata, true, handlerContextFuncs, tstdata.kademlias)
tstdata.sim = simulation.New(services) tstdata.sim = simulation.New(services)
defer tstdata.sim.Close() defer tstdata.sim.Close()
err := tstdata.sim.UploadSnapshot(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)) ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
defer cancel() defer cancel()
snap := readSnapshot(t, nodeCount) filename := fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)
err = tstdata.sim.WaitTillSnapshotRecreated(ctx, snap) err := tstdata.sim.UploadSnapshot(ctx, filename)
if err != nil { if err != nil {
t.Fatalf("failed to recreate snapshot: %s", err) t.Fatal(err)
} }
tstdata.init(msgCount) // initialize the test data tstdata.init(msgCount) // initialize the test data
wrapper := func(c context.Context, _ *simulation.Simulation) error { wrapper := func(c context.Context, _ *simulation.Simulation) error {
...@@ -426,7 +400,6 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T ...@@ -426,7 +400,6 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
b.Store(simulation.BucketKeyKademlia, pskad)
// register the handlers we've been passed // register the handlers we've been passed
var deregisters []func() var deregisters []func()
...@@ -448,6 +421,8 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T ...@@ -448,6 +421,8 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T
Public: false, Public: false,
}) })
b.Store(simulation.BucketKeyKademlia, pskad)
// return Pss and cleanups // return Pss and cleanups
return ps, func() { return ps, func() {
// run the handler deregister functions in reverse order // run the handler deregister functions in reverse order
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment