Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
e64625aa
Commit
e64625aa
authored
Jun 25, 2015
by
Jeffrey Wilcke
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1332 from bas-vk/ipcbatch
IPC interface improvements
parents
6b2a03fa
66228507
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
131 additions
and
63 deletions
+131
-63
codec.go
rpc/codec/codec.go
+1
-1
json.go
rpc/codec/json.go
+81
-22
comms.go
rpc/comms/comms.go
+31
-11
ipc.go
rpc/comms/ipc.go
+6
-0
ipc_unix.go
rpc/comms/ipc_unix.go
+5
-2
ipc_windows.go
rpc/comms/ipc_windows.go
+7
-27
No files found.
rpc/codec/codec.go
View file @
e64625aa
...
...
@@ -12,7 +12,7 @@ type Codec int
// (de)serialization support for rpc interface
type
ApiCoder
interface
{
// Parse message to request from underlying stream
ReadRequest
()
(
*
shared
.
Request
,
error
)
ReadRequest
()
(
[]
*
shared
.
Request
,
bool
,
error
)
// Parse response message from underlying stream
ReadResponse
()
(
interface
{},
error
)
// Encode response to encoded form in underlying stream
...
...
rpc/codec/json.go
View file @
e64625aa
...
...
@@ -2,71 +2,130 @@ package codec
import
(
"encoding/json"
"fmt"
"net"
"time"
"github.com/ethereum/go-ethereum/rpc/shared"
)
const
(
MAX_RESPONSE_SIZE
=
64
*
1024
READ_TIMEOUT
=
15
// read timeout in seconds
MAX_REQUEST_SIZE
=
1024
*
1024
MAX_RESPONSE_SIZE
=
1024
*
1024
)
// Json serialization support
type
JsonCodec
struct
{
c
net
.
Conn
d
*
json
.
Decoder
e
*
json
.
Encoder
}
// Create new JSON coder instance
func
NewJsonCoder
(
conn
net
.
Conn
)
ApiCoder
{
return
&
JsonCodec
{
c
:
conn
,
d
:
json
.
NewDecoder
(
conn
),
e
:
json
.
NewEncoder
(
conn
),
}
}
// Serialize obj to JSON and write it to conn
func
(
self
*
JsonCodec
)
ReadRequest
()
(
*
shared
.
Request
,
error
)
{
req
:=
shared
.
Request
{}
err
:=
self
.
d
.
Decode
(
&
req
)
if
err
==
nil
{
return
&
req
,
nil
func
(
self
*
JsonCodec
)
ReadRequest
()
(
requests
[]
*
shared
.
Request
,
isBatch
bool
,
err
error
)
{
bytesInBuffer
:=
0
buf
:=
make
([]
byte
,
MAX_REQUEST_SIZE
)
deadline
:=
time
.
Now
()
.
Add
(
READ_TIMEOUT
*
time
.
Second
)
if
err
:=
self
.
c
.
SetDeadline
(
deadline
);
err
!=
nil
{
return
nil
,
false
,
err
}
for
{
n
,
err
:=
self
.
c
.
Read
(
buf
[
bytesInBuffer
:
])
if
err
!=
nil
{
self
.
c
.
Close
()
return
nil
,
false
,
err
}
bytesInBuffer
+=
n
singleRequest
:=
shared
.
Request
{}
err
=
json
.
Unmarshal
(
buf
[
:
bytesInBuffer
],
&
singleRequest
)
if
err
==
nil
{
requests
:=
make
([]
*
shared
.
Request
,
1
)
requests
[
0
]
=
&
singleRequest
return
requests
,
false
,
nil
}
requests
=
make
([]
*
shared
.
Request
,
0
)
err
=
json
.
Unmarshal
(
buf
[
:
bytesInBuffer
],
&
requests
)
if
err
==
nil
{
return
requests
,
true
,
nil
}
}
return
nil
,
err
self
.
c
.
Close
()
// timeout
return
nil
,
false
,
fmt
.
Errorf
(
"Unable to read response"
)
}
func
(
self
*
JsonCodec
)
ReadResponse
()
(
interface
{},
error
)
{
var
err
error
bytesInBuffer
:=
0
buf
:=
make
([]
byte
,
MAX_RESPONSE_SIZE
)
n
,
_
:=
self
.
c
.
Read
(
buf
)
var
failure
shared
.
ErrorResponse
if
err
=
json
.
Unmarshal
(
buf
[
:
n
],
&
failure
);
err
==
nil
&&
failure
.
Erro
r
!=
nil
{
return
failure
,
nil
deadline
:=
time
.
Now
()
.
Add
(
READ_TIMEOUT
*
time
.
Second
)
if
err
:=
self
.
c
.
SetDeadline
(
deadline
);
er
r
!=
nil
{
return
nil
,
err
}
var
success
shared
.
SuccessResponse
if
err
=
json
.
Unmarshal
(
buf
[
:
n
],
&
success
);
err
==
nil
{
return
success
,
nil
for
{
n
,
err
:=
self
.
c
.
Read
(
buf
[
bytesInBuffer
:
])
if
err
!=
nil
{
return
nil
,
err
}
bytesInBuffer
+=
n
var
success
shared
.
SuccessResponse
if
err
=
json
.
Unmarshal
(
buf
[
:
bytesInBuffer
],
&
success
);
err
==
nil
{
return
success
,
nil
}
var
failure
shared
.
ErrorResponse
if
err
=
json
.
Unmarshal
(
buf
[
:
bytesInBuffer
],
&
failure
);
err
==
nil
&&
failure
.
Error
!=
nil
{
return
failure
,
nil
}
}
return
nil
,
err
self
.
c
.
Close
()
return
nil
,
fmt
.
Errorf
(
"Unable to read response"
)
}
//
Encode response to encoded form in underlying stream
//
Decode data
func
(
self
*
JsonCodec
)
Decode
(
data
[]
byte
,
msg
interface
{})
error
{
return
json
.
Unmarshal
(
data
,
msg
)
}
// Encode message
func
(
self
*
JsonCodec
)
Encode
(
msg
interface
{})
([]
byte
,
error
)
{
return
json
.
Marshal
(
msg
)
}
// Parse JSON data from conn to obj
func
(
self
*
JsonCodec
)
WriteResponse
(
res
interface
{})
error
{
return
self
.
e
.
Encode
(
&
res
)
data
,
err
:=
json
.
Marshal
(
res
)
if
err
!=
nil
{
self
.
c
.
Close
()
return
err
}
bytesWritten
:=
0
for
bytesWritten
<
len
(
data
)
{
n
,
err
:=
self
.
c
.
Write
(
data
[
bytesWritten
:
])
if
err
!=
nil
{
self
.
c
.
Close
()
return
err
}
bytesWritten
+=
n
}
return
nil
}
// Close decoder and encoder
...
...
rpc/comms/comms.go
View file @
e64625aa
...
...
@@ -43,29 +43,49 @@ type EthereumClient interface {
SupportedModules
()
(
map
[
string
]
string
,
error
)
}
func
handle
(
conn
net
.
Conn
,
api
shared
.
EthereumApi
,
c
codec
.
Codec
)
{
func
handle
(
id
int
,
conn
net
.
Conn
,
api
shared
.
EthereumApi
,
c
codec
.
Codec
)
{
codec
:=
c
.
New
(
conn
)
for
{
req
,
err
:=
codec
.
ReadRequest
()
req
uests
,
isBatch
,
err
:=
codec
.
ReadRequest
()
if
err
==
io
.
EOF
{
codec
.
Close
()
return
}
else
if
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"comms recv err - %v
\n
"
,
err
)
codec
.
Close
()
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Closed IPC Conn %06d recv err - %v
\n
"
,
id
,
err
)
return
}
var
rpcResponse
interface
{}
res
,
err
:=
api
.
Execute
(
req
)
if
isBatch
{
responses
:=
make
([]
*
interface
{},
len
(
requests
))
responseCount
:=
0
for
_
,
req
:=
range
requests
{
res
,
err
:=
api
.
Execute
(
req
)
if
req
.
Id
!=
nil
{
rpcResponse
:=
shared
.
NewRpcResponse
(
req
.
Id
,
req
.
Jsonrpc
,
res
,
err
)
responses
[
responseCount
]
=
rpcResponse
responseCount
+=
1
}
}
rpcResponse
=
shared
.
NewRpcResponse
(
req
.
Id
,
req
.
Jsonrpc
,
res
,
err
)
err
=
codec
.
WriteResponse
(
rpcResponse
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"comms send err - %v
\n
"
,
err
)
codec
.
Close
()
return
err
=
codec
.
WriteResponse
(
responses
[
:
responseCount
])
if
err
!=
nil
{
codec
.
Close
()
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Closed IPC Conn %06d send err - %v
\n
"
,
id
,
err
)
return
}
}
else
{
var
rpcResponse
interface
{}
res
,
err
:=
api
.
Execute
(
requests
[
0
])
rpcResponse
=
shared
.
NewRpcResponse
(
requests
[
0
]
.
Id
,
requests
[
0
]
.
Jsonrpc
,
res
,
err
)
err
=
codec
.
WriteResponse
(
rpcResponse
)
if
err
!=
nil
{
codec
.
Close
()
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Closed IPC Conn %06d send err - %v
\n
"
,
id
,
err
)
return
}
}
}
}
...
...
rpc/comms/ipc.go
View file @
e64625aa
...
...
@@ -2,6 +2,7 @@ package comms
import
(
"fmt"
"math/rand"
"net"
"encoding/json"
...
...
@@ -16,6 +17,7 @@ type IpcConfig struct {
type
ipcClient
struct
{
endpoint
string
c
net
.
Conn
codec
codec
.
Codec
coder
codec
.
ApiCoder
}
...
...
@@ -94,3 +96,7 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
func
StartIpc
(
cfg
IpcConfig
,
codec
codec
.
Codec
,
offeredApi
shared
.
EthereumApi
)
error
{
return
startIpc
(
cfg
,
codec
,
offeredApi
)
}
func
newIpcConnId
()
int
{
return
rand
.
Int
()
%
1000000
}
rpc/comms/ipc_unix.go
View file @
e64625aa
...
...
@@ -18,7 +18,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
return
nil
,
err
}
return
&
ipcClient
{
cfg
.
Endpoint
,
codec
,
codec
.
New
(
c
)},
nil
return
&
ipcClient
{
cfg
.
Endpoint
,
c
,
c
odec
,
codec
.
New
(
c
)},
nil
}
func
(
self
*
ipcClient
)
reconnect
()
error
{
...
...
@@ -48,7 +48,10 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error {
continue
}
go
handle
(
conn
,
api
,
codec
)
id
:=
newIpcConnId
()
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"New IPC connection with id %06d started
\n
"
,
id
)
go
handle
(
id
,
conn
,
api
,
codec
)
}
os
.
Remove
(
cfg
.
Endpoint
)
...
...
rpc/comms/ipc_windows.go
View file @
e64625aa
...
...
@@ -640,7 +640,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
return
nil
,
err
}
return
&
ipcClient
{
cfg
.
Endpoint
,
codec
,
codec
.
New
(
c
)},
nil
return
&
ipcClient
{
cfg
.
Endpoint
,
c
,
c
odec
,
codec
.
New
(
c
)},
nil
}
func
(
self
*
ipcClient
)
reconnect
()
error
{
...
...
@@ -668,33 +668,13 @@ func startIpc(cfg IpcConfig, codec codec.Codec, api shared.EthereumApi) error {
continue
}
go
func
(
conn
net
.
Conn
)
{
codec
:=
codec
.
New
(
conn
)
for
{
req
,
err
:=
codec
.
ReadRequest
()
if
err
==
io
.
EOF
{
codec
.
Close
()
return
}
else
if
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"IPC recv err - %v
\n
"
,
err
)
codec
.
Close
()
return
}
var
rpcResponse
interface
{}
res
,
err
:=
api
.
Execute
(
req
)
rpcResponse
=
shared
.
NewRpcResponse
(
req
.
Id
,
req
.
Jsonrpc
,
res
,
err
)
err
=
codec
.
WriteResponse
(
rpcResponse
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"IPC send err - %v
\n
"
,
err
)
codec
.
Close
()
return
}
}
}(
conn
)
id
:=
newIpcConnId
()
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"New IPC connection with id %06d started
\n
"
,
id
)
go
handle
(
id
,
conn
,
api
,
codec
)
}
os
.
Remove
(
cfg
.
Endpoint
)
}()
glog
.
V
(
logger
.
Info
)
.
Infof
(
"IPC service started (%s)
\n
"
,
cfg
.
Endpoint
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment