add websocket support

master
dingjun 6 years ago
parent 074c9020d7
commit e9fc2a99ad

@ -0,0 +1,120 @@
package jsonrpc
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
log "github.com/fangdingjun/go-log"
)
// HTTPTransport json rpc over http
type HTTPTransport struct {
Client *http.Client
URL string
nextid uint64
Mu *sync.Mutex
}
var _ Transport = &HTTPTransport{}
// NewHTTPTransport create a new http transport
func NewHTTPTransport(uri string, client *http.Client) (Transport, error) {
_, err := url.Parse(uri)
if err != nil {
return nil, err
}
c := client
if client == nil {
c = http.DefaultClient
}
return &HTTPTransport{
Client: c,
URL: uri,
Mu: new(sync.Mutex),
}, nil
}
func (h *HTTPTransport) nextID() uint64 {
h.Mu.Lock()
defer h.Mu.Unlock()
h.nextid++
return h.nextid
}
// Call call a remote method
func (h *HTTPTransport) Call(method string, args interface{}, reply interface{}) error {
if args == nil {
args = []string{}
}
r := &request{
Version: "2.0",
Method: method,
Params: args,
ID: fmt.Sprintf("%d", h.nextID()),
}
data, err := json.Marshal(r)
if err != nil {
return err
}
log.Debugf("send %s", data)
body := bytes.NewBuffer(data)
req, err := http.NewRequest("POST", h.URL, body)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := h.Client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
data, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
log.Debugf("recevied %s", data)
var res response
if err = json.Unmarshal(data, &res); err != nil {
// non 200 response without valid json response
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %s", resp.Status)
}
return err
}
// non 200 response with valid json response
if res.ID == r.ID && res.Error != nil {
return res.Error
}
// non 200 response without valid json response
if res.ID == r.ID && resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %s", resp.Status)
}
return json.Unmarshal(res.Result, &reply)
}
// Subscribe subscribe for change
func (h *HTTPTransport) Subscribe(method string, notifyMethod string,
args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) {
return nil, nil, errors.New("not supported")
}

@ -1,50 +1,48 @@
package jsonrpc package jsonrpc
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"log"
"net/http"
"net/url" "net/url"
) )
// Client json rpc client // Client json rpc client
type Client struct { type Client struct {
// URL is remote url, ex http://username:password@192.168.1.3:1001/jsonrpc // URL is remote url,
// example
// http://username:password@192.168.1.3:1001/jsonrpc
// ws://192.168.0.1:9121/
URL string URL string
// http client, default is http.DefaultClient Transport Transport
HTTPClient *http.Client
id uint64
// Debug set to true, log the send/recevied http data
Debug bool
} }
type request struct { type request struct {
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Method string `json:"method"` Method string `json:"method"`
Params interface{} `json:"params"` Params interface{} `json:"params"`
ID uint64 `json:"id"` ID string `json:"id"`
} }
type response struct { type response struct {
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Result json.RawMessage `json:"result"` Result json.RawMessage `json:"result"`
Error *Error `json:"error"` Error *Error `json:"error"`
ID uint64 `json:"id"` ID string `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
} }
// Error rpc error // Error rpc error
type Error struct { type Error struct {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`
Data interface{} `json:"data"` Data json.RawMessage `json:"data"`
} }
func (e *Error) Error() string { func (e *Error) Error() string {
return fmt.Sprintf("%d: %s", e.Code, e.Message) return fmt.Sprintf("code: %d, message: %s, data: %s",
e.Code, e.Message, e.Data)
} }
// NewClient create a new jsonrpc client // NewClient create a new jsonrpc client
@ -53,94 +51,35 @@ func NewClient(uri string) (*Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
t := ""
if u.Scheme != "http" && u.Scheme != "https" { switch u.Scheme {
return nil, errors.New("only http/https supported") case "http", "https":
t = "http"
case "ws", "wss":
t = "ws"
default:
return nil, fmt.Errorf("not supported %s", u.Scheme)
} }
if u.Host == "" { if t == "http" {
return nil, errors.New("invalid uri") tr, _ := NewHTTPTransport(uri, nil)
return &Client{Transport: tr, URL: uri}, nil
} }
if t == "ws" {
return &Client{URL: uri, HTTPClient: http.DefaultClient}, nil tr, _ := NewWebsocketTransport(uri)
return &Client{Transport: tr, URL: uri}, nil
} }
return nil, errors.New("not supported")
func (c *Client) nextID() uint64 {
c.id++
return c.id
} }
// Call invoke a method with args and return reply // Call invoke a method with args and return reply
func (c *Client) Call(method string, args interface{}, reply interface{}) error { func (c *Client) Call(method string, args interface{}, reply interface{}) error {
client := c.HTTPClient return c.Transport.Call(method, args, reply)
if client == nil {
client = http.DefaultClient
}
if args == nil {
args = []string{}
}
r := &request{
Version: "2.0",
Method: method,
Params: args,
ID: c.nextID(),
}
data, err := json.Marshal(r)
if err != nil {
return err
}
if c.Debug {
log.Println("send", string(data))
}
body := bytes.NewBuffer(data)
req, err := http.NewRequest("POST", c.URL, body)
if err != nil {
return err
} }
req.Header.Set("Content-Type", "application/json") // Subscribe subscribe for change
func (c *Client) Subscribe(method, notifyMethod string,
resp, err := client.Do(req) args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) {
if err != nil {
return err
}
defer resp.Body.Close()
data, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if c.Debug {
log.Println("recevied", string(data))
}
var res response
if err = json.Unmarshal(data, &res); err != nil {
// non 200 response without valid json response
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %s", resp.Status)
}
return err
}
// non 200 response with valid json response
if res.ID == r.ID && res.Error != nil {
return res.Error
}
// non 200 response without valid json response
if res.ID == r.ID && resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %s", resp.Status)
}
return json.Unmarshal(res.Result, &reply) return c.Transport.Subscribe(method, notifyMethod, args, reply)
} }

@ -1,15 +1,20 @@
package jsonrpc package jsonrpc
import ( import (
"log"
"testing" "testing"
log "github.com/fangdingjun/go-log"
) )
func TestCall(t *testing.T) { func TestCall(t *testing.T) {
url := "http://192.168.56.101:8542/" log.Default.Level = log.DEBUG
url := "http://192.168.56.101:8545/"
c, _ := NewClient(url) c, _ := NewClient(url)
c.Debug = true
var ret interface{} var ret string
err := c.Call("eth_getBalance", []string{"0x00CB25f6fD16a52e24eDd2c8fd62071dc29A035c", "latest"}, &ret) err := c.Call("eth_getBalance", []string{"0x00CB25f6fD16a52e24eDd2c8fd62071dc29A035c", "latest"}, &ret)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -17,28 +22,29 @@ func TestCall(t *testing.T) {
} }
log.Printf("result: %+v", ret) log.Printf("result: %+v", ret)
url = "http://admin2:123@192.168.56.101:19011/" c1, err := NewClient("ws://192.168.56.101:8546")
c, _ = NewClient(url)
c.Debug = true
err = c.Call("getbalance", []string{}, &ret)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
log.Printf("result: %+v", ret) var gas string
err = c1.Call("eth_gasPrice", []string{}, &gas)
if err = c.Call("fuck", []string{}, &ret); err == nil { if err != nil {
t.Errorf("expected error, got nil") t.Error(err)
return return
} }
log.Println("got", err)
if err = c.Call("listreceivedbyaddress", []interface{}{0, false}, &ret); err != nil { log.Println("gas", gas)
t.Error(err)
return var r string
ch, errch, err := c1.Subscribe("eth_subscribe", "eth_subscription", []interface{}{"newHeads"}, &r)
log.Println("subid", r)
select {
case d := <-ch:
log.Printf("%s", d)
case e := <-errch:
log.Println(e)
} }
log.Printf("result: %+v", ret)
} }

@ -0,0 +1,10 @@
package jsonrpc
import "encoding/json"
// Transport json rpc transport
type Transport interface {
Call(method string, args interface{}, reply interface{}) error
Subscribe(method string, notifyMethod string,
args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error)
}

@ -0,0 +1,189 @@
package jsonrpc
import (
"encoding/json"
"fmt"
"net/http"
"sync"
log "github.com/fangdingjun/go-log"
"github.com/gorilla/websocket"
)
// WebsocketTransport json rpc over websocket
type WebsocketTransport struct {
Conn *websocket.Conn
URL string
Mu *sync.Mutex
inflight map[string]*inflightReq
nextid uint64
err error
}
type inflightReq struct {
id string
ch chan json.RawMessage
errch chan *Error
}
var _ Transport = &WebsocketTransport{}
// NewWebsocketTransport create a new websocket transport
func NewWebsocketTransport(uri string) (Transport, error) {
dialer := &websocket.Dialer{}
conn, res, err := dialer.Dial(uri, nil)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusSwitchingProtocols {
return nil, fmt.Errorf("http error %d", res.StatusCode)
}
w := &WebsocketTransport{
Conn: conn,
URL: uri,
inflight: make(map[string]*inflightReq),
Mu: new(sync.Mutex),
}
go w.readloop()
return w, nil
}
func (h *WebsocketTransport) readloop() {
for {
var res response
_, data, err := h.Conn.ReadMessage()
if err != nil {
h.err = err
return
}
log.Debugf("received: %s", data)
if err := json.Unmarshal(data, &res); err != nil {
h.err = err
return
}
if res.ID != "" {
h.Mu.Lock()
req, ok := h.inflight[res.ID]
h.Mu.Unlock()
if !ok {
continue
}
if res.Error != nil {
req.errch <- res.Error
h.Mu.Lock()
delete(h.inflight, res.ID)
h.Mu.Unlock()
continue
}
req.ch <- res.Result
h.Mu.Lock()
delete(h.inflight, res.ID)
h.Mu.Unlock()
continue
}
if res.Method != "" {
h.Mu.Lock()
req, ok := h.inflight[res.Method]
h.Mu.Unlock()
if !ok {
continue
}
if res.Error != nil {
req.errch <- res.Error
continue
}
req.ch <- res.Params
}
}
}
func (h *WebsocketTransport) nextID() uint64 {
h.Mu.Lock()
defer h.Mu.Unlock()
h.nextid++
return h.nextid
}
// Call call a remote method
func (h *WebsocketTransport) Call(method string, args interface{}, reply interface{}) error {
if h.err != nil {
return h.err
}
id := fmt.Sprintf("%d", h.nextID())
req := request{
ID: id,
Version: "2.0",
Method: method,
Params: args,
}
d, err := json.Marshal(req)
if err != nil {
return err
}
log.Debugf("write %s", d)
if err := h.Conn.WriteMessage(websocket.TextMessage, d); err != nil {
return err
}
ch := make(chan json.RawMessage, 1)
errch := make(chan *Error, 1)
h.Mu.Lock()
h.inflight[id] = &inflightReq{
id: id,
ch: ch,
errch: errch,
}
h.Mu.Unlock()
select {
case data := <-ch:
return json.Unmarshal(data, reply)
case err := <-errch:
return err
}
//return nil
}
// Subscribe subscribe for change
func (h *WebsocketTransport) Subscribe(method string, notifyMethod string,
args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) {
err := h.Call(method, args, reply)
if err != nil {
return nil, nil, err
}
ch := make(chan json.RawMessage)
errch := make(chan *Error)
h.Mu.Lock()
h.inflight[notifyMethod] = &inflightReq{
ch: ch,
errch: errch,
id: notifyMethod,
}
h.Mu.Unlock()
return ch, errch, nil
}
Loading…
Cancel
Save