Commit 771655e3 authored by bas-vk's avatar bas-vk Committed by GitHub

Merge pull request #2808 from fjl/rpc-client-3

rpc: add new client, use it everywhere
parents 60cd5bf9 91b76904
......@@ -17,11 +17,7 @@
package backends
import (
"encoding/json"
"fmt"
"math/big"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
......@@ -37,119 +33,34 @@ var _ bind.ContractBackend = (*rpcBackend)(nil)
// rpcBackend implements bind.ContractBackend, and acts as the data provider to
// Ethereum contracts bound to Go structs. It uses an RPC connection to delegate
// all its functionality.
//
// Note: The current implementation is a blocking one. This should be replaced
// by a proper async version when a real RPC client is created.
type rpcBackend struct {
client rpc.Client // RPC client connection to interact with an API server
autoid uint32 // ID number to use for the next API request
lock sync.Mutex // Singleton access until we get to request multiplexing
client *rpc.Client // RPC client connection to interact with an API server
}
// NewRPCBackend creates a new binding backend to an RPC provider that can be
// used to interact with remote contracts.
func NewRPCBackend(client rpc.Client) bind.ContractBackend {
return &rpcBackend{
client: client,
}
}
// request is a JSON RPC request package assembled internally from the client
// method calls.
type request struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Method string `json:"method"` // Remote procedure name to invoke on the server
Params []interface{} `json:"params"` // List of parameters to pass through (keep types simple)
}
// response is a JSON RPC response package sent back from the API server.
type response struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Error *failure `json:"error"` // Any error returned by the remote side
Result json.RawMessage `json:"result"` // Whatever the remote side sends us in reply
}
// failure is a JSON RPC response error field sent back from the API server.
type failure struct {
Code int `json:"code"` // JSON RPC error code associated with the failure
Message string `json:"message"` // Specific error message of the failure
}
// request forwards an API request to the RPC server, and parses the response.
//
// This is currently painfully non-concurrent, but it will have to do until we
// find the time for niceties like this :P
func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) {
b.lock.Lock()
defer b.lock.Unlock()
if ctx == nil {
ctx = context.Background()
}
// Ugly hack to serialize an empty list properly
if params == nil {
params = []interface{}{}
}
// Assemble the request object
reqID := int(atomic.AddUint32(&b.autoid, 1))
req := &request{
JSONRPC: "2.0",
ID: reqID,
Method: method,
Params: params,
}
if err := b.client.Send(req); err != nil {
return nil, err
}
res := new(response)
errc := make(chan error, 1)
go func() {
errc <- b.client.Recv(res)
}()
select {
case err := <-errc:
if err != nil {
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
if res.Error != nil {
if res.Error.Message == bind.ErrNoCode.Error() {
return nil, bind.ErrNoCode
}
return nil, fmt.Errorf("remote error: %s", res.Error.Message)
}
return res.Result, nil
func NewRPCBackend(client *rpc.Client) bind.ContractBackend {
return &rpcBackend{client: client}
}
// HasCode implements ContractVerifier.HasCode by retrieving any code associated
// with the contract from the remote node, and checking its size.
func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) {
// Execute the RPC code retrieval
block := "latest"
if pending {
block = "pending"
}
res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block})
if err != nil {
return false, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
err := b.client.CallContext(ctx, &hex, "eth_getCode", contract, block)
if err != nil {
return false, err
}
// Convert the response back to a Go byte slice and return
return len(common.FromHex(hex)) > 0, nil
}
// ContractCall implements ContractCaller.ContractCall, delegating the execution of
// a contract call to the remote node, returning the reply to for local processing.
func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) {
// Pack up the request into an RPC argument
args := struct {
To common.Address `json:"to"`
Data string `json:"data"`
......@@ -157,63 +68,43 @@ func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address,
To: contract,
Data: common.ToHex(data),
}
// Execute the RPC call and retrieve the response
block := "latest"
if pending {
block = "pending"
}
res, err := b.request(ctx, "eth_call", []interface{}{args, block})
if err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
err := b.client.CallContext(ctx, &hex, "eth_call", args, block)
if err != nil {
return nil, err
}
// Convert the response back to a Go byte slice and return
return common.FromHex(hex), nil
}
// PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating
// the current account nonce retrieval to the remote node.
func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) {
res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"})
var hex rpc.HexNumber
err := b.client.CallContext(ctx, &hex, "eth_getTransactionCount", account.Hex(), "pending")
if err != nil {
return 0, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return 0, err
}
nonce, ok := new(big.Int).SetString(hex, 0)
if !ok {
return 0, fmt.Errorf("invalid nonce hex: %s", hex)
}
return nonce.Uint64(), nil
return hex.Uint64(), nil
}
// SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the
// gas price oracle request to the remote node.
func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
res, err := b.request(ctx, "eth_gasPrice", nil)
if err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
var hex rpc.HexNumber
if err := b.client.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
return nil, err
}
price, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid price hex: %s", hex)
}
return price, nil
return (*big.Int)(&hex), nil
}
// EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating
// the gas estimation to the remote node.
func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) {
// Pack up the request into an RPC argument
args := struct {
From common.Address `json:"from"`
To *common.Address `json:"to"`
......@@ -226,19 +117,12 @@ func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address
Value: rpc.NewHexNumber(value),
}
// Execute the RPC call and retrieve the response
res, err := b.request(ctx, "eth_estimateGas", []interface{}{args})
var hex rpc.HexNumber
err := b.client.CallContext(ctx, &hex, "eth_estimateGas", args)
if err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
estimate, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid estimate hex: %s", hex)
}
return estimate, nil
return (*big.Int)(&hex), nil
}
// SendTransaction implements ContractTransactor.SendTransaction, delegating the
......@@ -248,13 +132,5 @@ func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction)
if err != nil {
return err
}
res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)})
if err != nil {
return err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return err
}
return nil
return b.client.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
}
......@@ -19,9 +19,12 @@ package main
import (
"os"
"os/signal"
"strings"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/console"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1"
)
......@@ -99,7 +102,7 @@ func localConsole(ctx *cli.Context) error {
// console to it.
func remoteConsole(ctx *cli.Context) error {
// Attach to a remotely running geth instance and start the JavaScript console
client, err := utils.NewRemoteRPCClient(ctx)
client, err := dialRPC(ctx.Args().First())
if err != nil {
utils.Fatalf("Unable to attach to remote geth: %v", err)
}
......@@ -127,6 +130,20 @@ func remoteConsole(ctx *cli.Context) error {
return nil
}
// dialRPC returns a RPC client which connects to the given endpoint.
// The check for empty endpoint implements the defaulting logic
// for "geth attach" and "geth monitor" with no argument.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint()
} else if strings.HasPrefix(endpoint, "rpc:") || strings.HasPrefix(endpoint, "ipc:") {
// Backwards compatibility with geth < 1.5 which required
// these prefixes.
endpoint = endpoint[4:]
}
return rpc.Dial(endpoint)
}
// ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript
// console to it, and each of the files specified as arguments and tears the
// everything down.
......
......@@ -21,11 +21,10 @@ import (
"math"
"reflect"
"runtime"
"sort"
"strings"
"time"
"sort"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
......@@ -36,7 +35,7 @@ import (
var (
monitorCommandAttachFlag = cli.StringFlag{
Name: "attach",
Value: "ipc:" + node.DefaultIPCEndpoint(),
Value: node.DefaultIPCEndpoint(),
Usage: "API endpoint to attach to",
}
monitorCommandRowsFlag = cli.IntFlag{
......@@ -69,12 +68,12 @@ to display multiple metrics simultaneously.
// monitor starts a terminal UI based monitoring tool for the requested metrics.
func monitor(ctx *cli.Context) error {
var (
client rpc.Client
client *rpc.Client
err error
)
// Attach to an Ethereum node over IPC or RPC
endpoint := ctx.String(monitorCommandAttachFlag.Name)
if client, err = utils.NewRemoteRPCClientFromString(endpoint); err != nil {
if client, err = dialRPC(endpoint); err != nil {
utils.Fatalf("Unable to attach to geth node: %v", err)
}
defer client.Close()
......@@ -159,30 +158,10 @@ func monitor(ctx *cli.Context) error {
// retrieveMetrics contacts the attached geth node and retrieves the entire set
// of collected system metrics.
func retrieveMetrics(client rpc.Client) (map[string]interface{}, error) {
req := map[string]interface{}{
"id": new(int64),
"method": "debug_metrics",
"jsonrpc": "2.0",
"params": []interface{}{true},
}
if err := client.Send(req); err != nil {
return nil, err
}
var res rpc.JSONSuccessResponse
if err := client.Recv(&res); err != nil {
return nil, err
}
if res.Result != nil {
if mets, ok := res.Result.(map[string]interface{}); ok {
return mets, nil
}
}
return nil, fmt.Errorf("unable to retrieve metrics")
func retrieveMetrics(client *rpc.Client) (map[string]interface{}, error) {
var metrics map[string]interface{}
err := client.Call(&metrics, "debug_metrics", true)
return metrics, err
}
// resolveMetrics takes a list of input metric patterns, and resolves each to one
......@@ -270,7 +249,7 @@ func fetchMetric(metrics map[string]interface{}, metric string) float64 {
// refreshCharts retrieves a next batch of metrics, and inserts all the new
// values into the active datasets and charts
func refreshCharts(client rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
func refreshCharts(client *rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
values, err := retrieveMetrics(client)
for i, metric := range metrics {
if len(data) < 512 {
......
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"fmt"
"strings"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1"
)
// NewRemoteRPCClient returns a RPC client which connects to a running geth instance.
// Depending on the given context this can either be a IPC or a HTTP client.
func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) {
if ctx.Args().Present() {
endpoint := ctx.Args().First()
return NewRemoteRPCClientFromString(endpoint)
}
// use IPC by default
return rpc.NewIPCClient(node.DefaultIPCEndpoint())
}
// NewRemoteRPCClientFromString returns a RPC client which connects to the given
// endpoint. It must start with either `ipc:` or `rpc:` (HTTP).
func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) {
if strings.HasPrefix(endpoint, "ipc:") {
return rpc.NewIPCClient(endpoint[4:])
}
if strings.HasPrefix(endpoint, "rpc:") {
return rpc.NewHTTPClient(endpoint[4:])
}
if strings.HasPrefix(endpoint, "http://") {
return rpc.NewHTTPClient(endpoint)
}
if strings.HasPrefix(endpoint, "ws:") {
return rpc.NewWSClient(endpoint)
}
return nil, fmt.Errorf("invalid endpoint")
}
......@@ -31,13 +31,13 @@ import (
// bridge is a collection of JavaScript utility methods to bride the .js runtime
// environment and the Go RPC connection backing the remote method calls.
type bridge struct {
client rpc.Client // RPC client to execute Ethereum requests through
client *rpc.Client // RPC client to execute Ethereum requests through
prompter UserPrompter // Input prompter to allow interactive user feedback
printer io.Writer // Output writer to serialize any display strings to
}
// newBridge creates a new JavaScript wrapper around an RPC client.
func newBridge(client rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
func newBridge(client *rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
return &bridge{
client: client,
prompter: prompter,
......@@ -188,88 +188,86 @@ func (b *bridge) SleepBlocks(call otto.FunctionCall) (response otto.Value) {
return otto.FalseValue()
}
// Send will serialize the first argument, send it to the node and returns the response.
type jsonrpcCall struct {
Id int64
Method string
Params []interface{}
}
// Send implements the web3 provider "send" method.
func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) {
// Ensure that we've got a batch request (array) or a single request (object)
arg := call.Argument(0).Object()
if arg == nil || (arg.Class() != "Array" && arg.Class() != "Object") {
throwJSException("request must be an object or array")
}
// Convert the otto VM arguments to Go values
data, err := call.Otto.Call("JSON.stringify", nil, arg)
// Remarshal the request into a Go value.
JSON, _ := call.Otto.Object("JSON")
reqVal, err := JSON.Call("stringify", call.Argument(0))
if err != nil {
throwJSException(err.Error())
}
reqjson, err := data.ToString()
if err != nil {
throwJSException(err.Error())
}
var (
reqs []rpc.JSONRequest
batch = true
rawReq = []byte(reqVal.String())
reqs []jsonrpcCall
batch bool
)
if err = json.Unmarshal([]byte(reqjson), &reqs); err != nil {
// single request?
reqs = make([]rpc.JSONRequest, 1)
if err = json.Unmarshal([]byte(reqjson), &reqs[0]); err != nil {
throwJSException("invalid request")
}
if rawReq[0] == '[' {
batch = true
json.Unmarshal(rawReq, &reqs)
} else {
batch = false
reqs = make([]jsonrpcCall, 1)
json.Unmarshal(rawReq, &reqs[0])
}
// Iteratively execute the requests
call.Otto.Set("response_len", len(reqs))
call.Otto.Run("var ret_response = new Array(response_len);")
for i, req := range reqs {
// Execute the RPC request and parse the reply
if err = b.client.Send(&req); err != nil {
return newErrorResponse(call, -32603, err.Error(), req.Id)
}
result := make(map[string]interface{})
if err = b.client.Recv(&result); err != nil {
return newErrorResponse(call, -32603, err.Error(), req.Id)
}
// Feed the reply back into the JavaScript runtime environment
id, _ := result["id"]
jsonver, _ := result["jsonrpc"]
// Execute the requests.
resps, _ := call.Otto.Object("new Array()")
for _, req := range reqs {
resp, _ := call.Otto.Object(`({"jsonrpc":"2.0"})`)
resp.Set("id", req.Id)
var result json.RawMessage
err = b.client.Call(&result, req.Method, req.Params...)
switch err := err.(type) {
case nil:
if result == nil {
// Special case null because it is decoded as an empty
// raw message for some reason.
resp.Set("result", otto.NullValue())
} else {
resultVal, err := JSON.Call("parse", string(result))
if err != nil {
resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
} else {
resp.Set("result", resultVal)
}
}
case rpc.Error:
resp.Set("error", map[string]interface{}{
"code": err.ErrorCode(),
"message": err.Error(),
})
default:
resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
}
resps.Call("push", resp)
}
call.Otto.Set("ret_id", id)
call.Otto.Set("ret_jsonrpc", jsonver)
call.Otto.Set("response_idx", i)
// Return the responses either to the callback (if supplied)
// or directly as the return value.
if batch {
response = resps.Value()
} else {
response, _ = resps.Get("0")
}
if fn := call.Argument(1).Object(); fn != nil && fn.Class() == "function" {
fn.Call("apply", response)
return otto.UndefinedValue()
}
return response
}
if res, ok := result["result"]; ok {
payload, _ := json.Marshal(res)
call.Otto.Set("ret_result", string(payload))
response, err = call.Otto.Run(`
ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) };
`)
continue
}
if res, ok := result["error"]; ok {
payload, _ := json.Marshal(res)
call.Otto.Set("ret_result", string(payload))
response, err = call.Otto.Run(`
ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, error: JSON.parse(ret_result) };
`)
continue
}
return newErrorResponse(call, -32603, fmt.Sprintf("Invalid response"), new(int64))
}
// Convert single requests back from batch ones
if !batch {
call.Otto.Run("ret_response = ret_response[0];")
}
// Execute any registered callbacks
if call.Argument(1).IsObject() {
call.Otto.Set("callback", call.Argument(1))
call.Otto.Run(`
if (Object.prototype.toString.call(callback) == '[object Function]') {
callback(null, ret_response);
}
`)
}
return
func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) otto.Value {
// Bundle the error into a JSON RPC call response
m := map[string]interface{}{"version": "2.0", "id": id, "error": map[string]interface{}{"code": code, msg: msg}}
res, _ := json.Marshal(m)
val, _ := call.Otto.Run("(" + string(res) + ")")
return val
}
// throwJSException panics on an otto.Value. The Otto VM will recover from the
......@@ -281,37 +279,3 @@ func throwJSException(msg interface{}) otto.Value {
}
panic(val)
}
// newErrorResponse creates a JSON RPC error response for a specific request id,
// containing the specified error code and error message. Beside returning the
// error to the caller, it also sets the ret_error and ret_response JavaScript
// variables.
func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) {
// Bundle the error into a JSON RPC call response
res := rpc.JSONErrResponse{
Version: rpc.JSONRPCVersion,
Id: id,
Error: rpc.JSONError{
Code: code,
Message: msg,
},
}
// Serialize the error response into JavaScript variables
errObj, err := json.Marshal(res.Error)
if err != nil {
glog.V(logger.Error).Infof("Failed to serialize JSON RPC error: %v", err)
}
resObj, err := json.Marshal(res)
if err != nil {
glog.V(logger.Error).Infof("Failed to serialize JSON RPC error response: %v", err)
}
if _, err = call.Otto.Run("ret_error = " + string(errObj)); err != nil {
glog.V(logger.Error).Infof("Failed to set `ret_error` to the occurred error: %v", err)
}
resVal, err := call.Otto.Run("ret_response = " + string(resObj))
if err != nil {
glog.V(logger.Error).Infof("Failed to set `ret_response` to the JSON RPC response: %v", err)
}
return resVal
}
......@@ -52,7 +52,7 @@ const DefaultPrompt = "> "
type Config struct {
DataDir string // Data directory to store the console history at
DocRoot string // Filesystem path from where to load JavaScript files from
Client rpc.Client // RPC client to execute Ethereum requests through
Client *rpc.Client // RPC client to execute Ethereum requests through
Prompt string // Input prompt prefix string (defaults to DefaultPrompt)
Prompter UserPrompter // Input prompter to allow interactive user feedback (defaults to TerminalPrompter)
Printer io.Writer // Output writer to serialize any display strings to (defaults to os.Stdout)
......@@ -63,7 +63,7 @@ type Config struct {
// JavaScript console attached to a running node via an external or in-process RPC
// client.
type Console struct {
client rpc.Client // RPC client to execute Ethereum requests through
client *rpc.Client // RPC client to execute Ethereum requests through
jsre *jsre.JSRE // JavaScript runtime environment running the interpreter
prompt string // Input prompt prefix string
prompter UserPrompter // Input prompter to allow interactive user feedback
......
......@@ -116,7 +116,7 @@ func (ctx ppctx) printValue(v otto.Value, level int, inArray bool) {
func (ctx ppctx) printObject(obj *otto.Object, level int, inArray bool) {
switch obj.Class() {
case "Array":
case "Array", "GoArray":
lv, _ := obj.Get("length")
len, _ := lv.ToInteger()
if len == 0 {
......
......@@ -505,16 +505,14 @@ func (n *Node) Restart() error {
}
// Attach creates an RPC client attached to an in-process API handler.
func (n *Node) Attach() (rpc.Client, error) {
func (n *Node) Attach() (*rpc.Client, error) {
n.lock.RLock()
defer n.lock.RUnlock()
// Short circuit if the node's not running
if n.server == nil {
return nil, ErrNodeStopped
}
// Otherwise attach to the API and return
return rpc.NewInProcRPCClient(n.inprocHandler), nil
return rpc.DialInProc(n.inprocHandler), nil
}
// Server retrieves the currently running P2P network layer. This method is meant
......
......@@ -507,21 +507,27 @@ func TestAPIGather(t *testing.T) {
}
// Register a batch of services with some configured APIs
calls := make(chan string, 1)
makeAPI := func(result string) *OneMethodApi {
return &OneMethodApi{fun: func() { calls <- result }}
}
services := map[string]struct {
APIs []rpc.API
Maker InstrumentingWrapper
}{
"Zero APIs": {[]rpc.API{}, InstrumentedServiceMakerA},
"Single API": {[]rpc.API{
{"single", "1", &OneMethodApi{fun: func() { calls <- "single.v1" }}, true},
"Zero APIs": {
[]rpc.API{}, InstrumentedServiceMakerA},
"Single API": {
[]rpc.API{
{Namespace: "single", Version: "1", Service: makeAPI("single.v1"), Public: true},
}, InstrumentedServiceMakerB},
"Many APIs": {[]rpc.API{
{"multi", "1", &OneMethodApi{fun: func() { calls <- "multi.v1" }}, true},
{"multi.v2", "2", &OneMethodApi{fun: func() { calls <- "multi.v2" }}, true},
{"multi.v2.nested", "2", &OneMethodApi{fun: func() { calls <- "multi.v2.nested" }}, true},
"Many APIs": {
[]rpc.API{
{Namespace: "multi", Version: "1", Service: makeAPI("multi.v1"), Public: true},
{Namespace: "multi.v2", Version: "2", Service: makeAPI("multi.v2"), Public: true},
{Namespace: "multi.v2.nested", Version: "2", Service: makeAPI("multi.v2.nested"), Public: true},
}, InstrumentedServiceMakerC},
}
for id, config := range services {
config := config
constructor := func(*ServiceContext) (Service, error) {
......@@ -554,12 +560,8 @@ func TestAPIGather(t *testing.T) {
{"multi.v2.nested_theOneMethod", "multi.v2.nested"},
}
for i, test := range tests {
if err := client.Send(rpc.JSONRequest{Id: []byte("1"), Version: "2.0", Method: test.Method}); err != nil {
t.Fatalf("test %d: failed to send API request: %v", i, err)
}
reply := new(rpc.JSONSuccessResponse)
if err := client.Recv(reply); err != nil {
t.Fatalf("test %d: failed to read API reply: %v", i, err)
if err := client.Call(nil, test.Method); err != nil {
t.Errorf("test %d: API request failed: %v", i, err)
}
select {
case result := <-calls:
......
This diff is collapsed.
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build !go1.5
package rpc
import (
"net"
"net/http"
"time"
"golang.org/x/net/context"
)
// In older versions of Go (below 1.5), dials cannot be canceled
// via a channel or context. The context deadline can still applied.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return contextDialer(ctx).Dial(network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
// Set Timeout on the client if the context has a deadline.
// Note that there is no default timeout (unlike in contextDialer) because
// the timeout applies to the entire request, including reads from body.
if deadline, ok := ctx.Deadline(); ok {
c2 := *c
c2.Timeout = deadline.Sub(time.Now())
c = &c2
}
req2 := *req
return c, &req2
}
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build go1.5,!go1.6
package rpc
import (
"net"
"net/http"
"time"
"golang.org/x/net/context"
)
// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can
// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return contextDialer(ctx).Dial(network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
// Set Timeout on the client if the context has a deadline.
// Note that there is no default timeout (unlike in contextDialer) because
// the timeout applies to the entire request, including reads from body.
if deadline, ok := ctx.Deadline(); ok {
c2 := *c
c2.Timeout = deadline.Sub(time.Now())
c = &c2
}
req2 := *req
req2.Cancel = ctx.Done()
return c, &req2
}
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build go1.6,!go1.7
package rpc
import (
"net"
"net/http"
"time"
"golang.org/x/net/context"
)
// In Go 1.6, net.Dialer gained the ability to cancel via a channel.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return contextDialer(ctx).Dial(network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
// We set Timeout on the client for Go <= 1.5. There
// is no need to do that here because the dial will be canceled
// by package http.
req2 := *req
req2.Cancel = ctx.Done()
return c, &req2
}
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build go1.7
package rpc
import (
"context"
"net"
"net/http"
"time"
)
// In Go 1.7, context moved into the standard library and support
// for cancelation via context was added to net.Dialer and http.Request.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
d := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
return d.DialContext(ctx, network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
return c, req.WithContext(ctx)
}
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc_test
import (
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/rpc"
)
// In this example, our client whishes to track the latest 'block number'
// known to the server. The server supports two methods:
//
// eth_getBlockByNumber("latest", {})
// returns the latest block object.
//
// eth_subscribe("newBlocks")
// creates a subscription which fires block objects when new blocks arrive.
type Block struct {
Number *big.Int
}
func ExampleClientSubscription() {
// Connect the client.
client, _ := rpc.Dial("ws://127.0.0.1:8485")
subch := make(chan Block)
go subscribeBlocks(client, subch)
// Print events from the subscription as they arrive.
for block := range subch {
fmt.Println("latest block:", block.Number)
}
}
// subscribeBlocks runs in its own goroutine and maintains
// a subscription for new blocks.
func subscribeBlocks(client *rpc.Client, subch chan Block) {
for i := 0; ; i++ {
if i > 0 {
time.Sleep(2 * time.Second)
}
// Subscribe to new blocks.
sub, err := client.EthSubscribe(subch, "newBlocks")
if err == rpc.ErrClientQuit {
return // Stop reconnecting if the client was closed.
} else if err != nil {
fmt.Println("subscribe error:", err)
continue
}
// The connection is established now.
// Update the channel with the current block.
var lastBlock Block
if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
fmt.Println("can't get latest block:", err)
continue
}
subch <- lastBlock
// The subscription will deliver events to the channel. Wait for the
// subscription to end for any reason, then loop around to re-establish
// the connection.
fmt.Println("connection lost: ", <-sub.Err())
}
}
This diff is collapsed.
......@@ -24,74 +24,43 @@ type methodNotFoundError struct {
method string
}
func (e *methodNotFoundError) Code() int {
return -32601
}
func (e *methodNotFoundError) ErrorCode() int { return -32601 }
func (e *methodNotFoundError) Error() string {
return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method)
}
// received message isn't a valid request
type invalidRequestError struct {
message string
}
type invalidRequestError struct{ message string }
func (e *invalidRequestError) Code() int {
return -32600
}
func (e *invalidRequestError) ErrorCode() int { return -32600 }
func (e *invalidRequestError) Error() string {
return e.message
}
func (e *invalidRequestError) Error() string { return e.message }
// received message is invalid
type invalidMessageError struct {
message string
}
type invalidMessageError struct{ message string }
func (e *invalidMessageError) Code() int {
return -32700
}
func (e *invalidMessageError) ErrorCode() int { return -32700 }
func (e *invalidMessageError) Error() string {
return e.message
}
func (e *invalidMessageError) Error() string { return e.message }
// unable to decode supplied params, or an invalid number of parameters
type invalidParamsError struct {
message string
}
type invalidParamsError struct{ message string }
func (e *invalidParamsError) Code() int {
return -32602
}
func (e *invalidParamsError) ErrorCode() int { return -32602 }
func (e *invalidParamsError) Error() string {
return e.message
}
func (e *invalidParamsError) Error() string { return e.message }
// logic error, callback returned an error
type callbackError struct {
message string
}
type callbackError struct{ message string }
func (e *callbackError) Code() int {
return -32000
}
func (e *callbackError) ErrorCode() int { return -32000 }
func (e *callbackError) Error() string {
return e.message
}
func (e *callbackError) Error() string { return e.message }
// issued when a request is received after the server is issued to stop.
type shutdownError struct {
}
type shutdownError struct{}
func (e *shutdownError) Code() int {
return -32000
}
func (e *shutdownError) ErrorCode() int { return -32000 }
func (e *shutdownError) Error() string {
return "server is shutting down"
}
func (e *shutdownError) Error() string { return "server is shutting down" }
......@@ -22,71 +22,108 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/rs/cors"
"golang.org/x/net/context"
)
const (
maxHTTPRequestContentLength = 1024 * 128
)
// httpClient connects to a geth RPC server over HTTP.
type httpClient struct {
endpoint *url.URL // HTTP-RPC server endpoint
httpClient http.Client // reuse connection
lastRes []byte // HTTP requests are synchronous, store last response
var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
type httpConn struct {
client *http.Client
req *http.Request
closeOnce sync.Once
closed chan struct{}
}
// httpConn is treated specially by Client.
func (hc *httpConn) LocalAddr() net.Addr { return nullAddr }
func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr }
func (hc *httpConn) SetReadDeadline(time.Time) error { return nil }
func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil }
func (hc *httpConn) SetDeadline(time.Time) error { return nil }
func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") }
func (hc *httpConn) Read(b []byte) (int, error) {
<-hc.closed
return 0, io.EOF
}
func (hc *httpConn) Close() error {
hc.closeOnce.Do(func() { close(hc.closed) })
return nil
}
// NewHTTPClient create a new RPC clients that connection to a geth RPC server
// over HTTP.
func NewHTTPClient(endpoint string) (Client, error) {
url, err := url.Parse(endpoint)
// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP.
func DialHTTP(endpoint string) (*Client, error) {
req, err := http.NewRequest("POST", endpoint, nil)
if err != nil {
return nil, err
}
return &httpClient{endpoint: url}, nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
// Send will serialize the given msg to JSON and sends it to the RPC server.
// Since HTTP is synchronous the response is stored until Recv is called.
func (client *httpClient) Send(msg interface{}) error {
var body []byte
var err error
initctx := context.Background()
return newClient(initctx, func(context.Context) (net.Conn, error) {
return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil
})
}
client.lastRes = nil
if body, err = json.Marshal(msg); err != nil {
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msg)
if err != nil {
return err
}
defer respBody.Close()
var respmsg jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
return err
}
op.resp <- &respmsg
return nil
}
resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msgs)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
client.lastRes, err = ioutil.ReadAll(resp.Body)
defer respBody.Close()
var respmsgs []jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
return fmt.Errorf("request failed: %s", resp.Status)
}
// Recv will try to deserialize the last received response into the given msg.
func (client *httpClient) Recv(msg interface{}) error {
return json.Unmarshal(client.lastRes, &msg)
for _, respmsg := range respmsgs {
op.resp <- &respmsg
}
return nil
}
// Close is not necessary for httpClient
func (client *httpClient) Close() {
}
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
body, err := json.Marshal(msg)
if err != nil {
return nil, err
}
client, req := requestWithContext(hc.client, hc.req, ctx)
req.Body = ioutil.NopCloser(bytes.NewReader(body))
req.ContentLength = int64(len(body))
// SupportedModules will return the collection of offered RPC modules.
func (client *httpClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
return resp.Body, nil
}
// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
......@@ -100,17 +137,21 @@ func (t *httpReadWriteNopCloser) Close() error {
return nil
}
// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
// send the request to the given API provider and sends the response back to the caller.
func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// NewHTTPServer creates a new HTTP RPC server around an API provider.
//
// Deprecated: Server implements http.Handler
func NewHTTPServer(corsString string, srv *Server) *http.Server {
return &http.Server{Handler: newCorsHandler(srv, corsString)}
}
// ServeHTTP serves JSON-RPC requests over HTTP.
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.ContentLength > maxHTTPRequestContentLength {
http.Error(w,
fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
http.StatusRequestEntityTooLarge)
return
}
w.Header().Set("content-type", "application/json")
// create a codec that reads direct from the request body until
......@@ -119,24 +160,16 @@ func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
defer codec.Close()
srv.ServeSingleRequest(codec, OptionMethodInvocation)
}
}
// NewHTTPServer creates a new HTTP RPC server around an API provider.
func NewHTTPServer(corsString string, srv *Server) *http.Server {
func newCorsHandler(srv *Server, corsString string) http.Handler {
var allowedOrigins []string
for _, domain := range strings.Split(corsString, ",") {
allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
}
c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{"POST", "GET"},
})
handler := c.Handler(newJSONHTTPHandler(srv))
return &http.Server{
Handler: handler,
}
return c.Handler(srv)
}
......@@ -17,45 +17,18 @@
package rpc
import (
"encoding/json"
"io"
"net"
)
// inProcClient is an in-process buffer stream attached to an RPC server.
type inProcClient struct {
server *Server
cl io.Closer
enc *json.Encoder
dec *json.Decoder
}
// Close tears down the request channel of the in-proc client.
func (c *inProcClient) Close() {
c.cl.Close()
}
"golang.org/x/net/context"
)
// NewInProcRPCClient creates an in-process buffer stream attachment to a given
// RPC server.
func NewInProcRPCClient(handler *Server) Client {
// NewInProcClient attaches an in-process connection to the given RPC server.
func DialInProc(handler *Server) *Client {
initctx := context.Background()
c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
p1, p2 := net.Pipe()
go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
}
// Send marshals a message into a json format and injects in into the client
// request channel.
func (c *inProcClient) Send(msg interface{}) error {
return c.enc.Encode(msg)
}
// Recv reads a message from the response channel and tries to parse it into the
// given msg interface.
func (c *inProcClient) Recv(msg interface{}) error {
return c.dec.Decode(msg)
}
// Returns the collection of modules the RPC server offers.
func (c *inProcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(c)
return p2, nil
})
return c
}
......@@ -17,68 +17,39 @@
package rpc
import (
"encoding/json"
"net"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
)
// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe
// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
// Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint)
}
// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using
// JSON serialization.
type ipcClient struct {
endpoint string
conn net.Conn
out *json.Encoder
in *json.Decoder
}
// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
// named pipe.
func NewIPCClient(endpoint string) (Client, error) {
conn, err := newIPCConnection(endpoint)
if err != nil {
return nil, err
}
return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil
}
// Send will serialize the given message and send it to the server.
// When sending the message fails it will try to reconnect once and send the message again.
func (client *ipcClient) Send(msg interface{}) error {
if err := client.out.Encode(msg); err == nil {
return nil
}
// retry once
client.conn.Close()
conn, err := newIPCConnection(client.endpoint)
// ServeListener accepts connections on l, serving JSON-RPC on them.
func (srv *Server) ServeListener(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
return err
}
client.conn = conn
client.in = json.NewDecoder(conn)
client.out = json.NewEncoder(conn)
return client.out.Encode(msg)
}
// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded.
func (client *ipcClient) Recv(msg interface{}) error {
return client.in.Decode(&msg)
}
// Close will close the underlying IPC connection
func (client *ipcClient) Close() {
client.conn.Close()
glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr())
go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
}
}
// SupportedModules will return the collection of offered RPC modules.
func (client *ipcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
// the endpoint is the full path to a unix socket, and Windows the endpoint is an
// identifier for a named pipe.
//
// The context is used for the initial connection establishment. It does not
// affect subsequent interactions with the client.
func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
return newIPCConnection(ctx, endpoint)
})
}
......@@ -22,6 +22,8 @@ import (
"net"
"os"
"path/filepath"
"golang.org/x/net/context"
)
// ipcListen will create a Unix socket on the given endpoint.
......@@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) {
}
// newIPCConnection will connect to a Unix socket on the given endpoint.
func newIPCConnection(endpoint string) (net.Conn, error) {
return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"})
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
return dialContext(ctx, "unix", endpoint)
}
......@@ -22,16 +22,27 @@ import (
"net"
"time"
"golang.org/x/net/context"
"gopkg.in/natefinch/npipe.v2"
)
// This is used if the dialing context has no deadline. It is much smaller than the
// defaultDialTimeout because named pipes are local and there is no need to wait so long.
const defaultPipeDialTimeout = 2 * time.Second
// ipcListen will create a named pipe on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) {
return npipe.Listen(endpoint)
}
// newIPCConnection will connect to a named pipe with the given endpoint as name.
func newIPCConnection(endpoint string) (net.Conn, error) {
timeout := 5 * time.Second
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
timeout := defaultPipeDialTimeout
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout < 0 {
timeout = 0
}
}
return npipe.DialTimeout(endpoint, timeout)
}
......@@ -30,49 +30,43 @@ import (
)
const (
JSONRPCVersion = "2.0"
jsonrpcVersion = "2.0"
serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe"
unsubscribeMethod = "eth_unsubscribe"
notificationMethod = "eth_subscription"
)
// JSON-RPC request
type JSONRequest struct {
type jsonRequest struct {
Method string `json:"method"`
Version string `json:"jsonrpc"`
Id json.RawMessage `json:"id,omitempty"`
Payload json.RawMessage `json:"params,omitempty"`
}
// JSON-RPC response
type JSONSuccessResponse struct {
type jsonSuccessResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Result interface{} `json:"result"`
}
// JSON-RPC error object
type JSONError struct {
type jsonError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// JSON-RPC error response
type JSONErrResponse struct {
type jsonErrResponse struct {
Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"`
Error JSONError `json:"error"`
Error jsonError `json:"error"`
}
// JSON-RPC notification payload
type jsonSubscription struct {
Subscription string `json:"subscription"`
Result interface{} `json:"result,omitempty"`
}
// JSON-RPC notification
type jsonNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
......@@ -91,6 +85,17 @@ type jsonCodec struct {
rw io.ReadWriteCloser // connection
}
func (err *jsonError) Error() string {
if err.Message == "" {
return fmt.Sprintf("json-rpc error %d", err.Code)
}
return err.Message
}
func (err *jsonError) ErrorCode() int {
return err.Code
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc)
......@@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool {
// ReadRequestHeaders will read new requests without parsing the arguments. It will
// return a collection of requests, an indication if these requests are in batch
// form or an error when the incoming message could not be read/parsed.
func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
c.decMu.Lock()
defer c.decMu.Unlock()
......@@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error {
// parseRequest will parse a single request from the given RawMessage. It will return
// the parsed request, an indication if the request was a batch or an error when
// the request could not be parsed.
func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
var in JSONRequest
func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
var in jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
......@@ -182,12 +187,12 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
method: unsubscribeMethod, params: in.Payload}}, false, nil
}
// regular RPC call
elems := strings.Split(in.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, false, &methodNotFoundError{in.Method, ""}
}
// regular RPC call
if len(in.Payload) == 0 {
return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: &in.Id}}, false, nil
}
......@@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
// if the request was a batch or an error when the request could not be read.
func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
var in []JSONRequest
func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
var in []jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
......@@ -236,15 +241,15 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
continue
}
elems := strings.Split(r.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, true, &methodNotFoundError{r.Method, ""}
}
if len(r.Payload) == 0 {
requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: nil}
requests[i] = rpcRequest{id: id, params: nil}
} else {
requests[i] = rpcRequest{id: id, params: r.Payload}
}
if elem := strings.Split(r.Method, serviceMethodSeparator); len(elem) == 2 {
requests[i].service, requests[i].method = elem[0], elem[1]
} else {
requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: r.Payload}
requests[i].err = &methodNotFoundError{r.Method, ""}
}
}
......@@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
// ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed
// values or an error when the parsing failed.
func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) {
func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) {
if args, ok := params.(json.RawMessage); !ok {
return nil, &invalidParamsError{"Invalid params supplied"}
} else {
......@@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf
// parsePositionalArguments tries to parse the given args to an array of values with the given types.
// It returns the parsed values or an error when the args could not be parsed. Missing optional arguments
// are returned as reflect.Zero values.
func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) {
func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) {
params := make([]interface{}, 0, len(callbackArgs))
for _, t := range callbackArgs {
params = append(params, reflect.New(t).Interface())
......@@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type)
// CreateResponse will create a JSON-RPC success response with the given id and reply as result.
func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} {
if isHexNum(reflect.TypeOf(reply)) {
return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
}
return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply}
return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply}
}
// CreateErrorResponse will create a JSON-RPC error response with the given id and error.
func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} {
return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}}
func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} {
return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}}
}
// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
// info is optional and contains additional information about the error. When an empty string is passed it is ignored.
func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} {
return &JSONErrResponse{Version: JSONRPCVersion, Id: id,
Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}}
func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} {
return &jsonErrResponse{Version: jsonrpcVersion, Id: id,
Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}}
}
// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) {
return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
}
return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod,
return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: event}}
}
......
......@@ -28,7 +28,7 @@ import (
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported = errors.New("notifications not supported")
ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound = errors.New("notification not found")
......
......@@ -19,20 +19,31 @@ package rpc
import (
"encoding/json"
"net"
"sync"
"testing"
"time"
"golang.org/x/net/context"
)
type NotificationTestService struct{}
type NotificationTestService struct {
mu sync.Mutex
unsubscribed bool
var (
unsubCallbackCalled = false
)
gotHangSubscriptionReq chan struct{}
unblockHangSubscription chan struct{}
}
func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.unsubscribed
}
func (s *NotificationTestService) Unsubscribe(subid string) {
unsubCallbackCalled = true
s.mu.Lock()
s.unsubscribed = true
s.mu.Unlock()
}
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
......@@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
return subscription, nil
}
// HangSubscription blocks on s.unblockHangSubscription before
// sending anything.
func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
notifier, supported := NotifierFromContext(ctx)
if !supported {
return nil, ErrNotificationsUnsupported
}
s.gotHangSubscriptionReq <- struct{}{}
<-s.unblockHangSubscription
subscription, err := notifier.NewSubscription(s.Unsubscribe)
if err != nil {
return nil, err
}
go func() {
subscription.Notify(val)
}()
return subscription, nil
}
func TestNotifications(t *testing.T) {
server := NewServer()
service := &NotificationTestService{}
......@@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) {
}
var subid string
response := JSONSuccessResponse{Result: subid}
response := jsonSuccessResponse{Result: subid}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
......@@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) {
clientConn.Close() // causes notification unsubscribe callback to be called
time.Sleep(1 * time.Second)
if !unsubCallbackCalled {
if !service.wasUnsubCallbackCalled() {
t.Error("unsubscribe callback not called after closing connection")
}
}
......@@ -21,7 +21,6 @@ import (
"reflect"
"runtime"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
......@@ -30,8 +29,6 @@ import (
)
const (
stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
notificationBufferSize = 10000 // max buffered notifications before codec is closed
MetadataApi = "rpc"
......@@ -183,7 +180,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
glog.V(logger.Debug).Infof("read error %v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
return nil
}
......@@ -240,14 +237,12 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied")
time.AfterFunc(stopPendingRequestTimeout, func() {
s.codecsMu.Lock()
defer s.codecsMu.Unlock()
s.codecs.Each(func(c interface{}) bool {
c.(ServerCodec).Close()
return true
})
})
}
}
......@@ -386,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
// readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed.
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
......@@ -399,6 +394,11 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
var ok bool
var svc *service
if r.err != nil {
requests[i] = &serverRequest{id: r.id, err: r.err}
continue
}
if r.isPubSub && r.method == unsubscribeMethod {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
......
......@@ -21,6 +21,7 @@ import (
"net"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
)
......@@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args
return Result{str, i, args}
}
func (s *Service) Sleep(ctx context.Context, duration time.Duration) {
select {
case <-time.After(duration):
case <-ctx.Done():
}
}
func (s *Service) Rets() (string, error) {
return "", nil
}
......@@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) {
t.Fatalf("Expected service calc to be registered")
}
if len(svc.callbacks) != 4 {
t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks))
if len(svc.callbacks) != 5 {
t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks))
}
if len(svc.subscriptions) != 1 {
......@@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) {
t.Fatal(err)
}
response := JSONSuccessResponse{Result: &Result{}}
response := jsonSuccessResponse{Result: &Result{}}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
......
......@@ -62,7 +62,7 @@ type serverRequest struct {
callb *callback
args []reflect.Value
isUnsubscribe bool
err RPCError
err Error
}
type serviceRegistry map[string]*service // collection of services
......@@ -88,14 +88,13 @@ type rpcRequest struct {
id interface{}
isPubSub bool
params interface{}
err Error // invalid batch element
}
// RPCError implements RPC error, is add support for error codec over regular go errors
type RPCError interface {
// RPC error code
Code() int
// Error message
Error() string
// Error wraps RPC errors, which contain an error code in addition to the message.
type Error interface {
Error() string // returns the message
ErrorCode() int // returns the code
}
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
......@@ -103,15 +102,15 @@ type RPCError interface {
// multiple go-routines concurrently.
type ServerCodec interface {
// Read next request
ReadRequestHeaders() ([]rpcRequest, bool, RPCError)
ReadRequestHeaders() ([]rpcRequest, bool, Error)
// Parse request argument to the given types
ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError)
ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error)
// Assemble success response, expects response id and payload
CreateResponse(interface{}, interface{}) interface{}
// Assemble error response, expects response id and error
CreateErrorResponse(interface{}, RPCError) interface{}
CreateErrorResponse(interface{}, Error) interface{}
// Assemble error response with extra information about the error through info
CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{}
CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
// Create notification response
CreateNotification(string, interface{}) interface{}
// Write msg to client.
......@@ -273,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
func (bn *BlockNumber) Int64() int64 {
return (int64)(*bn)
}
// Client defines the interface for go client that wants to connect to a geth RPC endpoint
type Client interface {
// SupportedModules returns the collection of API's the server offers
SupportedModules() (map[string]string, error)
Send(req interface{}) error
Recv(msg interface{}) error
Close()
}
......@@ -20,7 +20,6 @@ import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"math/big"
"reflect"
"unicode"
......@@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) {
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// SupportedModules returns the collection of API's that the RPC server offers
// on which the given client connects.
func SupportedModules(client Client) (map[string]string, error) {
req := JSONRequest{
Id: []byte("1"),
Version: "2.0",
Method: MetadataApi + "_modules",
}
if err := client.Send(req); err != nil {
return nil, err
}
var response JSONSuccessResponse
if err := client.Recv(&response); err != nil {
return nil, err
}
if response.Result != nil {
mods := make(map[string]string)
if modules, ok := response.Result.(map[string]interface{}); ok {
for m, v := range modules {
mods[m] = fmt.Sprintf("%s", v)
}
return mods, nil
}
}
return nil, fmt.Errorf("unable to retrieve modules")
}
......@@ -17,36 +17,39 @@
package rpc
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
"gopkg.in/fatih/set.v0"
)
// wsReaderWriterCloser reads and write payloads from and to a websocket connection.
type wsReaderWriterCloser struct {
c *websocket.Conn
}
// Read will read incoming payload data into p.
func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) {
return rw.c.Read(p)
}
// Write writes p to the websocket.
func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) {
return rw.c.Write(p)
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
//
// allowedOrigins should be a comma-separated list of allowed origin URLs.
// To allow connections with any origin, pass "*".
func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler {
return websocket.Server{
Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
Handler: func(conn *websocket.Conn) {
srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
},
}
}
// Close closes the websocket connection.
func (rw *wsReaderWriterCloser) Close() error {
return rw.c.Close()
// NewWSServer creates a new websocket RPC server around an API provider.
//
// Deprecated: use Server.WebsocketHandler
func NewWSServer(allowedOrigins string, srv *Server) *http.Server {
return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
}
// wsHandshakeValidator returns a handler that verifies the origin during the
......@@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
return f
}
// NewWSServer creates a new websocket RPC server around an API provider.
func NewWSServer(allowedOrigins string, handler *Server) *http.Server {
return &http.Server{
Handler: websocket.Server{
Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
Handler: func(conn *websocket.Conn) {
handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}),
OptionMethodInvocation|OptionSubscriptions)
},
},
// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
// that is listening on the given endpoint.
//
// The context is used for the initial connection establishment. It does not
// affect subsequent interactions with the client.
func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
if origin == "" {
var err error
if origin, err = os.Hostname(); err != nil {
return nil, err
}
}
// wsClient represents a RPC client that communicates over websockets with a
// RPC server.
type wsClient struct {
endpoint string
connMu sync.Mutex
conn *websocket.Conn
}
// NewWSClientj creates a new RPC client that communicates with a RPC server
// that is listening on the given endpoint using JSON encoding.
func NewWSClient(endpoint string) (Client, error) {
return &wsClient{endpoint: endpoint}, nil
}
// connection will return a websocket connection to the RPC server. It will
// (re)connect when necessary.
func (client *wsClient) connection() (*websocket.Conn, error) {
if client.conn != nil {
return client.conn, nil
if strings.HasPrefix(endpoint, "wss") {
origin = "https://" + strings.ToLower(origin)
} else {
origin = "http://" + strings.ToLower(origin)
}
origin, err := os.Hostname()
}
config, err := websocket.NewConfig(endpoint, origin)
if err != nil {
return nil, err
}
origin = "http://" + origin
client.conn, err = websocket.Dial(client.endpoint, "", origin)
return client.conn, err
}
// SupportedModules is the collection of modules the RPC server offers.
func (client *wsClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
return wsDialContext(ctx, config)
})
}
// Send writes the JSON serialized msg to the websocket. It will create a new
// websocket connection to the server if the client is currently not connected.
func (client *wsClient) Send(msg interface{}) (err error) {
client.connMu.Lock()
defer client.connMu.Unlock()
var conn *websocket.Conn
if conn, err = client.connection(); err == nil {
if err = websocket.JSON.Send(conn, msg); err != nil {
client.conn.Close()
client.conn = nil
}
func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
var conn net.Conn
var err error
switch config.Location.Scheme {
case "ws":
conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
case "wss":
dialer := contextDialer(ctx)
conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
default:
err = websocket.ErrBadScheme
}
return err
}
// Recv reads a JSON message from the websocket and unmarshals it into msg.
func (client *wsClient) Recv(msg interface{}) (err error) {
client.connMu.Lock()
defer client.connMu.Unlock()
var conn *websocket.Conn
if conn, err = client.connection(); err == nil {
if err = websocket.JSON.Receive(conn, msg); err != nil {
client.conn.Close()
client.conn = nil
if err != nil {
return nil, err
}
ws, err := websocket.NewClient(config, conn)
if err != nil {
conn.Close()
return nil, err
}
return
return ws, err
}
// Close closes the underlaying websocket connection.
func (client *wsClient) Close() {
client.connMu.Lock()
defer client.connMu.Unlock()
var wsPortMap = map[string]string{"ws": "80", "wss": "443"}
if client.conn != nil {
client.conn.Close()
client.conn = nil
func wsDialAddress(location *url.URL) string {
if _, ok := wsPortMap[location.Scheme]; ok {
if _, _, err := net.SplitHostPort(location.Host); err != nil {
return net.JoinHostPort(location.Host, wsPortMap[location.Scheme])
}
}
return location.Host
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment