From e9fc2a99add2e98f90baaacbe68007db1ba8e5ce Mon Sep 17 00:00:00 2001 From: dingjun Date: Tue, 22 Jan 2019 15:27:49 +0800 Subject: [PATCH] add websocket support --- http_transport.go | 120 ++++++++++++++++++++++++++ jsonrpc.go | 135 ++++++++--------------------- jsonrpc_test.go | 44 +++++----- transport.go | 10 +++ websocket_transport.go | 189 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 381 insertions(+), 117 deletions(-) create mode 100644 http_transport.go create mode 100644 transport.go create mode 100644 websocket_transport.go diff --git a/http_transport.go b/http_transport.go new file mode 100644 index 0000000..1706320 --- /dev/null +++ b/http_transport.go @@ -0,0 +1,120 @@ +package jsonrpc + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "sync" + + log "github.com/fangdingjun/go-log" +) + +// HTTPTransport json rpc over http +type HTTPTransport struct { + Client *http.Client + URL string + nextid uint64 + Mu *sync.Mutex +} + +var _ Transport = &HTTPTransport{} + +// NewHTTPTransport create a new http transport +func NewHTTPTransport(uri string, client *http.Client) (Transport, error) { + _, err := url.Parse(uri) + if err != nil { + return nil, err + } + c := client + if client == nil { + c = http.DefaultClient + } + return &HTTPTransport{ + Client: c, + URL: uri, + Mu: new(sync.Mutex), + }, nil +} + +func (h *HTTPTransport) nextID() uint64 { + h.Mu.Lock() + defer h.Mu.Unlock() + h.nextid++ + return h.nextid +} + +// Call call a remote method +func (h *HTTPTransport) Call(method string, args interface{}, reply interface{}) error { + if args == nil { + args = []string{} + } + + r := &request{ + Version: "2.0", + Method: method, + Params: args, + ID: fmt.Sprintf("%d", h.nextID()), + } + + data, err := json.Marshal(r) + if err != nil { + return err + } + + log.Debugf("send %s", data) + + body := bytes.NewBuffer(data) + + req, err := http.NewRequest("POST", h.URL, body) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := h.Client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + data, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + log.Debugf("recevied %s", data) + + var res response + + if err = json.Unmarshal(data, &res); err != nil { + // non 200 response without valid json response + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http error: %s", resp.Status) + } + return err + } + + // non 200 response with valid json response + if res.ID == r.ID && res.Error != nil { + return res.Error + } + + // non 200 response without valid json response + if res.ID == r.ID && resp.StatusCode != http.StatusOK { + return fmt.Errorf("http error: %s", resp.Status) + } + + return json.Unmarshal(res.Result, &reply) +} + +// Subscribe subscribe for change +func (h *HTTPTransport) Subscribe(method string, notifyMethod string, + args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) { + return nil, nil, errors.New("not supported") +} diff --git a/jsonrpc.go b/jsonrpc.go index 4486539..6a25e7e 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -1,50 +1,48 @@ package jsonrpc import ( - "bytes" "encoding/json" "errors" "fmt" - "io/ioutil" - "log" - "net/http" "net/url" ) // Client json rpc client type Client struct { - // URL is remote url, ex http://username:password@192.168.1.3:1001/jsonrpc - URL string - // http client, default is http.DefaultClient - HTTPClient *http.Client - id uint64 - // Debug set to true, log the send/recevied http data - Debug bool + // URL is remote url, + // example + // http://username:password@192.168.1.3:1001/jsonrpc + // ws://192.168.0.1:9121/ + URL string + Transport Transport } type request struct { Version string `json:"jsonrpc"` Method string `json:"method"` Params interface{} `json:"params"` - ID uint64 `json:"id"` + ID string `json:"id"` } type response struct { Version string `json:"jsonrpc"` Result json.RawMessage `json:"result"` Error *Error `json:"error"` - ID uint64 `json:"id"` + ID string `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` } // Error rpc error type Error struct { - Code int `json:"code"` - Message string `json:"message"` - Data interface{} `json:"data"` + Code int `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data"` } func (e *Error) Error() string { - return fmt.Sprintf("%d: %s", e.Code, e.Message) + return fmt.Sprintf("code: %d, message: %s, data: %s", + e.Code, e.Message, e.Data) } // NewClient create a new jsonrpc client @@ -53,94 +51,35 @@ func NewClient(uri string) (*Client, error) { if err != nil { return nil, err } - - if u.Scheme != "http" && u.Scheme != "https" { - return nil, errors.New("only http/https supported") + t := "" + switch u.Scheme { + case "http", "https": + t = "http" + case "ws", "wss": + t = "ws" + default: + return nil, fmt.Errorf("not supported %s", u.Scheme) } - if u.Host == "" { - return nil, errors.New("invalid uri") + if t == "http" { + tr, _ := NewHTTPTransport(uri, nil) + return &Client{Transport: tr, URL: uri}, nil } - - return &Client{URL: uri, HTTPClient: http.DefaultClient}, nil -} - -func (c *Client) nextID() uint64 { - c.id++ - return c.id + if t == "ws" { + tr, _ := NewWebsocketTransport(uri) + return &Client{Transport: tr, URL: uri}, nil + } + return nil, errors.New("not supported") } // Call invoke a method with args and return reply func (c *Client) Call(method string, args interface{}, reply interface{}) error { - client := c.HTTPClient - if client == nil { - client = http.DefaultClient - } - - if args == nil { - args = []string{} - } - - r := &request{ - Version: "2.0", - Method: method, - Params: args, - ID: c.nextID(), - } - - data, err := json.Marshal(r) - if err != nil { - return err - } - - if c.Debug { - log.Println("send", string(data)) - } - - body := bytes.NewBuffer(data) - - req, err := http.NewRequest("POST", c.URL, body) - if err != nil { - return err - } - - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - return err - } - - defer resp.Body.Close() - - data, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - if c.Debug { - log.Println("recevied", string(data)) - } - - var res response - - if err = json.Unmarshal(data, &res); err != nil { - // non 200 response without valid json response - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("http error: %s", resp.Status) - } - return err - } - - // non 200 response with valid json response - if res.ID == r.ID && res.Error != nil { - return res.Error - } + return c.Transport.Call(method, args, reply) +} - // non 200 response without valid json response - if res.ID == r.ID && resp.StatusCode != http.StatusOK { - return fmt.Errorf("http error: %s", resp.Status) - } +// Subscribe subscribe for change +func (c *Client) Subscribe(method, notifyMethod string, + args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) { - return json.Unmarshal(res.Result, &reply) + return c.Transport.Subscribe(method, notifyMethod, args, reply) } diff --git a/jsonrpc_test.go b/jsonrpc_test.go index a976f8a..24327e0 100644 --- a/jsonrpc_test.go +++ b/jsonrpc_test.go @@ -1,15 +1,20 @@ package jsonrpc import ( - "log" "testing" + + log "github.com/fangdingjun/go-log" ) func TestCall(t *testing.T) { - url := "http://192.168.56.101:8542/" + log.Default.Level = log.DEBUG + + url := "http://192.168.56.101:8545/" + c, _ := NewClient(url) - c.Debug = true - var ret interface{} + + var ret string + err := c.Call("eth_getBalance", []string{"0x00CB25f6fD16a52e24eDd2c8fd62071dc29A035c", "latest"}, &ret) if err != nil { t.Error(err) @@ -17,28 +22,29 @@ func TestCall(t *testing.T) { } log.Printf("result: %+v", ret) - url = "http://admin2:123@192.168.56.101:19011/" - - c, _ = NewClient(url) - c.Debug = true - - err = c.Call("getbalance", []string{}, &ret) + c1, err := NewClient("ws://192.168.56.101:8546") if err != nil { t.Error(err) return } - log.Printf("result: %+v", ret) - - if err = c.Call("fuck", []string{}, &ret); err == nil { - t.Errorf("expected error, got nil") + var gas string + err = c1.Call("eth_gasPrice", []string{}, &gas) + if err != nil { + t.Error(err) return } - log.Println("got", err) - if err = c.Call("listreceivedbyaddress", []interface{}{0, false}, &ret); err != nil { - t.Error(err) - return + log.Println("gas", gas) + + var r string + ch, errch, err := c1.Subscribe("eth_subscribe", "eth_subscription", []interface{}{"newHeads"}, &r) + + log.Println("subid", r) + select { + case d := <-ch: + log.Printf("%s", d) + case e := <-errch: + log.Println(e) } - log.Printf("result: %+v", ret) } diff --git a/transport.go b/transport.go new file mode 100644 index 0000000..2f282a6 --- /dev/null +++ b/transport.go @@ -0,0 +1,10 @@ +package jsonrpc + +import "encoding/json" + +// Transport json rpc transport +type Transport interface { + Call(method string, args interface{}, reply interface{}) error + Subscribe(method string, notifyMethod string, + args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) +} diff --git a/websocket_transport.go b/websocket_transport.go new file mode 100644 index 0000000..27a749f --- /dev/null +++ b/websocket_transport.go @@ -0,0 +1,189 @@ +package jsonrpc + +import ( + "encoding/json" + "fmt" + "net/http" + "sync" + + log "github.com/fangdingjun/go-log" + "github.com/gorilla/websocket" +) + +// WebsocketTransport json rpc over websocket +type WebsocketTransport struct { + Conn *websocket.Conn + URL string + Mu *sync.Mutex + inflight map[string]*inflightReq + nextid uint64 + err error +} + +type inflightReq struct { + id string + ch chan json.RawMessage + errch chan *Error +} + +var _ Transport = &WebsocketTransport{} + +// NewWebsocketTransport create a new websocket transport +func NewWebsocketTransport(uri string) (Transport, error) { + dialer := &websocket.Dialer{} + + conn, res, err := dialer.Dial(uri, nil) + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusSwitchingProtocols { + return nil, fmt.Errorf("http error %d", res.StatusCode) + } + + w := &WebsocketTransport{ + Conn: conn, + URL: uri, + inflight: make(map[string]*inflightReq), + Mu: new(sync.Mutex), + } + go w.readloop() + return w, nil +} + +func (h *WebsocketTransport) readloop() { + for { + var res response + + _, data, err := h.Conn.ReadMessage() + if err != nil { + h.err = err + return + } + + log.Debugf("received: %s", data) + + if err := json.Unmarshal(data, &res); err != nil { + h.err = err + return + } + + if res.ID != "" { + h.Mu.Lock() + req, ok := h.inflight[res.ID] + h.Mu.Unlock() + + if !ok { + continue + } + if res.Error != nil { + req.errch <- res.Error + + h.Mu.Lock() + delete(h.inflight, res.ID) + h.Mu.Unlock() + + continue + } + req.ch <- res.Result + + h.Mu.Lock() + delete(h.inflight, res.ID) + h.Mu.Unlock() + + continue + } + if res.Method != "" { + h.Mu.Lock() + req, ok := h.inflight[res.Method] + h.Mu.Unlock() + + if !ok { + continue + } + + if res.Error != nil { + req.errch <- res.Error + continue + } + req.ch <- res.Params + } + } +} + +func (h *WebsocketTransport) nextID() uint64 { + h.Mu.Lock() + defer h.Mu.Unlock() + + h.nextid++ + + return h.nextid +} + +// Call call a remote method +func (h *WebsocketTransport) Call(method string, args interface{}, reply interface{}) error { + if h.err != nil { + return h.err + } + + id := fmt.Sprintf("%d", h.nextID()) + req := request{ + ID: id, + Version: "2.0", + Method: method, + Params: args, + } + + d, err := json.Marshal(req) + if err != nil { + return err + } + + log.Debugf("write %s", d) + + if err := h.Conn.WriteMessage(websocket.TextMessage, d); err != nil { + return err + } + + ch := make(chan json.RawMessage, 1) + errch := make(chan *Error, 1) + + h.Mu.Lock() + h.inflight[id] = &inflightReq{ + id: id, + ch: ch, + errch: errch, + } + h.Mu.Unlock() + + select { + case data := <-ch: + return json.Unmarshal(data, reply) + case err := <-errch: + return err + } + //return nil +} + +// Subscribe subscribe for change +func (h *WebsocketTransport) Subscribe(method string, notifyMethod string, + args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) { + + err := h.Call(method, args, reply) + if err != nil { + return nil, nil, err + } + + ch := make(chan json.RawMessage) + errch := make(chan *Error) + + h.Mu.Lock() + h.inflight[notifyMethod] = &inflightReq{ + ch: ch, + errch: errch, + id: notifyMethod, + } + h.Mu.Unlock() + + return ch, errch, nil +}