Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
G
Geth-Modification
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
张蕾
Geth-Modification
Commits
ed92f116
Commit
ed92f116
authored
Apr 05, 2016
by
Jeffrey Wilcke
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2407 from bas-vk/rpc-notifications
RPC pub sub
parents
6a185531
f7328c5e
Changes
16
Show whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
844 additions
and
310 deletions
+844
-310
api.go
eth/api.go
+145
-61
backend.go
eth/backend.go
+1
-1
api.go
eth/downloader/api.go
+59
-16
api.go
eth/filters/api.go
+66
-13
node.go
node/node.go
+1
-1
doc.go
rpc/doc.go
+6
-22
http.go
rpc/http.go
+1
-1
inproc.go
rpc/inproc.go
+1
-1
json.go
rpc/json.go
+14
-10
notification.go
rpc/notification.go
+288
-0
notification_test.go
rpc/notification_test.go
+119
-0
server.go
rpc/server.go
+107
-121
server_test.go
rpc/server_test.go
+10
-6
types.go
rpc/types.go
+4
-50
utils.go
rpc/utils.go
+20
-6
websocket.go
rpc/websocket.go
+2
-1
No files found.
eth/api.go
View file @
ed92f116
...
...
@@ -28,6 +28,8 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
...
...
@@ -460,13 +462,43 @@ type PublicBlockChainAPI struct {
bc
*
core
.
BlockChain
chainDb
ethdb
.
Database
eventMux
*
event
.
TypeMux
muNewBlockSubscriptions
sync
.
Mutex
// protects newBlocksSubscriptions
newBlockSubscriptions
map
[
string
]
func
(
core
.
ChainEvent
)
error
// callbacks for new block subscriptions
am
*
accounts
.
Manager
miner
*
miner
.
Miner
}
// NewPublicBlockChainAPI creates a new Etheruem blockchain API.
func
NewPublicBlockChainAPI
(
config
*
core
.
ChainConfig
,
bc
*
core
.
BlockChain
,
m
*
miner
.
Miner
,
chainDb
ethdb
.
Database
,
eventMux
*
event
.
TypeMux
,
am
*
accounts
.
Manager
)
*
PublicBlockChainAPI
{
return
&
PublicBlockChainAPI
{
config
:
config
,
bc
:
bc
,
miner
:
m
,
chainDb
:
chainDb
,
eventMux
:
eventMux
,
am
:
am
}
api
:=
&
PublicBlockChainAPI
{
config
:
config
,
bc
:
bc
,
miner
:
m
,
chainDb
:
chainDb
,
eventMux
:
eventMux
,
am
:
am
,
newBlockSubscriptions
:
make
(
map
[
string
]
func
(
core
.
ChainEvent
)
error
),
}
go
api
.
subscriptionLoop
()
return
api
}
// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions.
func
(
s
*
PublicBlockChainAPI
)
subscriptionLoop
()
{
sub
:=
s
.
eventMux
.
Subscribe
(
core
.
ChainEvent
{})
for
event
:=
range
sub
.
Chan
()
{
if
chainEvent
,
ok
:=
event
.
Data
.
(
core
.
ChainEvent
);
ok
{
s
.
muNewBlockSubscriptions
.
Lock
()
for
id
,
notifyOf
:=
range
s
.
newBlockSubscriptions
{
if
notifyOf
(
chainEvent
)
==
rpc
.
ErrNotificationNotFound
{
delete
(
s
.
newBlockSubscriptions
,
id
)
}
}
s
.
muNewBlockSubscriptions
.
Unlock
()
}
}
}
// BlockNumber returns the block number of the chain head.
...
...
@@ -564,20 +596,36 @@ type NewBlocksArgs struct {
// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
// the caller to specify whether the output should contain transactions and in what format.
func
(
s
*
PublicBlockChainAPI
)
NewBlocks
(
args
NewBlocksArgs
)
(
rpc
.
Subscription
,
error
)
{
sub
:=
s
.
eventMux
.
Subscribe
(
core
.
ChainEvent
{})
func
(
s
*
PublicBlockChainAPI
)
NewBlocks
(
ctx
context
.
Context
,
args
NewBlocksArgs
)
(
rpc
.
Subscription
,
error
)
{
notifier
,
supported
:=
ctx
.
Value
(
rpc
.
NotifierContextKey
)
.
(
rpc
.
Notifier
)
if
!
supported
{
return
nil
,
rpc
.
ErrNotificationsUnsupported
}
output
:=
func
(
rawBlock
interface
{})
interface
{}
{
if
event
,
ok
:=
rawBlock
.
(
core
.
ChainEvent
);
ok
{
notification
,
err
:=
s
.
rpcOutputBlock
(
event
.
Block
,
args
.
IncludeTransactions
,
args
.
TransactionDetails
)
if
err
==
nil
{
return
notification
// create a subscription that will remove itself when unsubscribed/cancelled
subscription
,
err
:=
notifier
.
NewSubscription
(
func
(
subId
string
)
{
s
.
muNewBlockSubscriptions
.
Lock
()
delete
(
s
.
newBlockSubscriptions
,
subId
)
s
.
muNewBlockSubscriptions
.
Unlock
()
})
if
err
!=
nil
{
return
nil
,
err
}
// add a callback that is called on chain events which will format the block and notify the client
s
.
muNewBlockSubscriptions
.
Lock
()
s
.
newBlockSubscriptions
[
subscription
.
ID
()]
=
func
(
e
core
.
ChainEvent
)
error
{
if
notification
,
err
:=
s
.
rpcOutputBlock
(
e
.
Block
,
args
.
IncludeTransactions
,
args
.
TransactionDetails
);
err
==
nil
{
return
subscription
.
Notify
(
notification
)
}
else
{
glog
.
V
(
logger
.
Warn
)
.
Info
(
"unable to format block %v
\n
"
,
err
)
}
return
rawBlock
return
nil
}
s
.
muNewBlockSubscriptions
.
Unlock
()
return
rpc
.
NewSubscriptionWithOutputFormat
(
sub
,
output
)
,
nil
return
subscription
,
nil
}
// GetCode returns the code stored at the given address in the state for the given block number.
...
...
@@ -829,11 +877,13 @@ type PublicTransactionPoolAPI struct {
am
*
accounts
.
Manager
txPool
*
core
.
TxPool
txMu
sync
.
Mutex
muPendingTxSubs
sync
.
Mutex
pendingTxSubs
map
[
string
]
rpc
.
Subscription
}
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func
NewPublicTransactionPoolAPI
(
e
*
Ethereum
)
*
PublicTransactionPoolAPI
{
return
&
PublicTransactionPoolAPI
{
api
:=
&
PublicTransactionPoolAPI
{
eventMux
:
e
.
EventMux
(),
gpo
:
NewGasPriceOracle
(
e
),
chainDb
:
e
.
ChainDb
(),
...
...
@@ -841,6 +891,53 @@ func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI {
am
:
e
.
AccountManager
(),
txPool
:
e
.
TxPool
(),
miner
:
e
.
Miner
(),
pendingTxSubs
:
make
(
map
[
string
]
rpc
.
Subscription
),
}
go
api
.
subscriptionLoop
()
return
api
}
// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions.
func
(
s
*
PublicTransactionPoolAPI
)
subscriptionLoop
()
{
sub
:=
s
.
eventMux
.
Subscribe
(
core
.
TxPreEvent
{})
accountTimeout
:=
time
.
NewTicker
(
10
*
time
.
Second
)
// only publish pending tx signed by one of the accounts in the node
accountSet
:=
set
.
New
()
accounts
,
_
:=
s
.
am
.
Accounts
()
for
_
,
acc
:=
range
accounts
{
accountSet
.
Add
(
acc
.
Address
)
}
for
{
select
{
case
event
:=
<-
sub
.
Chan
()
:
if
event
==
nil
{
continue
}
tx
:=
event
.
Data
.
(
core
.
TxPreEvent
)
if
from
,
err
:=
tx
.
Tx
.
FromFrontier
();
err
==
nil
{
if
accountSet
.
Has
(
from
)
{
s
.
muPendingTxSubs
.
Lock
()
for
id
,
sub
:=
range
s
.
pendingTxSubs
{
if
sub
.
Notify
(
tx
.
Tx
.
Hash
())
==
rpc
.
ErrNotificationNotFound
{
delete
(
s
.
pendingTxSubs
,
id
)
}
}
s
.
muPendingTxSubs
.
Unlock
()
}
}
case
<-
accountTimeout
.
C
:
// refresh account list when accounts are added/removed from the node.
if
accounts
,
err
:=
s
.
am
.
Accounts
();
err
==
nil
{
accountSet
.
Clear
()
for
_
,
acc
:=
range
accounts
{
accountSet
.
Add
(
acc
.
Address
)
}
}
}
}
}
...
...
@@ -1275,40 +1372,27 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err
// NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool
// and is send from one of the transactions this nodes manages.
func
(
s
*
PublicTransactionPoolAPI
)
NewPendingTransactions
()
(
rpc
.
Subscription
,
error
)
{
sub
:=
s
.
eventMux
.
Subscribe
(
core
.
TxPreEvent
{})
accounts
,
err
:=
s
.
am
.
Accounts
()
if
err
!=
nil
{
return
rpc
.
Subscription
{},
err
func
(
s
*
PublicTransactionPoolAPI
)
NewPendingTransactions
(
ctx
context
.
Context
)
(
rpc
.
Subscription
,
error
)
{
notifier
,
supported
:=
ctx
.
Value
(
rpc
.
NotifierContextKey
)
.
(
rpc
.
Notifier
)
if
!
supported
{
return
nil
,
rpc
.
ErrNotificationsUnsupported
}
accountSet
:=
set
.
New
()
for
_
,
account
:=
range
accounts
{
accountSet
.
Add
(
account
.
Address
)
}
accountSetLastUpdates
:=
time
.
Now
()
output
:=
func
(
transaction
interface
{})
interface
{}
{
if
time
.
Since
(
accountSetLastUpdates
)
>
(
time
.
Duration
(
2
)
*
time
.
Second
)
{
if
accounts
,
err
=
s
.
am
.
Accounts
();
err
!=
nil
{
accountSet
.
Clear
()
for
_
,
account
:=
range
accounts
{
accountSet
.
Add
(
account
.
Address
)
}
accountSetLastUpdates
=
time
.
Now
()
}
}
subscription
,
err
:=
notifier
.
NewSubscription
(
func
(
id
string
)
{
s
.
muPendingTxSubs
.
Lock
()
delete
(
s
.
pendingTxSubs
,
id
)
s
.
muPendingTxSubs
.
Unlock
()
})
tx
:=
transaction
.
(
core
.
TxPreEvent
)
if
from
,
err
:=
tx
.
Tx
.
FromFrontier
();
err
==
nil
{
if
accountSet
.
Has
(
from
)
{
return
tx
.
Tx
.
Hash
()
}
}
return
nil
if
err
!=
nil
{
return
nil
,
err
}
return
rpc
.
NewSubscriptionWithOutputFormat
(
sub
,
output
),
nil
s
.
muPendingTxSubs
.
Lock
()
s
.
pendingTxSubs
[
subscription
.
ID
()]
=
subscription
s
.
muPendingTxSubs
.
Unlock
()
return
subscription
,
nil
}
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
...
...
eth/backend.go
View file @
ed92f116
...
...
@@ -306,7 +306,7 @@ func (s *Ethereum) APIs() []rpc.API {
},
{
Namespace
:
"eth"
,
Version
:
"1.0"
,
Service
:
downloader
.
NewPublicDownloaderAPI
(
s
.
Downloader
()),
Service
:
downloader
.
NewPublicDownloaderAPI
(
s
.
Downloader
()
,
s
.
EventMux
()
),
Public
:
true
,
},
{
Namespace
:
"miner"
,
...
...
eth/downloader/api.go
View file @
ed92f116
...
...
@@ -17,6 +17,11 @@
package
downloader
import
(
"sync"
"golang.org/x/net/context"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
...
...
@@ -24,11 +29,43 @@ import (
// It offers only methods that operates on data that can be available to anyone without security risks.
type
PublicDownloaderAPI
struct
{
d
*
Downloader
mux
*
event
.
TypeMux
muSyncSubscriptions
sync
.
Mutex
syncSubscriptions
map
[
string
]
rpc
.
Subscription
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
func
NewPublicDownloaderAPI
(
d
*
Downloader
)
*
PublicDownloaderAPI
{
return
&
PublicDownloaderAPI
{
d
}
func
NewPublicDownloaderAPI
(
d
*
Downloader
,
m
*
event
.
TypeMux
)
*
PublicDownloaderAPI
{
api
:=
&
PublicDownloaderAPI
{
d
:
d
,
mux
:
m
,
syncSubscriptions
:
make
(
map
[
string
]
rpc
.
Subscription
)}
go
api
.
run
()
return
api
}
func
(
api
*
PublicDownloaderAPI
)
run
()
{
sub
:=
api
.
mux
.
Subscribe
(
StartEvent
{},
DoneEvent
{},
FailedEvent
{})
for
event
:=
range
sub
.
Chan
()
{
var
notification
interface
{}
switch
event
.
Data
.
(
type
)
{
case
StartEvent
:
result
:=
&
SyncingResult
{
Syncing
:
true
}
result
.
Status
.
Origin
,
result
.
Status
.
Current
,
result
.
Status
.
Height
,
result
.
Status
.
Pulled
,
result
.
Status
.
Known
=
api
.
d
.
Progress
()
notification
=
result
case
DoneEvent
,
FailedEvent
:
notification
=
false
}
api
.
muSyncSubscriptions
.
Lock
()
for
id
,
sub
:=
range
api
.
syncSubscriptions
{
if
sub
.
Notify
(
notification
)
==
rpc
.
ErrNotificationNotFound
{
delete
(
api
.
syncSubscriptions
,
id
)
}
}
api
.
muSyncSubscriptions
.
Unlock
()
}
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
...
...
@@ -47,19 +84,25 @@ type SyncingResult struct {
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
func
(
s
*
PublicDownloaderAPI
)
Syncing
()
(
rpc
.
Subscription
,
error
)
{
sub
:=
s
.
d
.
mux
.
Subscribe
(
StartEvent
{},
DoneEvent
{},
FailedEvent
{})
output
:=
func
(
event
interface
{})
interface
{}
{
switch
event
.
(
type
)
{
case
StartEvent
:
result
:=
&
SyncingResult
{
Syncing
:
true
}
result
.
Status
.
Origin
,
result
.
Status
.
Current
,
result
.
Status
.
Height
,
result
.
Status
.
Pulled
,
result
.
Status
.
Known
=
s
.
d
.
Progress
()
return
result
case
DoneEvent
,
FailedEvent
:
return
false
func
(
api
*
PublicDownloaderAPI
)
Syncing
(
ctx
context
.
Context
)
(
rpc
.
Subscription
,
error
)
{
notifier
,
supported
:=
ctx
.
Value
(
rpc
.
NotifierContextKey
)
.
(
rpc
.
Notifier
)
if
!
supported
{
return
nil
,
rpc
.
ErrNotificationsUnsupported
}
return
nil
subscription
,
err
:=
notifier
.
NewSubscription
(
func
(
id
string
)
{
api
.
muSyncSubscriptions
.
Lock
()
delete
(
api
.
syncSubscriptions
,
id
)
api
.
muSyncSubscriptions
.
Unlock
()
})
if
err
!=
nil
{
return
nil
,
err
}
return
rpc
.
NewSubscriptionWithOutputFormat
(
sub
,
output
),
nil
api
.
muSyncSubscriptions
.
Lock
()
api
.
syncSubscriptions
[
subscription
.
ID
()]
=
subscription
api
.
muSyncSubscriptions
.
Unlock
()
return
subscription
,
nil
}
eth/filters/api.go
View file @
ed92f116
...
...
@@ -17,15 +17,13 @@
package
filters
import
(
"sync"
"time"
"crypto/rand"
"encoding/hex"
"errors"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
...
...
@@ -33,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
var
(
...
...
@@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
}
// newLogFilter creates a new log filter.
func
(
s
*
PublicFilterAPI
)
newLogFilter
(
earliest
,
latest
int64
,
addresses
[]
common
.
Address
,
topics
[][]
common
.
Hash
)
(
int
,
error
)
{
func
(
s
*
PublicFilterAPI
)
newLogFilter
(
earliest
,
latest
int64
,
addresses
[]
common
.
Address
,
topics
[][]
common
.
Hash
,
callback
func
(
log
*
vm
.
Log
,
removed
bool
)
)
(
int
,
error
)
{
s
.
logMu
.
Lock
()
defer
s
.
logMu
.
Unlock
()
...
...
@@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
filter
.
SetAddresses
(
addresses
)
filter
.
SetTopics
(
topics
)
filter
.
LogCallback
=
func
(
log
*
vm
.
Log
,
removed
bool
)
{
if
callback
!=
nil
{
callback
(
log
,
removed
)
}
else
{
s
.
logMu
.
Lock
()
defer
s
.
logMu
.
Unlock
()
if
queue
:=
s
.
logQueue
[
id
];
queue
!=
nil
{
queue
.
add
(
vmlog
{
log
,
removed
})
}
}
}
return
id
,
nil
}
func
(
s
*
PublicFilterAPI
)
Logs
(
ctx
context
.
Context
,
args
NewFilterArgs
)
(
rpc
.
Subscription
,
error
)
{
notifier
,
supported
:=
ctx
.
Value
(
rpc
.
NotifierContextKey
)
.
(
rpc
.
Notifier
)
if
!
supported
{
return
nil
,
rpc
.
ErrNotificationsUnsupported
}
var
(
externalId
string
subscription
rpc
.
Subscription
err
error
)
if
externalId
,
err
=
newFilterId
();
err
!=
nil
{
return
nil
,
err
}
// uninstall filter when subscription is unsubscribed/cancelled
if
subscription
,
err
=
notifier
.
NewSubscription
(
func
(
string
)
{
s
.
UninstallFilter
(
externalId
)
});
err
!=
nil
{
return
nil
,
err
}
notifySubscriber
:=
func
(
log
*
vm
.
Log
,
removed
bool
)
{
rpcLog
:=
toRPCLogs
(
vm
.
Logs
{
log
},
removed
)
if
err
:=
subscription
.
Notify
(
rpcLog
);
err
!=
nil
{
subscription
.
Cancel
()
}
}
// from and to block number are not used since subscriptions don't allow you to travel to "time"
var
id
int
if
len
(
args
.
Addresses
)
>
0
{
id
,
err
=
s
.
newLogFilter
(
-
1
,
-
1
,
args
.
Addresses
,
args
.
Topics
,
notifySubscriber
)
}
else
{
id
,
err
=
s
.
newLogFilter
(
-
1
,
-
1
,
nil
,
args
.
Topics
,
notifySubscriber
)
}
if
err
!=
nil
{
subscription
.
Cancel
()
return
nil
,
err
}
s
.
filterMapMu
.
Lock
()
s
.
filterMapping
[
externalId
]
=
id
s
.
filterMapMu
.
Unlock
()
return
subscription
,
err
}
// NewFilterArgs represents a request to create a new filter.
type
NewFilterArgs
struct
{
FromBlock
rpc
.
BlockNumber
...
...
@@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var
id
int
if
len
(
args
.
Addresses
)
>
0
{
id
,
err
=
s
.
newLogFilter
(
args
.
FromBlock
.
Int64
(),
args
.
ToBlock
.
Int64
(),
args
.
Addresses
,
args
.
Topics
)
id
,
err
=
s
.
newLogFilter
(
args
.
FromBlock
.
Int64
(),
args
.
ToBlock
.
Int64
(),
args
.
Addresses
,
args
.
Topics
,
nil
)
}
else
{
id
,
err
=
s
.
newLogFilter
(
args
.
FromBlock
.
Int64
(),
args
.
ToBlock
.
Int64
(),
nil
,
args
.
Topics
)
id
,
err
=
s
.
newLogFilter
(
args
.
FromBlock
.
Int64
(),
args
.
ToBlock
.
Int64
(),
nil
,
args
.
Topics
,
nil
)
}
if
err
!=
nil
{
return
""
,
err
...
...
node/node.go
View file @
ed92f116
...
...
@@ -303,7 +303,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
glog
.
V
(
logger
.
Error
)
.
Infof
(
"IPC accept failed: %v"
,
err
)
continue
}
go
handler
.
ServeCodec
(
rpc
.
NewJSONCodec
(
conn
))
go
handler
.
ServeCodec
(
rpc
.
NewJSONCodec
(
conn
)
,
rpc
.
OptionMethodInvocation
|
rpc
.
OptionSubscriptions
)
}
}()
// All listeners booted successfully
...
...
rpc/doc.go
View file @
ed92f116
...
...
@@ -68,35 +68,19 @@ The package also supports the publish subscribe pattern through the use of subsc
A method that is considered eligible for notifications must satisfy the following criteria:
- object must be exported
- method must be exported
- first method argument type must be context.Context
- method argument(s) must be exported or builtin types
- method must return the tuple Subscription, error
An example method:
func (s *BlockChainService) Head() (Subscription, error) {
sub := s.bc.eventMux.Subscribe(ChainHeadEvent{})
return v2.NewSubscription(sub), nil
}
This method will push all raised ChainHeadEvents to subscribed clients. If the client is only
interested in every N'th block it is possible to add a criteria.
func (s *BlockChainService) HeadFiltered(nth uint64) (Subscription, error) {
sub := s.bc.eventMux.Subscribe(ChainHeadEvent{})
criteria := func(event interface{}) bool {
chainHeadEvent := event.(ChainHeadEvent)
if chainHeadEvent.Block.NumberU64() % nth == 0 {
return true
}
return false
}
return v2.NewSubscriptionFiltered(sub, criteria), nil
func (s *BlockChainService) NewBlocks(ctx context.Context) (Subscription, error) {
...
}
Subscriptions are deleted when:
- the user sends an unsubscribe request
- the connection which was used to create the subscription is closed
- the connection which was used to create the subscription is closed. This can be initiated
by the client and server. The server will close the connection on an write error or when
the queue of buffered notifications gets too big.
*/
package
rpc
rpc/http.go
View file @
ed92f116
...
...
@@ -126,7 +126,7 @@ func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
// a single request.
codec
:=
NewJSONCodec
(
&
httpReadWriteNopCloser
{
r
.
Body
,
w
})
defer
codec
.
Close
()
srv
.
ServeSingleRequest
(
codec
)
srv
.
ServeSingleRequest
(
codec
,
OptionMethodInvocation
)
}
}
...
...
rpc/inproc.go
View file @
ed92f116
...
...
@@ -39,7 +39,7 @@ func (c *inProcClient) Close() {
// RPC server.
func
NewInProcRPCClient
(
handler
*
Server
)
Client
{
p1
,
p2
:=
net
.
Pipe
()
go
handler
.
ServeCodec
(
NewJSONCodec
(
p1
))
go
handler
.
ServeCodec
(
NewJSONCodec
(
p1
)
,
OptionMethodInvocation
|
OptionSubscriptions
)
return
&
inProcClient
{
handler
,
p2
,
json
.
NewEncoder
(
p2
),
json
.
NewDecoder
(
p2
)}
}
...
...
rpc/json.go
View file @
ed92f116
...
...
@@ -22,7 +22,7 @@ import (
"io"
"reflect"
"strings"
"sync
/atomic
"
"sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
...
...
@@ -82,8 +82,9 @@ type jsonNotification struct {
// and serializing (result) objects.
type
jsonCodec
struct
{
closed
chan
interface
{}
isClosed
int32
closer
sync
.
Once
d
*
json
.
Decoder
muEncoder
sync
.
Mutex
e
*
json
.
Encoder
req
JSONRequest
rw
io
.
ReadWriteCloser
...
...
@@ -93,7 +94,7 @@ type jsonCodec struct {
func
NewJSONCodec
(
rwc
io
.
ReadWriteCloser
)
ServerCodec
{
d
:=
json
.
NewDecoder
(
rwc
)
d
.
UseNumber
()
return
&
jsonCodec
{
closed
:
make
(
chan
interface
{}),
d
:
d
,
e
:
json
.
NewEncoder
(
rwc
),
rw
:
rwc
,
isClosed
:
0
}
return
&
jsonCodec
{
closed
:
make
(
chan
interface
{}),
d
:
d
,
e
:
json
.
NewEncoder
(
rwc
),
rw
:
rwc
}
}
// isBatch returns true when the first non-whitespace characters is '['
...
...
@@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac
// Write message to client
func
(
c
*
jsonCodec
)
Write
(
res
interface
{})
error
{
c
.
muEncoder
.
Lock
()
defer
c
.
muEncoder
.
Unlock
()
return
c
.
e
.
Encode
(
res
)
}
// Close the underlying connection
func
(
c
*
jsonCodec
)
Close
()
{
if
atomic
.
CompareAndSwapInt32
(
&
c
.
isClosed
,
0
,
1
)
{
c
.
closer
.
Do
(
func
(
)
{
close
(
c
.
closed
)
c
.
rw
.
Close
()
}
}
)
}
// Closed returns a channel which will be closed when Close is called
...
...
rpc/notification.go
0 → 100644
View file @
ed92f116
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package
rpc
import
(
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
var
(
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported
=
errors
.
New
(
"notifications not supported"
)
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound
=
errors
.
New
(
"notification not found"
)
// errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
errNotifierStopped
=
errors
.
New
(
"unable to send notification"
)
// errNotificationQueueFull is returns when there are too many notifications in the queue
errNotificationQueueFull
=
errors
.
New
(
"too many pending notifications"
)
)
// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
// notifications that might be pending in the internal queue.
var
unsubSignal
=
new
(
struct
{})
// UnsubscribeCallback defines a callback that is called when a subcription ends.
// It receives the subscription id as argument.
type
UnsubscribeCallback
func
(
id
string
)
// notification is a helper object that holds event data for a subscription
type
notification
struct
{
sub
*
bufferedSubscription
// subscription id
data
interface
{}
// event data
}
// A Notifier type describes the interface for objects that can send create subscriptions
type
Notifier
interface
{
// Create a new subscription. The given callback is called when this subscription
// is cancelled (e.g. client send an unsubscribe, connection closed).
NewSubscription
(
UnsubscribeCallback
)
(
Subscription
,
error
)
// Cancel subscription
Unsubscribe
(
id
string
)
error
}
// Subscription defines the interface for objects that can notify subscribers
type
Subscription
interface
{
// Inform client of an event
Notify
(
data
interface
{})
error
// Unique identifier
ID
()
string
// Cancel subscription
Cancel
()
error
}
// bufferedSubscription is a subscription that uses a bufferedNotifier to send
// notifications to subscribers.
type
bufferedSubscription
struct
{
id
string
unsubOnce
sync
.
Once
// call unsub method once
unsub
UnsubscribeCallback
// called on Unsubscribed
notifier
*
bufferedNotifier
// forward notifications to
pending
chan
interface
{}
// closed when active
flushed
chan
interface
{}
// closed when all buffered notifications are send
lastNotification
time
.
Time
// last time a notification was send
}
// ID returns the subscription identifier that the client uses to refer to this instance.
func
(
s
*
bufferedSubscription
)
ID
()
string
{
return
s
.
id
}
// Cancel informs the notifier that this subscription is cancelled by the API
func
(
s
*
bufferedSubscription
)
Cancel
()
error
{
return
s
.
notifier
.
Unsubscribe
(
s
.
id
)
}
// Notify the subscriber of a particular event.
func
(
s
*
bufferedSubscription
)
Notify
(
data
interface
{})
error
{
return
s
.
notifier
.
send
(
s
.
id
,
data
)
}
// bufferedNotifier is a notifier that queues notifications in an internal queue and
// send them as fast as possible to the client from this queue. It will stop if the
// queue grows past a given size.
type
bufferedNotifier
struct
{
codec
ServerCodec
// underlying connection
mu
sync
.
Mutex
// guard internal state
subscriptions
map
[
string
]
*
bufferedSubscription
// keep track of subscriptions associated with codec
queueSize
int
// max number of items in queue
queue
chan
*
notification
// notification queue
stopped
bool
// indication if this notifier is ordered to stop
}
// newBufferedNotifier returns a notifier that queues notifications in an internal queue
// from which notifications are send as fast as possible to the client. If the queue size
// limit is reached (client is unable to keep up) it will stop and closes the codec.
func
newBufferedNotifier
(
codec
ServerCodec
,
size
int
)
*
bufferedNotifier
{
notifier
:=
&
bufferedNotifier
{
codec
:
codec
,
subscriptions
:
make
(
map
[
string
]
*
bufferedSubscription
),
queue
:
make
(
chan
*
notification
,
size
),
queueSize
:
size
,
}
go
notifier
.
run
()
return
notifier
}
// NewSubscription creates a new subscription that forwards events to this instance internal
// queue. The given callback is called when the subscription is unsubscribed/cancelled.
func
(
n
*
bufferedNotifier
)
NewSubscription
(
callback
UnsubscribeCallback
)
(
Subscription
,
error
)
{
id
,
err
:=
newSubscriptionID
()
if
err
!=
nil
{
return
nil
,
err
}
n
.
mu
.
Lock
()
defer
n
.
mu
.
Unlock
()
if
n
.
stopped
{
return
nil
,
errNotifierStopped
}
sub
:=
&
bufferedSubscription
{
id
:
id
,
unsub
:
callback
,
notifier
:
n
,
pending
:
make
(
chan
interface
{}),
flushed
:
make
(
chan
interface
{}),
lastNotification
:
time
.
Now
(),
}
n
.
subscriptions
[
id
]
=
sub
return
sub
,
nil
}
// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
func
(
n
*
bufferedNotifier
)
Unsubscribe
(
subid
string
)
error
{
n
.
mu
.
Lock
()
sub
,
found
:=
n
.
subscriptions
[
subid
]
n
.
mu
.
Unlock
()
if
found
{
// send the unsubscribe signal, this will cause the notifier not to accept new events
// for this subscription and will close the flushed channel after the last (buffered)
// notification was send to the client.
if
err
:=
n
.
send
(
subid
,
unsubSignal
);
err
!=
nil
{
return
err
}
// wait for confirmation that all (buffered) events are send for this subscription.
// this ensures that the unsubscribe method response is not send before all buffered
// events for this subscription are send.
<-
sub
.
flushed
return
nil
}
return
ErrNotificationNotFound
}
// Send enques the given data for the subscription with public ID on the internal queue. t returns
// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
// will remove the subscription with the given id from the subscription collection.
func
(
n
*
bufferedNotifier
)
send
(
id
string
,
data
interface
{})
error
{
n
.
mu
.
Lock
()
defer
n
.
mu
.
Unlock
()
if
n
.
stopped
{
return
errNotifierStopped
}
var
(
subscription
*
bufferedSubscription
found
bool
)
// check if subscription is associated with this connection, it might be cancelled
// (subscribe/connection closed)
if
subscription
,
found
=
n
.
subscriptions
[
id
];
!
found
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"received notification for unknown subscription %s
\n
"
,
id
)
return
ErrNotificationNotFound
}
// received the unsubscribe signal. Add it to the queue to make sure any pending notifications
// for this subscription are send. When the run loop receives this singal it will signal that
// all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
// send to the user. Remove the subscriptions to make sure new notifications are not accepted.
if
data
==
unsubSignal
{
delete
(
n
.
subscriptions
,
id
)
if
subscription
.
unsub
!=
nil
{
subscription
.
unsubOnce
.
Do
(
func
()
{
subscription
.
unsub
(
id
)
})
}
}
subscription
.
lastNotification
=
time
.
Now
()
if
len
(
n
.
queue
)
>=
n
.
queueSize
{
glog
.
V
(
logger
.
Warn
)
.
Infoln
(
"too many buffered notifications -> close connection"
)
n
.
codec
.
Close
()
return
errNotificationQueueFull
}
n
.
queue
<-
&
notification
{
subscription
,
data
}
return
nil
}
// run reads notifications from the internal queue and sends them to the client. In case of an
// error, or when the codec is closed it will cancel all active subscriptions and returns.
func
(
n
*
bufferedNotifier
)
run
()
{
defer
func
()
{
n
.
mu
.
Lock
()
defer
n
.
mu
.
Unlock
()
n
.
stopped
=
true
close
(
n
.
queue
)
// on exit call unsubscribe callback
for
id
,
sub
:=
range
n
.
subscriptions
{
if
sub
.
unsub
!=
nil
{
sub
.
unsubOnce
.
Do
(
func
()
{
sub
.
unsub
(
id
)
})
}
close
(
sub
.
flushed
)
delete
(
n
.
subscriptions
,
id
)
}
}()
for
{
select
{
case
notification
:=
<-
n
.
queue
:
// It can happen that an event is raised before the RPC server was able to send the sub
// id to the client. Therefore subscriptions are marked as pending until the sub id was
// send. The RPC server will activate the subscription by closing the pending chan.
<-
notification
.
sub
.
pending
if
notification
.
data
==
unsubSignal
{
// unsubSignal is the last accepted message for this subscription. Raise the signal
// that all buffered notifications are sent by closing the flushed channel. This
// indicates that the response for the unsubscribe can be send to the client.
close
(
notification
.
sub
.
flushed
)
}
else
{
msg
:=
n
.
codec
.
CreateNotification
(
notification
.
sub
.
id
,
notification
.
data
)
if
err
:=
n
.
codec
.
Write
(
msg
);
err
!=
nil
{
n
.
codec
.
Close
()
// unable to send notification to client, unsubscribe all subscriptions
glog
.
V
(
logger
.
Warn
)
.
Infof
(
"unable to send notification - %v
\n
"
,
err
)
return
}
}
case
<-
n
.
codec
.
Closed
()
:
// connection was closed
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"codec closed, stop subscriptions"
)
return
}
}
}
// Marks the subscription as active. This will causes the notifications for this subscription to be
// forwarded to the client.
func
(
n
*
bufferedNotifier
)
activate
(
subid
string
)
{
n
.
mu
.
Lock
()
defer
n
.
mu
.
Unlock
()
if
sub
,
found
:=
n
.
subscriptions
[
subid
];
found
{
close
(
sub
.
pending
)
}
}
rpc/notification_test.go
0 → 100644
View file @
ed92f116
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package
rpc
import
(
"encoding/json"
"net"
"testing"
"time"
"golang.org/x/net/context"
)
type
NotificationTestService
struct
{}
var
(
unsubCallbackCalled
=
false
)
func
(
s
*
NotificationTestService
)
Unsubscribe
(
subid
string
)
{
unsubCallbackCalled
=
true
}
func
(
s
*
NotificationTestService
)
SomeSubscription
(
ctx
context
.
Context
,
n
,
val
int
)
(
Subscription
,
error
)
{
notifier
,
supported
:=
ctx
.
Value
(
NotifierContextKey
)
.
(
Notifier
)
if
!
supported
{
return
nil
,
ErrNotificationsUnsupported
}
// by explicitly creating an subscription we make sure that the subscription id is send back to the client
// before the first subscription.Notify is called. Otherwise the events might be send before the response
// for the eth_subscribe method.
subscription
,
err
:=
notifier
.
NewSubscription
(
s
.
Unsubscribe
)
if
err
!=
nil
{
return
nil
,
err
}
go
func
()
{
for
i
:=
0
;
i
<
n
;
i
++
{
if
err
:=
subscription
.
Notify
(
val
+
i
);
err
!=
nil
{
return
}
}
}()
return
subscription
,
nil
}
func
TestNotifications
(
t
*
testing
.
T
)
{
server
:=
NewServer
()
service
:=
&
NotificationTestService
{}
if
err
:=
server
.
RegisterName
(
"eth"
,
service
);
err
!=
nil
{
t
.
Fatalf
(
"unable to register test service %v"
,
err
)
}
clientConn
,
serverConn
:=
net
.
Pipe
()
go
server
.
ServeCodec
(
NewJSONCodec
(
serverConn
),
OptionMethodInvocation
|
OptionSubscriptions
)
out
:=
json
.
NewEncoder
(
clientConn
)
in
:=
json
.
NewDecoder
(
clientConn
)
n
:=
5
val
:=
12345
request
:=
map
[
string
]
interface
{}{
"id"
:
1
,
"method"
:
"eth_subscribe"
,
"version"
:
"2.0"
,
"params"
:
[]
interface
{}{
"someSubscription"
,
n
,
val
},
}
// create subscription
if
err
:=
out
.
Encode
(
request
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
var
subid
string
response
:=
JSONSuccessResponse
{
Result
:
subid
}
if
err
:=
in
.
Decode
(
&
response
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
var
ok
bool
if
subid
,
ok
=
response
.
Result
.
(
string
);
!
ok
{
t
.
Fatalf
(
"expected subscription id, got %T"
,
response
.
Result
)
}
for
i
:=
0
;
i
<
n
;
i
++
{
var
notification
jsonNotification
if
err
:=
in
.
Decode
(
&
notification
);
err
!=
nil
{
t
.
Fatalf
(
"%v"
,
err
)
}
if
int
(
notification
.
Params
.
Result
.
(
float64
))
!=
val
+
i
{
t
.
Fatalf
(
"expected %d, got %d"
,
val
+
i
,
notification
.
Params
.
Result
)
}
}
clientConn
.
Close
()
// causes notification unsubscribe callback to be called
time
.
Sleep
(
1
*
time
.
Second
)
if
!
unsubCallbackCalled
{
t
.
Error
(
"unsubscribe callback not called after closing connection"
)
}
}
rpc/server.go
View file @
ed92f116
...
...
@@ -23,7 +23,6 @@ import (
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
...
...
@@ -33,10 +32,26 @@ import (
const
(
stopPendingRequestTimeout
=
3
*
time
.
Second
// give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
// NotifierContextKey is the key where the notifier associated with the codec is stored in the context
NotifierContextKey
=
1
notificationBufferSize
=
10000
// max buffered notifications before codec is closed
DefaultIPCApis
=
"admin,eth,debug,miner,net,shh,txpool,personal,web3"
DefaultHTTPApis
=
"eth,net,web3"
)
// CodecOption specifies which type of messages this codec supports
type
CodecOption
int
const
(
// OptionMethodInvocation is an indication that the codec supports RPC method calls
OptionMethodInvocation
CodecOption
=
1
<<
iota
// OptionSubscriptions is an indication that the codec suports RPC notifications
OptionSubscriptions
=
1
<<
iota
// support pub sub
)
// NewServer will create a new server instance with no registered handlers.
func
NewServer
()
*
Server
{
server
:=
&
Server
{
...
...
@@ -63,7 +78,7 @@ type RPCService struct {
// Modules returns the list of RPC services with their version number
func
(
s
*
RPCService
)
Modules
()
map
[
string
]
string
{
modules
:=
make
(
map
[
string
]
string
)
for
name
,
_
:=
range
s
.
server
.
services
{
for
name
:=
range
s
.
server
.
services
{
modules
[
name
]
=
"1.0"
}
return
modules
...
...
@@ -92,7 +107,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
if
regsvc
,
present
:=
s
.
services
[
name
];
present
{
methods
,
subscriptions
:=
suitableCallbacks
(
rcvrVal
,
svc
.
typ
)
if
len
(
methods
)
==
0
&&
len
(
subscriptions
)
==
0
{
return
fmt
.
Errorf
(
"Service
doesn't have any suitable methods/subscriptions to expose"
)
return
fmt
.
Errorf
(
"Service
%T doesn't have any suitable methods/subscriptions to expose"
,
rcvr
)
}
for
_
,
m
:=
range
methods
{
...
...
@@ -109,7 +124,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
svc
.
callbacks
,
svc
.
subscriptions
=
suitableCallbacks
(
rcvrVal
,
svc
.
typ
)
if
len
(
svc
.
callbacks
)
==
0
&&
len
(
svc
.
subscriptions
)
==
0
{
return
fmt
.
Errorf
(
"Service
doesn't have any suitable methods/subscriptions to expose"
)
return
fmt
.
Errorf
(
"Service
%T doesn't have any suitable methods/subscriptions to expose"
,
rcvr
)
}
s
.
services
[
svc
.
name
]
=
svc
...
...
@@ -117,12 +132,23 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return
nil
}
// hasOption returns true if option is included in options, otherwise false
func
hasOption
(
option
CodecOption
,
options
[]
CodecOption
)
bool
{
for
_
,
o
:=
range
options
{
if
option
==
o
{
return
true
}
}
return
false
}
// serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec.
//
// If singleShot is true it will process a single request, otherwise it will handle
// requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false.
func
(
s
*
Server
)
serveRequest
(
codec
ServerCodec
,
singleShot
bool
)
error
{
func
(
s
*
Server
)
serveRequest
(
codec
ServerCodec
,
singleShot
bool
,
options
CodecOption
)
error
{
defer
func
()
{
if
err
:=
recover
();
err
!=
nil
{
const
size
=
64
<<
10
...
...
@@ -141,6 +167,12 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
// if the codec supports notification include a notifier that callbacks can use
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if
options
&
OptionSubscriptions
==
OptionSubscriptions
{
ctx
=
context
.
WithValue
(
ctx
,
NotifierContextKey
,
newBufferedNotifier
(
codec
,
notificationBufferSize
))
}
s
.
codecsMu
.
Lock
()
if
atomic
.
LoadInt32
(
&
s
.
run
)
!=
1
{
// server stopped
s
.
codecsMu
.
Unlock
()
...
...
@@ -193,20 +225,16 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed or the server is
// stopped. In either case the codec is closed.
//
// This server will:
// 1. allow for asynchronous and parallel request execution
// 2. supports notifications (pub/sub)
// 3. supports request batches
func
(
s
*
Server
)
ServeCodec
(
codec
ServerCodec
)
{
func
(
s
*
Server
)
ServeCodec
(
codec
ServerCodec
,
options
CodecOption
)
{
defer
codec
.
Close
()
s
.
serveRequest
(
codec
,
false
)
s
.
serveRequest
(
codec
,
false
,
options
)
}
// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
// close the codec unless a non-recoverable error has occurred.
func
(
s
*
Server
)
ServeSingleRequest
(
codec
ServerCodec
)
{
s
.
serveRequest
(
codec
,
true
)
// close the codec unless a non-recoverable error has occurred. Note, this method will return after
// a single request has been processed!
func
(
s
*
Server
)
ServeSingleRequest
(
codec
ServerCodec
,
options
CodecOption
)
{
s
.
serveRequest
(
codec
,
true
,
options
)
}
// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,
...
...
@@ -225,122 +253,64 @@ func (s *Server) Stop() {
}
}
// sendNotification will create a notification from the given event by serializing member fields of the event.
// It will then send the notification to the client, when it fails the codec is closed. When the event has multiple
// fields an array of values is returned.
func
sendNotification
(
codec
ServerCodec
,
subid
string
,
event
interface
{})
{
notification
:=
codec
.
CreateNotification
(
subid
,
event
)
if
err
:=
codec
.
Write
(
notification
);
err
!=
nil
{
codec
.
Close
()
}
}
// createSubscription will register a new subscription and waits for raised events. When an event is raised it will:
// 1. test if the event is raised matches the criteria the user has (optionally) specified
// 2. create a notification of the event and send it the client when it matches the criteria
// It will unsubscribe the subscription when the socket is closed or the subscription is unsubscribed by the user.
func
(
s
*
Server
)
createSubscription
(
c
ServerCodec
,
req
*
serverRequest
)
(
string
,
error
)
{
args
:=
[]
reflect
.
Value
{
req
.
callb
.
rcvr
}
if
len
(
req
.
args
)
>
0
{
// createSubscription will call the subscription callback and returns the subscription id or error.
func
(
s
*
Server
)
createSubscription
(
ctx
context
.
Context
,
c
ServerCodec
,
req
*
serverRequest
)
(
string
,
error
)
{
// subscription have as first argument the context following optional arguments
args
:=
[]
reflect
.
Value
{
req
.
callb
.
rcvr
,
reflect
.
ValueOf
(
ctx
)}
args
=
append
(
args
,
req
.
args
...
)
}
subid
,
err
:=
newSubscriptionId
()
if
err
!=
nil
{
return
""
,
err
}
reply
:=
req
.
callb
.
method
.
Func
.
Call
(
args
)
if
reply
[
1
]
.
IsNil
()
{
// no error
if
subscription
,
ok
:=
reply
[
0
]
.
Interface
()
.
(
Subscription
);
ok
{
s
.
muSubcriptions
.
Lock
()
s
.
subscriptions
[
subid
]
=
subscription
s
.
muSubcriptions
.
Unlock
()
go
func
()
{
cases
:=
[]
reflect
.
SelectCase
{
reflect
.
SelectCase
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
subscription
.
Chan
())},
// new event
reflect
.
SelectCase
{
Dir
:
reflect
.
SelectRecv
,
Chan
:
reflect
.
ValueOf
(
c
.
Closed
())},
// connection closed
if
!
reply
[
1
]
.
IsNil
()
{
// subscription creation failed
return
""
,
reply
[
1
]
.
Interface
()
.
(
error
)
}
for
{
idx
,
notification
,
recvOk
:=
reflect
.
Select
(
cases
)
switch
idx
{
case
0
:
// new event, or channel closed
if
recvOk
{
// send notification
if
event
,
ok
:=
notification
.
Interface
()
.
(
*
event
.
Event
);
ok
{
if
subscription
.
match
==
nil
||
subscription
.
match
(
event
.
Data
)
{
sendNotification
(
c
,
subid
,
subscription
.
format
(
event
.
Data
))
}
}
}
else
{
// user send an eth_unsubscribe request
return
}
case
1
:
// connection closed
s
.
unsubscribe
(
subid
)
return
}
}
}()
}
else
{
// unable to create subscription
s
.
muSubcriptions
.
Lock
()
delete
(
s
.
subscriptions
,
subid
)
s
.
muSubcriptions
.
Unlock
()
}
}
else
{
return
""
,
fmt
.
Errorf
(
"Unable to create subscription"
)
}
return
subid
,
nil
}
// unsubscribe calls the Unsubscribe method on the subscription and removes a subscription from the subscription
// registry.
func
(
s
*
Server
)
unsubscribe
(
subid
string
)
bool
{
s
.
muSubcriptions
.
Lock
()
defer
s
.
muSubcriptions
.
Unlock
()
if
sub
,
ok
:=
s
.
subscriptions
[
subid
];
ok
{
sub
.
Unsubscribe
()
delete
(
s
.
subscriptions
,
subid
)
return
true
}
return
false
return
reply
[
0
]
.
Interface
()
.
(
Subscription
)
.
ID
(),
nil
}
// handle executes a request and returns the response from the callback.
func
(
s
*
Server
)
handle
(
ctx
context
.
Context
,
codec
ServerCodec
,
req
*
serverRequest
)
interface
{}
{
func
(
s
*
Server
)
handle
(
ctx
context
.
Context
,
codec
ServerCodec
,
req
*
serverRequest
)
(
interface
{},
func
())
{
if
req
.
err
!=
nil
{
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
req
.
err
)
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
req
.
err
)
,
nil
}
if
req
.
isUnsubscribe
{
// first param must be the subscription id
if
req
.
isUnsubscribe
{
//
cancel subscription,
first param must be the subscription id
if
len
(
req
.
args
)
>=
1
&&
req
.
args
[
0
]
.
Kind
()
==
reflect
.
String
{
notifier
,
supported
:=
ctx
.
Value
(
NotifierContextKey
)
.
(
*
bufferedNotifier
)
if
!
supported
{
// interface doesn't support subscriptions (e.g. http)
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
callbackError
{
ErrNotificationsUnsupported
.
Error
()}),
nil
}
subid
:=
req
.
args
[
0
]
.
String
()
if
s
.
unsubscribe
(
subid
)
{
return
codec
.
CreateResponse
(
req
.
id
,
true
)
}
else
{
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
callbackError
{
fmt
.
Sprintf
(
"subscription '%s' not found"
,
subid
)})
if
err
:=
notifier
.
Unsubscribe
(
subid
);
err
!=
nil
{
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
callbackError
{
err
.
Error
()}),
nil
}
return
codec
.
CreateResponse
(
req
.
id
,
true
),
nil
}
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
invalidParamsError
{
"Expected subscription id as
argument"
})
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
invalidParamsError
{
"Expected subscription id as
first argument"
}),
nil
}
if
req
.
callb
.
isSubscribe
{
subid
,
err
:=
s
.
createSubscription
(
codec
,
req
)
subid
,
err
:=
s
.
createSubscription
(
c
tx
,
c
odec
,
req
)
if
err
!=
nil
{
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
callbackError
{
err
.
Error
()})
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
callbackError
{
err
.
Error
()})
,
nil
}
return
codec
.
CreateResponse
(
req
.
id
,
subid
)
// active the subscription after the sub id was successful sent to the client
activateSub
:=
func
()
{
notifier
,
_
:=
ctx
.
Value
(
NotifierContextKey
)
.
(
*
bufferedNotifier
)
notifier
.
activate
(
subid
)
}
// regular RPC call
return
codec
.
CreateResponse
(
req
.
id
,
subid
),
activateSub
}
// regular RPC call, prepare arguments
if
len
(
req
.
args
)
!=
len
(
req
.
callb
.
argTypes
)
{
rpcErr
:=
&
invalidParamsError
{
fmt
.
Sprintf
(
"%s%s%s expects %d parameters, got %d"
,
req
.
svcname
,
serviceMethodSeparator
,
req
.
callb
.
method
.
Name
,
len
(
req
.
callb
.
argTypes
),
len
(
req
.
args
))}
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
rpcErr
)
return
codec
.
CreateErrorResponse
(
&
req
.
id
,
rpcErr
)
,
nil
}
arguments
:=
[]
reflect
.
Value
{
req
.
callb
.
rcvr
}
...
...
@@ -351,45 +321,56 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
arguments
=
append
(
arguments
,
req
.
args
...
)
}
// execute RPC method and return result
reply
:=
req
.
callb
.
method
.
Func
.
Call
(
arguments
)
if
len
(
reply
)
==
0
{
return
codec
.
CreateResponse
(
req
.
id
,
nil
)
return
codec
.
CreateResponse
(
req
.
id
,
nil
)
,
nil
}
if
req
.
callb
.
errPos
>=
0
{
// test if method returned an error
if
!
reply
[
req
.
callb
.
errPos
]
.
IsNil
()
{
e
:=
reply
[
req
.
callb
.
errPos
]
.
Interface
()
.
(
error
)
res
:=
codec
.
CreateErrorResponse
(
&
req
.
id
,
&
callbackError
{
e
.
Error
()})
return
res
return
res
,
nil
}
}
return
codec
.
CreateResponse
(
req
.
id
,
reply
[
0
]
.
Interface
())
return
codec
.
CreateResponse
(
req
.
id
,
reply
[
0
]
.
Interface
())
,
nil
}
// exec executes the given request and writes the result back using the codec.
func
(
s
*
Server
)
exec
(
ctx
context
.
Context
,
codec
ServerCodec
,
req
*
serverRequest
)
{
var
response
interface
{}
var
callback
func
()
if
req
.
err
!=
nil
{
response
=
codec
.
CreateErrorResponse
(
&
req
.
id
,
req
.
err
)
}
else
{
response
=
s
.
handle
(
ctx
,
codec
,
req
)
response
,
callback
=
s
.
handle
(
ctx
,
codec
,
req
)
}
if
err
:=
codec
.
Write
(
response
);
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"%v
\n
"
,
err
)
codec
.
Close
()
}
// when request was a subscribe request this allows these subscriptions to be actived
if
callback
!=
nil
{
callback
()
}
}
// execBatch executes the given requests and writes the result back using the codec.
It will only write the response
// back when the last request is processed.
// execBatch executes the given requests and writes the result back using the codec.
//
It will only write the response
back when the last request is processed.
func
(
s
*
Server
)
execBatch
(
ctx
context
.
Context
,
codec
ServerCodec
,
requests
[]
*
serverRequest
)
{
responses
:=
make
([]
interface
{},
len
(
requests
))
var
callbacks
[]
func
()
for
i
,
req
:=
range
requests
{
if
req
.
err
!=
nil
{
responses
[
i
]
=
codec
.
CreateErrorResponse
(
&
req
.
id
,
req
.
err
)
}
else
{
responses
[
i
]
=
s
.
handle
(
ctx
,
codec
,
req
)
var
callback
func
()
if
responses
[
i
],
callback
=
s
.
handle
(
ctx
,
codec
,
req
);
callback
!=
nil
{
callbacks
=
append
(
callbacks
,
callback
)
}
}
}
...
...
@@ -397,11 +378,16 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
glog
.
V
(
logger
.
Error
)
.
Infof
(
"%v
\n
"
,
err
)
codec
.
Close
()
}
// when request holds one of more subscribe requests this allows these subscriptions to be actived
for
_
,
c
:=
range
callbacks
{
c
()
}
}
// readRequest requests the next (batch) request from the codec. It will return the collection
of requests, an
//
indication if the request was a batch, the invalid request identifier and an error when the request could not be
// read/parsed.
// readRequest requests the next (batch) request from the codec. It will return the collection
//
of requests, an indication if the request was a batch, the invalid request identifier and an
//
error when the request could not be
read/parsed.
func
(
s
*
Server
)
readRequest
(
codec
ServerCodec
)
([]
*
serverRequest
,
bool
,
RPCError
)
{
reqs
,
batch
,
err
:=
codec
.
ReadRequestHeaders
()
if
err
!=
nil
{
...
...
@@ -417,7 +403,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
if
r
.
isPubSub
&&
r
.
method
==
unsubscribeMethod
{
requests
[
i
]
=
&
serverRequest
{
id
:
r
.
id
,
isUnsubscribe
:
true
}
argTypes
:=
[]
reflect
.
Type
{
reflect
.
TypeOf
(
""
)}
argTypes
:=
[]
reflect
.
Type
{
reflect
.
TypeOf
(
""
)}
// expect subscription id as first arg
if
args
,
err
:=
codec
.
ParseRequestArguments
(
argTypes
,
r
.
params
);
err
==
nil
{
requests
[
i
]
.
args
=
args
}
else
{
...
...
@@ -426,12 +412,12 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
continue
}
if
svc
,
ok
=
s
.
services
[
r
.
service
];
!
ok
{
if
svc
,
ok
=
s
.
services
[
r
.
service
];
!
ok
{
// rpc method isn't available
requests
[
i
]
=
&
serverRequest
{
id
:
r
.
id
,
err
:
&
methodNotFoundError
{
r
.
service
,
r
.
method
}}
continue
}
if
r
.
isPubSub
{
// eth_subscribe
if
r
.
isPubSub
{
// eth_subscribe
, r.method contains the subscription method name
if
callb
,
ok
:=
svc
.
subscriptions
[
r
.
method
];
ok
{
requests
[
i
]
=
&
serverRequest
{
id
:
r
.
id
,
svcname
:
svc
.
name
,
callb
:
callb
}
if
r
.
params
!=
nil
&&
len
(
callb
.
argTypes
)
>
0
{
...
...
@@ -449,7 +435,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
continue
}
if
callb
,
ok
:=
svc
.
callbacks
[
r
.
method
];
ok
{
if
callb
,
ok
:=
svc
.
callbacks
[
r
.
method
];
ok
{
// lookup RPC method
requests
[
i
]
=
&
serverRequest
{
id
:
r
.
id
,
svcname
:
svc
.
name
,
callb
:
callb
}
if
r
.
params
!=
nil
&&
len
(
callb
.
argTypes
)
>
0
{
if
args
,
err
:=
codec
.
ParseRequestArguments
(
callb
.
argTypes
,
r
.
params
);
err
==
nil
{
...
...
rpc/server_test.go
View file @
ed92f116
...
...
@@ -65,8 +65,12 @@ func (s *Service) InvalidRets3() (string, string, error) {
return
""
,
""
,
nil
}
func
(
s
*
Service
)
Subscription
()
(
Subscription
,
error
)
{
return
NewSubscription
(
nil
),
nil
func
(
s
*
Service
)
Subscription
(
ctx
context
.
Context
)
(
Subscription
,
error
)
{
return
nil
,
nil
}
func
(
s
*
Service
)
SubsriptionWithArgs
(
ctx
context
.
Context
,
a
,
b
int
)
(
Subscription
,
error
)
{
return
nil
,
nil
}
func
TestServerRegisterName
(
t
*
testing
.
T
)
{
...
...
@@ -90,8 +94,8 @@ func TestServerRegisterName(t *testing.T) {
t
.
Errorf
(
"Expected 4 callbacks for service 'calc', got %d"
,
len
(
svc
.
callbacks
))
}
if
len
(
svc
.
subscriptions
)
!=
1
{
t
.
Errorf
(
"Expected
1 subscription
for service 'calc', got %d"
,
len
(
svc
.
subscriptions
))
if
len
(
svc
.
subscriptions
)
!=
2
{
t
.
Errorf
(
"Expected
2 subscriptions
for service 'calc', got %d"
,
len
(
svc
.
subscriptions
))
}
}
...
...
@@ -229,7 +233,7 @@ func TestServerMethodExecution(t *testing.T) {
input
,
_
:=
json
.
Marshal
(
&
req
)
codec
:=
&
ServerTestCodec
{
input
:
input
,
closer
:
make
(
chan
interface
{})}
go
server
.
ServeCodec
(
codec
)
go
server
.
ServeCodec
(
codec
,
OptionMethodInvocation
)
<-
codec
.
closer
...
...
@@ -259,7 +263,7 @@ func TestServerMethodWithCtx(t *testing.T) {
input
,
_
:=
json
.
Marshal
(
&
req
)
codec
:=
&
ServerTestCodec
{
input
:
input
,
closer
:
make
(
chan
interface
{})}
go
server
.
ServeCodec
(
codec
)
go
server
.
ServeCodec
(
codec
,
OptionMethodInvocation
)
<-
codec
.
closer
...
...
rpc/types.go
View file @
ed92f116
...
...
@@ -24,7 +24,6 @@ import (
"strings"
"sync"
"github.com/ethereum/go-ethereum/event"
"gopkg.in/fatih/set.v0"
)
...
...
@@ -69,7 +68,7 @@ type serverRequest struct {
type
serviceRegistry
map
[
string
]
*
service
// collection of services
type
callbacks
map
[
string
]
*
callback
// collection of RPC callbacks
type
subscriptions
map
[
string
]
*
callback
// collection of subscription callbacks
type
subscriptionRegistry
map
[
string
]
Subscription
// collection of subscription
s
type
subscriptionRegistry
map
[
string
]
*
callback
// collection of subscription callback
s
// Server represents a RPC server
type
Server
struct
{
...
...
@@ -123,51 +122,6 @@ type ServerCodec interface {
Closed
()
<-
chan
interface
{}
}
// SubscriptionMatcher returns true if the given value matches the criteria specified by the user
type
SubscriptionMatcher
func
(
interface
{})
bool
// SubscriptionOutputFormat accepts event data and has the ability to format the data before it is send to the client
type
SubscriptionOutputFormat
func
(
interface
{})
interface
{}
// defaultSubscriptionOutputFormatter returns data and is used as default output format for notifications
func
defaultSubscriptionOutputFormatter
(
data
interface
{})
interface
{}
{
return
data
}
// Subscription is used by the server to send notifications to the client
type
Subscription
struct
{
sub
event
.
Subscription
match
SubscriptionMatcher
format
SubscriptionOutputFormat
}
// NewSubscription create a new RPC subscription
func
NewSubscription
(
sub
event
.
Subscription
)
Subscription
{
return
Subscription
{
sub
,
nil
,
defaultSubscriptionOutputFormatter
}
}
// NewSubscriptionWithOutputFormat create a new RPC subscription which a custom notification output format
func
NewSubscriptionWithOutputFormat
(
sub
event
.
Subscription
,
formatter
SubscriptionOutputFormat
)
Subscription
{
return
Subscription
{
sub
,
nil
,
formatter
}
}
// NewSubscriptionFiltered will create a new subscription. For each raised event the given matcher is
// called. If it returns true the event is send as notification to the client, otherwise it is ignored.
func
NewSubscriptionFiltered
(
sub
event
.
Subscription
,
match
SubscriptionMatcher
)
Subscription
{
return
Subscription
{
sub
,
match
,
defaultSubscriptionOutputFormatter
}
}
// Chan returns the channel where new events will be published. It's up the user to call the matcher to
// determine if the events are interesting for the client.
func
(
s
*
Subscription
)
Chan
()
<-
chan
*
event
.
Event
{
return
s
.
sub
.
Chan
()
}
// Unsubscribe will end the subscription and closes the event channel
func
(
s
*
Subscription
)
Unsubscribe
()
{
s
.
sub
.
Unsubscribe
()
}
// HexNumber serializes a number to hex format using the "%#x" format
type
HexNumber
big
.
Int
...
...
rpc/utils.go
View file @
ed92f116
...
...
@@ -45,6 +45,16 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
return
isExported
(
t
.
Name
())
||
t
.
PkgPath
()
==
""
}
var
contextType
=
reflect
.
TypeOf
((
*
context
.
Context
)(
nil
))
.
Elem
()
// isContextType returns an indication if the given t is of context.Context or *context.Context type
func
isContextType
(
t
reflect
.
Type
)
bool
{
for
t
.
Kind
()
==
reflect
.
Ptr
{
t
=
t
.
Elem
()
}
return
t
==
contextType
}
var
errorType
=
reflect
.
TypeOf
((
*
error
)(
nil
))
.
Elem
()
// Implements this type the error interface
...
...
@@ -57,6 +67,7 @@ func isErrorType(t reflect.Type) bool {
var
subscriptionType
=
reflect
.
TypeOf
((
*
Subscription
)(
nil
))
.
Elem
()
// isSubscriptionType returns an indication if the given t is of Subscription or *Subscription type
func
isSubscriptionType
(
t
reflect
.
Type
)
bool
{
for
t
.
Kind
()
==
reflect
.
Ptr
{
t
=
t
.
Elem
()
...
...
@@ -64,12 +75,17 @@ func isSubscriptionType(t reflect.Type) bool {
return
t
==
subscriptionType
}
// isPubSub tests whether the given method return the pair (v2.Subscription, error)
// isPubSub tests whether the given method has as as first argument a context.Context
// and returns the pair (Subscription, error)
func
isPubSub
(
methodType
reflect
.
Type
)
bool
{
if
methodType
.
NumOut
()
!=
2
{
// numIn(0) is the receiver type
if
methodType
.
NumIn
()
<
2
||
methodType
.
NumOut
()
!=
2
{
return
false
}
return
isSubscriptionType
(
methodType
.
Out
(
0
))
&&
isErrorType
(
methodType
.
Out
(
1
))
return
isContextType
(
methodType
.
In
(
1
))
&&
isSubscriptionType
(
methodType
.
Out
(
0
))
&&
isErrorType
(
methodType
.
Out
(
1
))
}
// formatName will convert to first character to lower case
...
...
@@ -110,8 +126,6 @@ func isBlockNumber(t reflect.Type) bool {
return
t
==
blockNumberType
}
var
contextType
=
reflect
.
TypeOf
(
new
(
context
.
Context
))
.
Elem
()
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
...
...
@@ -205,7 +219,7 @@ METHODS:
return
callbacks
,
subscriptions
}
func
newSubscriptionI
d
()
(
string
,
error
)
{
func
newSubscriptionI
D
()
(
string
,
error
)
{
var
subid
[
16
]
byte
n
,
_
:=
rand
.
Read
(
subid
[
:
])
if
n
!=
16
{
...
...
rpc/websocket.go
View file @
ed92f116
...
...
@@ -93,7 +93,8 @@ func NewWSServer(cors string, handler *Server) *http.Server {
Handler
:
websocket
.
Server
{
Handshake
:
wsHandshakeValidator
(
strings
.
Split
(
cors
,
","
)),
Handler
:
func
(
conn
*
websocket
.
Conn
)
{
handler
.
ServeCodec
(
NewJSONCodec
(
&
wsReaderWriterCloser
{
conn
}))
handler
.
ServeCodec
(
NewJSONCodec
(
&
wsReaderWriterCloser
{
conn
}),
OptionMethodInvocation
|
OptionSubscriptions
)
},
},
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment