Commit 79c7a69a authored by holisticode's avatar holisticode Committed by Viktor Trón

swarm: Better syncing and retrieval option definition (#17986)

* swarm: Better syncing and retrieval option definition

* swarm/network/stream: better comments

* swarm/network/stream: addressed PR comments
parent 53eb4e0b
...@@ -38,8 +38,13 @@ import ( ...@@ -38,8 +38,13 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
) )
//Tests initializing a retrieve request
func TestStreamerRetrieveRequest(t *testing.T) { func TestStreamerRetrieveRequest(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t, nil) regOpts := &RegistryOptions{
Retrieval: RetrievalClientOnly,
Syncing: SyncingDisabled,
}
tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
defer teardown() defer teardown()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -55,10 +60,21 @@ func TestStreamerRetrieveRequest(t *testing.T) { ...@@ -55,10 +60,21 @@ func TestStreamerRetrieveRequest(t *testing.T) {
) )
streamer.delivery.RequestFromPeers(ctx, req) streamer.delivery.RequestFromPeers(ctx, req)
stream := NewStream(swarmChunkServerStreamName, "", true)
err = tester.TestExchanges(p2ptest.Exchange{ err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg", Label: "RetrieveRequestMsg",
Expects: []p2ptest.Expect{ Expects: []p2ptest.Expect{
{ { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
History: nil,
Priority: Top,
},
Peer: node.ID(),
},
{ //expect a retrieve request message for the given hash
Code: 5, Code: 5,
Msg: &RetrieveRequestMsg{ Msg: &RetrieveRequestMsg{
Addr: hash0[:], Addr: hash0[:],
...@@ -74,9 +90,12 @@ func TestStreamerRetrieveRequest(t *testing.T) { ...@@ -74,9 +90,12 @@ func TestStreamerRetrieveRequest(t *testing.T) {
} }
} }
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
DoServeRetrieve: true, Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled, //do no syncing
}) })
defer teardown() defer teardown()
if err != nil { if err != nil {
...@@ -89,16 +108,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { ...@@ -89,16 +108,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
peer := streamer.getPeer(node.ID()) peer := streamer.getPeer(node.ID())
stream := NewStream(swarmChunkServerStreamName, "", true)
//simulate pre-subscription to RETRIEVE_REQUEST stream on peer
peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
Stream: NewStream(swarmChunkServerStreamName, "", true), Stream: stream,
History: nil, History: nil,
Priority: Top, Priority: Top,
}) })
//test the exchange
err = tester.TestExchanges(p2ptest.Exchange{ err = tester.TestExchanges(p2ptest.Exchange{
Expects: []p2ptest.Expect{
{ //first expect a subscription to the RETRIEVE_REQUEST stream
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
History: nil,
Priority: Top,
},
Peer: node.ID(),
},
},
}, p2ptest.Exchange{
Label: "RetrieveRequestMsg", Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{ Triggers: []p2ptest.Trigger{
{ { //then the actual RETRIEVE_REQUEST....
Code: 5, Code: 5,
Msg: &RetrieveRequestMsg{ Msg: &RetrieveRequestMsg{
Addr: chunk.Address()[:], Addr: chunk.Address()[:],
...@@ -107,7 +141,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { ...@@ -107,7 +141,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
}, },
}, },
Expects: []p2ptest.Expect{ Expects: []p2ptest.Expect{
{ { //to which the peer responds with offered hashes
Code: 1, Code: 1,
Msg: &OfferedHashesMsg{ Msg: &OfferedHashesMsg{
HandoverProof: nil, HandoverProof: nil,
...@@ -120,7 +154,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { ...@@ -120,7 +154,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
}, },
}) })
expectedError := `exchange #0 "RetrieveRequestMsg": timed out` //should fail with a timeout as the peer we are requesting
//the chunk from does not have the chunk
expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
if err == nil || err.Error() != expectedError { if err == nil || err.Error() != expectedError {
t.Fatalf("Expected error %v, got %v", expectedError, err) t.Fatalf("Expected error %v, got %v", expectedError, err)
} }
...@@ -130,7 +166,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { ...@@ -130,7 +166,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// offered hashes or delivery if skipHash is set to true // offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
DoServeRetrieve: true, Retrieval: RetrievalEnabled,
Syncing: SyncingDisabled,
}) })
defer teardown() defer teardown()
if err != nil { if err != nil {
...@@ -138,6 +175,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { ...@@ -138,6 +175,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
} }
node := tester.Nodes[0] node := tester.Nodes[0]
peer := streamer.getPeer(node.ID()) peer := streamer.getPeer(node.ID())
stream := NewStream(swarmChunkServerStreamName, "", true) stream := NewStream(swarmChunkServerStreamName, "", true)
...@@ -156,6 +194,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { ...@@ -156,6 +194,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
} }
err = tester.TestExchanges(p2ptest.Exchange{ err = tester.TestExchanges(p2ptest.Exchange{
Expects: []p2ptest.Expect{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
History: nil,
Priority: Top,
},
Peer: node.ID(),
},
},
}, p2ptest.Exchange{
Label: "RetrieveRequestMsg", Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{ Triggers: []p2ptest.Trigger{
{ {
...@@ -226,7 +276,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { ...@@ -226,7 +276,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
DoServeRetrieve: true, Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
}) })
defer teardown() defer teardown()
if err != nil { if err != nil {
...@@ -241,6 +292,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { ...@@ -241,6 +292,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
node := tester.Nodes[0] node := tester.Nodes[0]
//subscribe to custom stream
stream := NewStream("foo", "", true) stream := NewStream("foo", "", true)
err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
if err != nil { if err != nil {
...@@ -253,7 +305,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { ...@@ -253,7 +305,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
err = tester.TestExchanges(p2ptest.Exchange{ err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message", Label: "Subscribe message",
Expects: []p2ptest.Expect{ Expects: []p2ptest.Expect{
{ { //first expect subscription to the custom stream...
Code: 4, Code: 4,
Msg: &SubscribeMsg{ Msg: &SubscribeMsg{
Stream: stream, Stream: stream,
...@@ -267,7 +319,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { ...@@ -267,7 +319,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
p2ptest.Exchange{ p2ptest.Exchange{
Label: "ChunkDelivery message", Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{ Triggers: []p2ptest.Trigger{
{ { //...then trigger a chunk delivery for the given chunk from peer in order for
//local node to get the chunk delivered
Code: 6, Code: 6,
Msg: &ChunkDeliveryMsg{ Msg: &ChunkDeliveryMsg{
Addr: chunkKey, Addr: chunkKey,
...@@ -343,7 +396,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck ...@@ -343,7 +396,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck, SkipCheck: skipCheck,
DoServeRetrieve: true, Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
}) })
bucket.Store(bucketKeyRegistry, r) bucket.Store(bucketKeyRegistry, r)
...@@ -408,20 +462,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck ...@@ -408,20 +462,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return err return err
} }
//each of the nodes (except pivot node) subscribes to the stream of the next node
for j, node := range nodeIDs[0 : nodes-1] {
sid := nodeIDs[j+1]
item, ok := sim.NodeItem(node, bucketKeyRegistry)
if !ok {
return fmt.Errorf("No registry")
}
registry := item.(*Registry)
err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil {
return err
}
}
//get the pivot node's filestore //get the pivot node's filestore
item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore)
if !ok { if !ok {
...@@ -530,7 +570,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip ...@@ -530,7 +570,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck, SkipCheck: skipCheck,
DoSync: true, Syncing: SyncingDisabled,
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0, SyncUpdateDelay: 0,
}) })
......
...@@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { ...@@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
SkipCheck: skipCheck, SkipCheck: skipCheck,
}) })
bucket.Store(bucketKeyRegistry, r) bucket.Store(bucketKeyRegistry, r)
......
...@@ -25,7 +25,8 @@ import ( ...@@ -25,7 +25,8 @@ import (
// when it is serving Retrieve requests. // when it is serving Retrieve requests.
func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
registryOptions := &RegistryOptions{ registryOptions := &RegistryOptions{
DoServeRetrieve: true, Retrieval: RetrievalClientOnly,
Syncing: SyncingDisabled,
} }
tester, _, _, teardown, err := newStreamerTester(t, registryOptions) tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown() defer teardown()
...@@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { ...@@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
// requests are disabled // requests are disabled
func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
registryOptions := &RegistryOptions{ registryOptions := &RegistryOptions{
DoServeRetrieve: false, Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
} }
tester, _, _, teardown, err := newStreamerTester(t, registryOptions) tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown() defer teardown()
...@@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { ...@@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
// when syncing is enabled. // when syncing is enabled.
func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
registryOptions := &RegistryOptions{ registryOptions := &RegistryOptions{
DoSync: true, Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
} }
tester, _, _, teardown, err := newStreamerTester(t, registryOptions) tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown() defer teardown()
...@@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { ...@@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
// when syncing is disabled. // when syncing is disabled.
func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
registryOptions := &RegistryOptions{ registryOptions := &RegistryOptions{
DoSync: false, Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
} }
tester, _, _, teardown, err := newStreamerTester(t, registryOptions) tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown() defer teardown()
......
...@@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no ...@@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true, Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second, SyncUpdateDelay: 3 * time.Second,
DoRetrieve: true,
DoServeRetrieve: true,
}) })
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
......
...@@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic ...@@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true, Retrieval: RetrievalDisabled,
DoServeRetrieve: true, Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second, SyncUpdateDelay: 3 * time.Second,
}) })
...@@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) ...@@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoServeRetrieve: true, Retrieval: RetrievalDisabled,
DoSync: true, Syncing: SyncingRegisterOnly,
}) })
bucket.Store(bucketKeyRegistry, r) bucket.Store(bucketKeyRegistry, r)
......
...@@ -47,6 +47,31 @@ const ( ...@@ -47,6 +47,31 @@ const (
HashSize = 32 HashSize = 32
) )
//Enumerate options for syncing and retrieval
type SyncingOption int
type RetrievalOption int
//Syncing options
const (
//Syncing disabled
SyncingDisabled SyncingOption = iota
//Register the client and the server but not subscribe
SyncingRegisterOnly
//Both client and server funcs are registered, subscribe sent automatically
SyncingAutoSubscribe
)
const (
//Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
RetrievalDisabled RetrievalOption = iota
//Only the client side of the retrieve request is registered.
//(light nodes do not serve retrieve requests)
//once the client is registered, subscription to retrieve request stream is always sent
RetrievalClientOnly
//Both client and server funcs are registered, subscribe sent automatically
RetrievalEnabled
)
// Registry registry for outgoing and incoming streamer constructors // Registry registry for outgoing and incoming streamer constructors
type Registry struct { type Registry struct {
addr enode.ID addr enode.ID
...@@ -60,16 +85,15 @@ type Registry struct { ...@@ -60,16 +85,15 @@ type Registry struct {
peers map[enode.ID]*Peer peers map[enode.ID]*Peer
delivery *Delivery delivery *Delivery
intervalsStore state.Store intervalsStore state.Store
doRetrieve bool autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int maxPeerServers int
} }
// RegistryOptions holds optional values for NewRegistry constructor. // RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct { type RegistryOptions struct {
SkipCheck bool SkipCheck bool
DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags. Syncing SyncingOption //Defines syncing behavior
DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true. Retrieval RetrievalOption //Defines retrieval behavior
DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag.
SyncUpdateDelay time.Duration SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry MaxPeerServers int // The limit of servers for each peer in registry
} }
...@@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy ...@@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 { if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second options.SyncUpdateDelay = 15 * time.Second
} }
//check if retriaval has been disabled
retrieval := options.Retrieval != RetrievalDisabled
streamer := &Registry{ streamer := &Registry{
addr: localID, addr: localID,
skipCheck: options.SkipCheck, skipCheck: options.SkipCheck,
...@@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy ...@@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
peers: make(map[enode.ID]*Peer), peers: make(map[enode.ID]*Peer),
delivery: delivery, delivery: delivery,
intervalsStore: intervalsStore, intervalsStore: intervalsStore,
doRetrieve: options.DoRetrieve, autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers, maxPeerServers: options.MaxPeerServers,
} }
streamer.api = NewAPI(streamer) streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer delivery.getPeer = streamer.getPeer
if options.DoServeRetrieve { //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live { if !live {
return nil, errors.New("only live retrieval requests supported") return nil, errors.New("only live retrieval requests supported")
...@@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy ...@@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
}) })
} }
//if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
if options.Retrieval != RetrievalDisabled {
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
}) })
}
if options.DoSync { //If syncing is not disabled, the syncing functions are registered (both client and server)
if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore)
} }
if options.DoSync { //if syncing is set to automatically subscribe to the syncing stream, start the subscription process
if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that // latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop // - receiving from the in chan is not blocked by processing inside the for loop
// - the latest int value is delivered to the loop after the processing is done // - the latest int value is delivered to the loop after the processing is done
...@@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { ...@@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer close(sp.quit) defer close(sp.quit)
defer sp.close() defer sp.close()
if r.doRetrieve { if r.autoRetrieval && !p.LightNode {
err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil { if err != nil {
return err return err
......
...@@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { ...@@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
func TestMaxPeerServersWithUnsubscribe(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6 var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers, MaxPeerServers: maxPeerServers,
}) })
defer teardown() defer teardown()
......
...@@ -114,6 +114,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck ...@@ -114,6 +114,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
bucket.Store(bucketKeyDelivery, delivery) bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck, SkipCheck: skipCheck,
}) })
......
...@@ -175,18 +175,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e ...@@ -175,18 +175,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
if err := nodeID.UnmarshalText([]byte(config.NodeID)); err != nil { if err := nodeID.UnmarshalText([]byte(config.NodeID)); err != nil {
return nil, err return nil, err
} }
syncing := stream.SyncingAutoSubscribe
if !config.SyncEnabled || config.LightNodeEnabled {
syncing = stream.SyncingDisabled
}
retrieval := stream.RetrievalEnabled
if config.LightNodeEnabled {
retrieval = stream.RetrievalClientOnly
}
registryOptions := &stream.RegistryOptions{ registryOptions := &stream.RegistryOptions{
SkipCheck: config.DeliverySkipCheck, SkipCheck: config.DeliverySkipCheck,
DoSync: config.SyncEnabled, Syncing: syncing,
DoRetrieve: true, Retrieval: retrieval,
DoServeRetrieve: true,
SyncUpdateDelay: config.SyncUpdateDelay, SyncUpdateDelay: config.SyncUpdateDelay,
MaxPeerServers: config.MaxStreamPeerServers, MaxPeerServers: config.MaxStreamPeerServers,
} }
if config.LightNodeEnabled {
registryOptions.DoSync = false
registryOptions.DoRetrieve = false
}
self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions) self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions)
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment