From 40d34d478ff4caf1fcf4c834607c0fb5109ac70a Mon Sep 17 00:00:00 2001 From: dingjun Date: Wed, 21 Aug 2019 17:08:39 +0800 Subject: [PATCH] fix subscribe process issue --- websocket_transport.go | 60 +++++++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/websocket_transport.go b/websocket_transport.go index 147abb2..cd7398b 100644 --- a/websocket_transport.go +++ b/websocket_transport.go @@ -22,6 +22,7 @@ type WebsocketTransport struct { err error ctx context.Context cancelFunc context.CancelFunc + subscribes map[string][]*inflightReq } type inflightReq struct { @@ -49,10 +50,11 @@ func NewWebsocketTransport(ctx context.Context, uri string) (Transport, error) { } w := &WebsocketTransport{ - Conn: conn, - URL: uri, - inflight: make(map[string]*inflightReq), - Mu: new(sync.Mutex), + Conn: conn, + URL: uri, + inflight: make(map[string]*inflightReq), + subscribes: make(map[string][]*inflightReq), + Mu: new(sync.Mutex), } w.ctx, w.cancelFunc = context.WithCancel(ctx) go w.readloop() @@ -95,6 +97,8 @@ func (h *WebsocketTransport) readloop() { } if res.ID != "" { + // response + h.Mu.Lock() req, ok := h.inflight[res.ID] h.Mu.Unlock() @@ -120,9 +124,12 @@ func (h *WebsocketTransport) readloop() { continue } + if res.Method != "" { + // notify + h.Mu.Lock() - req, ok := h.inflight[res.Method] + reqs, ok := h.subscribes[res.Method] h.Mu.Unlock() if !ok { @@ -130,12 +137,17 @@ func (h *WebsocketTransport) readloop() { continue } - if res.Error != nil { - req.errch <- res.Error - continue + for _, req := range reqs { + if res.Error != nil { + req.errch <- res.Error + } else { + req.ch <- res.Params + } } - req.ch <- res.Params + continue } + + log.Warnf("unhandled message %s", data) } } @@ -210,18 +222,42 @@ func (h *WebsocketTransport) Subscribe(method string, notifyMethod string, ch := make(chan json.RawMessage) errch := make(chan *Error) - h.Mu.Lock() - h.inflight[notifyMethod] = &inflightReq{ + req := &inflightReq{ ch: ch, errch: errch, id: notifyMethod, } + + h.Mu.Lock() + sub, ok := h.subscribes[notifyMethod] + if ok { + h.subscribes[notifyMethod] = append(sub, req) + } else { + h.subscribes[notifyMethod] = []*inflightReq{req} + } h.Mu.Unlock() err := h.Call(method, args, reply) if err != nil { h.Mu.Lock() - delete(h.inflight, notifyMethod) + sub := h.subscribes[notifyMethod] + n := len(sub) + for i := 0; i < n; i++ { + if sub[i] == req { + if i == 0 { + // first + h.subscribes[notifyMethod] = sub[1:] + break + } + if i == (n - 1) { + // last + h.subscribes[notifyMethod] = sub[:n-1] + break + } + h.subscribes[notifyMethod] = append(sub[:i], sub[i+1:]...) + break + } + } h.Mu.Unlock() return nil, nil, err }