master
dingjun 5 years ago
parent 1ac79fbc3c
commit 1a1e7d7404

@ -1,120 +1,120 @@
package jsonrpc package jsonrpc
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"sync" "sync"
log "github.com/fangdingjun/go-log" log "github.com/fangdingjun/go-log"
) )
// HTTPTransport json rpc over http // HTTPTransport json rpc over http
type HTTPTransport struct { type HTTPTransport struct {
Client *http.Client Client *http.Client
URL string URL string
nextid uint64 nextid uint64
Mu *sync.Mutex Mu *sync.Mutex
} }
var _ Transport = &HTTPTransport{} var _ Transport = &HTTPTransport{}
// NewHTTPTransport create a new http transport // NewHTTPTransport create a new http transport
func NewHTTPTransport(uri string, client *http.Client) (Transport, error) { func NewHTTPTransport(uri string, client *http.Client) (Transport, error) {
_, err := url.Parse(uri) _, err := url.Parse(uri)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c := client c := client
if client == nil { if client == nil {
c = http.DefaultClient c = http.DefaultClient
} }
return &HTTPTransport{ return &HTTPTransport{
Client: c, Client: c,
URL: uri, URL: uri,
Mu: new(sync.Mutex), Mu: new(sync.Mutex),
}, nil }, nil
} }
func (h *HTTPTransport) nextID() uint64 { func (h *HTTPTransport) nextID() uint64 {
h.Mu.Lock() h.Mu.Lock()
defer h.Mu.Unlock() defer h.Mu.Unlock()
h.nextid++ h.nextid++
return h.nextid return h.nextid
} }
// Call call a remote method // Call call a remote method
func (h *HTTPTransport) Call(method string, args interface{}, reply interface{}) error { func (h *HTTPTransport) Call(method string, args interface{}, reply interface{}) error {
if args == nil { if args == nil {
args = []string{} args = []string{}
} }
r := &request{ r := &request{
Version: "2.0", Version: "2.0",
Method: method, Method: method,
Params: args, Params: args,
ID: fmt.Sprintf("%d", h.nextID()), ID: fmt.Sprintf("%d", h.nextID()),
} }
data, err := json.Marshal(r) data, err := json.Marshal(r)
if err != nil { if err != nil {
return err return err
} }
log.Debugf("send %s", data) log.Debugf("send %s", data)
body := bytes.NewReader(data) body := bytes.NewReader(data)
req, err := http.NewRequest("POST", h.URL, body) req, err := http.NewRequest("POST", h.URL, body)
if err != nil { if err != nil {
return err return err
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
resp, err := h.Client.Do(req) resp, err := h.Client.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
data, err = ioutil.ReadAll(resp.Body) data, err = ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return err return err
} }
log.Debugf("recevied %s", data) log.Debugf("recevied %s", data)
var res response var res response
if err = json.Unmarshal(data, &res); err != nil { if err = json.Unmarshal(data, &res); err != nil {
// non 200 response without valid json response // non 200 response without valid json response
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %s", resp.Status) return fmt.Errorf("http error: %s", resp.Status)
} }
return err return err
} }
// non 200 response with valid json response // non 200 response with valid json response
if res.ID == r.ID && res.Error != nil { if res.ID == r.ID && res.Error != nil {
return res.Error return res.Error
} }
// non 200 response without valid json response // non 200 response without valid json response
if res.ID == r.ID && resp.StatusCode != http.StatusOK { if res.ID == r.ID && resp.StatusCode != http.StatusOK {
return fmt.Errorf("http error: %s", resp.Status) return fmt.Errorf("http error: %s", resp.Status)
} }
return json.Unmarshal(res.Result, &reply) return json.Unmarshal(res.Result, &reply)
} }
// Subscribe subscribe for change // Subscribe subscribe for change
func (h *HTTPTransport) Subscribe(method string, notifyMethod string, func (h *HTTPTransport) Subscribe(method string, notifyMethod string,
args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) { args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) {
return nil, nil, errors.New("not supported") return nil, nil, errors.New("not supported")
} }

