Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
92fbb616
Commit
92fbb616
authored
Apr 10, 2015
by
Jeffrey Wilcke
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #691 from fjl/discovery-fixes
p2p: a bunch of fixes
parents
fc1d1f9a
c5332537
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
354 additions
and
183 deletions
+354
-183
udp.go
p2p/discover/udp.go
+7
-8
handshake.go
p2p/handshake.go
+42
-26
handshake_test.go
p2p/handshake_test.go
+2
-2
peer.go
p2p/peer.go
+69
-63
peer_error.go
p2p/peer_error.go
+4
-6
peer_test.go
p2p/peer_test.go
+65
-9
server.go
p2p/server.go
+108
-68
server_test.go
p2p/server_test.go
+57
-1
No files found.
p2p/discover/udp.go
View file @
92fbb616
...
...
@@ -335,7 +335,7 @@ func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error {
if
err
!=
nil
{
return
err
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
">>> %v %T
%v
\n
"
,
toaddr
,
req
,
req
)
glog
.
V
(
logger
.
Detail
)
.
Infof
(
">>> %v %T
\n
"
,
toaddr
,
req
)
if
_
,
err
=
t
.
conn
.
WriteToUDP
(
packet
,
toaddr
);
err
!=
nil
{
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"UDP send failed:"
,
err
)
}
...
...
@@ -378,12 +378,11 @@ func (t *udp) readLoop() {
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Bad packet from %v: %v
\n
"
,
from
,
err
)
continue
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"<<< %v %T %v
\n
"
,
from
,
packet
,
packet
)
go
func
()
{
if
err
:=
packet
.
handle
(
t
,
from
,
fromID
,
hash
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"error handling %T from %v: %v"
,
packet
,
from
,
err
)
}
}()
status
:=
"ok"
if
err
:=
packet
.
handle
(
t
,
from
,
fromID
,
hash
);
err
!=
nil
{
status
=
err
.
Error
()
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"<<< %v %T: %s
\n
"
,
from
,
packet
,
status
)
}
}
...
...
@@ -430,7 +429,7 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
})
if
!
t
.
handleReply
(
fromID
,
pingPacket
,
req
)
{
// Note: we're ignoring the provided IP address right now
t
.
bond
(
true
,
fromID
,
from
,
req
.
Port
)
go
t
.
bond
(
true
,
fromID
,
from
,
req
.
Port
)
}
return
nil
}
...
...
p2p/handshake.go
View file @
92fbb616
...
...
@@ -68,50 +68,61 @@ type protoHandshake struct {
// setupConn starts a protocol session on the given connection.
// It runs the encryption handshake and the protocol handshake.
// If dial is non-nil, the connection the local node is the initiator.
func
setupConn
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
)
(
*
conn
,
error
)
{
// If atcap is true, the connection will be disconnected with DiscTooManyPeers
// after the key exchange.
func
setupConn
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
,
atcap
bool
)
(
*
conn
,
error
)
{
if
dial
==
nil
{
return
setupInboundConn
(
fd
,
prv
,
our
)
return
setupInboundConn
(
fd
,
prv
,
our
,
atcap
)
}
else
{
return
setupOutboundConn
(
fd
,
prv
,
our
,
dial
)
return
setupOutboundConn
(
fd
,
prv
,
our
,
dial
,
atcap
)
}
}
func
setupInboundConn
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
)
(
*
conn
,
error
)
{
func
setupInboundConn
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
atcap
bool
)
(
*
conn
,
error
)
{
secrets
,
err
:=
receiverEncHandshake
(
fd
,
prv
,
nil
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"encryption handshake failed: %v"
,
err
)
}
// Run the protocol handshake using authenticated messages.
rw
:=
newRlpxFrameRW
(
fd
,
secrets
)
rhs
,
err
:=
readProtocolHandshake
(
rw
,
our
)
if
atcap
{
SendItems
(
rw
,
discMsg
,
DiscTooManyPeers
)
return
nil
,
errors
.
New
(
"we have too many peers"
)
}
// Run the protocol handshake using authenticated messages.
rhs
,
err
:=
readProtocolHandshake
(
rw
,
secrets
.
RemoteID
,
our
)
if
err
!=
nil
{
return
nil
,
err
}
if
rhs
.
ID
!=
secrets
.
RemoteID
{
return
nil
,
errors
.
New
(
"node ID in protocol handshake does not match encryption handshake"
)
}
// TODO: validate that handshake node ID matches
if
err
:=
Send
(
rw
,
handshakeMsg
,
our
);
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"protocol write error: %v"
,
err
)
return
nil
,
fmt
.
Errorf
(
"protocol
handshake
write error: %v"
,
err
)
}
return
&
conn
{
rw
,
rhs
},
nil
}
func
setupOutboundConn
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
)
(
*
conn
,
error
)
{
func
setupOutboundConn
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
,
atcap
bool
)
(
*
conn
,
error
)
{
secrets
,
err
:=
initiatorEncHandshake
(
fd
,
prv
,
dial
.
ID
,
nil
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"encryption handshake failed: %v"
,
err
)
}
// Run the protocol handshake using authenticated messages.
rw
:=
newRlpxFrameRW
(
fd
,
secrets
)
if
err
:=
Send
(
rw
,
handshakeMsg
,
our
);
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"protocol write error: %v"
,
err
)
if
atcap
{
SendItems
(
rw
,
discMsg
,
DiscTooManyPeers
)
return
nil
,
errors
.
New
(
"we have too many peers"
)
}
rhs
,
err
:=
readProtocolHandshake
(
rw
,
our
)
// Run the protocol handshake using authenticated messages.
//
// Note that even though writing the handshake is first, we prefer
// returning the handshake read error. If the remote side
// disconnects us early with a valid reason, we should return it
// as the error so it can be tracked elsewhere.
werr
:=
make
(
chan
error
)
go
func
()
{
werr
<-
Send
(
rw
,
handshakeMsg
,
our
)
}()
rhs
,
err
:=
readProtocolHandshake
(
rw
,
secrets
.
RemoteID
,
our
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"protocol handshake read error: %v"
,
err
)
return
nil
,
err
}
if
err
:=
<-
werr
;
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"protocol handshake write error: %v"
,
err
)
}
if
rhs
.
ID
!=
dial
.
ID
{
return
nil
,
errors
.
New
(
"dialed node id mismatch"
)
...
...
@@ -398,18 +409,17 @@ func xor(one, other []byte) (xor []byte) {
return
xor
}
func
readProtocolHandshake
(
r
MsgReader
,
our
*
protoHandshake
)
(
*
protoHandshake
,
error
)
{
// read and handle remote handshake
msg
,
err
:=
r
.
ReadMsg
()
func
readProtocolHandshake
(
rw
MsgReadWriter
,
wantID
discover
.
NodeID
,
our
*
protoHandshake
)
(
*
protoHandshake
,
error
)
{
msg
,
err
:=
rw
.
ReadMsg
()
if
err
!=
nil
{
return
nil
,
err
}
if
msg
.
Code
==
discMsg
{
// disconnect before protocol handshake is valid according to the
// spec and we send it ourself if Server.addPeer fails.
var
reason
DiscReason
var
reason
[
1
]
DiscReason
rlp
.
Decode
(
msg
.
Payload
,
&
reason
)
return
nil
,
discRequestedError
(
reason
)
return
nil
,
reason
[
0
]
}
if
msg
.
Code
!=
handshakeMsg
{
return
nil
,
fmt
.
Errorf
(
"expected handshake, got %x"
,
msg
.
Code
)
...
...
@@ -423,10 +433,16 @@ func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, e
}
// validate handshake info
if
hs
.
Version
!=
our
.
Version
{
return
nil
,
newPeerError
(
errP2PVersionMismatch
,
"required version %d, received %d
\n
"
,
baseProtocolVersion
,
hs
.
Version
)
SendItems
(
rw
,
discMsg
,
DiscIncompatibleVersion
)
return
nil
,
fmt
.
Errorf
(
"required version %d, received %d
\n
"
,
baseProtocolVersion
,
hs
.
Version
)
}
if
(
hs
.
ID
==
discover
.
NodeID
{})
{
return
nil
,
newPeerError
(
errPubkeyInvalid
,
"missing"
)
SendItems
(
rw
,
discMsg
,
DiscInvalidIdentity
)
return
nil
,
errors
.
New
(
"invalid public key in handshake"
)
}
if
hs
.
ID
!=
wantID
{
SendItems
(
rw
,
discMsg
,
DiscUnexpectedIdentity
)
return
nil
,
errors
.
New
(
"handshake node ID does not match encryption handshake"
)
}
return
&
hs
,
nil
}
p2p/handshake_test.go
View file @
92fbb616
...
...
@@ -143,7 +143,7 @@ func TestSetupConn(t *testing.T) {
done
:=
make
(
chan
struct
{})
go
func
()
{
defer
close
(
done
)
conn0
,
err
:=
setupConn
(
fd0
,
prv0
,
hs0
,
node1
)
conn0
,
err
:=
setupConn
(
fd0
,
prv0
,
hs0
,
node1
,
false
)
if
err
!=
nil
{
t
.
Errorf
(
"outbound side error: %v"
,
err
)
return
...
...
@@ -156,7 +156,7 @@ func TestSetupConn(t *testing.T) {
}
}()
conn1
,
err
:=
setupConn
(
fd1
,
prv1
,
hs1
,
nil
)
conn1
,
err
:=
setupConn
(
fd1
,
prv1
,
hs1
,
nil
,
false
)
if
err
!=
nil
{
t
.
Fatalf
(
"inbound side error: %v"
,
err
)
}
...
...
p2p/peer.go
View file @
92fbb616
...
...
@@ -44,7 +44,7 @@ type Peer struct {
rw
*
conn
running
map
[
string
]
*
protoRW
protoWG
sync
.
WaitGroup
wg
sync
.
WaitGroup
protoErr
chan
error
closed
chan
struct
{}
disc
chan
DiscReason
...
...
@@ -102,58 +102,50 @@ func (p *Peer) String() string {
func
newPeer
(
fd
net
.
Conn
,
conn
*
conn
,
protocols
[]
Protocol
)
*
Peer
{
logtag
:=
fmt
.
Sprintf
(
"Peer %.8x %v"
,
conn
.
ID
[
:
],
fd
.
RemoteAddr
())
protomap
:=
matchProtocols
(
protocols
,
conn
.
Caps
,
conn
)
p
:=
&
Peer
{
Logger
:
logger
.
NewLogger
(
logtag
),
conn
:
fd
,
rw
:
conn
,
running
:
matchProtocols
(
protocols
,
conn
.
Caps
,
conn
)
,
running
:
protomap
,
disc
:
make
(
chan
DiscReason
),
protoErr
:
make
(
chan
error
),
protoErr
:
make
(
chan
error
,
len
(
protomap
)
+
1
),
// protocols + pingLoop
closed
:
make
(
chan
struct
{}),
}
return
p
}
func
(
p
*
Peer
)
run
()
DiscReason
{
var
readErr
=
make
(
chan
error
,
1
)
defer
p
.
closeProtocols
()
defer
close
(
p
.
closed
)
readErr
:=
make
(
chan
error
,
1
)
p
.
wg
.
Add
(
2
)
go
p
.
readLoop
(
readErr
)
go
p
.
pingLoop
()
p
.
startProtocols
()
go
func
()
{
readErr
<-
p
.
readLoop
()
}()
ping
:=
time
.
NewTicker
(
pingInterval
)
defer
ping
.
Stop
()
// Wait for an error or disconnect.
var
reason
DiscReason
loop
:
for
{
select
{
case
<-
ping
.
C
:
go
func
()
{
if
err
:=
SendItems
(
p
.
rw
,
pingMsg
);
err
!=
nil
{
p
.
protoErr
<-
err
return
}
}()
case
err
:=
<-
readErr
:
// We rely on protocols to abort if there is a write error. It
// might be more robust to handle them here as well.
p
.
DebugDetailf
(
"Read error: %v
\n
"
,
err
)
p
.
conn
.
Close
()
return
DiscNetworkError
case
err
:=
<-
p
.
protoErr
:
reason
=
discReasonForError
(
err
)
break
loop
case
reason
=
<-
p
.
disc
:
break
loop
select
{
case
err
:=
<-
readErr
:
if
r
,
ok
:=
err
.
(
DiscReason
);
ok
{
reason
=
r
break
}
// Note: We rely on protocols to abort if there is a write
// error. It might be more robust to handle them here as well.
p
.
DebugDetailf
(
"Read error: %v
\n
"
,
err
)
p
.
conn
.
Close
()
reason
=
DiscNetworkError
case
err
:=
<-
p
.
protoErr
:
reason
=
discReasonForError
(
err
)
case
reason
=
<-
p
.
disc
:
}
p
.
politeDisconnect
(
reason
)
// Wait for readLoop. It will end because conn is now closed.
<-
readErr
close
(
p
.
closed
)
p
.
wg
.
Wait
()
if
reason
!=
DiscNetworkError
{
p
.
politeDisconnect
(
reason
)
}
p
.
Debugf
(
"Disconnected: %v
\n
"
,
reason
)
return
reason
}
...
...
@@ -174,18 +166,36 @@ func (p *Peer) politeDisconnect(reason DiscReason) {
p
.
conn
.
Close
()
}
func
(
p
*
Peer
)
readLoop
()
error
{
func
(
p
*
Peer
)
pingLoop
()
{
ping
:=
time
.
NewTicker
(
pingInterval
)
defer
p
.
wg
.
Done
()
defer
ping
.
Stop
()
for
{
select
{
case
<-
ping
.
C
:
if
err
:=
SendItems
(
p
.
rw
,
pingMsg
);
err
!=
nil
{
p
.
protoErr
<-
err
return
}
case
<-
p
.
closed
:
return
}
}
}
func
(
p
*
Peer
)
readLoop
(
errc
chan
<-
error
)
{
defer
p
.
wg
.
Done
()
for
{
p
.
conn
.
SetDeadline
(
time
.
Now
()
.
Add
(
frameReadTimeout
))
msg
,
err
:=
p
.
rw
.
ReadMsg
()
if
err
!=
nil
{
return
err
errc
<-
err
return
}
if
err
=
p
.
handle
(
msg
);
err
!=
nil
{
return
err
errc
<-
err
return
}
}
return
nil
}
func
(
p
*
Peer
)
handle
(
msg
Msg
)
error
{
...
...
@@ -195,12 +205,11 @@ func (p *Peer) handle(msg Msg) error {
go
SendItems
(
p
.
rw
,
pongMsg
)
case
msg
.
Code
==
discMsg
:
var
reason
[
1
]
DiscReason
//
no need to discard or for error checking, we'll close the
// c
onnection after this
.
//
This is the last message. We don't need to discard or
// c
heck errors because, the connection will be closed after it
.
rlp
.
Decode
(
msg
.
Payload
,
&
reason
)
p
.
Debugf
(
"Disconnect requested: %v
\n
"
,
reason
[
0
])
p
.
Disconnect
(
DiscRequested
)
return
discRequestedError
(
reason
[
0
])
return
DiscRequested
case
msg
.
Code
<
baseProtocolLength
:
// ignore other base protocol messages
return
msg
.
Discard
()
...
...
@@ -210,7 +219,12 @@ func (p *Peer) handle(msg Msg) error {
if
err
!=
nil
{
return
fmt
.
Errorf
(
"msg code out of range: %v"
,
msg
.
Code
)
}
proto
.
in
<-
msg
select
{
case
proto
.
in
<-
msg
:
return
nil
case
<-
p
.
closed
:
return
io
.
EOF
}
}
return
nil
}
...
...
@@ -234,10 +248,11 @@ outer:
}
func
(
p
*
Peer
)
startProtocols
()
{
p
.
wg
.
Add
(
len
(
p
.
running
))
for
_
,
proto
:=
range
p
.
running
{
proto
:=
proto
proto
.
closed
=
p
.
closed
p
.
DebugDetailf
(
"Starting protocol %s/%d
\n
"
,
proto
.
Name
,
proto
.
Version
)
p
.
protoWG
.
Add
(
1
)
go
func
()
{
err
:=
proto
.
Run
(
p
,
proto
)
if
err
==
nil
{
...
...
@@ -246,11 +261,8 @@ func (p *Peer) startProtocols() {
}
else
{
p
.
DebugDetailf
(
"Protocol %s/%d error: %v
\n
"
,
proto
.
Name
,
proto
.
Version
,
err
)
}
select
{
case
p
.
protoErr
<-
err
:
case
<-
p
.
closed
:
}
p
.
protoWG
.
Done
()
p
.
protoErr
<-
err
p
.
wg
.
Done
()
}()
}
}
...
...
@@ -266,13 +278,6 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) {
return
nil
,
newPeerError
(
errInvalidMsgCode
,
"%d"
,
code
)
}
func
(
p
*
Peer
)
closeProtocols
()
{
for
_
,
p
:=
range
p
.
running
{
close
(
p
.
in
)
}
p
.
protoWG
.
Wait
()
}
// writeProtoMsg sends the given message on behalf of the given named protocol.
// this exists because of Server.Broadcast.
func
(
p
*
Peer
)
writeProtoMsg
(
protoName
string
,
msg
Msg
)
error
{
...
...
@@ -289,8 +294,8 @@ func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
type
protoRW
struct
{
Protocol
in
chan
Msg
closed
<-
chan
struct
{}
offset
uint64
w
MsgWriter
}
...
...
@@ -304,10 +309,11 @@ func (rw *protoRW) WriteMsg(msg Msg) error {
}
func
(
rw
*
protoRW
)
ReadMsg
()
(
Msg
,
error
)
{
msg
,
ok
:=
<-
rw
.
in
if
!
ok
{
return
msg
,
io
.
EOF
select
{
case
msg
:=
<-
rw
.
in
:
msg
.
Code
-=
rw
.
offset
return
msg
,
nil
case
<-
rw
.
closed
:
return
Msg
{},
io
.
EOF
}
msg
.
Code
-=
rw
.
offset
return
msg
,
nil
}
p2p/peer_error.go
View file @
92fbb616
...
...
@@ -98,15 +98,13 @@ func (d DiscReason) String() string {
return
discReasonToString
[
d
]
}
type
discRequestedError
DiscReason
func
(
err
discRequestedError
)
Error
()
string
{
return
fmt
.
Sprintf
(
"disconnect requested: %v"
,
DiscReason
(
err
))
func
(
d
DiscReason
)
Error
()
string
{
return
d
.
String
()
}
func
discReasonForError
(
err
error
)
DiscReason
{
if
reason
,
ok
:=
err
.
(
discRequestedError
);
ok
{
return
DiscReason
(
reason
)
if
reason
,
ok
:=
err
.
(
DiscReason
);
ok
{
return
reason
}
peerError
,
ok
:=
err
.
(
*
peerError
)
if
!
ok
{
...
...
p2p/peer_test.go
View file @
92fbb616
...
...
@@ -2,8 +2,9 @@ package p2p
import
(
"bytes"
"errors"
"fmt"
"
io
"
"
math/rand
"
"net"
"reflect"
"testing"
...
...
@@ -27,7 +28,7 @@ var discard = Protocol{
},
}
func
testPeer
(
protos
[]
Protocol
)
(
io
.
Closer
,
*
conn
,
*
Peer
,
<-
chan
DiscReason
)
{
func
testPeer
(
protos
[]
Protocol
)
(
func
()
,
*
conn
,
*
Peer
,
<-
chan
DiscReason
)
{
fd1
,
_
:=
net
.
Pipe
()
hs1
:=
&
protoHandshake
{
ID
:
randomID
(),
Version
:
baseProtocolVersion
}
hs2
:=
&
protoHandshake
{
ID
:
randomID
(),
Version
:
baseProtocolVersion
}
...
...
@@ -41,7 +42,11 @@ func testPeer(protos []Protocol) (io.Closer, *conn, *Peer, <-chan DiscReason) {
errc
:=
make
(
chan
DiscReason
,
1
)
go
func
()
{
errc
<-
peer
.
run
()
}()
return
p1
,
&
conn
{
p2
,
hs2
},
peer
,
errc
closer
:=
func
()
{
p1
.
Close
()
fd1
.
Close
()
}
return
closer
,
&
conn
{
p2
,
hs2
},
peer
,
errc
}
func
TestPeerProtoReadMsg
(
t
*
testing
.
T
)
{
...
...
@@ -67,7 +72,7 @@ func TestPeerProtoReadMsg(t *testing.T) {
}
closer
,
rw
,
_
,
errc
:=
testPeer
([]
Protocol
{
proto
})
defer
closer
.
Close
()
defer
closer
()
Send
(
rw
,
baseProtocolLength
+
2
,
[]
uint
{
1
})
Send
(
rw
,
baseProtocolLength
+
3
,
[]
uint
{
2
})
...
...
@@ -99,7 +104,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
},
}
closer
,
rw
,
_
,
_
:=
testPeer
([]
Protocol
{
proto
})
defer
closer
.
Close
()
defer
closer
()
if
err
:=
ExpectMsg
(
rw
,
17
,
[]
string
{
"foo"
,
"bar"
});
err
!=
nil
{
t
.
Error
(
err
)
...
...
@@ -110,7 +115,7 @@ func TestPeerWriteForBroadcast(t *testing.T) {
defer
testlog
(
t
)
.
detach
()
closer
,
rw
,
peer
,
peerErr
:=
testPeer
([]
Protocol
{
discard
})
defer
closer
.
Close
()
defer
closer
()
emptymsg
:=
func
(
code
uint64
)
Msg
{
return
Msg
{
Code
:
code
,
Size
:
0
,
Payload
:
bytes
.
NewReader
(
nil
)}
...
...
@@ -150,7 +155,7 @@ func TestPeerPing(t *testing.T) {
defer
testlog
(
t
)
.
detach
()
closer
,
rw
,
_
,
_
:=
testPeer
(
nil
)
defer
closer
.
Close
()
defer
closer
()
if
err
:=
SendItems
(
rw
,
pingMsg
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -163,19 +168,70 @@ func TestPeerDisconnect(t *testing.T) {
defer
testlog
(
t
)
.
detach
()
closer
,
rw
,
_
,
disc
:=
testPeer
(
nil
)
defer
closer
.
Close
()
defer
closer
()
if
err
:=
SendItems
(
rw
,
discMsg
,
DiscQuitting
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
err
:=
ExpectMsg
(
rw
,
discMsg
,
[]
interface
{}{
DiscRequested
});
err
!=
nil
{
t
.
Error
(
err
)
}
closer
.
Close
()
// make test end faster
closer
()
if
reason
:=
<-
disc
;
reason
!=
DiscRequested
{
t
.
Errorf
(
"run returned wrong reason: got %v, want %v"
,
reason
,
DiscRequested
)
}
}
// This test is supposed to verify that Peer can reliably handle
// multiple causes of disconnection occurring at the same time.
func
TestPeerDisconnectRace
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
maybe
:=
func
()
bool
{
return
rand
.
Intn
(
1
)
==
1
}
for
i
:=
0
;
i
<
1000
;
i
++
{
protoclose
:=
make
(
chan
error
)
protodisc
:=
make
(
chan
DiscReason
)
closer
,
rw
,
p
,
disc
:=
testPeer
([]
Protocol
{
{
Name
:
"closereq"
,
Run
:
func
(
p
*
Peer
,
rw
MsgReadWriter
)
error
{
return
<-
protoclose
},
Length
:
1
,
},
{
Name
:
"disconnect"
,
Run
:
func
(
p
*
Peer
,
rw
MsgReadWriter
)
error
{
p
.
Disconnect
(
<-
protodisc
);
return
nil
},
Length
:
1
,
},
})
// Simulate incoming messages.
go
SendItems
(
rw
,
baseProtocolLength
+
1
)
go
SendItems
(
rw
,
baseProtocolLength
+
2
)
// Close the network connection.
go
closer
()
// Make protocol "closereq" return.
protoclose
<-
errors
.
New
(
"protocol closed"
)
// Make protocol "disconnect" call peer.Disconnect
protodisc
<-
DiscAlreadyConnected
// In some cases, simulate something else calling peer.Disconnect.
if
maybe
()
{
go
p
.
Disconnect
(
DiscInvalidIdentity
)
}
// In some cases, simulate remote requesting a disconnect.
if
maybe
()
{
go
SendItems
(
rw
,
discMsg
,
DiscQuitting
)
}
select
{
case
<-
disc
:
case
<-
time
.
After
(
2
*
time
.
Second
)
:
// Peer.run should return quickly. If it doesn't the Peer
// goroutines are probably deadlocked. Call panic in order to
// show the stacks.
panic
(
"Peer.run took to long to return."
)
}
}
}
func
TestNewPeer
(
t
*
testing
.
T
)
{
name
:=
"nodename"
caps
:=
[]
Cap
{{
"foo"
,
2
},
{
"bar"
,
3
}}
...
...
p2p/server.go
View file @
92fbb616
...
...
@@ -3,6 +3,7 @@ package p2p
import
(
"bytes"
"crypto/ecdsa"
"crypto/rand"
"errors"
"fmt"
"net"
...
...
@@ -20,6 +21,11 @@ const (
defaultDialTimeout
=
10
*
time
.
Second
refreshPeersInterval
=
30
*
time
.
Second
// This is the maximum number of inbound connection
// that are allowed to linger between 'accepted' and
// 'added as peer'.
maxAcceptConns
=
50
// total timeout for encryption handshake and protocol
// handshake in both directions.
handshakeTimeout
=
5
*
time
.
Second
...
...
@@ -85,12 +91,12 @@ type Server struct {
ourHandshake
*
protoHandshake
lock
sync
.
RWMutex
running
bool
listener
net
.
Listener
peers
map
[
discover
.
NodeID
]
*
Peer
lock
sync
.
RWMutex
// protects running and peers
running
bool
peers
map
[
discover
.
NodeID
]
*
Peer
ntab
*
discover
.
Table
ntab
*
discover
.
Table
listener
net
.
Listener
quit
chan
struct
{}
loopWG
sync
.
WaitGroup
// {dial,listen,nat}Loop
...
...
@@ -98,7 +104,7 @@ type Server struct {
peerConnect
chan
*
discover
.
Node
}
type
setupFunc
func
(
net
.
Conn
,
*
ecdsa
.
PrivateKey
,
*
protoHandshake
,
*
discover
.
Node
)
(
*
conn
,
error
)
type
setupFunc
func
(
net
.
Conn
,
*
ecdsa
.
PrivateKey
,
*
protoHandshake
,
*
discover
.
Node
,
bool
)
(
*
conn
,
error
)
type
newPeerHook
func
(
*
Peer
)
// Peers returns all connected peers.
...
...
@@ -260,62 +266,94 @@ func (srv *Server) Stop() {
srv
.
peerWG
.
Wait
()
}
// Self returns the local node's endpoint information.
func
(
srv
*
Server
)
Self
()
*
discover
.
Node
{
return
srv
.
ntab
.
Self
()
}
// main loop for adding connections via listening
func
(
srv
*
Server
)
listenLoop
()
{
defer
srv
.
loopWG
.
Done
()
// This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake.
// If all slots are taken, no further connections are accepted.
slots
:=
make
(
chan
struct
{},
maxAcceptConns
)
for
i
:=
0
;
i
<
maxAcceptConns
;
i
++
{
slots
<-
struct
{}{}
}
glog
.
V
(
logger
.
Info
)
.
Infoln
(
"Listening on"
,
srv
.
listener
.
Addr
())
for
{
<-
slots
conn
,
err
:=
srv
.
listener
.
Accept
()
if
err
!=
nil
{
return
}
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Accepted conn %v
\n
"
,
conn
.
RemoteAddr
())
srv
.
peerWG
.
Add
(
1
)
go
srv
.
startPeer
(
conn
,
nil
)
go
func
()
{
srv
.
startPeer
(
conn
,
nil
)
slots
<-
struct
{}{}
}()
}
}
func
(
srv
*
Server
)
dialLoop
()
{
var
(
dialed
=
make
(
chan
*
discover
.
Node
)
dialing
=
make
(
map
[
discover
.
NodeID
]
bool
)
findresults
=
make
(
chan
[]
*
discover
.
Node
)
refresh
=
time
.
NewTimer
(
0
)
)
defer
srv
.
loopWG
.
Done
()
refresh
:=
time
.
NewTicker
(
refreshPeersInterval
)
defer
refresh
.
Stop
()
srv
.
ntab
.
Bootstrap
(
srv
.
BootstrapNodes
)
go
srv
.
findPeers
()
dialed
:=
make
(
chan
*
discover
.
Node
)
dialing
:=
make
(
map
[
discover
.
NodeID
]
bool
)
// TODO: maybe limit number of active dials
dial
:=
func
(
dest
*
discover
.
Node
)
{
// Don't dial nodes that would fail the checks in addPeer.
// This is important because the connection handshake is a lot
// of work and we'd rather avoid doing that work for peers
// that can't be added.
srv
.
lock
.
RLock
()
ok
,
_
:=
srv
.
checkPeer
(
dest
.
ID
)
srv
.
lock
.
RUnlock
()
if
!
ok
||
dialing
[
dest
.
ID
]
{
return
}
// TODO: limit number of active dials
// TODO: ensure only one findPeers goroutine is running
// TODO: pause findPeers when we're at capacity
dialing
[
dest
.
ID
]
=
true
srv
.
peerWG
.
Add
(
1
)
go
func
()
{
srv
.
dialNode
(
dest
)
dialed
<-
dest
}()
}
srv
.
ntab
.
Bootstrap
(
srv
.
BootstrapNodes
)
for
{
select
{
case
<-
refresh
.
C
:
go
srv
.
findPeers
()
// Grab some nodes to connect to if we're not at capacity.
srv
.
lock
.
RLock
()
needpeers
:=
len
(
srv
.
peers
)
<
srv
.
MaxPeers
srv
.
lock
.
RUnlock
()
if
needpeers
{
go
func
()
{
var
target
discover
.
NodeID
rand
.
Read
(
target
[
:
])
findresults
<-
srv
.
ntab
.
Lookup
(
target
)
}()
refresh
.
Stop
()
}
case
dest
:=
<-
srv
.
peerConnect
:
// avoid dialing nodes that are already connected.
// there is another check for this in addPeer,
// which runs after the handshake.
srv
.
lock
.
Lock
()
_
,
isconnected
:=
srv
.
peers
[
dest
.
ID
]
srv
.
lock
.
Unlock
()
if
isconnected
||
dialing
[
dest
.
ID
]
||
dest
.
ID
==
srv
.
Self
()
.
ID
{
continue
dial
(
dest
)
case
dests
:=
<-
findresults
:
for
_
,
dest
:=
range
dests
{
dial
(
dest
)
}
dialing
[
dest
.
ID
]
=
true
srv
.
peerWG
.
Add
(
1
)
go
func
()
{
srv
.
dialNode
(
dest
)
// at this point, the peer has been added
// or discarded. either way, we're not dialing it anymore.
dialed
<-
dest
}()
refresh
.
Reset
(
refreshPeersInterval
)
case
dest
:=
<-
dialed
:
delete
(
dialing
,
dest
.
ID
)
...
...
@@ -331,44 +369,34 @@ func (srv *Server) dialNode(dest *discover.Node) {
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Dialing %v
\n
"
,
dest
)
conn
,
err
:=
srv
.
Dialer
.
Dial
(
"tcp"
,
addr
.
String
())
if
err
!=
nil
{
// dialLoop adds to the wait group counter when launching
// dialNode, so we need to count it down again. startPeer also
// does that when an error occurs.
srv
.
peerWG
.
Done
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"dial error: %v"
,
err
)
return
}
srv
.
startPeer
(
conn
,
dest
)
}
func
(
srv
*
Server
)
Self
()
*
discover
.
Node
{
return
srv
.
ntab
.
Self
()
}
func
(
srv
*
Server
)
findPeers
()
{
far
:=
srv
.
Self
()
.
ID
for
i
:=
range
far
{
far
[
i
]
=
^
far
[
i
]
}
closeToSelf
:=
srv
.
ntab
.
Lookup
(
srv
.
Self
()
.
ID
)
farFromSelf
:=
srv
.
ntab
.
Lookup
(
far
)
for
i
:=
0
;
i
<
len
(
closeToSelf
)
||
i
<
len
(
farFromSelf
);
i
++
{
if
i
<
len
(
closeToSelf
)
{
srv
.
peerConnect
<-
closeToSelf
[
i
]
}
if
i
<
len
(
farFromSelf
)
{
srv
.
peerConnect
<-
farFromSelf
[
i
]
}
}
}
func
(
srv
*
Server
)
startPeer
(
fd
net
.
Conn
,
dest
*
discover
.
Node
)
{
// TODO: handle/store session token
// Run setupFunc, which should create an authenticated connection
// and run the capability exchange. Note that any early error
// returns during that exchange need to call peerWG.Done because
// the callers of startPeer added the peer to the wait group already.
fd
.
SetDeadline
(
time
.
Now
()
.
Add
(
handshakeTimeout
))
conn
,
err
:=
srv
.
setupFunc
(
fd
,
srv
.
PrivateKey
,
srv
.
ourHandshake
,
dest
)
srv
.
lock
.
RLock
()
atcap
:=
len
(
srv
.
peers
)
==
srv
.
MaxPeers
srv
.
lock
.
RUnlock
()
conn
,
err
:=
srv
.
setupFunc
(
fd
,
srv
.
PrivateKey
,
srv
.
ourHandshake
,
dest
,
atcap
)
if
err
!=
nil
{
fd
.
Close
()
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Handshake with %v failed: %v"
,
fd
.
RemoteAddr
(),
err
)
srv
.
peerWG
.
Done
()
return
}
conn
.
MsgReadWriter
=
&
netWrapper
{
wrapped
:
conn
.
MsgReadWriter
,
conn
:
fd
,
rtimeout
:
frameReadTimeout
,
wtimeout
:
frameWriteTimeout
,
...
...
@@ -377,26 +405,30 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
if
ok
,
reason
:=
srv
.
addPeer
(
conn
.
ID
,
p
);
!
ok
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"Not adding %v (%v)
\n
"
,
p
,
reason
)
p
.
politeDisconnect
(
reason
)
srv
.
peerWG
.
Done
()
return
}
// The handshakes are done and it passed all checks.
// Spawn the Peer loops.
go
srv
.
runPeer
(
p
)
}
func
(
srv
*
Server
)
runPeer
(
p
*
Peer
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Added %v
\n
"
,
p
)
srvjslog
.
LogJson
(
&
logger
.
P2PConnected
{
RemoteId
:
fmt
.
Sprintf
(
"%x"
,
conn
.
ID
[
:
]
),
RemoteAddress
:
fd
.
RemoteAddr
()
.
String
(),
RemoteVersionString
:
conn
.
Name
,
RemoteId
:
p
.
ID
()
.
String
(
),
RemoteAddress
:
p
.
RemoteAddr
()
.
String
(),
RemoteVersionString
:
p
.
Name
()
,
NumConnections
:
srv
.
PeerCount
(),
})
if
srv
.
newPeerHook
!=
nil
{
srv
.
newPeerHook
(
p
)
}
discreason
:=
p
.
run
()
srv
.
removePeer
(
p
)
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Removed %v (%v)
\n
"
,
p
,
discreason
)
srvjslog
.
LogJson
(
&
logger
.
P2PDisconnected
{
RemoteId
:
fmt
.
Sprintf
(
"%x"
,
conn
.
ID
[
:
]
),
RemoteId
:
p
.
ID
()
.
String
(
),
NumConnections
:
srv
.
PeerCount
(),
})
}
...
...
@@ -404,6 +436,14 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
func
(
srv
*
Server
)
addPeer
(
id
discover
.
NodeID
,
p
*
Peer
)
(
bool
,
DiscReason
)
{
srv
.
lock
.
Lock
()
defer
srv
.
lock
.
Unlock
()
if
ok
,
reason
:=
srv
.
checkPeer
(
id
);
!
ok
{
return
false
,
reason
}
srv
.
peers
[
id
]
=
p
return
true
,
0
}
func
(
srv
*
Server
)
checkPeer
(
id
discover
.
NodeID
)
(
bool
,
DiscReason
)
{
switch
{
case
!
srv
.
running
:
return
false
,
DiscQuitting
...
...
@@ -413,9 +453,9 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
return
false
,
DiscAlreadyConnected
case
id
==
srv
.
Self
()
.
ID
:
return
false
,
DiscSelf
default
:
return
true
,
0
}
srv
.
peers
[
id
]
=
p
return
true
,
0
}
func
(
srv
*
Server
)
removePeer
(
p
*
Peer
)
{
...
...
p2p/server_test.go
View file @
92fbb616
...
...
@@ -22,7 +22,7 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
ListenAddr
:
"127.0.0.1:0"
,
PrivateKey
:
newkey
(),
newPeerHook
:
pf
,
setupFunc
:
func
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
)
(
*
conn
,
error
)
{
setupFunc
:
func
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
,
atcap
bool
)
(
*
conn
,
error
)
{
id
:=
randomID
()
rw
:=
newRlpxFrameRW
(
fd
,
secrets
{
MAC
:
zero16
,
...
...
@@ -163,6 +163,62 @@ func TestServerBroadcast(t *testing.T) {
}
}
// This test checks that connections are disconnected
// just after the encryption handshake when the server is
// at capacity.
//
// It also serves as a light-weight integration test.
func
TestServerDisconnectAtCap
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
started
:=
make
(
chan
*
Peer
)
srv
:=
&
Server
{
ListenAddr
:
"127.0.0.1:0"
,
PrivateKey
:
newkey
(),
MaxPeers
:
10
,
NoDial
:
true
,
// This hook signals that the peer was actually started. We
// need to wait for the peer to be started before dialing the
// next connection to get a deterministic peer count.
newPeerHook
:
func
(
p
*
Peer
)
{
started
<-
p
},
}
if
err
:=
srv
.
Start
();
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
srv
.
Stop
()
nconns
:=
srv
.
MaxPeers
+
1
dialer
:=
&
net
.
Dialer
{
Deadline
:
time
.
Now
()
.
Add
(
3
*
time
.
Second
)}
for
i
:=
0
;
i
<
nconns
;
i
++
{
conn
,
err
:=
dialer
.
Dial
(
"tcp"
,
srv
.
ListenAddr
)
if
err
!=
nil
{
t
.
Fatalf
(
"conn %d: dial error: %v"
,
i
,
err
)
}
// Close the connection when the test ends, before
// shutting down the server.
defer
conn
.
Close
()
// Run the handshakes just like a real peer would.
key
:=
newkey
()
hs
:=
&
protoHandshake
{
Version
:
baseProtocolVersion
,
ID
:
discover
.
PubkeyID
(
&
key
.
PublicKey
)}
_
,
err
=
setupConn
(
conn
,
key
,
hs
,
srv
.
Self
(),
false
)
if
i
==
nconns
-
1
{
// When handling the last connection, the server should
// disconnect immediately instead of running the protocol
// handshake.
if
err
!=
DiscTooManyPeers
{
t
.
Errorf
(
"conn %d: got error %q, expected %q"
,
i
,
err
,
DiscTooManyPeers
)
}
}
else
{
// For all earlier connections, the handshake should go through.
if
err
!=
nil
{
t
.
Fatalf
(
"conn %d: unexpected error: %v"
,
i
,
err
)
}
// Wait for runPeer to be started.
<-
started
}
}
}
func
newkey
()
*
ecdsa
.
PrivateKey
{
key
,
err
:=
crypto
.
GenerateKey
()
if
err
!=
nil
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment