Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
c3ba4ace
Commit
c3ba4ace
authored
Dec 15, 2014
by
obscuren
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'poc8' into develop
parents
f8061fcb
15e46b97
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
277 additions
and
8 deletions
+277
-8
message.go
p2p/message.go
+77
-0
message_test.go
p2p/message_test.go
+63
-0
peer.go
p2p/peer.go
+12
-5
peer_error.go
p2p/peer_error.go
+9
-0
peer_test.go
p2p/peer_test.go
+56
-0
protocol.go
p2p/protocol.go
+2
-3
protocol_test.go
p2p/protocol_test.go
+58
-0
No files found.
p2p/message.go
View file @
c3ba4ace
...
...
@@ -3,9 +3,11 @@ package p2p
import
(
"bytes"
"encoding/binary"
"errors"
"io"
"io/ioutil"
"math/big"
"sync/atomic"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/rlp"
...
...
@@ -153,3 +155,78 @@ func (r *postrack) ReadByte() (byte, error) {
}
return
b
,
err
}
// MsgPipe creates a message pipe. Reads on one end are matched
// with writes on the other. The pipe is full-duplex, both ends
// implement MsgReadWriter.
func
MsgPipe
()
(
*
MsgPipeRW
,
*
MsgPipeRW
)
{
var
(
c1
,
c2
=
make
(
chan
Msg
),
make
(
chan
Msg
)
closing
=
make
(
chan
struct
{})
closed
=
new
(
int32
)
rw1
=
&
MsgPipeRW
{
c1
,
c2
,
closing
,
closed
}
rw2
=
&
MsgPipeRW
{
c2
,
c1
,
closing
,
closed
}
)
return
rw1
,
rw2
}
// ErrPipeClosed is returned from pipe operations after the
// pipe has been closed.
var
ErrPipeClosed
=
errors
.
New
(
"p2p: read or write on closed message pipe"
)
// MsgPipeRW is an endpoint of a MsgReadWriter pipe.
type
MsgPipeRW
struct
{
w
chan
<-
Msg
r
<-
chan
Msg
closing
chan
struct
{}
closed
*
int32
}
// WriteMsg sends a messsage on the pipe.
// It blocks until the receiver has consumed the message payload.
func
(
p
*
MsgPipeRW
)
WriteMsg
(
msg
Msg
)
error
{
if
atomic
.
LoadInt32
(
p
.
closed
)
==
0
{
consumed
:=
make
(
chan
struct
{},
1
)
msg
.
Payload
=
&
eofSignal
{
msg
.
Payload
,
int64
(
msg
.
Size
),
consumed
}
select
{
case
p
.
w
<-
msg
:
if
msg
.
Size
>
0
{
// wait for payload read or discard
<-
consumed
}
return
nil
case
<-
p
.
closing
:
}
}
return
ErrPipeClosed
}
// EncodeMsg is a convenient shorthand for sending an RLP-encoded message.
func
(
p
*
MsgPipeRW
)
EncodeMsg
(
code
uint64
,
data
...
interface
{})
error
{
return
p
.
WriteMsg
(
NewMsg
(
code
,
data
...
))
}
// ReadMsg returns a message sent on the other end of the pipe.
func
(
p
*
MsgPipeRW
)
ReadMsg
()
(
Msg
,
error
)
{
if
atomic
.
LoadInt32
(
p
.
closed
)
==
0
{
select
{
case
msg
:=
<-
p
.
r
:
return
msg
,
nil
case
<-
p
.
closing
:
}
}
return
Msg
{},
ErrPipeClosed
}
// Close unblocks any pending ReadMsg and WriteMsg calls on both ends
// of the pipe. They will return ErrPipeClosed. Note that Close does
// not interrupt any reads from a message payload.
func
(
p
*
MsgPipeRW
)
Close
()
error
{
if
atomic
.
AddInt32
(
p
.
closed
,
1
)
!=
1
{
// someone else is already closing
atomic
.
StoreInt32
(
p
.
closed
,
1
)
// avoid overflow
return
nil
}
close
(
p
.
closing
)
return
nil
}
p2p/message_test.go
View file @
c3ba4ace
...
...
@@ -2,8 +2,11 @@ package p2p
import
(
"bytes"
"fmt"
"io/ioutil"
"runtime"
"testing"
"time"
"github.com/ethereum/go-ethereum/ethutil"
)
...
...
@@ -68,3 +71,63 @@ func TestDecodeRealMsg(t *testing.T) {
t
.
Errorf
(
"incorrect code %d, want %d"
,
msg
.
Code
,
0
)
}
}
func
ExampleMsgPipe
()
{
rw1
,
rw2
:=
MsgPipe
()
go
func
()
{
rw1
.
EncodeMsg
(
8
,
[]
byte
{
0
,
0
})
rw1
.
EncodeMsg
(
5
,
[]
byte
{
1
,
1
})
rw1
.
Close
()
}()
for
{
msg
,
err
:=
rw2
.
ReadMsg
()
if
err
!=
nil
{
break
}
var
data
[
1
][]
byte
msg
.
Decode
(
&
data
)
fmt
.
Printf
(
"msg: %d, %x
\n
"
,
msg
.
Code
,
data
[
0
])
}
// Output:
// msg: 8, 0000
// msg: 5, 0101
}
func
TestMsgPipeUnblockWrite
(
t
*
testing
.
T
)
{
loop
:
for
i
:=
0
;
i
<
100
;
i
++
{
rw1
,
rw2
:=
MsgPipe
()
done
:=
make
(
chan
struct
{})
go
func
()
{
if
err
:=
rw1
.
EncodeMsg
(
1
);
err
==
nil
{
t
.
Error
(
"EncodeMsg returned nil error"
)
}
else
if
err
!=
ErrPipeClosed
{
t
.
Error
(
"EncodeMsg returned wrong error: got %v, want %v"
,
err
,
ErrPipeClosed
)
}
close
(
done
)
}()
// this call should ensure that EncodeMsg is waiting to
// deliver sometimes. if this isn't done, Close is likely to
// be executed before EncodeMsg starts and then we won't test
// all the cases.
runtime
.
Gosched
()
rw2
.
Close
()
select
{
case
<-
done
:
case
<-
time
.
After
(
200
*
time
.
Millisecond
)
:
t
.
Errorf
(
"write didn't unblock"
)
break
loop
}
}
}
// This test should panic if concurrent close isn't implemented correctly.
func
TestMsgPipeConcurrentClose
(
t
*
testing
.
T
)
{
rw1
,
_
:=
MsgPipe
()
for
i
:=
0
;
i
<
10
;
i
++
{
go
rw1
.
Close
()
}
}
p2p/peer.go
View file @
c3ba4ace
...
...
@@ -300,7 +300,7 @@ func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error)
proto
.
in
<-
msg
}
else
{
wait
=
true
pr
:=
&
eofSignal
{
msg
.
Payload
,
protoDone
}
pr
:=
&
eofSignal
{
msg
.
Payload
,
int64
(
msg
.
Size
),
protoDone
}
msg
.
Payload
=
pr
proto
.
in
<-
msg
}
...
...
@@ -438,18 +438,25 @@ func (rw *proto) ReadMsg() (Msg, error) {
return
msg
,
nil
}
// eofSignal wraps a reader with eof signaling.
// the eof channel is closed when the wrapped reader
// reaches EOF.
// eofSignal wraps a reader with eof signaling. the eof channel is
// closed when the wrapped reader returns an error or when count bytes
// have been read.
//
type
eofSignal
struct
{
wrapped
io
.
Reader
count
int64
eof
chan
<-
struct
{}
}
// note: when using eofSignal to detect whether a message payload
// has been read, Read might not be called for zero sized messages.
func
(
r
*
eofSignal
)
Read
(
buf
[]
byte
)
(
int
,
error
)
{
n
,
err
:=
r
.
wrapped
.
Read
(
buf
)
if
err
!=
nil
{
r
.
count
-=
int64
(
n
)
if
(
err
!=
nil
||
r
.
count
<=
0
)
&&
r
.
eof
!=
nil
{
r
.
eof
<-
struct
{}{}
// tell Peer that msg has been consumed
r
.
eof
=
nil
}
return
n
,
err
}
p2p/peer_error.go
View file @
c3ba4ace
...
...
@@ -100,7 +100,16 @@ func (d DiscReason) String() string {
return
discReasonToString
[
d
]
}
type
discRequestedError
DiscReason
func
(
err
discRequestedError
)
Error
()
string
{
return
fmt
.
Sprintf
(
"disconnect requested: %v"
,
DiscReason
(
err
))
}
func
discReasonForError
(
err
error
)
DiscReason
{
if
reason
,
ok
:=
err
.
(
discRequestedError
);
ok
{
return
DiscReason
(
reason
)
}
peerError
,
ok
:=
err
.
(
*
peerError
)
if
!
ok
{
return
DiscSubprotocolError
...
...
p2p/peer_test.go
View file @
c3ba4ace
...
...
@@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"encoding/hex"
"io"
"io/ioutil"
"net"
"reflect"
...
...
@@ -237,3 +238,58 @@ func TestNewPeer(t *testing.T) {
// Should not hang.
p
.
Disconnect
(
DiscAlreadyConnected
)
}
func
TestEOFSignal
(
t
*
testing
.
T
)
{
rb
:=
make
([]
byte
,
10
)
// empty reader
eof
:=
make
(
chan
struct
{},
1
)
sig
:=
&
eofSignal
{
new
(
bytes
.
Buffer
),
0
,
eof
}
if
n
,
err
:=
sig
.
Read
(
rb
);
n
!=
0
||
err
!=
io
.
EOF
{
t
.
Errorf
(
"Read returned unexpected values: (%v, %v)"
,
n
,
err
)
}
select
{
case
<-
eof
:
default
:
t
.
Error
(
"EOF chan not signaled"
)
}
// count before error
eof
=
make
(
chan
struct
{},
1
)
sig
=
&
eofSignal
{
bytes
.
NewBufferString
(
"aaaaaaaa"
),
4
,
eof
}
if
n
,
err
:=
sig
.
Read
(
rb
);
n
!=
8
||
err
!=
nil
{
t
.
Errorf
(
"Read returned unexpected values: (%v, %v)"
,
n
,
err
)
}
select
{
case
<-
eof
:
default
:
t
.
Error
(
"EOF chan not signaled"
)
}
// error before count
eof
=
make
(
chan
struct
{},
1
)
sig
=
&
eofSignal
{
bytes
.
NewBufferString
(
"aaaa"
),
999
,
eof
}
if
n
,
err
:=
sig
.
Read
(
rb
);
n
!=
4
||
err
!=
nil
{
t
.
Errorf
(
"Read returned unexpected values: (%v, %v)"
,
n
,
err
)
}
if
n
,
err
:=
sig
.
Read
(
rb
);
n
!=
0
||
err
!=
io
.
EOF
{
t
.
Errorf
(
"Read returned unexpected values: (%v, %v)"
,
n
,
err
)
}
select
{
case
<-
eof
:
default
:
t
.
Error
(
"EOF chan not signaled"
)
}
// no signal if neither occurs
eof
=
make
(
chan
struct
{},
1
)
sig
=
&
eofSignal
{
bytes
.
NewBufferString
(
"aaaaaaaaaaaaaaaaaaaaa"
),
999
,
eof
}
if
n
,
err
:=
sig
.
Read
(
rb
);
n
!=
10
||
err
!=
nil
{
t
.
Errorf
(
"Read returned unexpected values: (%v, %v)"
,
n
,
err
)
}
select
{
case
<-
eof
:
t
.
Error
(
"unexpected EOF signal"
)
default
:
}
}
p2p/protocol.go
View file @
c3ba4ace
...
...
@@ -154,12 +154,11 @@ func (bp *baseProtocol) handle(rw MsgReadWriter) error {
return
newPeerError
(
errProtocolBreach
,
"extra handshake received"
)
case
discMsg
:
var
reason
DiscReason
var
reason
[
1
]
DiscReason
if
err
:=
msg
.
Decode
(
&
reason
);
err
!=
nil
{
return
err
}
bp
.
peer
.
Disconnect
(
reason
)
return
nil
return
discRequestedError
(
reason
[
0
])
case
pingMsg
:
return
bp
.
rw
.
EncodeMsg
(
pongMsg
)
...
...
p2p/protocol_test.go
0 → 100644
View file @
c3ba4ace
package
p2p
import
(
"fmt"
"testing"
)
func
TestBaseProtocolDisconnect
(
t
*
testing
.
T
)
{
peer
:=
NewPeer
(
NewSimpleClientIdentity
(
"p1"
,
""
,
""
,
"foo"
),
nil
)
peer
.
ourID
=
NewSimpleClientIdentity
(
"p2"
,
""
,
""
,
"bar"
)
peer
.
pubkeyHook
=
func
(
*
peerAddr
)
error
{
return
nil
}
rw1
,
rw2
:=
MsgPipe
()
done
:=
make
(
chan
struct
{})
go
func
()
{
if
err
:=
expectMsg
(
rw2
,
handshakeMsg
);
err
!=
nil
{
t
.
Error
(
err
)
}
err
:=
rw2
.
EncodeMsg
(
handshakeMsg
,
baseProtocolVersion
,
""
,
[]
interface
{}{},
0
,
make
([]
byte
,
64
),
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
if
err
:=
expectMsg
(
rw2
,
getPeersMsg
);
err
!=
nil
{
t
.
Error
(
err
)
}
if
err
:=
rw2
.
EncodeMsg
(
discMsg
,
DiscQuitting
);
err
!=
nil
{
t
.
Error
(
err
)
}
close
(
done
)
}()
if
err
:=
runBaseProtocol
(
peer
,
rw1
);
err
==
nil
{
t
.
Errorf
(
"base protocol returned without error"
)
}
else
if
reason
,
ok
:=
err
.
(
discRequestedError
);
!
ok
||
reason
!=
DiscQuitting
{
t
.
Errorf
(
"base protocol returned wrong error: %v"
,
err
)
}
<-
done
}
func
expectMsg
(
r
MsgReader
,
code
uint64
)
error
{
msg
,
err
:=
r
.
ReadMsg
()
if
err
!=
nil
{
return
err
}
if
err
:=
msg
.
Discard
();
err
!=
nil
{
return
err
}
if
msg
.
Code
!=
code
{
return
fmt
.
Errorf
(
"wrong message code: got %d, expected %d"
,
msg
.
Code
,
code
)
}
return
nil
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment