json.go 4.76 KB
Newer Older
1 2 3 4
package codec

import (
	"encoding/json"
5
	"fmt"
6
	"net"
7
	"time"
8 9 10 11 12

	"github.com/ethereum/go-ethereum/rpc/shared"
)

const (
13
	READ_TIMEOUT      = 60 // in seconds
14
	MAX_REQUEST_SIZE  = 1024 * 1024
15
	MAX_RESPONSE_SIZE = 1024 * 1024
16 17
)

18 19 20 21 22 23 24
var (
	// No new requests in buffer
	EmptyRequestQueueError = fmt.Errorf("No incoming requests")
	// Next request in buffer isn't yet complete
	IncompleteRequestError = fmt.Errorf("Request incomplete")
)

25 26
// Json serialization support
type JsonCodec struct {
27 28 29 30
	c                net.Conn
	reqBuffer        []byte
	bytesInReqBuffer int
	reqLastPos       int
31 32 33 34 35
}

// Create new JSON coder instance
func NewJsonCoder(conn net.Conn) ApiCoder {
	return &JsonCodec{
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
		c:                conn,
		reqBuffer:        make([]byte, MAX_REQUEST_SIZE),
		bytesInReqBuffer: 0,
		reqLastPos:       0,
	}
}

// Indication if the next request in the buffer is a batch request
func (self *JsonCodec) isNextBatchReq() (bool, error) {
	for i := 0; i < self.bytesInReqBuffer; i++ {
		switch self.reqBuffer[i] {
		case 0x20, 0x09, 0x0a, 0x0d: // allow leading whitespace (JSON whitespace RFC4627)
			continue
		case 0x7b: // single req
			return false, nil
		case 0x5b: // batch req
			return true, nil
		default:
			return false, &json.InvalidUnmarshalError{}
		}
	}

	return false, EmptyRequestQueueError
}

// remove parsed request from buffer
func (self *JsonCodec) resetReqbuffer(pos int) {
	copy(self.reqBuffer, self.reqBuffer[pos:self.bytesInReqBuffer])
	self.reqLastPos = 0
	self.bytesInReqBuffer -= pos
}

// parse request in buffer
func (self *JsonCodec) nextRequest() (requests []*shared.Request, isBatch bool, err error) {
	if isBatch, err := self.isNextBatchReq(); err == nil {
		if isBatch {
			requests = make([]*shared.Request, 0)
			for ; self.reqLastPos <= self.bytesInReqBuffer; self.reqLastPos++ {
				if err = json.Unmarshal(self.reqBuffer[:self.reqLastPos], &requests); err == nil {
					self.resetReqbuffer(self.reqLastPos)
					return requests, true, nil
				}
			}
			return nil, true, IncompleteRequestError
		} else {
			request := shared.Request{}
			for ; self.reqLastPos <= self.bytesInReqBuffer; self.reqLastPos++ {
				if err = json.Unmarshal(self.reqBuffer[:self.reqLastPos], &request); err == nil {
					requests := make([]*shared.Request, 1)
					requests[0] = &request
					self.resetReqbuffer(self.reqLastPos)
					return requests, false, nil
				}
			}
			return nil, true, IncompleteRequestError
		}
	} else {
		return nil, false, err
94 95 96 97
	}
}

// Serialize obj to JSON and write it to conn
98
func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) {
99 100 101 102 103 104 105 106 107 108
	if self.bytesInReqBuffer != 0 {
		req, batch, err := self.nextRequest()
		if err == nil {
			return req, batch, err
		}

		if err != IncompleteRequestError {
			return nil, false, err
		}
	}
109

110
	// no/incomplete request in buffer -> read more data first
111 112
	deadline := time.Now().Add(READ_TIMEOUT * time.Second)
	if err := self.c.SetDeadline(deadline); err != nil {
113 114 115
		return nil, false, err
	}

116
	var retErr error
117
	for {
118 119 120 121
		n, err := self.c.Read(self.reqBuffer[self.bytesInReqBuffer:])
		if err != nil {
			retErr = err
			break
122
		}
123

124
		self.bytesInReqBuffer += n
125

126 127 128
		requests, isBatch, err := self.nextRequest()
		if err == nil {
			return requests, isBatch, nil
129 130
		}

131 132
		if err == IncompleteRequestError || err == EmptyRequestQueueError {
			continue // need more data
133
		}
134

135 136
		retErr = err
		break
137 138
	}

139 140
	self.c.Close()
	return nil, false, retErr
141 142 143
}

func (self *JsonCodec) ReadResponse() (interface{}, error) {
144
	bytesInBuffer := 0
145 146
	buf := make([]byte, MAX_RESPONSE_SIZE)

147 148 149 150
	deadline := time.Now().Add(READ_TIMEOUT * time.Second)
	if err := self.c.SetDeadline(deadline); err != nil {
		return nil, err
	}
151 152 153 154 155 156 157

	for {
		n, err := self.c.Read(buf[bytesInBuffer:])
		if err != nil {
			return nil, err
		}
		bytesInBuffer += n
158

159 160 161 162 163 164 165 166 167
		var success shared.SuccessResponse
		if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil {
			return success, nil
		}

		var failure shared.ErrorResponse
		if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil {
			return failure, nil
		}
168 169
	}

170 171
	self.c.Close()
	return nil, fmt.Errorf("Unable to read response")
172 173
}

174
// Decode data
175 176 177 178
func (self *JsonCodec) Decode(data []byte, msg interface{}) error {
	return json.Unmarshal(data, msg)
}

179
// Encode message
180 181 182 183 184 185
func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) {
	return json.Marshal(msg)
}

// Parse JSON data from conn to obj
func (self *JsonCodec) WriteResponse(res interface{}) error {
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
	data, err := json.Marshal(res)
	if err != nil {
		self.c.Close()
		return err
	}

	bytesWritten := 0

	for bytesWritten < len(data) {
		n, err := self.c.Write(data[bytesWritten:])
		if err != nil {
			self.c.Close()
			return err
		}
		bytesWritten += n
	}

	return nil
204 205 206 207 208 209
}

// Close decoder and encoder
func (self *JsonCodec) Close() {
	self.c.Close()
}