@ -1,10 +1,10 @@
package jsonrpc package jsonrpc
import "encoding/json" import "encoding/json"
// Transport json rpc transport // Transport json rpc transport
type Transport interface { type Transport interface {
Call(method string, args interface{}, reply interface{}) error Call(method string, args interface{}, reply interface{}) error
Subscribe(method string, notifyMethod string, Subscribe(method string, notifyMethod string,
args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error)
} }

@ -1,230 +1,230 @@
package jsonrpc package jsonrpc
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
log "github.com/fangdingjun/go-log" log "github.com/fangdingjun/go-log"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
// WebsocketTransport json rpc over websocket // WebsocketTransport json rpc over websocket
type WebsocketTransport struct { type WebsocketTransport struct {
Conn *websocket.Conn Conn *websocket.Conn
URL string URL string
Mu *sync.Mutex Mu *sync.Mutex
inflight map[string]*inflightReq inflight map[string]*inflightReq
nextid uint64 nextid uint64
err error err error
ctx context.Context ctx context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
} }
type inflightReq struct { type inflightReq struct {
id string id string
ch chan json.RawMessage ch chan json.RawMessage
errch chan *Error errch chan *Error
} }
// ErrConnClosed error for connection closed // ErrConnClosed error for connection closed
var ErrConnClosed = errors.New("connection closed") var ErrConnClosed = errors.New("connection closed")
var _ Transport = &WebsocketTransport{} var _ Transport = &WebsocketTransport{}
// NewWebsocketTransport create a new websocket transport // NewWebsocketTransport create a new websocket transport
func NewWebsocketTransport(ctx context.Context, uri string) (Transport, error) { func NewWebsocketTransport(ctx context.Context, uri string) (Transport, error) {
var dialer = &websocket.Dialer{} var dialer = &websocket.Dialer{}
conn, res, err := dialer.DialContext(ctx, uri, nil) conn, res, err := dialer.DialContext(ctx, uri, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer res.Body.Close() defer res.Body.Close()
if res.StatusCode != http.StatusSwitchingProtocols { if res.StatusCode != http.StatusSwitchingProtocols {
return nil, fmt.Errorf("http error %d", res.StatusCode) return nil, fmt.Errorf("http error %d", res.StatusCode)
} }
w := &WebsocketTransport{ w := &WebsocketTransport{
Conn: conn, Conn: conn,
URL: uri, URL: uri,
inflight: make(map[string]*inflightReq), inflight: make(map[string]*inflightReq),
Mu: new(sync.Mutex), Mu: new(sync.Mutex),
} }
w.ctx, w.cancelFunc = context.WithCancel(ctx) w.ctx, w.cancelFunc = context.WithCancel(ctx)
go w.readloop() go w.readloop()
return w, nil return w, nil
} }
// Context return the context transport used // Context return the context transport used
func (h *WebsocketTransport) Context() context.Context { func (h *WebsocketTransport) Context() context.Context {
return h.ctx return h.ctx
} }
func (h *WebsocketTransport) readloop() { func (h *WebsocketTransport) readloop() {
defer func() { defer func() {
//log.Debugf("close websocket connection") //log.Debugf("close websocket connection")
h.Conn.Close() h.Conn.Close()
//log.Debugf("cancel context") //log.Debugf("cancel context")
h.cancelFunc() h.cancelFunc()
}() }()
for { for {
select { select {
case <-h.ctx.Done(): case <-h.ctx.Done():
return return
default: default:
} }
var res response var res response
_, data, err := h.Conn.ReadMessage() _, data, err := h.Conn.ReadMessage()
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
h.err = err h.err = err
return return
} }
log.Debugf("received: %s", data) log.Debugf("received: %s", data)
if err := json.Unmarshal(data, &res); err != nil { if err := json.Unmarshal(data, &res); err != nil {
h.err = err h.err = err
return return
} }
if res.ID != "" { if res.ID != "" {
h.Mu.Lock() h.Mu.Lock()
req, ok := h.inflight[res.ID] req, ok := h.inflight[res.ID]
h.Mu.Unlock() h.Mu.Unlock()
if !ok { if !ok {
log.Warnf("handler for id %s not exists", res.ID) log.Warnf("handler for id %s not exists", res.ID)
continue continue
} }
if res.Error != nil { if res.Error != nil {
req.errch <- res.Error req.errch <- res.Error
h.Mu.Lock() h.Mu.Lock()
delete(h.inflight, res.ID) delete(h.inflight, res.ID)
h.Mu.Unlock() h.Mu.Unlock()
continue continue
} }
req.ch <- res.Result req.ch <- res.Result
h.Mu.Lock() h.Mu.Lock()
delete(h.inflight, res.ID) delete(h.inflight, res.ID)
h.Mu.Unlock() h.Mu.Unlock()
continue continue
} }
if res.Method != "" { if res.Method != "" {
h.Mu.Lock() h.Mu.Lock()
req, ok := h.inflight[res.Method] req, ok := h.inflight[res.Method]
h.Mu.Unlock() h.Mu.Unlock()
if !ok { if !ok {
log.Warnf("handler for method %s not exists", res.Method) log.Warnf("handler for method %s not exists", res.Method)
continue continue
} }
if res.Error != nil { if res.Error != nil {
req.errch <- res.Error req.errch <- res.Error
continue continue
} }
req.ch <- res.Params req.ch <- res.Params
} }
} }
} }
func (h *WebsocketTransport) nextID() uint64 { func (h *WebsocketTransport) nextID() uint64 {
h.Mu.Lock() h.Mu.Lock()
defer h.Mu.Unlock() defer h.Mu.Unlock()
h.nextid++ h.nextid++
return h.nextid return h.nextid
} }
// Call call a remote method // Call call a remote method
func (h *WebsocketTransport) Call(method string, args interface{}, reply interface{}) error { func (h *WebsocketTransport) Call(method string, args interface{}, reply interface{}) error {
select { select {
case <-h.ctx.Done(): case <-h.ctx.Done():
return ErrConnClosed return ErrConnClosed
default: default:
} }
if h.err != nil { if h.err != nil {
return h.err return h.err
} }
id := fmt.Sprintf("%d", h.nextID()) id := fmt.Sprintf("%d", h.nextID())
req := request{ req := request{
ID: id, ID: id,
Version: "2.0", Version: "2.0",
Method: method, Method: method,
Params: args, Params: args,
} }
d, err := json.Marshal(req) d, err := json.Marshal(req)
if err != nil { if err != nil {
return err return err
} }
ch := make(chan json.RawMessage, 1) ch := make(chan json.RawMessage, 1)
errch := make(chan *Error, 1) errch := make(chan *Error, 1)
h.Mu.Lock() h.Mu.Lock()
h.inflight[id] = &inflightReq{ h.inflight[id] = &inflightReq{
id: id, id: id,
ch: ch, ch: ch,
errch: errch, errch: errch,
} }
h.Mu.Unlock() h.Mu.Unlock()
log.Debugf("write %s", d) log.Debugf("write %s", d)
if err := h.Conn.WriteMessage(websocket.TextMessage, d); err != nil { if err := h.Conn.WriteMessage(websocket.TextMessage, d); err != nil {
h.Mu.Lock() h.Mu.Lock()
delete(h.inflight, id) delete(h.inflight, id)
h.Mu.Unlock() h.Mu.Unlock()
return err return err
} }
select { select {
case data := <-ch: case data := <-ch:
return json.Unmarshal(data, reply) return json.Unmarshal(data, reply)
case err := <-errch: case err := <-errch:
return err return err
} }
//return nil //return nil
} }
// Subscribe subscribe for change // Subscribe subscribe for change
func (h *WebsocketTransport) Subscribe(method string, notifyMethod string, func (h *WebsocketTransport) Subscribe(method string, notifyMethod string,
args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) { args interface{}, reply interface{}) (chan json.RawMessage, chan *Error, error) {
ch := make(chan json.RawMessage) ch := make(chan json.RawMessage)
errch := make(chan *Error) errch := make(chan *Error)
h.Mu.Lock() h.Mu.Lock()
h.inflight[notifyMethod] = &inflightReq{ h.inflight[notifyMethod] = &inflightReq{
ch: ch, ch: ch,
errch: errch, errch: errch,
id: notifyMethod, id: notifyMethod,
} }
h.Mu.Unlock() h.Mu.Unlock()
err := h.Call(method, args, reply) err := h.Call(method, args, reply)
if err != nil { if err != nil {
h.Mu.Lock() h.Mu.Lock()
delete(h.inflight, notifyMethod) delete(h.inflight, notifyMethod)
h.Mu.Unlock() h.Mu.Unlock()
return nil, nil, err return nil, nil, err
} }
return ch, errch, nil return ch, errch, nil
} }

Loading…
Cancel
Save