Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
12ca7053
Commit
12ca7053
authored
Mar 05, 2015
by
obscuren
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'fjl-p2p-handshake-2' into poc-9
parents
fabaf4f1
ba0c4143
Changes
15
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
763 additions
and
747 deletions
+763
-747
backend.go
eth/backend.go
+1
-8
protocol.go
eth/protocol.go
+21
-15
node.go
p2p/discover/node.go
+15
-0
node_test.go
p2p/discover/node_test.go
+18
-0
handshake.go
p2p/handshake.go
+239
-237
handshake_test.go
p2p/handshake_test.go
+66
-119
message.go
p2p/message.go
+20
-167
message_test.go
p2p/message_test.go
+13
-49
peer.go
p2p/peer.go
+16
-12
peer_test.go
p2p/peer_test.go
+19
-56
rlpx.go
p2p/rlpx.go
+174
-0
rlpx_test.go
p2p/rlpx_test.go
+124
-0
server.go
p2p/server.go
+18
-63
server_test.go
p2p/server_test.go
+9
-4
peer.go
whisper/peer.go
+10
-17
No files found.
eth/backend.go
View file @
12ca7053
...
@@ -107,11 +107,9 @@ func (cfg *Config) nodeKey() (*ecdsa.PrivateKey, error) {
...
@@ -107,11 +107,9 @@ func (cfg *Config) nodeKey() (*ecdsa.PrivateKey, error) {
type
Ethereum
struct
{
type
Ethereum
struct
{
// Channel for shutting down the ethereum
// Channel for shutting down the ethereum
shutdownChan
chan
bool
shutdownChan
chan
bool
quit
chan
bool
// DB interface
// DB interface
db
ethutil
.
Database
db
ethutil
.
Database
blacklist
p2p
.
Blacklist
//*** SERVICES ***
//*** SERVICES ***
// State manager for processing new blocks and managing the over all states
// State manager for processing new blocks and managing the over all states
...
@@ -169,10 +167,8 @@ func New(config *Config) (*Ethereum, error) {
...
@@ -169,10 +167,8 @@ func New(config *Config) (*Ethereum, error) {
eth
:=
&
Ethereum
{
eth
:=
&
Ethereum
{
shutdownChan
:
make
(
chan
bool
),
shutdownChan
:
make
(
chan
bool
),
quit
:
make
(
chan
bool
),
db
:
db
,
db
:
db
,
keyManager
:
keyManager
,
keyManager
:
keyManager
,
blacklist
:
p2p
.
NewBlacklist
(),
eventMux
:
&
event
.
TypeMux
{},
eventMux
:
&
event
.
TypeMux
{},
logger
:
ethlogger
,
logger
:
ethlogger
,
}
}
...
@@ -205,7 +201,6 @@ func New(config *Config) (*Ethereum, error) {
...
@@ -205,7 +201,6 @@ func New(config *Config) (*Ethereum, error) {
Name
:
config
.
Name
,
Name
:
config
.
Name
,
MaxPeers
:
config
.
MaxPeers
,
MaxPeers
:
config
.
MaxPeers
,
Protocols
:
protocols
,
Protocols
:
protocols
,
Blacklist
:
eth
.
blacklist
,
NAT
:
config
.
NAT
,
NAT
:
config
.
NAT
,
NoDial
:
!
config
.
Dial
,
NoDial
:
!
config
.
Dial
,
BootstrapNodes
:
config
.
parseBootNodes
(),
BootstrapNodes
:
config
.
parseBootNodes
(),
...
@@ -279,8 +274,6 @@ func (s *Ethereum) Stop() {
...
@@ -279,8 +274,6 @@ func (s *Ethereum) Stop() {
// Close the database
// Close the database
defer
s
.
db
.
Close
()
defer
s
.
db
.
Close
()
close
(
s
.
quit
)
s
.
txSub
.
Unsubscribe
()
// quits txBroadcastLoop
s
.
txSub
.
Unsubscribe
()
// quits txBroadcastLoop
s
.
blockSub
.
Unsubscribe
()
// quits blockBroadcastLoop
s
.
blockSub
.
Unsubscribe
()
// quits blockBroadcastLoop
...
...
eth/protocol.go
View file @
12ca7053
...
@@ -3,7 +3,6 @@ package eth
...
@@ -3,7 +3,6 @@ package eth
import
(
import
(
"bytes"
"bytes"
"fmt"
"fmt"
"io"
"math/big"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types"
...
@@ -188,33 +187,37 @@ func (self *ethProtocol) handle() error {
...
@@ -188,33 +187,37 @@ func (self *ethProtocol) handle() error {
case
BlockHashesMsg
:
case
BlockHashesMsg
:
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
)
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
)
var
err
error
if
_
,
err
:=
msgStream
.
List
();
err
!=
nil
{
var
i
int
return
err
}
var
i
int
iter
:=
func
()
(
hash
[]
byte
,
ok
bool
)
{
iter
:=
func
()
(
hash
[]
byte
,
ok
bool
)
{
hash
,
err
=
msgStream
.
Bytes
()
hash
,
err
:=
msgStream
.
Bytes
()
if
err
==
nil
{
if
err
==
rlp
.
EOL
{
i
++
return
nil
,
false
ok
=
true
}
else
if
err
!=
nil
{
}
else
{
if
err
!=
io
.
EOF
{
self
.
protoError
(
ErrDecode
,
"msg %v: after %v hashes : %v"
,
msg
,
i
,
err
)
self
.
protoError
(
ErrDecode
,
"msg %v: after %v hashes : %v"
,
msg
,
i
,
err
)
return
nil
,
false
}
}
i
++
return
hash
,
true
}
}
return
}
self
.
blockPool
.
AddBlockHashes
(
iter
,
self
.
id
)
self
.
blockPool
.
AddBlockHashes
(
iter
,
self
.
id
)
case
GetBlocksMsg
:
case
GetBlocksMsg
:
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
)
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
)
if
_
,
err
:=
msgStream
.
List
();
err
!=
nil
{
return
err
}
var
blocks
[]
interface
{}
var
blocks
[]
interface
{}
var
i
int
var
i
int
for
{
for
{
i
++
i
++
var
hash
[]
byte
var
hash
[]
byte
if
err
:=
msgStream
.
Decode
(
&
hash
);
err
!=
nil
{
if
err
:=
msgStream
.
Decode
(
&
hash
);
err
!=
nil
{
if
err
==
io
.
EOF
{
if
err
==
rlp
.
EOL
{
break
break
}
else
{
}
else
{
return
self
.
protoError
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
return
self
.
protoError
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
...
@@ -232,10 +235,13 @@ func (self *ethProtocol) handle() error {
...
@@ -232,10 +235,13 @@ func (self *ethProtocol) handle() error {
case
BlocksMsg
:
case
BlocksMsg
:
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
)
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
)
if
_
,
err
:=
msgStream
.
List
();
err
!=
nil
{
return
err
}
for
{
for
{
var
block
types
.
Block
var
block
types
.
Block
if
err
:=
msgStream
.
Decode
(
&
block
);
err
!=
nil
{
if
err
:=
msgStream
.
Decode
(
&
block
);
err
!=
nil
{
if
err
==
io
.
EOF
{
if
err
==
rlp
.
EOL
{
break
break
}
else
{
}
else
{
return
self
.
protoError
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
return
self
.
protoError
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
...
...
p2p/discover/node.go
View file @
12ca7053
...
@@ -7,6 +7,7 @@ import (
...
@@ -7,6 +7,7 @@ import (
"errors"
"errors"
"fmt"
"fmt"
"io"
"io"
"math/big"
"math/rand"
"math/rand"
"net"
"net"
"net/url"
"net/url"
...
@@ -14,6 +15,7 @@ import (
...
@@ -14,6 +15,7 @@ import (
"strings"
"strings"
"time"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rlp"
)
)
...
@@ -187,6 +189,19 @@ func PubkeyID(pub *ecdsa.PublicKey) NodeID {
...
@@ -187,6 +189,19 @@ func PubkeyID(pub *ecdsa.PublicKey) NodeID {
return
id
return
id
}
}
// Pubkey returns the public key represented by the node ID.
// It returns an error if the ID is not a point on the curve.
func
(
id
NodeID
)
Pubkey
()
(
*
ecdsa
.
PublicKey
,
error
)
{
p
:=
&
ecdsa
.
PublicKey
{
Curve
:
crypto
.
S256
(),
X
:
new
(
big
.
Int
),
Y
:
new
(
big
.
Int
)}
half
:=
len
(
id
)
/
2
p
.
X
.
SetBytes
(
id
[
:
half
])
p
.
Y
.
SetBytes
(
id
[
half
:
])
if
!
p
.
Curve
.
IsOnCurve
(
p
.
X
,
p
.
Y
)
{
return
nil
,
errors
.
New
(
"not a point on the S256 curve"
)
}
return
p
,
nil
}
// recoverNodeID computes the public key used to sign the
// recoverNodeID computes the public key used to sign the
// given hash from the signature.
// given hash from the signature.
func
recoverNodeID
(
hash
,
sig
[]
byte
)
(
id
NodeID
,
err
error
)
{
func
recoverNodeID
(
hash
,
sig
[]
byte
)
(
id
NodeID
,
err
error
)
{
...
...
p2p/discover/node_test.go
View file @
12ca7053
...
@@ -133,6 +133,24 @@ func TestNodeID_recover(t *testing.T) {
...
@@ -133,6 +133,24 @@ func TestNodeID_recover(t *testing.T) {
if
pub
!=
recpub
{
if
pub
!=
recpub
{
t
.
Errorf
(
"recovered wrong pubkey:
\n
got: %v
\n
want: %v"
,
recpub
,
pub
)
t
.
Errorf
(
"recovered wrong pubkey:
\n
got: %v
\n
want: %v"
,
recpub
,
pub
)
}
}
ecdsa
,
err
:=
pub
.
Pubkey
()
if
err
!=
nil
{
t
.
Errorf
(
"Pubkey error: %v"
,
err
)
}
if
!
reflect
.
DeepEqual
(
ecdsa
,
&
prv
.
PublicKey
)
{
t
.
Errorf
(
"Pubkey mismatch:
\n
got: %#v
\n
want: %#v"
,
ecdsa
,
&
prv
.
PublicKey
)
}
}
func
TestNodeID_pubkeyBad
(
t
*
testing
.
T
)
{
ecdsa
,
err
:=
NodeID
{}
.
Pubkey
()
if
err
==
nil
{
t
.
Error
(
"expected error for zero ID"
)
}
if
ecdsa
!=
nil
{
t
.
Error
(
"expected nil result"
)
}
}
}
func
TestNodeID_distcmp
(
t
*
testing
.
T
)
{
func
TestNodeID_distcmp
(
t
*
testing
.
T
)
{
...
...
p2p/handshake.go
View file @
12ca7053
This diff is collapsed.
Click to expand it.
p2p/handshake_test.go
View file @
12ca7053
...
@@ -2,53 +2,18 @@ package p2p
...
@@ -2,53 +2,18 @@ package p2p
import
(
import
(
"bytes"
"bytes"
"crypto/ecdsa"
"crypto/rand"
"crypto/rand"
"fmt"
"net"
"net"
"reflect"
"reflect"
"testing"
"testing"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discover"
)
)
func
TestPublicKeyEncoding
(
t
*
testing
.
T
)
{
prv0
,
_
:=
crypto
.
GenerateKey
()
// = ecdsa.GenerateKey(crypto.S256(), rand.Reader)
pub0
:=
&
prv0
.
PublicKey
pub0s
:=
crypto
.
FromECDSAPub
(
pub0
)
pub1
,
err
:=
importPublicKey
(
pub0s
)
if
err
!=
nil
{
t
.
Errorf
(
"%v"
,
err
)
}
eciesPub1
:=
ecies
.
ImportECDSAPublic
(
pub1
)
if
eciesPub1
==
nil
{
t
.
Errorf
(
"invalid ecdsa public key"
)
}
pub1s
,
err
:=
exportPublicKey
(
pub1
)
if
err
!=
nil
{
t
.
Errorf
(
"%v"
,
err
)
}
if
len
(
pub1s
)
!=
64
{
t
.
Errorf
(
"wrong length expect 64, got"
,
len
(
pub1s
))
}
pub2
,
err
:=
importPublicKey
(
pub1s
)
if
err
!=
nil
{
t
.
Errorf
(
"%v"
,
err
)
}
pub2s
,
err
:=
exportPublicKey
(
pub2
)
if
err
!=
nil
{
t
.
Errorf
(
"%v"
,
err
)
}
if
!
bytes
.
Equal
(
pub1s
,
pub2s
)
{
t
.
Errorf
(
"exports dont match"
)
}
pub2sEC
:=
crypto
.
FromECDSAPub
(
pub2
)
if
!
bytes
.
Equal
(
pub0s
,
pub2sEC
)
{
t
.
Errorf
(
"exports dont match"
)
}
}
func
TestSharedSecret
(
t
*
testing
.
T
)
{
func
TestSharedSecret
(
t
*
testing
.
T
)
{
prv0
,
_
:=
crypto
.
GenerateKey
()
// = ecdsa.GenerateKey(crypto.S256(), rand.Reader)
prv0
,
_
:=
crypto
.
GenerateKey
()
// = ecdsa.GenerateKey(crypto.S256(), rand.Reader)
pub0
:=
&
prv0
.
PublicKey
pub0
:=
&
prv0
.
PublicKey
...
@@ -69,103 +34,85 @@ func TestSharedSecret(t *testing.T) {
...
@@ -69,103 +34,85 @@ func TestSharedSecret(t *testing.T) {
}
}
}
}
func
TestCryptoHandshake
(
t
*
testing
.
T
)
{
func
TestEncHandshake
(
t
*
testing
.
T
)
{
testCryptoHandshake
(
newkey
(),
newkey
(),
nil
,
t
)
for
i
:=
0
;
i
<
20
;
i
++
{
}
start
:=
time
.
Now
()
if
err
:=
testEncHandshake
(
nil
);
err
!=
nil
{
func
TestCryptoHandshakeWithToken
(
t
*
testing
.
T
)
{
t
.
Fatalf
(
"i=%d %v"
,
i
,
err
)
sessionToken
:=
make
([]
byte
,
shaLen
)
rand
.
Read
(
sessionToken
)
testCryptoHandshake
(
newkey
(),
newkey
(),
sessionToken
,
t
)
}
func
testCryptoHandshake
(
prv0
,
prv1
*
ecdsa
.
PrivateKey
,
sessionToken
[]
byte
,
t
*
testing
.
T
)
{
var
err
error
// pub0 := &prv0.PublicKey
pub1
:=
&
prv1
.
PublicKey
// pub0s := crypto.FromECDSAPub(pub0)
pub1s
:=
crypto
.
FromECDSAPub
(
pub1
)
// simulate handshake by feeding output to input
// initiator sends handshake 'auth'
auth
,
initNonce
,
randomPrivKey
,
err
:=
authMsg
(
prv0
,
pub1s
,
sessionToken
)
if
err
!=
nil
{
t
.
Errorf
(
"%v"
,
err
)
}
}
// t.Logf("-> %v", hexkey(auth))
t
.
Logf
(
"(without token) %d %v
\n
"
,
i
+
1
,
time
.
Since
(
start
))
// receiver reads auth and responds with response
response
,
remoteRecNonce
,
remoteInitNonce
,
_
,
remoteRandomPrivKey
,
remoteInitRandomPubKey
,
err
:=
authResp
(
auth
,
sessionToken
,
prv1
)
if
err
!=
nil
{
t
.
Errorf
(
"%v"
,
err
)
}
}
// t.Logf("<- %v\n", hexkey(response))
// initiator reads receiver's response and the key exchange completes
for
i
:=
0
;
i
<
20
;
i
++
{
recNonce
,
remoteRandomPubKey
,
_
,
err
:=
completeHandshake
(
response
,
prv0
)
tok
:=
make
([]
byte
,
shaLen
)
if
err
!=
nil
{
rand
.
Reader
.
Read
(
tok
)
t
.
Errorf
(
"completeHandshake error: %v"
,
err
)
start
:=
time
.
Now
()
if
err
:=
testEncHandshake
(
tok
);
err
!=
nil
{
t
.
Fatalf
(
"i=%d %v"
,
i
,
err
)
}
}
t
.
Logf
(
"(with token) %d %v
\n
"
,
i
+
1
,
time
.
Since
(
start
))
// now both parties should have the same session parameters
initSessionToken
,
err
:=
newSession
(
initNonce
,
recNonce
,
randomPrivKey
,
remoteRandomPubKey
)
if
err
!=
nil
{
t
.
Errorf
(
"newSession error: %v"
,
err
)
}
}
}
recSessionToken
,
err
:=
newSession
(
remoteInitNonce
,
remoteRecNonce
,
remoteRandomPrivKey
,
remoteInitRandomPubKey
)
func
testEncHandshake
(
token
[]
byte
)
error
{
if
err
!=
nil
{
type
result
struct
{
t
.
Errorf
(
"newSession error: %v"
,
err
)
side
string
s
secrets
err
error
}
}
var
(
prv0
,
_
=
crypto
.
GenerateKey
()
prv1
,
_
=
crypto
.
GenerateKey
()
rw0
,
rw1
=
net
.
Pipe
()
output
=
make
(
chan
result
)
)
// fmt.Printf("\nauth (%v) %x\n\nresp (%v) %x\n\n", len(auth), auth, len(response), response)
go
func
()
{
r
:=
result
{
side
:
"initiator"
}
// fmt.Printf("\nauth %x\ninitNonce %x\nresponse%x\nremoteRecNonce %x\nremoteInitNonce %x\nremoteRandomPubKey %x\nrecNonce %x\nremoteInitRandomPubKey %x\ninitSessionToken %x\n\n", auth, initNonce, response, remoteRecNonce, remoteInitNonce, remoteRandomPubKey, recNonce, remoteInitRandomPubKey, initSessionToken
)
defer
func
()
{
output
<-
r
}(
)
if
!
bytes
.
Equal
(
initNonce
,
remoteInitNonce
)
{
pub1s
:=
discover
.
PubkeyID
(
&
prv1
.
PublicKey
)
t
.
Errorf
(
"nonces do not match"
)
r
.
s
,
r
.
err
=
initiatorEncHandshake
(
rw0
,
prv0
,
pub1s
,
token
)
}
if
r
.
err
!=
nil
{
if
!
bytes
.
Equal
(
recNonce
,
remoteRecNonce
)
{
return
t
.
Errorf
(
"receiver nonces do not match"
)
}
}
if
!
bytes
.
Equal
(
initSessionToken
,
recSessionToken
)
{
id1
:=
discover
.
PubkeyID
(
&
prv1
.
PublicKey
)
t
.
Errorf
(
"session tokens do not match"
)
if
r
.
s
.
RemoteID
!=
id1
{
r
.
err
=
fmt
.
Errorf
(
"remote ID mismatch: got %v, want: %v"
,
r
.
s
.
RemoteID
,
id1
)
}
}
}
func
TestEncHandshake
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
prv0
,
_
:=
crypto
.
GenerateKey
()
prv1
,
_
:=
crypto
.
GenerateKey
()
pub0s
,
_
:=
exportPublicKey
(
&
prv0
.
PublicKey
)
pub1s
,
_
:=
exportPublicKey
(
&
prv1
.
PublicKey
)
rw0
,
rw1
:=
net
.
Pipe
()
tokens
:=
make
(
chan
[]
byte
)
go
func
()
{
token
,
err
:=
outboundEncHandshake
(
rw0
,
prv0
,
pub1s
,
nil
)
if
err
!=
nil
{
t
.
Errorf
(
"outbound side error: %v"
,
err
)
}
tokens
<-
token
}()
}()
go
func
()
{
go
func
()
{
token
,
remotePubkey
,
err
:=
inboundEncHandshake
(
rw1
,
prv1
,
nil
)
r
:=
result
{
side
:
"receiver"
}
if
err
!=
nil
{
defer
func
()
{
output
<-
r
}()
t
.
Errorf
(
"inbound side error: %v"
,
err
)
r
.
s
,
r
.
err
=
receiverEncHandshake
(
rw1
,
prv1
,
token
)
if
r
.
err
!=
nil
{
return
}
}
if
!
bytes
.
Equal
(
remotePubkey
,
pub0s
)
{
id0
:=
discover
.
PubkeyID
(
&
prv0
.
PublicKey
)
t
.
Errorf
(
"inbound side returned wrong remote pubkey
\n
got: %x
\n
want: %x"
,
remotePubkey
,
pub0s
)
if
r
.
s
.
RemoteID
!=
id0
{
r
.
err
=
fmt
.
Errorf
(
"remote ID mismatch: got %v, want: %v"
,
r
.
s
.
RemoteID
,
id0
)
}
}
tokens
<-
token
}()
}()
t1
,
t2
:=
<-
tokens
,
<-
tokens
// wait for results from both sides
if
!
bytes
.
Equal
(
t1
,
t2
)
{
r1
,
r2
:=
<-
output
,
<-
output
t
.
Error
(
"session token mismatch"
)
if
r1
.
err
!=
nil
{
return
fmt
.
Errorf
(
"%s side error: %v"
,
r1
.
side
,
r1
.
err
)
}
if
r2
.
err
!=
nil
{
return
fmt
.
Errorf
(
"%s side error: %v"
,
r2
.
side
,
r2
.
err
)
}
// don't compare remote node IDs
r1
.
s
.
RemoteID
,
r2
.
s
.
RemoteID
=
discover
.
NodeID
{},
discover
.
NodeID
{}
// flip MACs on one of them so they compare equal
r1
.
s
.
EgressMAC
,
r1
.
s
.
IngressMAC
=
r1
.
s
.
IngressMAC
,
r1
.
s
.
EgressMAC
if
!
reflect
.
DeepEqual
(
r1
.
s
,
r2
.
s
)
{
return
fmt
.
Errorf
(
"secrets mismatch:
\n
t1: %#v
\n
t2: %#v"
,
r1
.
s
,
r2
.
s
)
}
}
return
nil
}
}
func
TestSetupConn
(
t
*
testing
.
T
)
{
func
TestSetupConn
(
t
*
testing
.
T
)
{
...
...
p2p/message.go
View file @
12ca7053
package
p2p
package
p2p
import
(
import
(
"bufio"
"bytes"
"bytes"
"encoding/binary"
"errors"
"errors"
"fmt"
"fmt"
"io"
"io"
"io/ioutil"
"io/ioutil"
"math/big"
"net"
"net"
"sync"
"sync"
"sync/atomic"
"sync/atomic"
...
@@ -18,28 +15,6 @@ import (
...
@@ -18,28 +15,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rlp"
)
)
// parameters for frameRW
const
(
// maximum time allowed for reading a message header.
// this is effectively the amount of time a connection can be idle.
frameReadTimeout
=
1
*
time
.
Minute
// maximum time allowed for reading the payload data of a message.
// this is shorter than (and distinct from) frameReadTimeout because
// the connection is not considered idle while a message is transferred.
// this also limits the payload size of messages to how much the connection
// can transfer within the timeout.
payloadReadTimeout
=
5
*
time
.
Second
// maximum amount of time allowed for writing a complete message.
msgWriteTimeout
=
5
*
time
.
Second
// messages smaller than this many bytes will be read at
// once before passing them to a protocol. this increases
// concurrency in the processing.
wholePayloadSize
=
64
*
1024
)
// Msg defines the structure of a p2p message.
// Msg defines the structure of a p2p message.
//
//
// Note that a Msg can only be sent once since the Payload reader is
// Note that a Msg can only be sent once since the Payload reader is
...
@@ -55,19 +30,8 @@ type Msg struct {
...
@@ -55,19 +30,8 @@ type Msg struct {
// NewMsg creates an RLP-encoded message with the given code.
// NewMsg creates an RLP-encoded message with the given code.
func
NewMsg
(
code
uint64
,
params
...
interface
{})
Msg
{
func
NewMsg
(
code
uint64
,
params
...
interface
{})
Msg
{
buf
:=
new
(
bytes
.
Buffer
)
p
:=
bytes
.
NewReader
(
ethutil
.
Encode
(
params
))
for
_
,
p
:=
range
params
{
return
Msg
{
Code
:
code
,
Size
:
uint32
(
p
.
Len
()),
Payload
:
p
}
buf
.
Write
(
ethutil
.
Encode
(
p
))
}
return
Msg
{
Code
:
code
,
Size
:
uint32
(
buf
.
Len
()),
Payload
:
buf
}
}
func
encodePayload
(
params
...
interface
{})
[]
byte
{
buf
:=
new
(
bytes
.
Buffer
)
for
_
,
p
:=
range
params
{
buf
.
Write
(
ethutil
.
Encode
(
p
))
}
return
buf
.
Bytes
()
}
}
// Decode parse the RLP content of a message into
// Decode parse the RLP content of a message into
...
@@ -75,8 +39,7 @@ func encodePayload(params ...interface{}) []byte {
...
@@ -75,8 +39,7 @@ func encodePayload(params ...interface{}) []byte {
//
//
// For the decoding rules, please see package rlp.
// For the decoding rules, please see package rlp.
func
(
msg
Msg
)
Decode
(
val
interface
{})
error
{
func
(
msg
Msg
)
Decode
(
val
interface
{})
error
{
s
:=
rlp
.
NewListStream
(
msg
.
Payload
,
uint64
(
msg
.
Size
))
if
err
:=
rlp
.
Decode
(
msg
.
Payload
,
val
);
err
!=
nil
{
if
err
:=
s
.
Decode
(
val
);
err
!=
nil
{
return
newPeerError
(
errInvalidMsg
,
"(code %#x) (size %d) %v"
,
msg
.
Code
,
msg
.
Size
,
err
)
return
newPeerError
(
errInvalidMsg
,
"(code %#x) (size %d) %v"
,
msg
.
Code
,
msg
.
Size
,
err
)
}
}
return
nil
return
nil
...
@@ -119,138 +82,28 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
...
@@ -119,138 +82,28 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
return
w
.
WriteMsg
(
NewMsg
(
code
,
data
...
))
return
w
.
WriteMsg
(
NewMsg
(
code
,
data
...
))
}
}
// frameRW is a MsgReadWriter that reads and writes devp2p message frames.
// netWrapper wrapsa MsgReadWriter with locks around
// As required by the interface, ReadMsg and WriteMsg can be called from
// ReadMsg/WriteMsg and applies read/write deadlines.
// multiple goroutines.
type
netWrapper
struct
{
type
frameRW
struct
{
rmu
,
wmu
sync
.
Mutex
net
.
Conn
// make Conn methods available. be careful.
bufconn
*
bufio
.
ReadWriter
// this channel is used to 'lend' bufconn to a caller of ReadMsg
// until the message payload has been consumed. the channel
// receives a value when EOF is reached on the payload, unblocking
// a pending call to ReadMsg.
rsync
chan
struct
{}
// this mutex guards writes to bufconn.
writeMu
sync
.
Mutex
}
func
newFrameRW
(
conn
net
.
Conn
,
timeout
time
.
Duration
)
*
frameRW
{
rsync
:=
make
(
chan
struct
{},
1
)
rsync
<-
struct
{}{}
return
&
frameRW
{
Conn
:
conn
,
bufconn
:
bufio
.
NewReadWriter
(
bufio
.
NewReader
(
conn
),
bufio
.
NewWriter
(
conn
)),
rsync
:
rsync
,
}
}
var
magicToken
=
[]
byte
{
34
,
64
,
8
,
145
}
func
(
rw
*
frameRW
)
WriteMsg
(
msg
Msg
)
error
{
rtimeout
,
wtimeout
time
.
Duration
rw
.
writeMu
.
Lock
()
conn
net
.
Conn
defer
rw
.
writeMu
.
Unlock
()
wrapped
MsgReadWriter
rw
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
msgWriteTimeout
))
if
err
:=
writeMsg
(
rw
.
bufconn
,
msg
);
err
!=
nil
{
return
err
}
return
rw
.
bufconn
.
Flush
()
}
}
func
writeMsg
(
w
io
.
Writer
,
msg
Msg
)
error
{
func
(
rw
*
netWrapper
)
ReadMsg
()
(
Msg
,
error
)
{
// TODO: handle case when Size + len(code) + len(listhdr) overflows uint32
rw
.
rmu
.
Lock
()
code
:=
ethutil
.
Encode
(
uint32
(
msg
.
Code
))
defer
rw
.
rmu
.
Unlock
()
listhdr
:=
makeListHeader
(
msg
.
Size
+
uint32
(
len
(
code
)))
rw
.
conn
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
rw
.
rtimeout
))
payloadLen
:=
uint32
(
len
(
listhdr
))
+
uint32
(
len
(
code
))
+
msg
.
Size
return
rw
.
wrapped
.
ReadMsg
()
start
:=
make
([]
byte
,
8
)
copy
(
start
,
magicToken
)
binary
.
BigEndian
.
PutUint32
(
start
[
4
:
],
payloadLen
)
for
_
,
b
:=
range
[][]
byte
{
start
,
listhdr
,
code
}
{
if
_
,
err
:=
w
.
Write
(
b
);
err
!=
nil
{
return
err
}
}
_
,
err
:=
io
.
CopyN
(
w
,
msg
.
Payload
,
int64
(
msg
.
Size
))
return
err
}
}
func
makeListHeader
(
length
uint32
)
[]
byte
{
func
(
rw
*
netWrapper
)
WriteMsg
(
msg
Msg
)
error
{
if
length
<
56
{
rw
.
wmu
.
Lock
()
return
[]
byte
{
byte
(
length
+
0xc0
)}
defer
rw
.
wmu
.
Unlock
()
}
rw
.
conn
.
SetWriteDeadline
(
time
.
Now
()
.
Add
(
rw
.
wtimeout
))
enc
:=
big
.
NewInt
(
int64
(
length
))
.
Bytes
()
return
rw
.
wrapped
.
WriteMsg
(
msg
)
lenb
:=
byte
(
len
(
enc
))
+
0xf7
return
append
([]
byte
{
lenb
},
enc
...
)
}
func
(
rw
*
frameRW
)
ReadMsg
()
(
msg
Msg
,
err
error
)
{
<-
rw
.
rsync
// wait until bufconn is ours
rw
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
frameReadTimeout
))
// read magic and payload size
start
:=
make
([]
byte
,
8
)
if
_
,
err
=
io
.
ReadFull
(
rw
.
bufconn
,
start
);
err
!=
nil
{
return
msg
,
err
}
if
!
bytes
.
HasPrefix
(
start
,
magicToken
)
{
return
msg
,
fmt
.
Errorf
(
"bad magic token %x"
,
start
[
:
4
])
}
size
:=
binary
.
BigEndian
.
Uint32
(
start
[
4
:
])
// decode start of RLP message to get the message code
posr
:=
&
postrack
{
rw
.
bufconn
,
0
}
s
:=
rlp
.
NewStream
(
posr
)
if
_
,
err
:=
s
.
List
();
err
!=
nil
{
return
msg
,
err
}
msg
.
Code
,
err
=
s
.
Uint
()
if
err
!=
nil
{
return
msg
,
err
}
msg
.
Size
=
size
-
posr
.
p
rw
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
payloadReadTimeout
))
if
msg
.
Size
<=
wholePayloadSize
{
// msg is small, read all of it and move on to the next message.
pbuf
:=
make
([]
byte
,
msg
.
Size
)
if
_
,
err
:=
io
.
ReadFull
(
rw
.
bufconn
,
pbuf
);
err
!=
nil
{
return
msg
,
err
}
rw
.
rsync
<-
struct
{}{}
// bufconn is available again
msg
.
Payload
=
bytes
.
NewReader
(
pbuf
)
}
else
{
// lend bufconn to the caller until it has
// consumed the payload. eofSignal will send a value
// on rw.rsync when EOF is reached.
pr
:=
&
eofSignal
{
rw
.
bufconn
,
msg
.
Size
,
rw
.
rsync
}
msg
.
Payload
=
pr
}
return
msg
,
nil
}
// postrack wraps an rlp.ByteReader with a position counter.
type
postrack
struct
{
r
rlp
.
ByteReader
p
uint32
}
func
(
r
*
postrack
)
Read
(
buf
[]
byte
)
(
int
,
error
)
{
n
,
err
:=
r
.
r
.
Read
(
buf
)
r
.
p
+=
uint32
(
n
)
return
n
,
err
}
func
(
r
*
postrack
)
ReadByte
()
(
byte
,
error
)
{
b
,
err
:=
r
.
r
.
ReadByte
()
if
err
==
nil
{
r
.
p
++
}
return
b
,
err
}
}
// eofSignal wraps a reader with eof signaling. the eof channel is
// eofSignal wraps a reader with eof signaling. the eof channel is
...
...
p2p/message_test.go
View file @
12ca7053
...
@@ -2,10 +2,12 @@ package p2p
...
@@ -2,10 +2,12 @@ package p2p
import
(
import
(
"bytes"
"bytes"
"encoding/hex"
"fmt"
"fmt"
"io"
"io"
"io/ioutil"
"io/ioutil"
"runtime"
"runtime"
"strings"
"testing"
"testing"
"time"
"time"
)
)
...
@@ -15,62 +17,16 @@ func TestNewMsg(t *testing.T) {
...
@@ -15,62 +17,16 @@ func TestNewMsg(t *testing.T) {
if
msg
.
Code
!=
3
{
if
msg
.
Code
!=
3
{
t
.
Errorf
(
"incorrect code %d, want %d"
,
msg
.
Code
)
t
.
Errorf
(
"incorrect code %d, want %d"
,
msg
.
Code
)
}
}
if
msg
.
Size
!=
5
{
expect
:=
unhex
(
"c50183303030"
)
t
.
Errorf
(
"incorrect size %d, want %d"
,
msg
.
Size
,
5
)
if
msg
.
Size
!=
uint32
(
len
(
expect
))
{
t
.
Errorf
(
"incorrect size %d, want %d"
,
msg
.
Size
,
len
(
expect
))
}
}
pl
,
_
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
pl
,
_
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
expect
:=
[]
byte
{
0x01
,
0x83
,
0x30
,
0x30
,
0x30
}
if
!
bytes
.
Equal
(
pl
,
expect
)
{
if
!
bytes
.
Equal
(
pl
,
expect
)
{
t
.
Errorf
(
"incorrect payload content, got %x, want %x"
,
pl
,
expect
)
t
.
Errorf
(
"incorrect payload content, got %x, want %x"
,
pl
,
expect
)
}
}
}
}
// func TestEncodeDecodeMsg(t *testing.T) {
// msg := NewMsg(3, 1, "000")
// buf := new(bytes.Buffer)
// if err := writeMsg(buf, msg); err != nil {
// t.Fatalf("encodeMsg error: %v", err)
// }
// // t.Logf("encoded: %x", buf.Bytes())
// decmsg, err := readMsg(buf)
// if err != nil {
// t.Fatalf("readMsg error: %v", err)
// }
// if decmsg.Code != 3 {
// t.Errorf("incorrect code %d, want %d", decmsg.Code, 3)
// }
// if decmsg.Size != 5 {
// t.Errorf("incorrect size %d, want %d", decmsg.Size, 5)
// }
// var data struct {
// I uint
// S string
// }
// if err := decmsg.Decode(&data); err != nil {
// t.Fatalf("Decode error: %v", err)
// }
// if data.I != 1 {
// t.Errorf("incorrect data.I: got %v, expected %d", data.I, 1)
// }
// if data.S != "000" {
// t.Errorf("incorrect data.S: got %q, expected %q", data.S, "000")
// }
// }
// func TestDecodeRealMsg(t *testing.T) {
// data := ethutil.Hex2Bytes("2240089100000080f87e8002b5457468657265756d282b2b292f5065657220536572766572204f6e652f76302e372e382f52656c656173652f4c696e75782f672b2bc082765fb84086dd80b7aefd6a6d2e3b93f4f300a86bfb6ef7bdc97cb03f793db6bb")
// msg, err := readMsg(bytes.NewReader(data))
// if err != nil {
// t.Fatalf("unexpected error: %v", err)
// }
// if msg.Code != 0 {
// t.Errorf("incorrect code %d, want %d", msg.Code, 0)
// }
// }
func
ExampleMsgPipe
()
{
func
ExampleMsgPipe
()
{
rw1
,
rw2
:=
MsgPipe
()
rw1
,
rw2
:=
MsgPipe
()
go
func
()
{
go
func
()
{
...
@@ -185,3 +141,11 @@ func TestEOFSignal(t *testing.T) {
...
@@ -185,3 +141,11 @@ func TestEOFSignal(t *testing.T) {
default
:
default
:
}
}
}
}
func
unhex
(
str
string
)
[]
byte
{
b
,
err
:=
hex
.
DecodeString
(
strings
.
Replace
(
str
,
"
\n
"
,
""
,
-
1
))
if
err
!=
nil
{
panic
(
fmt
.
Sprintf
(
"invalid hex string: %q"
,
str
))
}
return
b
}
p2p/peer.go
View file @
12ca7053
...
@@ -20,8 +20,8 @@ const (
...
@@ -20,8 +20,8 @@ const (
baseProtocolLength
=
uint64
(
16
)
baseProtocolLength
=
uint64
(
16
)
baseProtocolMaxMsgSize
=
10
*
1024
*
1024
baseProtocolMaxMsgSize
=
10
*
1024
*
1024
disconnectGracePeriod
=
2
*
time
.
Second
pingInterval
=
15
*
time
.
Second
pingInterval
=
15
*
time
.
Second
disconnectGracePeriod
=
2
*
time
.
Second
)
)
const
(
const
(
...
@@ -40,6 +40,7 @@ type Peer struct {
...
@@ -40,6 +40,7 @@ type Peer struct {
// Use them to display messages related to the peer.
// Use them to display messages related to the peer.
*
logger
.
Logger
*
logger
.
Logger
conn
net
.
Conn
rw
*
conn
rw
*
conn
running
map
[
string
]
*
protoRW
running
map
[
string
]
*
protoRW
...
@@ -52,8 +53,9 @@ type Peer struct {
...
@@ -52,8 +53,9 @@ type Peer struct {
// NewPeer returns a peer for testing purposes.
// NewPeer returns a peer for testing purposes.
func
NewPeer
(
id
discover
.
NodeID
,
name
string
,
caps
[]
Cap
)
*
Peer
{
func
NewPeer
(
id
discover
.
NodeID
,
name
string
,
caps
[]
Cap
)
*
Peer
{
pipe
,
_
:=
net
.
Pipe
()
pipe
,
_
:=
net
.
Pipe
()
conn
:=
newConn
(
pipe
,
&
protoHandshake
{
ID
:
id
,
Name
:
name
,
Caps
:
caps
})
msgpipe
,
_
:=
MsgPipe
()
peer
:=
newPeer
(
conn
,
nil
)
conn
:=
&
conn
{
msgpipe
,
&
protoHandshake
{
ID
:
id
,
Name
:
name
,
Caps
:
caps
}}
peer
:=
newPeer
(
pipe
,
conn
,
nil
)
close
(
peer
.
closed
)
// ensures Disconnect doesn't block
close
(
peer
.
closed
)
// ensures Disconnect doesn't block
return
peer
return
peer
}
}
...
@@ -76,12 +78,12 @@ func (p *Peer) Caps() []Cap {
...
@@ -76,12 +78,12 @@ func (p *Peer) Caps() []Cap {
// RemoteAddr returns the remote address of the network connection.
// RemoteAddr returns the remote address of the network connection.
func
(
p
*
Peer
)
RemoteAddr
()
net
.
Addr
{
func
(
p
*
Peer
)
RemoteAddr
()
net
.
Addr
{
return
p
.
rw
.
RemoteAddr
()
return
p
.
conn
.
RemoteAddr
()
}
}
// LocalAddr returns the local address of the network connection.
// LocalAddr returns the local address of the network connection.
func
(
p
*
Peer
)
LocalAddr
()
net
.
Addr
{
func
(
p
*
Peer
)
LocalAddr
()
net
.
Addr
{
return
p
.
rw
.
LocalAddr
()
return
p
.
conn
.
LocalAddr
()
}
}
// Disconnect terminates the peer connection with the given reason.
// Disconnect terminates the peer connection with the given reason.
...
@@ -98,10 +100,11 @@ func (p *Peer) String() string {
...
@@ -98,10 +100,11 @@ func (p *Peer) String() string {
return
fmt
.
Sprintf
(
"Peer %.8x %v"
,
p
.
rw
.
ID
[
:
],
p
.
RemoteAddr
())
return
fmt
.
Sprintf
(
"Peer %.8x %v"
,
p
.
rw
.
ID
[
:
],
p
.
RemoteAddr
())
}
}
func
newPeer
(
conn
*
conn
,
protocols
[]
Protocol
)
*
Peer
{
func
newPeer
(
fd
net
.
Conn
,
conn
*
conn
,
protocols
[]
Protocol
)
*
Peer
{
logtag
:=
fmt
.
Sprintf
(
"Peer %.8x %v"
,
conn
.
ID
[
:
],
conn
.
RemoteAddr
())
logtag
:=
fmt
.
Sprintf
(
"Peer %.8x %v"
,
conn
.
ID
[
:
],
fd
.
RemoteAddr
())
p
:=
&
Peer
{
p
:=
&
Peer
{
Logger
:
logger
.
NewLogger
(
logtag
),
Logger
:
logger
.
NewLogger
(
logtag
),
conn
:
fd
,
rw
:
conn
,
rw
:
conn
,
running
:
matchProtocols
(
protocols
,
conn
.
Caps
,
conn
),
running
:
matchProtocols
(
protocols
,
conn
.
Caps
,
conn
),
disc
:
make
(
chan
DiscReason
),
disc
:
make
(
chan
DiscReason
),
...
@@ -138,7 +141,7 @@ loop:
...
@@ -138,7 +141,7 @@ loop:
// We rely on protocols to abort if there is a write error. It
// We rely on protocols to abort if there is a write error. It
// might be more robust to handle them here as well.
// might be more robust to handle them here as well.
p
.
DebugDetailf
(
"Read error: %v
\n
"
,
err
)
p
.
DebugDetailf
(
"Read error: %v
\n
"
,
err
)
p
.
rw
.
Close
()
p
.
conn
.
Close
()
return
DiscNetworkError
return
DiscNetworkError
case
err
:=
<-
p
.
protoErr
:
case
err
:=
<-
p
.
protoErr
:
reason
=
discReasonForError
(
err
)
reason
=
discReasonForError
(
err
)
...
@@ -161,18 +164,19 @@ func (p *Peer) politeDisconnect(reason DiscReason) {
...
@@ -161,18 +164,19 @@ func (p *Peer) politeDisconnect(reason DiscReason) {
EncodeMsg
(
p
.
rw
,
discMsg
,
uint
(
reason
))
EncodeMsg
(
p
.
rw
,
discMsg
,
uint
(
reason
))
// Wait for the other side to close the connection.
// Wait for the other side to close the connection.
// Discard any data that they send until then.
// Discard any data that they send until then.
io
.
Copy
(
ioutil
.
Discard
,
p
.
rw
)
io
.
Copy
(
ioutil
.
Discard
,
p
.
conn
)
close
(
done
)
close
(
done
)
}()
}()
select
{
select
{
case
<-
done
:
case
<-
done
:
case
<-
time
.
After
(
disconnectGracePeriod
)
:
case
<-
time
.
After
(
disconnectGracePeriod
)
:
}
}
p
.
rw
.
Close
()
p
.
conn
.
Close
()
}
}
func
(
p
*
Peer
)
readLoop
()
error
{
func
(
p
*
Peer
)
readLoop
()
error
{
for
{
for
{
p
.
conn
.
SetDeadline
(
time
.
Now
()
.
Add
(
frameReadTimeout
))
msg
,
err
:=
p
.
rw
.
ReadMsg
()
msg
,
err
:=
p
.
rw
.
ReadMsg
()
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
...
@@ -190,12 +194,12 @@ func (p *Peer) handle(msg Msg) error {
...
@@ -190,12 +194,12 @@ func (p *Peer) handle(msg Msg) error {
msg
.
Discard
()
msg
.
Discard
()
go
EncodeMsg
(
p
.
rw
,
pongMsg
)
go
EncodeMsg
(
p
.
rw
,
pongMsg
)
case
msg
.
Code
==
discMsg
:
case
msg
.
Code
==
discMsg
:
var
reason
DiscReason
var
reason
[
1
]
DiscReason
// no need to discard or for error checking, we'll close the
// no need to discard or for error checking, we'll close the
// connection after this.
// connection after this.
rlp
.
Decode
(
msg
.
Payload
,
&
reason
)
rlp
.
Decode
(
msg
.
Payload
,
&
reason
)
p
.
Disconnect
(
DiscRequested
)
p
.
Disconnect
(
DiscRequested
)
return
discRequestedError
(
reason
)
return
discRequestedError
(
reason
[
0
]
)
case
msg
.
Code
<
baseProtocolLength
:
case
msg
.
Code
<
baseProtocolLength
:
// ignore other base protocol messages
// ignore other base protocol messages
return
msg
.
Discard
()
return
msg
.
Discard
()
...
...
p2p/peer_test.go
View file @
12ca7053
...
@@ -3,6 +3,7 @@ package p2p
...
@@ -3,6 +3,7 @@ package p2p
import
(
import
(
"bytes"
"bytes"
"fmt"
"fmt"
"io"
"io/ioutil"
"io/ioutil"
"net"
"net"
"reflect"
"reflect"
...
@@ -29,8 +30,8 @@ var discard = Protocol{
...
@@ -29,8 +30,8 @@ var discard = Protocol{
},
},
}
}
func
testPeer
(
protos
[]
Protocol
)
(
*
conn
,
*
Peer
,
<-
chan
DiscReason
)
{
func
testPeer
(
protos
[]
Protocol
)
(
io
.
Closer
,
*
conn
,
*
Peer
,
<-
chan
DiscReason
)
{
fd1
,
fd2
:=
net
.
Pipe
()
fd1
,
_
:=
net
.
Pipe
()
hs1
:=
&
protoHandshake
{
ID
:
randomID
(),
Version
:
baseProtocolVersion
}
hs1
:=
&
protoHandshake
{
ID
:
randomID
(),
Version
:
baseProtocolVersion
}
hs2
:=
&
protoHandshake
{
ID
:
randomID
(),
Version
:
baseProtocolVersion
}
hs2
:=
&
protoHandshake
{
ID
:
randomID
(),
Version
:
baseProtocolVersion
}
for
_
,
p
:=
range
protos
{
for
_
,
p
:=
range
protos
{
...
@@ -38,11 +39,12 @@ func testPeer(protos []Protocol) (*conn, *Peer, <-chan DiscReason) {
...
@@ -38,11 +39,12 @@ func testPeer(protos []Protocol) (*conn, *Peer, <-chan DiscReason) {
hs2
.
Caps
=
append
(
hs2
.
Caps
,
p
.
cap
())
hs2
.
Caps
=
append
(
hs2
.
Caps
,
p
.
cap
())
}
}
peer
:=
newPeer
(
newConn
(
fd1
,
hs1
),
protos
)
p1
,
p2
:=
MsgPipe
()
peer
:=
newPeer
(
fd1
,
&
conn
{
p1
,
hs1
},
protos
)
errc
:=
make
(
chan
DiscReason
,
1
)
errc
:=
make
(
chan
DiscReason
,
1
)
go
func
()
{
errc
<-
peer
.
run
()
}()
go
func
()
{
errc
<-
peer
.
run
()
}()
return
newConn
(
fd2
,
hs2
)
,
peer
,
errc
return
p1
,
&
conn
{
p2
,
hs2
}
,
peer
,
errc
}
}
func
TestPeerProtoReadMsg
(
t
*
testing
.
T
)
{
func
TestPeerProtoReadMsg
(
t
*
testing
.
T
)
{
...
@@ -67,8 +69,8 @@ func TestPeerProtoReadMsg(t *testing.T) {
...
@@ -67,8 +69,8 @@ func TestPeerProtoReadMsg(t *testing.T) {
},
},
}
}
rw
,
_
,
errc
:=
testPeer
([]
Protocol
{
proto
})
closer
,
rw
,
_
,
errc
:=
testPeer
([]
Protocol
{
proto
})
defer
rw
.
Close
()
defer
closer
.
Close
()
EncodeMsg
(
rw
,
baseProtocolLength
+
2
,
1
)
EncodeMsg
(
rw
,
baseProtocolLength
+
2
,
1
)
EncodeMsg
(
rw
,
baseProtocolLength
+
3
,
2
)
EncodeMsg
(
rw
,
baseProtocolLength
+
3
,
2
)
...
@@ -83,41 +85,6 @@ func TestPeerProtoReadMsg(t *testing.T) {
...
@@ -83,41 +85,6 @@ func TestPeerProtoReadMsg(t *testing.T) {
}
}
}
}
func
TestPeerProtoReadLargeMsg
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
msgsize
:=
uint32
(
10
*
1024
*
1024
)
done
:=
make
(
chan
struct
{})
proto
:=
Protocol
{
Name
:
"a"
,
Length
:
5
,
Run
:
func
(
peer
*
Peer
,
rw
MsgReadWriter
)
error
{
msg
,
err
:=
rw
.
ReadMsg
()
if
err
!=
nil
{
t
.
Errorf
(
"read error: %v"
,
err
)
}
if
msg
.
Size
!=
msgsize
+
4
{
t
.
Errorf
(
"incorrect msg.Size, got %d, expected %d"
,
msg
.
Size
,
msgsize
)
}
msg
.
Discard
()
close
(
done
)
return
nil
},
}
rw
,
_
,
errc
:=
testPeer
([]
Protocol
{
proto
})
defer
rw
.
Close
()
EncodeMsg
(
rw
,
18
,
make
([]
byte
,
msgsize
))
select
{
case
<-
done
:
case
err
:=
<-
errc
:
t
.
Errorf
(
"peer returned: %v"
,
err
)
case
<-
time
.
After
(
2
*
time
.
Second
)
:
t
.
Errorf
(
"receive timeout"
)
}
}
func
TestPeerProtoEncodeMsg
(
t
*
testing
.
T
)
{
func
TestPeerProtoEncodeMsg
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
defer
testlog
(
t
)
.
detach
()
...
@@ -134,8 +101,8 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
...
@@ -134,8 +101,8 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
return
nil
return
nil
},
},
}
}
rw
,
_
,
_
:=
testPeer
([]
Protocol
{
proto
})
closer
,
rw
,
_
,
_
:=
testPeer
([]
Protocol
{
proto
})
defer
rw
.
Close
()
defer
closer
.
Close
()
if
err
:=
expectMsg
(
rw
,
17
,
[]
string
{
"foo"
,
"bar"
});
err
!=
nil
{
if
err
:=
expectMsg
(
rw
,
17
,
[]
string
{
"foo"
,
"bar"
});
err
!=
nil
{
t
.
Error
(
err
)
t
.
Error
(
err
)
...
@@ -145,8 +112,8 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
...
@@ -145,8 +112,8 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
func
TestPeerWriteForBroadcast
(
t
*
testing
.
T
)
{
func
TestPeerWriteForBroadcast
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
defer
testlog
(
t
)
.
detach
()
rw
,
peer
,
peerErr
:=
testPeer
([]
Protocol
{
discard
})
closer
,
rw
,
peer
,
peerErr
:=
testPeer
([]
Protocol
{
discard
})
defer
rw
.
Close
()
defer
closer
.
Close
()
// test write errors
// test write errors
if
err
:=
peer
.
writeProtoMsg
(
"b"
,
NewMsg
(
3
));
err
==
nil
{
if
err
:=
peer
.
writeProtoMsg
(
"b"
,
NewMsg
(
3
));
err
==
nil
{
...
@@ -181,8 +148,8 @@ func TestPeerWriteForBroadcast(t *testing.T) {
...
@@ -181,8 +148,8 @@ func TestPeerWriteForBroadcast(t *testing.T) {
func
TestPeerPing
(
t
*
testing
.
T
)
{
func
TestPeerPing
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
defer
testlog
(
t
)
.
detach
()
rw
,
_
,
_
:=
testPeer
(
nil
)
closer
,
rw
,
_
,
_
:=
testPeer
(
nil
)
defer
rw
.
Close
()
defer
closer
.
Close
()
if
err
:=
EncodeMsg
(
rw
,
pingMsg
);
err
!=
nil
{
if
err
:=
EncodeMsg
(
rw
,
pingMsg
);
err
!=
nil
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
...
@@ -194,15 +161,15 @@ func TestPeerPing(t *testing.T) {
...
@@ -194,15 +161,15 @@ func TestPeerPing(t *testing.T) {
func
TestPeerDisconnect
(
t
*
testing
.
T
)
{
func
TestPeerDisconnect
(
t
*
testing
.
T
)
{
defer
testlog
(
t
)
.
detach
()
defer
testlog
(
t
)
.
detach
()
rw
,
_
,
disc
:=
testPeer
(
nil
)
closer
,
rw
,
_
,
disc
:=
testPeer
(
nil
)
defer
rw
.
Close
()
defer
closer
.
Close
()
if
err
:=
EncodeMsg
(
rw
,
discMsg
,
DiscQuitting
);
err
!=
nil
{
if
err
:=
EncodeMsg
(
rw
,
discMsg
,
DiscQuitting
);
err
!=
nil
{
t
.
Fatal
(
err
)
t
.
Fatal
(
err
)
}
}
if
err
:=
expectMsg
(
rw
,
discMsg
,
[]
interface
{}{
DiscRequested
});
err
!=
nil
{
if
err
:=
expectMsg
(
rw
,
discMsg
,
[]
interface
{}{
DiscRequested
});
err
!=
nil
{
t
.
Error
(
err
)
t
.
Error
(
err
)
}
}
rw
.
Close
()
// make test end faster
closer
.
Close
()
// make test end faster
if
reason
:=
<-
disc
;
reason
!=
DiscRequested
{
if
reason
:=
<-
disc
;
reason
!=
DiscRequested
{
t
.
Errorf
(
"run returned wrong reason: got %v, want %v"
,
reason
,
DiscRequested
)
t
.
Errorf
(
"run returned wrong reason: got %v, want %v"
,
reason
,
DiscRequested
)
}
}
...
@@ -244,13 +211,9 @@ func expectMsg(r MsgReader, code uint64, content interface{}) error {
...
@@ -244,13 +211,9 @@ func expectMsg(r MsgReader, code uint64, content interface{}) error {
if
err
!=
nil
{
if
err
!=
nil
{
panic
(
"content encode error: "
+
err
.
Error
())
panic
(
"content encode error: "
+
err
.
Error
())
}
}
// skip over list header in encoded value. this is temporary.
if
int
(
msg
.
Size
)
!=
len
(
contentEnc
)
{
contentEncR
:=
bytes
.
NewReader
(
contentEnc
)
return
fmt
.
Errorf
(
"message size mismatch: got %d, want %d"
,
msg
.
Size
,
len
(
contentEnc
))
if
k
,
_
,
err
:=
rlp
.
NewStream
(
contentEncR
)
.
Kind
();
k
!=
rlp
.
List
||
err
!=
nil
{
panic
(
"content must encode as RLP list"
)
}
}
contentEnc
=
contentEnc
[
len
(
contentEnc
)
-
contentEncR
.
Len
()
:
]
actualContent
,
err
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
actualContent
,
err
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
...
...
p2p/rlpx.go
0 → 100644
View file @
12ca7053
package
p2p
import
(
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/hmac"
"errors"
"hash"
"io"
"github.com/ethereum/go-ethereum/rlp"
)
var
(
// this is used in place of actual frame header data.
// TODO: replace this when Msg contains the protocol type code.
zeroHeader
=
[]
byte
{
0xC2
,
0x80
,
0x80
}
// sixteen zero bytes
zero16
=
make
([]
byte
,
16
)
maxUint24
=
^
uint32
(
0
)
>>
8
)
// rlpxFrameRW implements a simplified version of RLPx framing.
// chunked messages are not supported and all headers are equal to
// zeroHeader.
//
// rlpxFrameRW is not safe for concurrent use from multiple goroutines.
type
rlpxFrameRW
struct
{
conn
io
.
ReadWriter
enc
cipher
.
Stream
dec
cipher
.
Stream
macCipher
cipher
.
Block
egressMAC
hash
.
Hash
ingressMAC
hash
.
Hash
}
func
newRlpxFrameRW
(
conn
io
.
ReadWriter
,
s
secrets
)
*
rlpxFrameRW
{
macc
,
err
:=
aes
.
NewCipher
(
s
.
MAC
)
if
err
!=
nil
{
panic
(
"invalid MAC secret: "
+
err
.
Error
())
}
encc
,
err
:=
aes
.
NewCipher
(
s
.
AES
)
if
err
!=
nil
{
panic
(
"invalid AES secret: "
+
err
.
Error
())
}
// we use an all-zeroes IV for AES because the key used
// for encryption is ephemeral.
iv
:=
make
([]
byte
,
encc
.
BlockSize
())
return
&
rlpxFrameRW
{
conn
:
conn
,
enc
:
cipher
.
NewCTR
(
encc
,
iv
),
dec
:
cipher
.
NewCTR
(
encc
,
iv
),
macCipher
:
macc
,
egressMAC
:
s
.
EgressMAC
,
ingressMAC
:
s
.
IngressMAC
,
}
}
func
(
rw
*
rlpxFrameRW
)
WriteMsg
(
msg
Msg
)
error
{
ptype
,
_
:=
rlp
.
EncodeToBytes
(
msg
.
Code
)
// write header
headbuf
:=
make
([]
byte
,
32
)
fsize
:=
uint32
(
len
(
ptype
))
+
msg
.
Size
if
fsize
>
maxUint24
{
return
errors
.
New
(
"message size overflows uint24"
)
}
putInt24
(
fsize
,
headbuf
)
// TODO: check overflow
copy
(
headbuf
[
3
:
],
zeroHeader
)
rw
.
enc
.
XORKeyStream
(
headbuf
[
:
16
],
headbuf
[
:
16
])
// first half is now encrypted
// write header MAC
copy
(
headbuf
[
16
:
],
updateMAC
(
rw
.
egressMAC
,
rw
.
macCipher
,
headbuf
[
:
16
]))
if
_
,
err
:=
rw
.
conn
.
Write
(
headbuf
);
err
!=
nil
{
return
err
}
// write encrypted frame, updating the egress MAC hash with
// the data written to conn.
tee
:=
cipher
.
StreamWriter
{
S
:
rw
.
enc
,
W
:
io
.
MultiWriter
(
rw
.
conn
,
rw
.
egressMAC
)}
if
_
,
err
:=
tee
.
Write
(
ptype
);
err
!=
nil
{
return
err
}
if
_
,
err
:=
io
.
Copy
(
tee
,
msg
.
Payload
);
err
!=
nil
{
return
err
}
if
padding
:=
fsize
%
16
;
padding
>
0
{
if
_
,
err
:=
tee
.
Write
(
zero16
[
:
16
-
padding
]);
err
!=
nil
{
return
err
}
}
// write frame MAC. egress MAC hash is up to date because
// frame content was written to it as well.
fmacseed
:=
rw
.
egressMAC
.
Sum
(
nil
)
mac
:=
updateMAC
(
rw
.
egressMAC
,
rw
.
macCipher
,
fmacseed
)
_
,
err
:=
rw
.
conn
.
Write
(
mac
)
return
err
}
func
(
rw
*
rlpxFrameRW
)
ReadMsg
()
(
msg
Msg
,
err
error
)
{
// read the header
headbuf
:=
make
([]
byte
,
32
)
if
_
,
err
:=
io
.
ReadFull
(
rw
.
conn
,
headbuf
);
err
!=
nil
{
return
msg
,
err
}
// verify header mac
shouldMAC
:=
updateMAC
(
rw
.
ingressMAC
,
rw
.
macCipher
,
headbuf
[
:
16
])
if
!
hmac
.
Equal
(
shouldMAC
,
headbuf
[
16
:
])
{
return
msg
,
errors
.
New
(
"bad header MAC"
)
}
rw
.
dec
.
XORKeyStream
(
headbuf
[
:
16
],
headbuf
[
:
16
])
// first half is now decrypted
fsize
:=
readInt24
(
headbuf
)
// ignore protocol type for now
// read the frame content
var
rsize
=
fsize
// frame size rounded up to 16 byte boundary
if
padding
:=
fsize
%
16
;
padding
>
0
{
rsize
+=
16
-
padding
}
framebuf
:=
make
([]
byte
,
rsize
)
if
_
,
err
:=
io
.
ReadFull
(
rw
.
conn
,
framebuf
);
err
!=
nil
{
return
msg
,
err
}
// read and validate frame MAC. we can re-use headbuf for that.
rw
.
ingressMAC
.
Write
(
framebuf
)
fmacseed
:=
rw
.
ingressMAC
.
Sum
(
nil
)
if
_
,
err
:=
io
.
ReadFull
(
rw
.
conn
,
headbuf
[
:
16
]);
err
!=
nil
{
return
msg
,
err
}
shouldMAC
=
updateMAC
(
rw
.
ingressMAC
,
rw
.
macCipher
,
fmacseed
)
if
!
hmac
.
Equal
(
shouldMAC
,
headbuf
[
:
16
])
{
return
msg
,
errors
.
New
(
"bad frame MAC"
)
}
// decrypt frame content
rw
.
dec
.
XORKeyStream
(
framebuf
,
framebuf
)
// decode message code
content
:=
bytes
.
NewReader
(
framebuf
[
:
fsize
])
if
err
:=
rlp
.
Decode
(
content
,
&
msg
.
Code
);
err
!=
nil
{
return
msg
,
err
}
msg
.
Size
=
uint32
(
content
.
Len
())
msg
.
Payload
=
content
return
msg
,
nil
}
// updateMAC reseeds the given hash with encrypted seed.
// it returns the first 16 bytes of the hash sum after seeding.
func
updateMAC
(
mac
hash
.
Hash
,
block
cipher
.
Block
,
seed
[]
byte
)
[]
byte
{
aesbuf
:=
make
([]
byte
,
aes
.
BlockSize
)
block
.
Encrypt
(
aesbuf
,
mac
.
Sum
(
nil
))
for
i
:=
range
aesbuf
{
aesbuf
[
i
]
^=
seed
[
i
]
}
mac
.
Write
(
aesbuf
)
return
mac
.
Sum
(
nil
)[
:
16
]
}
func
readInt24
(
b
[]
byte
)
uint32
{
return
uint32
(
b
[
2
])
|
uint32
(
b
[
1
])
<<
8
|
uint32
(
b
[
0
])
<<
16
}
func
putInt24
(
v
uint32
,
b
[]
byte
)
{
b
[
0
]
=
byte
(
v
>>
16
)
b
[
1
]
=
byte
(
v
>>
8
)
b
[
2
]
=
byte
(
v
)
}
p2p/rlpx_test.go
0 → 100644
View file @
12ca7053
package
p2p
import
(
"bytes"
"crypto/rand"
"io/ioutil"
"strings"
"testing"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/rlp"
)
func
TestRlpxFrameFake
(
t
*
testing
.
T
)
{
buf
:=
new
(
bytes
.
Buffer
)
hash
:=
fakeHash
([]
byte
{
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
,
1
})
rw
:=
newRlpxFrameRW
(
buf
,
secrets
{
AES
:
crypto
.
Sha3
(),
MAC
:
crypto
.
Sha3
(),
IngressMAC
:
hash
,
EgressMAC
:
hash
,
})
golden
:=
unhex
(
`
00828ddae471818bb0bfa6b551d1cb42
01010101010101010101010101010101
ba628a4ba590cb43f7848f41c4382885
01010101010101010101010101010101
`
)
// Check WriteMsg. This puts a message into the buffer.
if
err
:=
EncodeMsg
(
rw
,
8
,
1
,
2
,
3
,
4
);
err
!=
nil
{
t
.
Fatalf
(
"WriteMsg error: %v"
,
err
)
}
written
:=
buf
.
Bytes
()
if
!
bytes
.
Equal
(
written
,
golden
)
{
t
.
Fatalf
(
"output mismatch:
\n
got: %x
\n
want: %x"
,
written
,
golden
)
}
// Check ReadMsg. It reads the message encoded by WriteMsg, which
// is equivalent to the golden message above.
msg
,
err
:=
rw
.
ReadMsg
()
if
err
!=
nil
{
t
.
Fatalf
(
"ReadMsg error: %v"
,
err
)
}
if
msg
.
Size
!=
5
{
t
.
Errorf
(
"msg size mismatch: got %d, want %d"
,
msg
.
Size
,
5
)
}
if
msg
.
Code
!=
8
{
t
.
Errorf
(
"msg code mismatch: got %d, want %d"
,
msg
.
Code
,
8
)
}
payload
,
_
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
wantPayload
:=
unhex
(
"C401020304"
)
if
!
bytes
.
Equal
(
payload
,
wantPayload
)
{
t
.
Errorf
(
"msg payload mismatch:
\n
got %x
\n
want %x"
,
payload
,
wantPayload
)
}
}
type
fakeHash
[]
byte
func
(
fakeHash
)
Write
(
p
[]
byte
)
(
int
,
error
)
{
return
len
(
p
),
nil
}
func
(
fakeHash
)
Reset
()
{}
func
(
fakeHash
)
BlockSize
()
int
{
return
0
}
func
(
h
fakeHash
)
Size
()
int
{
return
len
(
h
)
}
func
(
h
fakeHash
)
Sum
(
b
[]
byte
)
[]
byte
{
return
append
(
b
,
h
...
)
}
func
TestRlpxFrameRW
(
t
*
testing
.
T
)
{
var
(
aesSecret
=
make
([]
byte
,
16
)
macSecret
=
make
([]
byte
,
16
)
egressMACinit
=
make
([]
byte
,
32
)
ingressMACinit
=
make
([]
byte
,
32
)
)
for
_
,
s
:=
range
[][]
byte
{
aesSecret
,
macSecret
,
egressMACinit
,
ingressMACinit
}
{
rand
.
Read
(
s
)
}
conn
:=
new
(
bytes
.
Buffer
)
s1
:=
secrets
{
AES
:
aesSecret
,
MAC
:
macSecret
,
EgressMAC
:
sha3
.
NewKeccak256
(),
IngressMAC
:
sha3
.
NewKeccak256
(),
}
s1
.
EgressMAC
.
Write
(
egressMACinit
)
s1
.
IngressMAC
.
Write
(
ingressMACinit
)
rw1
:=
newRlpxFrameRW
(
conn
,
s1
)
s2
:=
secrets
{
AES
:
aesSecret
,
MAC
:
macSecret
,
EgressMAC
:
sha3
.
NewKeccak256
(),
IngressMAC
:
sha3
.
NewKeccak256
(),
}
s2
.
EgressMAC
.
Write
(
ingressMACinit
)
s2
.
IngressMAC
.
Write
(
egressMACinit
)
rw2
:=
newRlpxFrameRW
(
conn
,
s2
)
// send some messages
for
i
:=
0
;
i
<
10
;
i
++
{
// write message into conn buffer
wmsg
:=
[]
interface
{}{
"foo"
,
"bar"
,
strings
.
Repeat
(
"test"
,
i
)}
err
:=
EncodeMsg
(
rw1
,
uint64
(
i
),
wmsg
...
)
if
err
!=
nil
{
t
.
Fatalf
(
"WriteMsg error (i=%d): %v"
,
i
,
err
)
}
// read message that rw1 just wrote
msg
,
err
:=
rw2
.
ReadMsg
()
if
err
!=
nil
{
t
.
Fatalf
(
"ReadMsg error (i=%d): %v"
,
i
,
err
)
}
if
msg
.
Code
!=
uint64
(
i
)
{
t
.
Fatalf
(
"msg code mismatch: got %d, want %d"
,
msg
.
Code
,
i
)
}
payload
,
_
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
wantPayload
,
_
:=
rlp
.
EncodeToBytes
(
wmsg
)
if
!
bytes
.
Equal
(
payload
,
wantPayload
)
{
t
.
Fatalf
(
"msg payload mismatch:
\n
got %x
\n
want %x"
,
payload
,
wantPayload
)
}
}
}
p2p/server.go
View file @
12ca7053
...
@@ -10,15 +10,24 @@ import (
...
@@ -10,15 +10,24 @@ import (
"sync"
"sync"
"time"
"time"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/nat"
)
)
const
(
const
(
handshakeTimeout
=
5
*
time
.
Second
defaultDialTimeout
=
10
*
time
.
Second
defaultDialTimeout
=
10
*
time
.
Second
refreshPeersInterval
=
30
*
time
.
Second
refreshPeersInterval
=
30
*
time
.
Second
// total timeout for encryption handshake and protocol
// handshake in both directions.
handshakeTimeout
=
5
*
time
.
Second
// maximum time allowed for reading a complete message.
// this is effectively the amount of time a connection can be idle.
frameReadTimeout
=
1
*
time
.
Minute
// maximum amount of time allowed for writing a complete message.
frameWriteTimeout
=
5
*
time
.
Second
)
)
var
srvlog
=
logger
.
NewLogger
(
"P2P Server"
)
var
srvlog
=
logger
.
NewLogger
(
"P2P Server"
)
...
@@ -57,10 +66,6 @@ type Server struct {
...
@@ -57,10 +66,6 @@ type Server struct {
// each peer.
// each peer.
Protocols
[]
Protocol
Protocols
[]
Protocol
// If Blacklist is set to a non-nil value, the given Blacklist
// is used to verify peer connections.
Blacklist
Blacklist
// If ListenAddr is set to a non-nil address, the server
// If ListenAddr is set to a non-nil address, the server
// will listen for incoming connections.
// will listen for incoming connections.
//
//
...
@@ -135,7 +140,7 @@ func (srv *Server) SuggestPeer(n *discover.Node) {
...
@@ -135,7 +140,7 @@ func (srv *Server) SuggestPeer(n *discover.Node) {
func
(
srv
*
Server
)
Broadcast
(
protocol
string
,
code
uint64
,
data
...
interface
{})
{
func
(
srv
*
Server
)
Broadcast
(
protocol
string
,
code
uint64
,
data
...
interface
{})
{
var
payload
[]
byte
var
payload
[]
byte
if
data
!=
nil
{
if
data
!=
nil
{
payload
=
e
ncodePayload
(
data
...
)
payload
=
e
thutil
.
Encode
(
data
)
}
}
srv
.
lock
.
RLock
()
srv
.
lock
.
RLock
()
defer
srv
.
lock
.
RUnlock
()
defer
srv
.
lock
.
RUnlock
()
...
@@ -174,9 +179,6 @@ func (srv *Server) Start() (err error) {
...
@@ -174,9 +179,6 @@ func (srv *Server) Start() (err error) {
if
srv
.
setupFunc
==
nil
{
if
srv
.
setupFunc
==
nil
{
srv
.
setupFunc
=
setupConn
srv
.
setupFunc
=
setupConn
}
}
if
srv
.
Blacklist
==
nil
{
srv
.
Blacklist
=
NewBlacklist
()
}
// node table
// node table
ntab
,
err
:=
discover
.
ListenUDP
(
srv
.
PrivateKey
,
srv
.
ListenAddr
,
srv
.
NAT
)
ntab
,
err
:=
discover
.
ListenUDP
(
srv
.
PrivateKey
,
srv
.
ListenAddr
,
srv
.
NAT
)
...
@@ -365,7 +367,12 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
...
@@ -365,7 +367,12 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
srvlog
.
Debugf
(
"Handshake with %v failed: %v"
,
fd
.
RemoteAddr
(),
err
)
srvlog
.
Debugf
(
"Handshake with %v failed: %v"
,
fd
.
RemoteAddr
(),
err
)
return
return
}
}
p
:=
newPeer
(
conn
,
srv
.
Protocols
)
conn
.
MsgReadWriter
=
&
netWrapper
{
wrapped
:
conn
.
MsgReadWriter
,
conn
:
fd
,
rtimeout
:
frameReadTimeout
,
wtimeout
:
frameWriteTimeout
,
}
p
:=
newPeer
(
fd
,
conn
,
srv
.
Protocols
)
if
ok
,
reason
:=
srv
.
addPeer
(
conn
.
ID
,
p
);
!
ok
{
if
ok
,
reason
:=
srv
.
addPeer
(
conn
.
ID
,
p
);
!
ok
{
srvlog
.
DebugDetailf
(
"Not adding %v (%v)
\n
"
,
p
,
reason
)
srvlog
.
DebugDetailf
(
"Not adding %v (%v)
\n
"
,
p
,
reason
)
p
.
politeDisconnect
(
reason
)
p
.
politeDisconnect
(
reason
)
...
@@ -375,7 +382,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
...
@@ -375,7 +382,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
srvlog
.
Debugf
(
"Added %v
\n
"
,
p
)
srvlog
.
Debugf
(
"Added %v
\n
"
,
p
)
srvjslog
.
LogJson
(
&
logger
.
P2PConnected
{
srvjslog
.
LogJson
(
&
logger
.
P2PConnected
{
RemoteId
:
fmt
.
Sprintf
(
"%x"
,
conn
.
ID
[
:
]),
RemoteId
:
fmt
.
Sprintf
(
"%x"
,
conn
.
ID
[
:
]),
RemoteAddress
:
conn
.
RemoteAddr
()
.
String
(),
RemoteAddress
:
fd
.
RemoteAddr
()
.
String
(),
RemoteVersionString
:
conn
.
Name
,
RemoteVersionString
:
conn
.
Name
,
NumConnections
:
srv
.
PeerCount
(),
NumConnections
:
srv
.
PeerCount
(),
})
})
...
@@ -403,8 +410,6 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
...
@@ -403,8 +410,6 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
return
false
,
DiscTooManyPeers
return
false
,
DiscTooManyPeers
case
srv
.
peers
[
id
]
!=
nil
:
case
srv
.
peers
[
id
]
!=
nil
:
return
false
,
DiscAlreadyConnected
return
false
,
DiscAlreadyConnected
case
srv
.
Blacklist
.
Exists
(
id
[
:
])
:
return
false
,
DiscUselessPeer
case
id
==
srv
.
ntab
.
Self
()
:
case
id
==
srv
.
ntab
.
Self
()
:
return
false
,
DiscSelf
return
false
,
DiscSelf
}
}
...
@@ -418,53 +423,3 @@ func (srv *Server) removePeer(p *Peer) {
...
@@ -418,53 +423,3 @@ func (srv *Server) removePeer(p *Peer) {
srv
.
lock
.
Unlock
()
srv
.
lock
.
Unlock
()
srv
.
peerWG
.
Done
()
srv
.
peerWG
.
Done
()
}
}
type
Blacklist
interface
{
Get
([]
byte
)
(
bool
,
error
)
Put
([]
byte
)
error
Delete
([]
byte
)
error
Exists
(
pubkey
[]
byte
)
(
ok
bool
)
}
type
BlacklistMap
struct
{
blacklist
map
[
string
]
bool
lock
sync
.
RWMutex
}
func
NewBlacklist
()
*
BlacklistMap
{
return
&
BlacklistMap
{
blacklist
:
make
(
map
[
string
]
bool
),
}
}
func
(
self
*
BlacklistMap
)
Get
(
pubkey
[]
byte
)
(
bool
,
error
)
{
self
.
lock
.
RLock
()
defer
self
.
lock
.
RUnlock
()
v
,
ok
:=
self
.
blacklist
[
string
(
pubkey
)]
var
err
error
if
!
ok
{
err
=
fmt
.
Errorf
(
"not found"
)
}
return
v
,
err
}
func
(
self
*
BlacklistMap
)
Exists
(
pubkey
[]
byte
)
(
ok
bool
)
{
self
.
lock
.
RLock
()
defer
self
.
lock
.
RUnlock
()
_
,
ok
=
self
.
blacklist
[
string
(
pubkey
)]
return
}
func
(
self
*
BlacklistMap
)
Put
(
pubkey
[]
byte
)
error
{
self
.
lock
.
Lock
()
defer
self
.
lock
.
Unlock
()
self
.
blacklist
[
string
(
pubkey
)]
=
true
return
nil
}
func
(
self
*
BlacklistMap
)
Delete
(
pubkey
[]
byte
)
error
{
self
.
lock
.
Lock
()
defer
self
.
lock
.
Unlock
()
delete
(
self
.
blacklist
,
string
(
pubkey
))
return
nil
}
p2p/server_test.go
View file @
12ca7053
...
@@ -11,6 +11,7 @@ import (
...
@@ -11,6 +11,7 @@ import (
"time"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discover"
)
)
...
@@ -23,8 +24,14 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
...
@@ -23,8 +24,14 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
newPeerHook
:
pf
,
newPeerHook
:
pf
,
setupFunc
:
func
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
)
(
*
conn
,
error
)
{
setupFunc
:
func
(
fd
net
.
Conn
,
prv
*
ecdsa
.
PrivateKey
,
our
*
protoHandshake
,
dial
*
discover
.
Node
)
(
*
conn
,
error
)
{
id
:=
randomID
()
id
:=
randomID
()
rw
:=
newRlpxFrameRW
(
fd
,
secrets
{
MAC
:
zero16
,
AES
:
zero16
,
IngressMAC
:
sha3
.
NewKeccak256
(),
EgressMAC
:
sha3
.
NewKeccak256
(),
})
return
&
conn
{
return
&
conn
{
frameRW
:
newFrameRW
(
fd
,
msgWriteTimeout
)
,
MsgReadWriter
:
rw
,
protoHandshake
:
&
protoHandshake
{
ID
:
id
,
Version
:
baseProtocolVersion
},
protoHandshake
:
&
protoHandshake
{
ID
:
id
,
Version
:
baseProtocolVersion
},
},
nil
},
nil
},
},
...
@@ -143,9 +150,7 @@ func TestServerBroadcast(t *testing.T) {
...
@@ -143,9 +150,7 @@ func TestServerBroadcast(t *testing.T) {
// broadcast one message
// broadcast one message
srv
.
Broadcast
(
"discard"
,
0
,
"foo"
)
srv
.
Broadcast
(
"discard"
,
0
,
"foo"
)
goldbuf
:=
new
(
bytes
.
Buffer
)
golden
:=
unhex
(
"66e94d166f0a2c3b884cfa59ca34"
)
writeMsg
(
goldbuf
,
NewMsg
(
16
,
"foo"
))
golden
:=
goldbuf
.
Bytes
()
// check that the message has been written everywhere
// check that the message has been written everywhere
for
i
,
conn
:=
range
conns
{
for
i
,
conn
:=
range
conns
{
...
...
whisper/peer.go
View file @
12ca7053
...
@@ -2,10 +2,10 @@ package whisper
...
@@ -2,10 +2,10 @@ package whisper
import
(
import
(
"fmt"
"fmt"
"io/ioutil"
"time"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
"gopkg.in/fatih/set.v0"
)
)
...
@@ -77,8 +77,7 @@ func (self *peer) broadcast(envelopes []*Envelope) error {
...
@@ -77,8 +77,7 @@ func (self *peer) broadcast(envelopes []*Envelope) error {
}
}
if
i
>
0
{
if
i
>
0
{
msg
:=
p2p
.
NewMsg
(
envelopesMsg
,
envs
[
:
i
]
...
)
if
err
:=
p2p
.
EncodeMsg
(
self
.
ws
,
envelopesMsg
,
envs
[
:
i
]
...
);
err
!=
nil
{
if
err
:=
self
.
ws
.
WriteMsg
(
msg
);
err
!=
nil
{
return
err
return
err
}
}
self
.
peer
.
DebugDetailln
(
"broadcasted"
,
i
,
"message(s)"
)
self
.
peer
.
DebugDetailln
(
"broadcasted"
,
i
,
"message(s)"
)
...
@@ -93,34 +92,28 @@ func (self *peer) addKnown(envelope *Envelope) {
...
@@ -93,34 +92,28 @@ func (self *peer) addKnown(envelope *Envelope) {
func
(
self
*
peer
)
handleStatus
()
error
{
func
(
self
*
peer
)
handleStatus
()
error
{
ws
:=
self
.
ws
ws
:=
self
.
ws
if
err
:=
ws
.
WriteMsg
(
self
.
statusMsg
());
err
!=
nil
{
if
err
:=
ws
.
WriteMsg
(
self
.
statusMsg
());
err
!=
nil
{
return
err
return
err
}
}
msg
,
err
:=
ws
.
ReadMsg
()
msg
,
err
:=
ws
.
ReadMsg
()
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
if
msg
.
Code
!=
statusMsg
{
if
msg
.
Code
!=
statusMsg
{
return
fmt
.
Errorf
(
"peer send %x before status msg"
,
msg
.
Code
)
return
fmt
.
Errorf
(
"peer send %x before status msg"
,
msg
.
Code
)
}
}
s
:=
rlp
.
NewStream
(
msg
.
Payload
)
data
,
err
:=
ioutil
.
ReadAll
(
msg
.
Payload
)
if
_
,
err
:=
s
.
List
();
err
!=
nil
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"bad status message: %v"
,
err
)
return
err
}
}
pv
,
err
:=
s
.
Uint
()
if
len
(
data
)
==
0
{
if
err
!=
nil
{
return
fmt
.
Errorf
(
"
malformed status. data len = 0"
)
return
fmt
.
Errorf
(
"
bad status message: %v"
,
err
)
}
}
if
pv
!=
protocolVersion
{
if
pv
:=
data
[
0
];
pv
!=
protocolVersion
{
return
fmt
.
Errorf
(
"protocol version mismatch %d != %d"
,
pv
,
protocolVersion
)
return
fmt
.
Errorf
(
"protocol version mismatch %d != %d"
,
pv
,
protocolVersion
)
}
}
return
msg
.
Discard
()
// ignore anything after protocol version
return
nil
}
}
func
(
self
*
peer
)
statusMsg
()
p2p
.
Msg
{
func
(
self
*
peer
)
statusMsg
()
p2p
.
Msg
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment