Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
c3cfdfac
Unverified
Commit
c3cfdfac
authored
Sep 27, 2018
by
Viktor Trón
Committed by
GitHub
Sep 27, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #17757 from ethersphere/retrieve-request-ttl-pr
swarm: prevent forever running retrieve request loops
parents
4b6824e0
3f7acbbe
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
79 additions
and
27 deletions
+79
-27
fetcher.go
swarm/network/fetcher.go
+17
-7
fetcher_test.go
swarm/network/fetcher_test.go
+41
-13
delivery.go
swarm/network/stream/delivery.go
+5
-1
stream.go
swarm/network/stream/stream.go
+1
-1
testutil.go
swarm/storage/mru/testutil.go
+1
-1
netstore.go
swarm/storage/netstore.go
+5
-2
netstore_test.go
swarm/storage/netstore_test.go
+9
-2
No files found.
swarm/network/fetcher.go
View file @
c3cfdfac
...
...
@@ -32,6 +32,8 @@ var searchTimeout = 1 * time.Second
// Also used in stream delivery.
var
RequestTimeout
=
10
*
time
.
Second
var
maxHopCount
uint8
=
20
// maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops
type
RequestFunc
func
(
context
.
Context
,
*
Request
)
(
*
enode
.
ID
,
chan
struct
{},
error
)
// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
...
...
@@ -44,7 +46,7 @@ type Fetcher struct {
protoRequestFunc
RequestFunc
// request function fetcher calls to issue retrieve request for a chunk
addr
storage
.
Address
// the address of the chunk to be fetched
offerC
chan
*
enode
.
ID
// channel of sources (peer node id strings)
requestC
chan
struct
{}
requestC
chan
uint8
// channel for incoming requests (with the hopCount value in it)
skipCheck
bool
}
...
...
@@ -53,6 +55,7 @@ type Request struct {
Source
*
enode
.
ID
// nodeID of peer to request from (can be nil)
SkipCheck
bool
// whether to offer the chunk first or deliver directly
peersToSkip
*
sync
.
Map
// peers not to request chunk from (only makes sense if source is nil)
HopCount
uint8
// number of forwarded requests (hops)
}
// NewRequest returns a new instance of Request based on chunk address skip check and
...
...
@@ -113,7 +116,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
addr
:
addr
,
protoRequestFunc
:
rf
,
offerC
:
make
(
chan
*
enode
.
ID
),
requestC
:
make
(
chan
struct
{}
),
requestC
:
make
(
chan
uint8
),
skipCheck
:
skipCheck
,
}
}
...
...
@@ -136,7 +139,7 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
}
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
func
(
f
*
Fetcher
)
Request
(
ctx
context
.
Context
)
{
func
(
f
*
Fetcher
)
Request
(
ctx
context
.
Context
,
hopCount
uint8
)
{
// First we need to have this select to make sure that we return if context is done
select
{
case
<-
ctx
.
Done
()
:
...
...
@@ -144,10 +147,15 @@ func (f *Fetcher) Request(ctx context.Context) {
default
:
}
if
hopCount
>=
maxHopCount
{
log
.
Debug
(
"fetcher request hop count limit reached"
,
"hops"
,
hopCount
)
return
}
// This select alone would not guarantee that we return of context is done, it could potentially
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select
{
case
f
.
requestC
<-
struct
{}{}
:
case
f
.
requestC
<-
hopCount
+
1
:
case
<-
ctx
.
Done
()
:
}
}
...
...
@@ -161,6 +169,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
waitC
<-
chan
time
.
Time
// timer channel
sources
[]
*
enode
.
ID
// known sources, ie. peers that offered the chunk
requested
bool
// true if the chunk was actually requested
hopCount
uint8
)
gone
:=
make
(
chan
*
enode
.
ID
)
// channel to signal that a peer we requested from disconnected
...
...
@@ -183,7 +192,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest
=
requested
// incoming request
case
<-
f
.
requestC
:
case
hopCount
=
<-
f
.
requestC
:
log
.
Trace
(
"new request"
,
"request addr"
,
f
.
addr
)
// 2) chunk is requested, set requested flag
// launch a request iff none been launched yet
...
...
@@ -213,7 +222,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request
if
doRequest
{
var
err
error
sources
,
err
=
f
.
doRequest
(
ctx
,
gone
,
peers
,
sources
)
sources
,
err
=
f
.
doRequest
(
ctx
,
gone
,
peers
,
sources
,
hopCount
)
if
err
!=
nil
{
log
.
Info
(
"unable to request"
,
"request addr"
,
f
.
addr
,
"err"
,
err
)
}
...
...
@@ -251,7 +260,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
func
(
f
*
Fetcher
)
doRequest
(
ctx
context
.
Context
,
gone
chan
*
enode
.
ID
,
peersToSkip
*
sync
.
Map
,
sources
[]
*
enode
.
ID
)
([]
*
enode
.
ID
,
error
)
{
func
(
f
*
Fetcher
)
doRequest
(
ctx
context
.
Context
,
gone
chan
*
enode
.
ID
,
peersToSkip
*
sync
.
Map
,
sources
[]
*
enode
.
ID
,
hopCount
uint8
)
([]
*
enode
.
ID
,
error
)
{
var
i
int
var
sourceID
*
enode
.
ID
var
quit
chan
struct
{}
...
...
@@ -260,6 +269,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
Addr
:
f
.
addr
,
SkipCheck
:
f
.
skipCheck
,
peersToSkip
:
peersToSkip
,
HopCount
:
hopCount
,
}
foundSource
:=
false
...
...
swarm/network/fetcher_test.go
View file @
c3cfdfac
...
...
@@ -33,7 +33,7 @@ type mockRequester struct {
// requests []Request
requestC
chan
*
Request
// when a request is coming it is pushed to requestC
waitTimes
[]
time
.
Duration
// with waitTimes[i] you can define how much to wait on the ith request (optional)
c
tr
int
//counts the number of requests
c
ount
int
//counts the number of requests
quitC
chan
struct
{}
}
...
...
@@ -47,9 +47,9 @@ func newMockRequester(waitTimes ...time.Duration) *mockRequester {
func
(
m
*
mockRequester
)
doRequest
(
ctx
context
.
Context
,
request
*
Request
)
(
*
enode
.
ID
,
chan
struct
{},
error
)
{
waitTime
:=
time
.
Duration
(
0
)
if
m
.
c
tr
<
len
(
m
.
waitTimes
)
{
waitTime
=
m
.
waitTimes
[
m
.
c
tr
]
m
.
c
tr
++
if
m
.
c
ount
<
len
(
m
.
waitTimes
)
{
waitTime
=
m
.
waitTimes
[
m
.
c
ount
]
m
.
c
ount
++
}
time
.
Sleep
(
waitTime
)
m
.
requestC
<-
request
...
...
@@ -83,7 +83,7 @@ func TestFetcherSingleRequest(t *testing.T) {
go
fetcher
.
run
(
ctx
,
peersToSkip
)
rctx
:=
context
.
Background
()
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
select
{
case
request
:=
<-
requester
.
requestC
:
...
...
@@ -100,6 +100,11 @@ func TestFetcherSingleRequest(t *testing.T) {
t
.
Fatalf
(
"request.peersToSkip does not contain peer returned by the request function"
)
}
// hopCount in the forwarded request should be incremented
if
request
.
HopCount
!=
1
{
t
.
Fatalf
(
"Expected request.HopCount 1 got %v"
,
request
.
HopCount
)
}
// fetch should trigger a request, if it doesn't happen in time, test should fail
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
t
.
Fatalf
(
"fetch timeout"
)
...
...
@@ -123,7 +128,7 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
rctx
,
rcancel
:=
context
.
WithTimeout
(
ctx
,
100
*
time
.
Millisecond
)
defer
rcancel
()
// we call Request with an active context
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select
{
...
...
@@ -151,7 +156,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
rcancel
()
// we call Request with a cancelled context
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select
{
...
...
@@ -162,7 +167,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
rctx
=
context
.
Background
()
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
select
{
case
<-
requester
.
requestC
:
...
...
@@ -200,7 +205,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
// call Request after the Offer
rctx
=
context
.
Background
()
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
// there should be exactly 1 request coming from fetcher
var
request
*
Request
...
...
@@ -241,7 +246,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
// call Request first
rctx
:=
context
.
Background
()
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
// there should be a request coming from fetcher
var
request
*
Request
...
...
@@ -296,7 +301,7 @@ func TestFetcherRetryOnTimeout(t *testing.T) {
// call the fetch function with an active context
rctx
:=
context
.
Background
()
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
// after 100ms the first request should be initiated
time
.
Sleep
(
100
*
time
.
Millisecond
)
...
...
@@ -338,7 +343,7 @@ func TestFetcherFactory(t *testing.T) {
fetcher
:=
fetcherFactory
.
New
(
context
.
Background
(),
addr
,
peersToSkip
)
fetcher
.
Request
(
context
.
Background
())
fetcher
.
Request
(
context
.
Background
()
,
0
)
// check if the created fetchFunction really starts a fetcher and initiates a request
select
{
...
...
@@ -368,7 +373,7 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
go
fetcher
.
run
(
ctx
,
peersToSkip
)
rctx
:=
context
.
Background
()
fetcher
.
Request
(
rctx
)
fetcher
.
Request
(
rctx
,
0
)
select
{
case
<-
requester
.
requestC
:
...
...
@@ -457,3 +462,26 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
t
.
Errorf
(
"peer not skipped"
)
}
}
func
TestFetcherMaxHopCount
(
t
*
testing
.
T
)
{
requester
:=
newMockRequester
()
addr
:=
make
([]
byte
,
32
)
fetcher
:=
NewFetcher
(
addr
,
requester
.
doRequest
,
true
)
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
peersToSkip
:=
&
sync
.
Map
{}
go
fetcher
.
run
(
ctx
,
peersToSkip
)
rctx
:=
context
.
Background
()
fetcher
.
Request
(
rctx
,
maxHopCount
)
// if hopCount is already at max no request should be initiated
select
{
case
<-
requester
.
requestC
:
t
.
Fatalf
(
"cancelled fetcher initiated request"
)
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
}
}
swarm/network/stream/delivery.go
View file @
c3cfdfac
...
...
@@ -128,6 +128,7 @@ func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, err
type
RetrieveRequestMsg
struct
{
Addr
storage
.
Address
SkipCheck
bool
HopCount
uint8
}
func
(
d
*
Delivery
)
handleRetrieveRequestMsg
(
ctx
context
.
Context
,
sp
*
Peer
,
req
*
RetrieveRequestMsg
)
error
{
...
...
@@ -148,7 +149,9 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
var
cancel
func
()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx
,
cancel
=
context
.
WithTimeout
(
context
.
WithValue
(
ctx
,
"peer"
,
sp
.
ID
()
.
String
()),
network
.
RequestTimeout
)
ctx
=
context
.
WithValue
(
ctx
,
"peer"
,
sp
.
ID
()
.
String
())
ctx
=
context
.
WithValue
(
ctx
,
"hopcount"
,
req
.
HopCount
)
ctx
,
cancel
=
context
.
WithTimeout
(
ctx
,
network
.
RequestTimeout
)
go
func
()
{
select
{
...
...
@@ -247,6 +250,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
err
:=
sp
.
SendPriority
(
ctx
,
&
RetrieveRequestMsg
{
Addr
:
req
.
Addr
,
SkipCheck
:
req
.
SkipCheck
,
HopCount
:
req
.
HopCount
,
},
Top
)
if
err
!=
nil
{
return
nil
,
nil
,
err
...
...
swarm/network/stream/stream.go
View file @
c3cfdfac
...
...
@@ -639,7 +639,7 @@ func (c *clientParams) clientCreated() {
// Spec is the spec of the streamer protocol
var
Spec
=
&
protocols
.
Spec
{
Name
:
"stream"
,
Version
:
6
,
Version
:
7
,
MaxMsgSize
:
10
*
1024
*
1024
,
Messages
:
[]
interface
{}{
UnsubscribeMsg
{},
...
...
swarm/storage/mru/testutil.go
View file @
c3cfdfac
...
...
@@ -40,7 +40,7 @@ func (t *TestHandler) Close() {
type
mockNetFetcher
struct
{}
func
(
m
*
mockNetFetcher
)
Request
(
ctx
context
.
Context
)
{
func
(
m
*
mockNetFetcher
)
Request
(
ctx
context
.
Context
,
hopCount
uint8
)
{
}
func
(
m
*
mockNetFetcher
)
Offer
(
ctx
context
.
Context
,
source
*
enode
.
ID
)
{
}
...
...
swarm/storage/netstore.go
View file @
c3cfdfac
...
...
@@ -34,7 +34,7 @@ type (
)
type
NetFetcher
interface
{
Request
(
ctx
context
.
Context
)
Request
(
ctx
context
.
Context
,
hopCount
uint8
)
Offer
(
ctx
context
.
Context
,
source
*
enode
.
ID
)
}
...
...
@@ -263,6 +263,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
// If there is a source in the context then it is an offer, otherwise a request
sourceIF
:=
rctx
.
Value
(
"source"
)
hopCount
,
_
:=
rctx
.
Value
(
"hopcount"
)
.
(
uint8
)
if
sourceIF
!=
nil
{
var
source
enode
.
ID
if
err
:=
source
.
UnmarshalText
([]
byte
(
sourceIF
.
(
string
)));
err
!=
nil
{
...
...
@@ -270,7 +273,7 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
}
f
.
netFetcher
.
Offer
(
rctx
,
&
source
)
}
else
{
f
.
netFetcher
.
Request
(
rctx
)
f
.
netFetcher
.
Request
(
rctx
,
hopCount
)
}
// wait until either the chunk is delivered or the context is done
...
...
swarm/storage/netstore_test.go
View file @
c3cfdfac
...
...
@@ -40,6 +40,7 @@ type mockNetFetcher struct {
offerCalled
bool
quit
<-
chan
struct
{}
ctx
context
.
Context
hopCounts
[]
uint8
}
func
(
m
*
mockNetFetcher
)
Offer
(
ctx
context
.
Context
,
source
*
enode
.
ID
)
{
...
...
@@ -47,7 +48,7 @@ func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
m
.
sources
=
append
(
m
.
sources
,
source
)
}
func
(
m
*
mockNetFetcher
)
Request
(
ctx
context
.
Context
)
{
func
(
m
*
mockNetFetcher
)
Request
(
ctx
context
.
Context
,
hopCount
uint8
)
{
m
.
requestCalled
=
true
var
peers
[]
Address
m
.
peers
.
Range
(
func
(
key
interface
{},
_
interface
{})
bool
{
...
...
@@ -55,6 +56,7 @@ func (m *mockNetFetcher) Request(ctx context.Context) {
return
true
})
m
.
peersPerRequest
=
append
(
m
.
peersPerRequest
,
peers
)
m
.
hopCounts
=
append
(
m
.
hopCounts
,
hopCount
)
}
type
mockNetFetchFuncFactory
struct
{
...
...
@@ -412,7 +414,8 @@ func TestNetStoreGetCallsRequest(t *testing.T) {
chunk
:=
GenerateRandomChunk
(
ch
.
DefaultSize
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
200
*
time
.
Millisecond
)
ctx
:=
context
.
WithValue
(
context
.
Background
(),
"hopcount"
,
uint8
(
5
))
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
200
*
time
.
Millisecond
)
defer
cancel
()
// We call get for a not available chunk, it will timeout because the chunk is not delivered
...
...
@@ -426,6 +429,10 @@ func TestNetStoreGetCallsRequest(t *testing.T) {
if
!
fetcher
.
requestCalled
{
t
.
Fatal
(
"Expected NetFetcher.Request to be called"
)
}
if
fetcher
.
hopCounts
[
0
]
!=
5
{
t
.
Fatalf
(
"Expected NetFetcher.Request be called with hopCount 5, got %v"
,
fetcher
.
hopCounts
[
0
])
}
}
// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment