From 8c53841e25033fc4545c4770be2e5cc27179d039 Mon Sep 17 00:00:00 2001 From: fangdingjun Date: Thu, 12 Jul 2018 15:31:56 +0800 Subject: [PATCH] code format --- callbacks.go | 11 ++++++++--- conn.go | 22 +++++++++++++++++++++- data_provider.go | 8 ++++++++ stream.go | 15 ++++++++++++++- 4 files changed, 51 insertions(+), 5 deletions(-) diff --git a/callbacks.go b/callbacks.go index 38b3df1..04d657f 100644 --- a/callbacks.go +++ b/callbacks.go @@ -286,6 +286,7 @@ func onClientDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.si return C.ssize_t(n) } */ + // onClientDataSendCallback callback function for libnghttp2 library want send data to network. // //export onClientDataSendCallback @@ -422,8 +423,12 @@ func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { func onClientConnectionCloseCallback(ptr unsafe.Pointer) { conn := (*ClientConn)(ptr) conn.err = io.EOF - select { - case conn.exitch <- struct{}{}: - default: + + // signal all goroutings exit + for i := 0; i < 4; i++ { + select { + case conn.exitch <- struct{}{}: + default: + } } } diff --git a/conn.go b/conn.go index 6188761..61756f1 100644 --- a/conn.go +++ b/conn.go @@ -284,15 +284,21 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { setNvArray(nva, nvIndex, ":authority", req.Host, 0) nvIndex++ + /* + :path must starts with "/" + req.RequestURI maybe starts with http:// + */ p := req.URL.Path q := req.URL.Query().Encode() if q != "" { p = p + "?" + q } + if req.Method != "CONNECT" { setNvArray(nva, nvIndex, ":path", p, 0) nvIndex++ } + //log.Printf("%s http://%s%s", req.Method, req.Host, p) for k, v := range req.Header { //log.Printf("header %s: %s\n", k, v[0]) @@ -304,8 +310,10 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) nvIndex++ } + var dp *dataProvider var cdp *C.nghttp2_data_provider + if req.Method == "PUT" || req.Method == "POST" || req.Method == "CONNECT" { dp, cdp = newDataProvider(c.lock) go func() { @@ -320,6 +328,7 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { c.lock.Unlock() C.delete_nv_array(nva) + if int(streamID) < 0 { return nil, fmt.Errorf("submit request error: %s", C.GoString(C.nghttp2_strerror(streamID))) @@ -331,6 +340,7 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { dp.streamID = int(streamID) dp.session = c.session } + s := &ClientStream{ streamID: int(streamID), conn: c, @@ -340,11 +350,13 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { errch: make(chan error), lock: new(sync.Mutex), } + c.lock.Lock() c.streams[int(streamID)] = s c.streamCount++ c.lock.Unlock() + // waiting for response from server select { case err := <-s.errch: return nil, err @@ -422,10 +434,13 @@ func Server(c net.Conn, handler http.Handler) (*ServerConn, error) { errch: make(chan struct{}), exitch: make(chan struct{}), } - conn.session = C.init_nghttp2_server_session(C.size_t(uintptr(unsafe.Pointer(conn)))) + + conn.session = C.init_nghttp2_server_session( + C.size_t(uintptr(unsafe.Pointer(conn)))) if conn.session == nil { return nil, fmt.Errorf("init session failed") } + //log.Println("send server connection header") ret := C.send_server_connection_header(conn.session) if int(ret) < 0 { @@ -441,9 +456,12 @@ func (c *ServerConn) serve(s *ServerStream) { if c.Handler == nil { handler = http.DefaultServeMux } + if s.req.URL == nil { s.req.URL = &url.URL{} } + + // call http.Handler to serve request handler.ServeHTTP(s, s.req) s.Close() } @@ -454,6 +472,7 @@ func (c *ServerConn) Close() error { return nil } c.closed = true + for _, s := range c.streams { s.Close() } @@ -465,6 +484,7 @@ func (c *ServerConn) Close() error { C.nghttp2_session_del(c.session) close(c.exitch) c.conn.Close() + return nil } diff --git a/data_provider.go b/data_provider.go index 3e053c7..190a630 100644 --- a/data_provider.go +++ b/data_provider.go @@ -42,13 +42,16 @@ func (dp *dataProvider) Read(buf []byte) (n int, err error) { func (dp *dataProvider) Write(buf []byte) (n int, err error) { dp.lock.Lock() defer dp.lock.Unlock() + if dp.closed { return 0, io.EOF } + if dp.deferred { dp.sessLock.Lock() C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) dp.sessLock.Unlock() + dp.deferred = false } return dp.buf.Write(buf) @@ -58,6 +61,7 @@ func (dp *dataProvider) Write(buf []byte) (n int, err error) { func (dp *dataProvider) Close() error { dp.lock.Lock() defer dp.lock.Unlock() + if dp.closed { return nil } @@ -67,6 +71,7 @@ func (dp *dataProvider) Close() error { dp.sessLock.Lock() C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) dp.sessLock.Unlock() + dp.deferred = false } return nil @@ -95,6 +100,7 @@ type bodyProvider struct { // will block when data not yet avaliable func (bp *bodyProvider) Read(buf []byte) (int, error) { var delay = 100 * time.Millisecond + for { bp.lock.Lock() n, err := bp.buf.Read(buf) @@ -112,6 +118,7 @@ func (bp *bodyProvider) Read(buf []byte) (int, error) { func (bp *bodyProvider) Write(buf []byte) (int, error) { bp.lock.Lock() defer bp.lock.Unlock() + return bp.buf.Write(buf) } @@ -119,6 +126,7 @@ func (bp *bodyProvider) Write(buf []byte) (int, error) { func (bp *bodyProvider) Close() error { bp.lock.Lock() defer bp.lock.Unlock() + bp.closed = true return nil } diff --git a/stream.go b/stream.go index dda3603..67cbdea 100644 --- a/stream.go +++ b/stream.go @@ -138,14 +138,19 @@ func (s *ServerStream) WriteHeader(code int) { if s.closed { return } + if s.responseSend { return } + s.responseSend = true s.statusCode = code + nvIndex := 0 nvMax := 25 + nva := C.new_nv_array(C.size_t(nvMax)) + setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0) nvIndex++ @@ -157,11 +162,14 @@ func (s *ServerStream) WriteHeader(code int) { setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) nvIndex++ } + var dp *dataProvider var cdp *C.nghttp2_data_provider + dp, cdp = newDataProvider(s.conn.lock) dp.streamID = s.streamID dp.session = s.conn.session + s.dp = dp s.cdp = cdp @@ -172,7 +180,8 @@ func (s *ServerStream) WriteHeader(code int) { C.delete_nv_array(nva) if int(ret) < 0 { - panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret)))) + panic(fmt.Sprintf("sumit response error %s", + C.GoString(C.nghttp2_strerror(ret)))) } //log.Printf("stream %d send response", s.streamID) } @@ -192,14 +201,17 @@ func (s *ServerStream) Close() error { return nil } s.closed = true + //C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) if s.req.Body != nil { s.req.Body.Close() } + if s.dp != nil { s.dp.Close() s.dp = nil } + if s.cdp != nil { C.free(unsafe.Pointer(s.cdp)) s.cdp = nil @@ -207,6 +219,7 @@ func (s *ServerStream) Close() error { s.conn.lock.Lock() s.conn.lock.Unlock() + if _, ok := s.conn.streams[s.streamID]; ok { delete(s.conn.streams, s.streamID) }