From 9f05ca356aed3ee154147fbd17393851163bd0ae Mon Sep 17 00:00:00 2001 From: fangdingjun Date: Sat, 30 Jun 2018 21:47:11 +0800 Subject: [PATCH] add connection run loop --- _nghttp2.h | 3 +- http2.go | 193 ++++++++++++++++++++++++++++++++++++++++++++++++----- nghttp2.c | 29 ++++++-- 3 files changed, 200 insertions(+), 25 deletions(-) diff --git a/_nghttp2.h b/_nghttp2.h index d016412..09e8b90 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -12,6 +12,7 @@ extern int OnDataRecv(void *, int, void *, size_t); extern int OnBeginHeaderCallback(void *, int); extern int OnHeaderCallback(void *, int, void *, int, void *, int); extern int OnFrameRecvCallback(void *, int); +extern int OnStreamClose(void *, int); struct nv_array { @@ -32,6 +33,6 @@ int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen int send_client_connection_header(nghttp2_session *session); -void init_nghttp2_session(nghttp2_session *session, void *data); +nghttp2_session * init_nghttp2_session(size_t data); #endif \ No newline at end of file diff --git a/http2.go b/http2.go index 05f8af7..c1a1476 100644 --- a/http2.go +++ b/http2.go @@ -8,9 +8,13 @@ import "C" import ( "fmt" "io" + "log" "net" "net/http" + "strconv" "strings" + "sync" + "time" "unsafe" ) @@ -19,6 +23,9 @@ type Conn struct { session *C.nghttp2_session conn net.Conn streams map[int]*Stream + lock *sync.Mutex + errch chan struct{} + err error } // Stream http2 stream @@ -27,9 +34,12 @@ type Stream struct { cdp *C.nghttp2_data_provider dp *dataProvider // application read data from stream - r io.Reader + r *io.PipeReader // recv stream data from session - w io.Writer + w *io.PipeWriter + res *http.Response + resch chan *http.Response + errch chan error } type dataProvider struct { @@ -40,11 +50,21 @@ type dataProvider struct { } // NewConn create http2 connection -func NewConn(c net.Conn) *Conn { - conn := &Conn{conn: c, streams: make(map[int]*Stream)} - C.init_nghttp2_session(conn.session, unsafe.Pointer(conn)) - C.send_client_connection_header(conn.session) - return conn +func NewConn(c net.Conn) (*Conn, error) { + conn := &Conn{ + conn: c, streams: make(map[int]*Stream), lock: new(sync.Mutex), + errch: make(chan struct{}), + } + conn.session = C.init_nghttp2_session(C.size_t(int(uintptr(unsafe.Pointer(conn))))) + if conn.session == nil { + return nil, fmt.Errorf("init session failed") + } + ret := C.send_client_connection_header(conn.session) + if int(ret) < 0 { + log.Printf("submit settings error: %s", C.GoString(C.nghttp2_strerror(ret))) + } + go conn.run() + return conn, nil } func (c *Conn) onDataRecv(buf []byte, streamID int) { @@ -68,17 +88,85 @@ func (c *Conn) onFrameRecv(streamID int) { stream.onFrameRecv() } +func (c *Conn) onStreamClose(streamID int) { + stream, ok := c.streams[streamID] + if ok { + stream.Close() + c.lock.Lock() + delete(c.streams, streamID) + c.lock.Unlock() + } + +} + +// Close close the http2 connection +func (c *Conn) Close() error { + for _, s := range c.streams { + s.Close() + } + C.nghttp2_session_del(c.session) + close(c.errch) + c.conn.Close() + return nil +} + +func (c *Conn) run() { + var wantRead int + var wantWrite int + var delay = 50 + var ret C.int + +loop: + for { + select { + case <-c.errch: + break loop + default: + } + + wantRead = int(C.nghttp2_session_want_read(c.session)) + wantWrite = int(C.nghttp2_session_want_write(c.session)) + if wantWrite != 0 { + ret = C.nghttp2_session_send(c.session) + if int(ret) < 0 { + c.err = fmt.Errorf("sesion send error: %s", C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break + } + } + if wantRead != 0 { + ret = C.nghttp2_session_recv(c.session) + if int(ret) < 0 { + c.err = fmt.Errorf("sesion recv error: %s", C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break + } + } + + // make delay when no data read/write + if wantRead == 0 && wantWrite == 0 { + select { + case <-time.After(time.Duration(delay) * time.Millisecond): + } + } + } +} + // NewRequest create a new http2 stream -func (c *Conn) NewRequest(req *http.Request) *http.Response { +func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) { + if c.err != nil { + return nil, c.err + } + nvIndex := 0 nvMax := 25 nva := C.new_nv_array(C.size_t(nvMax)) setNvArray(nva, nvIndex, ":method", req.Method, 0) - nvIndex += 1 + nvIndex++ setNvArray(nva, nvIndex, ":scheme", "https", 0) - nvIndex += 1 + nvIndex++ setNvArray(nva, nvIndex, ":authority", req.Host, 0) - nvIndex += 1 + nvIndex++ p := req.URL.Path q := req.URL.Query().Encode() @@ -86,13 +174,13 @@ func (c *Conn) NewRequest(req *http.Request) *http.Response { p = p + "?" + q } setNvArray(nva, nvIndex, ":path", p, 0) - nvIndex += 1 + nvIndex++ for k, v := range req.Header { if strings.ToLower(k) == "host" { continue } setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) - nvIndex += 1 + nvIndex++ } var dp *dataProvider var cdp *C.nghttp2_data_provider @@ -102,13 +190,30 @@ func (c *Conn) NewRequest(req *http.Request) *http.Response { streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex+1)) C.delete_nv_array(nva) if int(streamID) < 0 { - return nil + return nil, fmt.Errorf("submit request error: %s", C.GoString(C.nghttp2_strerror(streamID))) } + log.Println("stream id ", int(streamID)) r, w := io.Pipe() - s := &Stream{streamID: int(streamID), dp: dp, cdp: cdp, r: r, w: w} + s := &Stream{ + streamID: int(streamID), + dp: dp, + cdp: cdp, + r: r, + w: w, + resch: make(chan *http.Response), + errch: make(chan error), + } + c.lock.Lock() c.streams[int(streamID)] = s + c.lock.Unlock() - return nil + select { + case err := <-s.errch: + return nil, err + case res := <-s.resch: + return res, nil + } + //return nil, fmt.Errorf("unknown error") } func setNvArray(a *C.struct_nv_array, index int, name, value string, flags int) { @@ -153,19 +258,44 @@ func (s *Stream) onDataRecv(buf []byte) { } func (s *Stream) onBeginHeader() { - + s.res = &http.Response{ + Header: make(http.Header), + } } func (s *Stream) onHeader(name, value string) { - + if name == ":status" { + statusCode, _ := strconv.Atoi(value) + s.res.StatusCode = statusCode + s.res.Status = http.StatusText(statusCode) + s.res.Proto = "HTTP/2.0" + s.res.ProtoMajor = 2 + s.res.ProtoMinor = 0 + return + } + s.res.Header.Add(name, value) } func (s *Stream) onFrameRecv() { + s.res.Body = s + s.resch <- s.res +} +// Close close the stream +func (s *Stream) Close() error { + select { + case s.errch <- fmt.Errorf("stream closed"): + } + close(s.resch) + close(s.errch) + s.w.Close() + return nil } +// DataSourceRead callback function for data read from data provider source //export DataSourceRead func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t { + log.Println("data source read") dp := (*dataProvider)(ptr) gobuf := make([]byte, int(length)) n, err := dp.Read(gobuf) @@ -181,47 +311,63 @@ func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.s return C.ssize_t(n) } +// OnDataRecv callback function for data frame received //export OnDataRecv func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int { + log.Println("on data recv") conn := (*Conn)(ptr) gobuf := C.GoBytes(buf, C.int(length)) conn.onDataRecv(gobuf, int(streamID)) return 0 } +// DataRead callback function for session wants read data from peer //export DataRead func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + log.Println("data read") conn := (*Conn)(ptr) buf := make([]byte, int(size)) + log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Read(buf) if err != nil { + log.Println(err) return -1 } + log.Println("read from network ", n) return C.ssize_t(n) } +// DataWrite callback function for session wants send data to peer //export DataWrite func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + log.Println("data write") conn := (*Conn)(ptr) buf := C.GoBytes(data, C.int(size)) + log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Write(buf) if err != nil { + log.Println(err) return -1 } + log.Println("write data to network ", n) return C.ssize_t(n) } +// OnBeginHeaderCallback callback function for response //export OnBeginHeaderCallback func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + log.Println("begin header") conn := (*Conn)(ptr) conn.onBeginHeader(int(streamID)) return 0 } +// OnHeaderCallback callback function for header //export OnHeaderCallback func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, name unsafe.Pointer, namelen C.int, value unsafe.Pointer, valuelen C.int) C.int { + log.Println("header") conn := (*Conn)(ptr) goname := C.GoBytes(name, namelen) govalue := C.GoBytes(value, valuelen) @@ -229,9 +375,20 @@ func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, return 0 } +// OnFrameRecvCallback callback function for begion to recv data //export OnFrameRecvCallback func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { + log.Println("frame recv") conn := (*Conn)(ptr) conn.onFrameRecv(int(streamID)) return 0 } + +// OnStreamClose callback function for stream close +//export OnStreamClose +func OnStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { + log.Println("stream close") + conn := (*Conn)(ptr) + conn.onStreamClose(int(streamID)) + return 0 +} diff --git a/nghttp2.c b/nghttp2.c index de1a1cc..53bbf83 100644 --- a/nghttp2.c +++ b/nghttp2.c @@ -21,6 +21,7 @@ static int on_header_callback(nghttp2_session *session, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) { + printf("on_header_callback\n"); switch (frame->hd.type) { case NGHTTP2_HEADERS: @@ -40,6 +41,7 @@ static int on_begin_headers_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { + printf("on_begin_headers_callback\n"); int stream_id = frame->hd.stream_id; switch (frame->hd.type) { @@ -48,8 +50,8 @@ static int on_begin_headers_callback(nghttp2_session *session, { fprintf(stderr, "Response headers for stream ID=%d:\n", frame->hd.stream_id); + OnBeginHeaderCallback(user_data, stream_id); } - OnBeginHeaderCallback(user_data, stream_id); break; } return 0; @@ -58,15 +60,15 @@ static int on_begin_headers_callback(nghttp2_session *session, static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { - + printf("on_frame_recv_callback\n"); switch (frame->hd.type) { case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { fprintf(stderr, "All headers received\n"); + OnFrameRecvCallback(user_data, frame->hd.stream_id); } - OnFrameRecvCallback(user_data, frame->hd.stream_id); break; } return 0; @@ -82,6 +84,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { + OnStreamClose(user_data, stream_id); return 0; } @@ -96,9 +99,18 @@ ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id, } return ret; } +int on_error_callback(nghttp2_session *session, int lib_error_code, + const char *msg, size_t len, void *user_data) +{ + //printf("errmsg %*s\n", msg, len); + printf("code: %d, error: %s\n", lib_error_code, nghttp2_strerror(lib_error_code)); + return 0; +} -void init_nghttp2_session(nghttp2_session *session, void *data) +nghttp2_session *init_nghttp2_session(size_t data) { + int ret; + nghttp2_session *session; nghttp2_session_callbacks *callbacks; nghttp2_session_callbacks_new(&callbacks); @@ -106,6 +118,7 @@ void init_nghttp2_session(nghttp2_session *session, void *data) nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback); + nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback); @@ -121,9 +134,13 @@ void init_nghttp2_session(nghttp2_session *session, void *data) nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, on_begin_headers_callback); - nghttp2_session_client_new(&session, callbacks, data); - + ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data))); + if (session == NULL) + { + printf("c init session failed: %s\n", nghttp2_strerror(ret)); + } nghttp2_session_callbacks_del(callbacks); + return session; } int send_client_connection_header(nghttp2_session *session)