fix subscribe process issue

master
dingjun 5 years ago
parent bbb112b301
commit 40d34d478f

@ -22,6 +22,7 @@ type WebsocketTransport struct {
err error err error
ctx context.Context ctx context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
subscribes map[string][]*inflightReq
} }
type inflightReq struct { type inflightReq struct {
@ -52,6 +53,7 @@ func NewWebsocketTransport(ctx context.Context, uri string) (Transport, error) {
Conn: conn, Conn: conn,
URL: uri, URL: uri,
inflight: make(map[string]*inflightReq), inflight: make(map[string]*inflightReq),
subscribes: make(map[string][]*inflightReq),
Mu: new(sync.Mutex), Mu: new(sync.Mutex),
} }
w.ctx, w.cancelFunc = context.WithCancel(ctx) w.ctx, w.cancelFunc = context.WithCancel(ctx)
@ -95,6 +97,8 @@ func (h *WebsocketTransport) readloop() {
} }
if res.ID != "" { if res.ID != "" {
// response
h.Mu.Lock() h.Mu.Lock()
req, ok := h.inflight[res.ID] req, ok := h.inflight[res.ID]
h.Mu.Unlock() h.Mu.Unlock()
@ -120,9 +124,12 @@ func (h *WebsocketTransport) readloop() {
continue continue
} }
if res.Method != "" { if res.Method != "" {
// notify
h.Mu.Lock() h.Mu.Lock()
req, ok := h.inflight[res.Method] reqs, ok := h.subscribes[res.Method]
h.Mu.Unlock() h.Mu.Unlock()
if !ok { if !ok {
@ -130,13 +137,18 @@ func (h *WebsocketTransport) readloop() {
continue continue
} }
for _, req := range reqs {
if res.Error != nil { if res.Error != nil {
req.errch <- res.Error req.errch <- res.Error
continue } else {
}
req.ch <- res.Params req.ch <- res.Params
} }
} }
continue
}
log.Warnf("unhandled message %s", data)
}
} }
func (h *WebsocketTransport) nextID() uint64 { func (h *WebsocketTransport) nextID() uint64 {
@ -210,18 +222,42 @@ func (h *WebsocketTransport) Subscribe(method string, notifyMethod string,
ch := make(chan json.RawMessage) ch := make(chan json.RawMessage)
errch := make(chan *Error) errch := make(chan *Error)
h.Mu.Lock() req := &inflightReq{
h.inflight[notifyMethod] = &inflightReq{
ch: ch, ch: ch,
errch: errch, errch: errch,
id: notifyMethod, id: notifyMethod,
} }
h.Mu.Lock()
sub, ok := h.subscribes[notifyMethod]
if ok {
h.subscribes[notifyMethod] = append(sub, req)
} else {
h.subscribes[notifyMethod] = []*inflightReq{req}
}
h.Mu.Unlock() h.Mu.Unlock()
err := h.Call(method, args, reply) err := h.Call(method, args, reply)
if err != nil { if err != nil {
h.Mu.Lock() h.Mu.Lock()
delete(h.inflight, notifyMethod) sub := h.subscribes[notifyMethod]
n := len(sub)
for i := 0; i < n; i++ {
if sub[i] == req {
if i == 0 {
// first
h.subscribes[notifyMethod] = sub[1:]
break
}
if i == (n - 1) {
// last
h.subscribes[notifyMethod] = sub[:n-1]
break
}
h.subscribes[notifyMethod] = append(sub[:i], sub[i+1:]...)
break
}
}
h.Mu.Unlock() h.Mu.Unlock()
return nil, nil, err return nil, nil, err
} }

Loading…
Cancel
Save