From c87800ed83e1d4483449cedbd6e949e7f2257da4 Mon Sep 17 00:00:00 2001 From: fangdingjun Date: Sun, 15 Jul 2018 16:11:46 +0800 Subject: [PATCH] merge client/server connection and stream --- _nghttp2.h | 29 +- callbacks.go | 379 +++++++----------------- conn.go | 743 ++++++++++------------------------------------- data_provider.go | 15 + nghttp2.c | 416 ++++++-------------------- stream.go | 255 +++------------- 6 files changed, 414 insertions(+), 1423 deletions(-) diff --git a/_nghttp2.h b/_nghttp2.h index d585e11..ed66bb7 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -7,16 +7,17 @@ #define ARRLEN(x) (sizeof(x) / sizeof(x[0])) -extern ssize_t onClientDataRecvCallback(void *, void *data, size_t); -extern ssize_t onClientDataSendCallback(void *, void *data, size_t); +extern ssize_t onDataRecvCallback(void *, void *data, size_t); +extern ssize_t onDataSendCallback(void *, void *data, size_t); extern ssize_t onServerDataSourceReadCallback(void *, int, void *, size_t); -extern ssize_t onClientDataSourceReadCallback(void *, int, void *, size_t); -extern int onClientDataChunkRecv(void *, int, void *, size_t); -extern int onClientBeginHeaderCallback(void *, int); -extern int onClientHeaderCallback(void *, int, void *, int, void *, int); -extern int onClientHeadersDoneCallback(void *, int); -extern int onClientStreamClose(void *, int); -extern void onClientConnectionCloseCallback(void *user_data); +extern ssize_t onDataSourceReadCallback(void *, int, void *, size_t); +extern int onDataChunkRecv(void *, int, void *, size_t); +extern int onBeginHeaderCallback(void *, int); +extern int onHeaderCallback(void *, int, void *, int, void *, int); +extern int onHeadersDoneCallback(void *, int); +extern int onStreamClose(void *, int); +extern void onConnectionCloseCallback(void *user_data); +extern void onStreamEndCallback(void *, int); int _nghttp2_submit_response(nghttp2_session *sess, int streamid, size_t nv, size_t nvlen, nghttp2_data_provider *dp); @@ -25,15 +26,7 @@ int _nghttp2_submit_request(nghttp2_session *session, const nghttp2_priority_spe size_t nva, size_t nvlen, const nghttp2_data_provider *data_prd, void *stream_user_data); -extern ssize_t onServerDataRecvCallback(void *, void *data, size_t); -extern ssize_t onServerDataSendCallback(void *, void *data, size_t); -extern int onServerDataChunkRecv(void *, int, void *, size_t); -extern int onServerBeginHeaderCallback(void *, int); -extern int onServerHeaderCallback(void *, int, void *, int, void *, int); -extern int onServerStreamEndCallback(void *, int); -extern int onServerHeadersDoneCallback(void *, int); -extern int onServerStreamClose(void *, int); -int send_server_connection_header(nghttp2_session *session); +int send_connection_header(nghttp2_session *session); struct nv_array { diff --git a/callbacks.go b/callbacks.go index a1d7f6a..8823636 100644 --- a/callbacks.go +++ b/callbacks.go @@ -5,17 +5,19 @@ package nghttp2 */ import "C" import ( - "bytes" "crypto/tls" + "errors" "io" "net/http" - "net/url" "strconv" "strings" - "sync" "unsafe" ) +var ( + errAgain = errors.New("again") +) + const ( NGHTTP2_NO_ERROR = 0 NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE = -521 @@ -23,176 +25,16 @@ const ( NGHTTP2_ERR_DEFERRED = -508 ) -// onServerDataSendCallback callback function for libnghttp2 library -// want send data to network. -// -//export onServerDataSendCallback -func onServerDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, - length C.size_t) C.ssize_t { - //log.Println("server data send") - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - buf := C.GoBytes(data, C.int(length)) - n, err := conn.conn.Write(buf) - if err != nil { - return NGHTTP2_ERR_CALLBACK_FAILURE - } - //log.Println("send ", n, " bytes to network ", buf) - return C.ssize_t(n) -} - -// onServerDataChunkRecv callback function for libnghttp2 library's data chunk recv. -// -//export onServerDataChunkRecv -func onServerDataChunkRecv(ptr unsafe.Pointer, streamID C.int, - data unsafe.Pointer, length C.size_t) C.int { - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - bp := s.req.Body.(*bodyProvider) - buf := C.GoBytes(data, C.int(length)) - bp.Write(buf) - return C.int(length) -} - -// onServerBeginHeaderCallback callback function for begin begin header recv. -// -//export onServerBeginHeaderCallback -func onServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - var TLS tls.ConnectionState - if tlsconn, ok := conn.conn.(*tls.Conn); ok { - TLS = tlsconn.ConnectionState() - } - - s := &ServerStream{ - streamID: int(streamID), - conn: conn, - req: &http.Request{ - //URL: &url.URL{}, - Header: http.Header{}, - Proto: "HTTP/2.0", - ProtoMajor: 2, - ProtoMinor: 0, - RemoteAddr: conn.conn.RemoteAddr().String(), - TLS: &TLS, - }, - //buf: new(bytes.Buffer), - } - //conn.lock.Lock() - conn.streams[int(streamID)] = s - //conn.lock.Unlock() - - return NGHTTP2_NO_ERROR -} - -// onServerHeaderCallback callback function for each header recv. -// -//export onServerHeaderCallback -func onServerHeaderCallback(ptr unsafe.Pointer, streamID C.int, - name unsafe.Pointer, namelen C.int, - value unsafe.Pointer, valuelen C.int) C.int { - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - hdrname := C.GoStringN((*C.char)(name), namelen) - hdrvalue := C.GoStringN((*C.char)(value), valuelen) - hdrname = strings.ToLower(hdrname) - switch hdrname { - case ":method": - s.req.Method = hdrvalue - case ":scheme": - // s.req.URL.Scheme = hdrvalue - case ":path": - s.req.RequestURI = hdrvalue - u, _ := url.ParseRequestURI(s.req.RequestURI) - s.req.URL = u - case ":authority": - s.req.Host = hdrvalue - case "content-length": - s.req.Header.Add(hdrname, hdrvalue) - n, err := strconv.ParseInt(hdrvalue, 10, 64) - if err == nil { - s.req.ContentLength = n - } - default: - s.req.Header.Add(hdrname, hdrvalue) - } - return NGHTTP2_NO_ERROR -} - -// onServerStreamEndCallback callback function for the stream when END_STREAM flag set -// -//export onServerStreamEndCallback -func onServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int { - - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - - s.streamEnd = true - bp := s.req.Body.(*bodyProvider) - if s.req.Method != "CONNECT" { - bp.closed = true - //log.Println("stream end flag set, begin to serve") - go conn.serve(s) - } - return NGHTTP2_NO_ERROR -} - -// onServerHeadersDoneCallback callback function for the stream when all headers received. -// -//export onServerHeadersDoneCallback -func onServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - s.headersDone = true - bp := &bodyProvider{ - buf: new(bytes.Buffer), - lock: new(sync.Mutex), - } - s.req.Body = bp - if s.req.Method == "CONNECT" { - go conn.serve(s) - } - return NGHTTP2_NO_ERROR -} - -// onServerStreamClose callback function for the stream when closed. -// -//export onServerStreamClose -func onServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - //log.Printf("stream %d closed", int(streamID)) - if !ok { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - //conn.lock.Lock() - delete(conn.streams, int(streamID)) - //conn.lock.Unlock() - go s.Close() - return NGHTTP2_NO_ERROR -} - -// onClientDataSourceReadCallback callback function for libnghttp2 library +// onDataSourceReadCallback callback function for libnghttp2 library // want read data from data provider source, // return NGHTTP2_ERR_DEFERRED will cause data frame defered, // application later call nghttp2_session_resume_data will re-quene the data frame // -//export onClientDataSourceReadCallback -func onClientDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, +//export onDataSourceReadCallback +func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.ssize_t { //log.Println("onDataSourceReadCallback begin") - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) s, ok := conn.streams[int(streamID)] if !ok { //log.Println("client dp callback, stream not exists") @@ -221,193 +63,169 @@ func onClientDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, return C.ssize_t(n) } -// onServerDataSourceReadCallback callback function for libnghttp2 library -// want read data from data provider source, -// return NGHTTP2_ERR_DEFERRED will cause data frame defered, -// application later call nghttp2_session_resume_data will re-quene the data frame +// onDataChunkRecv callback function for libnghttp2 library data chunk received. // -//export onServerDataSourceReadCallback -func onServerDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, - buf unsafe.Pointer, length C.size_t) C.ssize_t { - //log.Println("onDataSourceReadCallback begin") - conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - //log.Println("server dp callback, stream not exists") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - gobuf := make([]byte, int(length)) - n, err := s.dp.Read(gobuf) - if err != nil { - if err == io.EOF { - //log.Println("onDataSourceReadCallback end") - return 0 - } - if err == errAgain { - //log.Println("onDataSourceReadCallback end") - s.dp.deferred = true - return NGHTTP2_ERR_DEFERRED - } - //log.Println("onDataSourceReadCallback end") - return NGHTTP2_ERR_CALLBACK_FAILURE - } - //cbuf := C.CBytes(gobuf) - //defer C.free(cbuf) - //C.memcpy(buf, cbuf, C.size_t(n)) - C.memcpy(buf, unsafe.Pointer(&gobuf[0]), C.size_t(n)) - //log.Println("onDataSourceReadCallback end") - return C.ssize_t(n) -} - -// onClientDataChunkRecv callback function for libnghttp2 library data chunk received. -// -//export onClientDataChunkRecv -func onClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int, +//export onDataChunkRecv +func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int { - //log.Println("onClientDataChunkRecv begin") - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + //log.Println("onDataChunkRecv begin") + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) gobuf := C.GoBytes(buf, C.int(length)) s, ok := conn.streams[int(streamID)] if !ok { - //log.Println("onClientDataChunkRecv end") + //log.Println("onDataChunkRecv end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } - if s.res.Body == nil { + if s.bp == nil { //log.Println("empty body") - //log.Println("onClientDataChunkRecv end") + //log.Println("onDataChunkRecv end") return C.int(length) } - if bp, ok := s.res.Body.(*bodyProvider); ok { - n, err := bp.Write(gobuf) - if err != nil { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - //log.Println("onClientDataChunkRecv end") - return C.int(n) + n, err := s.bp.Write(gobuf) + if err != nil { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } - //log.Println("onClientDataChunkRecv end") - return C.int(length) + //log.Println("onDataChunkRecv end") + return C.int(n) } -// onClientDataSendCallback callback function for libnghttp2 library want send data to network. +// onDataSendCallback callback function for libnghttp2 library want send data to network. // -//export onClientDataSendCallback -func onClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { - //log.Println("onClientDataSendCallback begin") +//export onDataSendCallback +func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + //log.Println("onDataSendCallback begin") //log.Println("data write req ", int(size)) - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) buf := C.GoBytes(data, C.int(size)) //log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Write(buf) if err != nil { - //log.Println("onClientDataSendCallback end") + //log.Println("onDataSendCallback end") return NGHTTP2_ERR_CALLBACK_FAILURE } //log.Printf("write %d bytes to network ", n) - //log.Println("onClientDataSendCallback end") + //log.Println("onDataSendCallback end") return C.ssize_t(n) } -// onClientBeginHeaderCallback callback function for begin header receive. +// onBeginHeaderCallback callback function for begin header receive. // -//export onClientBeginHeaderCallback -func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("onClientBeginHeaderCallback begin") +//export onBeginHeaderCallback +func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("onBeginHeaderCallback begin") //log.Printf("stream %d begin headers", int(streamID)) - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) s, ok := conn.streams[int(streamID)] if !ok { - //log.Println("onClientBeginHeaderCallback end") + //log.Println("onBeginHeaderCallback end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } var TLS tls.ConnectionState if tlsconn, ok := conn.conn.(*tls.Conn); ok { TLS = tlsconn.ConnectionState() } - s.res = &http.Response{ - Header: make(http.Header), - Body: &bodyProvider{ - buf: new(bytes.Buffer), - lock: new(sync.Mutex), - }, - TLS: &TLS, + if conn.isServer { + s.request = &http.Request{ + Header: make(http.Header), + Proto: "HTTP/2", + ProtoMajor: 2, + ProtoMinor: 0, + TLS: &TLS, + Body: s.bp, + } + return NGHTTP2_NO_ERROR + } + s.response = &http.Response{ + Proto: "HTTP/2", + ProtoMajor: 2, + ProtoMinor: 0, + Header: make(http.Header), + Body: s.bp, + TLS: &TLS, } - //log.Println("onClientBeginHeaderCallback end") + //log.Println("onBeginHeaderCallback end") return NGHTTP2_NO_ERROR } -// onClientHeaderCallback callback function for each header received. +// onHeaderCallback callback function for each header received. // -//export onClientHeaderCallback -func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, +//export onHeaderCallback +func onHeaderCallback(ptr unsafe.Pointer, streamID C.int, name unsafe.Pointer, namelen C.int, value unsafe.Pointer, valuelen C.int) C.int { - //log.Println("onClientHeaderCallback begin") + //log.Println("onHeaderCallback begin") //log.Println("header") - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) goname := string(C.GoBytes(name, namelen)) govalue := string(C.GoBytes(value, valuelen)) s, ok := conn.streams[int(streamID)] if !ok { - //log.Println("onClientHeaderCallback end") + //log.Println("onHeaderCallback end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } goname = strings.ToLower(goname) switch goname { + case ":method": + s.request.Method = govalue + case ":scheme": + case ":authority": + s.request.Host = govalue + case ":path": + s.request.RequestURI = govalue case ":status": statusCode, _ := strconv.Atoi(govalue) - s.res.StatusCode = statusCode - s.res.Status = http.StatusText(statusCode) - s.res.Proto = "HTTP/2.0" - s.res.ProtoMajor = 2 - s.res.ProtoMinor = 0 + s.response.StatusCode = statusCode + s.response.Status = http.StatusText(statusCode) case "content-length": - s.res.Header.Add(goname, govalue) + s.response.Header.Add(goname, govalue) n, err := strconv.ParseInt(govalue, 10, 64) if err == nil { - s.res.ContentLength = n + s.response.ContentLength = n } case "transfer-encoding": - s.res.Header.Add(goname, govalue) - s.res.TransferEncoding = append(s.res.TransferEncoding, govalue) + s.response.Header.Add(goname, govalue) + s.response.TransferEncoding = append(s.response.TransferEncoding, govalue) default: - s.res.Header.Add(goname, govalue) + s.response.Header.Add(goname, govalue) } - //log.Println("onClientHeaderCallback end") + //log.Println("onHeaderCallback end") return NGHTTP2_NO_ERROR } -// onClientHeadersDoneCallback callback function for the stream when all headers received. +// onHeadersDoneCallback callback function for the stream when all headers received. // -//export onClientHeadersDoneCallback -func onClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("onClientHeadersDoneCallback begin") +//export onHeadersDoneCallback +func onHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("onHeadersDoneCallback begin") //log.Printf("stream %d headers done", int(streamID)) - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) s, ok := conn.streams[int(streamID)] if !ok { - //log.Println("onClientHeadersDoneCallback end") + //log.Println("onHeadersDoneCallback end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } + if conn.isServer { + return NGHTTP2_NO_ERROR + } select { - case s.resch <- s.res: + case s.resch <- s.response: default: } - //log.Println("onClientHeadersDoneCallback end") + //log.Println("onHeadersDoneCallback end") return NGHTTP2_NO_ERROR } -// onClientStreamClose callback function for the stream when closed. +// onStreamClose callback function for the stream when closed. // -//export onClientStreamClose -func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("onClientStreamClose begin") +//export onStreamClose +func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("onStreamClose begin") //log.Printf("stream %d closed", int(streamID)) - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) stream, ok := conn.streams[int(streamID)] if ok { @@ -416,16 +234,16 @@ func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { delete(conn.streams, int(streamID)) //go stream.Close() //conn.lock.Unlock() - //log.Println("onClientStreamClose end") + //log.Println("onStreamClose end") return NGHTTP2_NO_ERROR } - //log.Println("onClientStreamClose end") + //log.Println("onStreamClose end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } -//export onClientConnectionCloseCallback -func onClientConnectionCloseCallback(ptr unsafe.Pointer) { - conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr))) +//export onConnectionCloseCallback +func onConnectionCloseCallback(ptr unsafe.Pointer) { + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn.err = io.EOF // signal all goroutings exit @@ -436,3 +254,8 @@ func onClientConnectionCloseCallback(ptr unsafe.Pointer) { } } } + +//export onStreamEndCallback +func onStreamEndCallback(ptr unsafe.Pointer, streamID C.int) { + +} diff --git a/conn.go b/conn.go index a1b031b..2a2f4b3 100644 --- a/conn.go +++ b/conn.go @@ -1,589 +1,154 @@ -package nghttp2 - -/* -#cgo pkg-config: libnghttp2 -#include "_nghttp2.h" -*/ -import "C" -import ( - "crypto/tls" - "errors" - "fmt" - "io" - "net" - "net/http" - "net/url" - "strings" - "sync" - "time" - "unsafe" -) - -var ( - errAgain = errors.New("again") -) - -// ClientConn http2 client connection -type ClientConn struct { - session *C.nghttp2_session - conn net.Conn - streams map[int]*ClientStream - lock *sync.Mutex - errch chan struct{} - exitch chan struct{} - err error - closed bool - streamCount int -} - -// Client create http2 client -func Client(c net.Conn) (*ClientConn, error) { - conn := &ClientConn{ - conn: c, streams: make(map[int]*ClientStream), - lock: new(sync.Mutex), - errch: make(chan struct{}), - exitch: make(chan struct{}), - } - conn.session = C.init_nghttp2_client_session( - C.size_t(int(uintptr(unsafe.Pointer(conn))))) - if conn.session == nil { - return nil, fmt.Errorf("init session failed") - } - ret := C.nghttp2_submit_settings(conn.session, 0, nil, 0) - if int(ret) < 0 { - conn.Close() - return nil, fmt.Errorf("submit settings error: %s", - C.GoString(C.nghttp2_strerror(ret))) - } - go conn.run() - return conn, nil -} - -// Error return current error on connection -func (c *ClientConn) Error() error { - c.lock.Lock() - defer c.lock.Unlock() - return c.err -} - -// Close close the http2 connection -func (c *ClientConn) Close() error { - if c.closed { - return nil - } - c.closed = true - - for _, s := range c.streams { - s.Close() - } - - c.lock.Lock() - defer c.lock.Unlock() - //log.Println("close client connection") - C.nghttp2_session_terminate_session(c.session, 0) - C.nghttp2_session_del(c.session) - close(c.exitch) - c.conn.Close() - return nil -} - -func (c *ClientConn) run() { - var wantWrite int - var delay = 50 * time.Millisecond - var keepalive = 5 * time.Second - var ret C.int - var lastDataRecv time.Time - - //defer c.Close() - - defer close(c.errch) - - errch := make(chan struct{}, 5) - - // data read loop - go func() { - buf := make([]byte, 16*1024) - readloop: - for { - select { - case <-c.exitch: - break readloop - case <-errch: - break readloop - default: - } - - n, err := c.conn.Read(buf) - if err != nil { - c.lock.Lock() - c.err = err - c.lock.Unlock() - close(errch) - break - } - //log.Printf("read %d bytes from network", n) - lastDataRecv = time.Now() - //d1 := C.CBytes(buf[:n]) - - c.lock.Lock() - //ret1 := C.nghttp2_session_mem_recv(c.session, - // (*C.uchar)(d1), C.size_t(n)) - ret1 := C.nghttp2_session_mem_recv(c.session, - (*C.uchar)(unsafe.Pointer(&buf[0])), C.size_t(n)) - c.lock.Unlock() - - //C.free(d1) - if int(ret1) < 0 { - c.lock.Lock() - c.err = fmt.Errorf("sesion recv error: %s", - C.GoString(C.nghttp2_strerror(ret))) - //log.Println(c.err) - c.lock.Unlock() - close(errch) - break - } - } - }() - - // keep alive loop - go func() { - for { - select { - case <-c.exitch: - return - case <-errch: - return - case <-time.After(keepalive): - } - now := time.Now() - last := lastDataRecv - d := now.Sub(last) - if d > keepalive { - c.lock.Lock() - C.nghttp2_submit_ping(c.session, 0, nil) - c.lock.Unlock() - //log.Println("submit ping") - } - } - }() - -loop: - for { - select { - case <-c.errch: - break loop - case <-errch: - break loop - case <-c.exitch: - break loop - default: - } - - c.lock.Lock() - ret = C.nghttp2_session_send(c.session) - c.lock.Unlock() - - if int(ret) < 0 { - c.lock.Lock() - c.err = fmt.Errorf("sesion send error: %s", - C.GoString(C.nghttp2_strerror(ret))) - c.lock.Unlock() - //log.Println(c.err) - close(errch) - break - } - - c.lock.Lock() - wantWrite = int(C.nghttp2_session_want_write(c.session)) - c.lock.Unlock() - - // make delay when no data read/write - if wantWrite == 0 { - time.Sleep(delay) - } - } -} - -// Connect submit a CONNECT request, return a ClientStream and http status code from server -// -// equals to "CONNECT host:port" in http/1.1 -func (c *ClientConn) Connect(addr string) (cs *ClientStream, statusCode int, err error) { - if c.err != nil { - return nil, http.StatusServiceUnavailable, c.err - } - var nv = []C.nghttp2_nv{} - nv = append(nv, newNV(":method", "CONNECT")) - nv = append(nv, newNV(":authority", addr)) - - var dp *dataProvider - - s := &ClientStream{ - conn: c, - cdp: C.nghttp2_data_provider{}, - resch: make(chan *http.Response), - errch: make(chan error), - lock: new(sync.Mutex), - } - - dp = newDataProvider(unsafe.Pointer(&s.cdp), c.lock, 1) - s.dp = dp - - c.lock.Lock() - streamID := C._nghttp2_submit_request(c.session, nil, - C.size_t(uintptr(unsafe.Pointer(&nv[0]))), C.size_t(len(nv)), &s.cdp, nil) - c.lock.Unlock() - - if int(streamID) < 0 { - return nil, http.StatusServiceUnavailable, fmt.Errorf( - "submit request error: %s", C.GoString(C.nghttp2_strerror(streamID))) - } - dp.streamID = int(streamID) - dp.session = c.session - s.streamID = int(streamID) - //log.Println("stream id ", int(streamID)) - - c.lock.Lock() - c.streams[int(streamID)] = s - c.streamCount++ - c.lock.Unlock() - - //log.Printf("new stream id %d", int(streamID)) - select { - case err := <-s.errch: - //log.Println("wait response, got ", err) - return nil, http.StatusServiceUnavailable, err - case res := <-s.resch: - if res != nil { - res.Request = &http.Request{ - Method: "CONNECT", - RequestURI: addr, - URL: &url.URL{}, - Host: addr, - } - return s, res.StatusCode, nil - } - //log.Println("wait response, empty response") - return nil, http.StatusServiceUnavailable, io.EOF - case <-c.errch: - return nil, http.StatusServiceUnavailable, fmt.Errorf("connection error") - } -} - -func newNV(name, value string) C.nghttp2_nv { - nv := C.nghttp2_nv{} - nameArr := make([]byte, len(name)+1) - valueArr := make([]byte, len(value)+1) - copy(nameArr, []byte(name)) - copy(valueArr, []byte(value)) - - nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0])) - nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0])) - nv.namelen = C.size_t(len(name)) - nv.valuelen = C.size_t(len(value)) - nv.flags = 0 - return nv -} - -// CreateRequest submit a request and return a http.Response, -func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { - if c.err != nil { - return nil, c.err - } - - nv := []C.nghttp2_nv{} - nv = append(nv, newNV(":method", req.Method)) - nv = append(nv, newNV(":scheme", "https")) - nv = append(nv, newNV(":authority", req.Host)) - - /* - :path must starts with "/" - req.RequestURI maybe starts with http:// - */ - p := req.URL.Path - q := req.URL.Query().Encode() - if q != "" { - p = p + "?" + q - } - - nv = append(nv, newNV(":path", p)) - - //log.Printf("%s http://%s%s", req.Method, req.Host, p) - for k, v := range req.Header { - //log.Printf("header %s: %s\n", k, v[0]) - _k := strings.ToLower(k) - if _k == "host" || _k == "connection" || _k == "proxy-connection" { - continue - } - //log.Printf("header %s: %s", k, v) - nv = append(nv, newNV(k, v[0])) - } - - var dp *dataProvider - - s := &ClientStream{ - //streamID: int(streamID), - conn: c, - resch: make(chan *http.Response), - errch: make(chan error), - lock: new(sync.Mutex), - } - - if req.Method == "PUT" || req.Method == "POST" || req.Method == "CONNECT" { - s.cdp = C.nghttp2_data_provider{} - dp = newDataProvider(unsafe.Pointer(&s.cdp), c.lock, 1) - s.dp = dp - go func() { - io.Copy(dp, req.Body) - dp.Close() - }() - } - - c.lock.Lock() - streamID := C._nghttp2_submit_request(c.session, nil, - C.size_t(uintptr(unsafe.Pointer(&nv[0]))), C.size_t(len(nv)), &s.cdp, nil) - c.lock.Unlock() - - //C.delete_nv_array(nva) - - if int(streamID) < 0 { - return nil, fmt.Errorf("submit request error: %s", - C.GoString(C.nghttp2_strerror(streamID))) - } - s.streamID = int(streamID) - //log.Printf("new stream, id %d", int(streamID)) - - if dp != nil { - dp.streamID = int(streamID) - dp.session = c.session - } - - c.lock.Lock() - c.streams[int(streamID)] = s - c.streamCount++ - c.lock.Unlock() - - // waiting for response from server - select { - case err := <-s.errch: - return nil, err - case res := <-s.resch: - if res != nil { - res.Request = req - return res, nil - } - return nil, io.EOF - case <-c.errch: - return nil, fmt.Errorf("connection error") - } - //return nil, fmt.Errorf("unknown error") -} - -// CanTakeNewRequest check if the ClientConn can submit a new request -func (c *ClientConn) CanTakeNewRequest() bool { - c.lock.Lock() - c.lock.Unlock() - - if c.closed { - return false - } - - if c.err != nil { - return false - } - - if c.streamCount > ((1 << 31) / 2) { - return false - } - return true -} - -// ServerConn server connection -type ServerConn struct { - // Handler handler to handle request - Handler http.Handler - - closed bool - session *C.nghttp2_session - streams map[int]*ServerStream - lock *sync.Mutex - conn net.Conn - errch chan struct{} - exitch chan struct{} - err error -} - -// HTTP2Handler is the http2 server handler that can co-work with standard net/http. -// -// usage example: -// l, err := net.Listen("tcp", ":1222") -// srv := &http.Server{ -// TLSConfig: &tls.Config{ -// NextProtos:[]string{"h2", "http/1.1"}, -// } -// TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){ -// "h2": nghttp2.Http2Handler -// } -// } -// srv.ServeTLS(l, "server.crt", "server.key") -func HTTP2Handler(srv *http.Server, conn *tls.Conn, handler http.Handler) { - h2conn, err := Server(conn, handler) - if err != nil { - panic(err.Error()) - } - h2conn.Run() -} - -// Server create new server connection -func Server(c net.Conn, handler http.Handler) (*ServerConn, error) { - conn := &ServerConn{ - conn: c, - Handler: handler, - streams: make(map[int]*ServerStream), - lock: new(sync.Mutex), - errch: make(chan struct{}), - exitch: make(chan struct{}), - } - - conn.session = C.init_nghttp2_server_session( - C.size_t(uintptr(unsafe.Pointer(conn)))) - if conn.session == nil { - return nil, fmt.Errorf("init session failed") - } - - //log.Println("send server connection header") - ret := C.send_server_connection_header(conn.session) - if int(ret) < 0 { - return nil, fmt.Errorf(C.GoString(C.nghttp2_strerror(ret))) - //return nil, fmt.Errorf("send connection header failed") - } - //go conn.run() - return conn, nil -} - -func (c *ServerConn) serve(s *ServerStream) { - var handler = c.Handler - if c.Handler == nil { - handler = http.DefaultServeMux - } - - if s.req.URL == nil { - s.req.URL = &url.URL{} - } - - // call http.Handler to serve request - handler.ServeHTTP(s, s.req) - s.Close() -} - -// Close close the server connection -func (c *ServerConn) Close() error { - if c.closed { - return nil - } - c.closed = true - - for _, s := range c.streams { - s.Close() - } - - c.lock.Lock() - defer c.lock.Unlock() - - C.nghttp2_session_terminate_session(c.session, 0) - C.nghttp2_session_del(c.session) - close(c.exitch) - c.conn.Close() - - return nil -} - -// Run run the server loop -func (c *ServerConn) Run() { - var wantWrite int - var delay = 100 * time.Millisecond - var ret C.int - - defer c.Close() - defer close(c.errch) - - errch := make(chan struct{}, 5) - - go func() { - buf := make([]byte, 16*1024) - readloop: - for { - select { - case <-c.exitch: - break readloop - case <-errch: - break readloop - default: - } - - n, err := c.conn.Read(buf) - if err != nil { - c.lock.Lock() - c.err = err - c.lock.Unlock() - close(errch) - break - } - - //d1 := C.CBytes(buf[:n]) - - c.lock.Lock() - //ret1 := C.nghttp2_session_mem_recv(c.session, - // (*C.uchar)(d1), C.size_t(n)) - ret1 := C.nghttp2_session_mem_recv(c.session, - (*C.uchar)(unsafe.Pointer(&buf[0])), C.size_t(n)) - c.lock.Unlock() - - //C.free(d1) - if int(ret1) < 0 { - c.lock.Lock() - c.err = fmt.Errorf("sesion recv error: %s", - C.GoString(C.nghttp2_strerror(ret))) - c.lock.Unlock() - //log.Println(c.err) - close(errch) - break - } - } - }() - -loop: - for { - select { - case <-c.errch: - break loop - case <-errch: - break loop - case <-c.exitch: - break loop - default: - } - - c.lock.Lock() - ret = C.nghttp2_session_send(c.session) - c.lock.Unlock() - - if int(ret) < 0 { - c.lock.Lock() - c.err = fmt.Errorf("sesion send error: %s", - C.GoString(C.nghttp2_strerror(ret))) - c.lock.Unlock() - //log.Println(c.err) - close(errch) - break - } - - c.lock.Lock() - wantWrite = int(C.nghttp2_session_want_write(c.session)) - c.lock.Unlock() - - // make delay when no data read/write - if wantWrite == 0 { - time.Sleep(delay) - } - } -} +package nghttp2 + +/* +#cgo pkg-config: libnghttp2 +#include "_nghttp2.h" +*/ +import "C" +import ( + "errors" + "fmt" + "net" + "net/http" + "sync" + "time" + "unsafe" +) + +// Conn http2 connection +type Conn struct { + conn net.Conn + session *C.nghttp2_session + streams map[int]*stream + streamCount int + closed bool + isServer bool + handler http.Handler + lock *sync.Mutex + err error + errch chan error + exitch chan struct{} +} + +// RoundTrip submit http request and return the response +func (c *Conn) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, errors.New("not implement") +} + +// Connect submit connect request +func (c *Conn) Connect(addr string) (net.Conn, error) { + return nil, errors.New("not implement") +} + +// Run run the event loop +func (c *Conn) Run() { + defer c.Close() + + go c.readloop() + go c.writeloop() + + for { + select { + case err := <-c.errch: + c.err = err + return + case <-c.exitch: + return + } + } +} + +// Close close the connection +func (c *Conn) Close() error { + if c.closed { + return nil + } + c.closed = true + for _, s := range c.streams { + s.Close() + } + close(c.exitch) + c.conn.Close() + return nil +} + +func (c *Conn) errorNotify(err error) { + select { + case c.errch <- err: + default: + } +} + +func (c *Conn) readloop() { + type data struct { + buf []byte + err error + } + + var ret C.ssize_t + var err error + var d data + + datach := make(chan data) + + go func() { + d1 := data{} + var n int + var err1 error + for { + buf := make([]byte, 16*1024) + n, err1 = c.conn.Read(buf) + d1.buf = buf[:n] + d1.err = err1 + datach <- d1 + } + }() + + for { + select { + case <-c.exitch: + return + case d = <-datach: + if d.err != nil { + c.errorNotify(d.err) + return + } + c.lock.Lock() + ret = C.nghttp2_session_mem_recv(c.session, + (*C.uchar)(unsafe.Pointer(&d.buf[0])), C.size_t(len(d.buf))) + c.lock.Unlock() + if int(ret) < 0 { + err = fmt.Errorf("http2 recv error: %s", C.GoString(C.nghttp2_strerror(C.int(ret)))) + c.errorNotify(err) + return + } + } + } +} + +func (c *Conn) writeloop() { + var ret C.int + var err error + var delay = 50 * time.Millisecond + for { + select { + case <-c.exitch: + return + default: + } + c.lock.Lock() + ret = C.nghttp2_session_send(c.session) + c.lock.Unlock() + if int(ret) < 0 { + err = fmt.Errorf("http2 send error: %s", C.GoString(C.nghttp2_strerror(C.int(ret)))) + c.errorNotify(err) + return + } + c.lock.Lock() + wantWrite := C.nghttp2_session_want_write(c.session) + c.lock.Unlock() + if int(wantWrite) == 0 { + time.Sleep(delay) + } + } +} diff --git a/data_provider.go b/data_provider.go index 30b748c..9d1efb2 100644 --- a/data_provider.go +++ b/data_provider.go @@ -144,3 +144,18 @@ func (bp *bodyProvider) Close() error { bp.closed = true return nil } + +func newNV(name, value string) C.nghttp2_nv { + nv := C.nghttp2_nv{} + nameArr := make([]byte, len(name)+1) + valueArr := make([]byte, len(value)+1) + copy(nameArr, []byte(name)) + copy(valueArr, []byte(value)) + + nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0])) + nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0])) + nv.namelen = C.size_t(len(name)) + nv.valuelen = C.size_t(len(value)) + nv.flags = 0 + return nv +} diff --git a/nghttp2.c b/nghttp2.c index dd0d91f..3b8f301 100644 --- a/nghttp2.c +++ b/nghttp2.c @@ -7,76 +7,103 @@ int on_invalid_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, int lib_error_code, void *user_data); -static ssize_t server_send_callback(nghttp2_session *session, - const uint8_t *data, size_t length, - int flags, void *user_data) +int init_nghttp2_callbacks(nghttp2_session_callbacks *callbacks); + +int on_error_callback(nghttp2_session *session, const char *msg, + size_t len, void *user_data) +{ + printf("on error callback: %s\n", msg); + return 0; +} +int on_invalid_frame_recv_callback(nghttp2_session *session, + const nghttp2_frame *frame, + int lib_error_code, void *user_data) { - return onServerDataSendCallback(user_data, (void *)data, length); + printf("invalid frame recv: %s\n", nghttp2_strerror(lib_error_code)); + return 0; +} +static ssize_t on_data_source_read_callback(nghttp2_session *session, int32_t stream_id, + uint8_t *buf, size_t length, uint32_t *data_flags, + nghttp2_data_source *source, void *user_data) +{ + int ret = onDataSourceReadCallback(user_data, stream_id, buf, length); + if (ret == 0) + { + *data_flags = NGHTTP2_DATA_FLAG_EOF; + } + return ret; } -static int on_server_frame_recv_callback(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) +static ssize_t on_send_callback(nghttp2_session *session, + const uint8_t *data, size_t length, + int flags, void *user_data) +{ + return onDataSendCallback(user_data, (void *)data, length); +} + +static int on_frame_recv_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) { switch (frame->hd.type) { case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) { - onServerHeadersDoneCallback(user_data, frame->hd.stream_id); + onHeadersDoneCallback(user_data, frame->hd.stream_id); } case NGHTTP2_DATA: if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - onServerStreamEndCallback(user_data, frame->hd.stream_id); + onStreamEndCallback(user_data, frame->hd.stream_id); } break; } return 0; } -static int on_server_stream_close_callback(nghttp2_session *session, - int32_t stream_id, - uint32_t error_code, - void *user_data) +static int on_stream_close_callback(nghttp2_session *session, + int32_t stream_id, + uint32_t error_code, + void *user_data) { - onServerStreamClose(user_data, stream_id); + onStreamClose(user_data, stream_id); return 0; } -static int on_server_header_callback(nghttp2_session *session, - const nghttp2_frame *frame, - const uint8_t *name, size_t namelen, - const uint8_t *value, - size_t valuelen, uint8_t flags, - void *user_data) +static int on_header_callback(nghttp2_session *session, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, + size_t valuelen, uint8_t flags, + void *user_data) { switch (frame->hd.type) { case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) { - onServerHeaderCallback(user_data, frame->hd.stream_id, - (void *)name, namelen, (void *)value, valuelen); + onHeaderCallback(user_data, frame->hd.stream_id, + (void *)name, namelen, (void *)value, valuelen); } break; } return 0; } -static int on_server_data_chunk_recv_callback(nghttp2_session *session, - uint8_t flags, - int32_t stream_id, - const uint8_t *data, - size_t len, void *user_data) +static int on_data_chunk_recv_callback(nghttp2_session *session, + uint8_t flags, + int32_t stream_id, + const uint8_t *data, + size_t len, void *user_data) { - return onServerDataChunkRecv(user_data, stream_id, (void *)data, len); + return onDataChunkRecv(user_data, stream_id, (void *)data, len); } -static int on_server_begin_headers_callback(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) +static int on_begin_headers_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) { switch (frame->hd.type) @@ -84,7 +111,7 @@ static int on_server_begin_headers_callback(nghttp2_session *session, case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) { - onServerBeginHeaderCallback(user_data, frame->hd.stream_id); + onBeginHeaderCallback(user_data, frame->hd.stream_id); } break; } @@ -96,335 +123,62 @@ nghttp2_session *init_nghttp2_server_session(size_t data) nghttp2_session_callbacks *callbacks; nghttp2_session *session; - nghttp2_session_callbacks_new(&callbacks); - - nghttp2_session_callbacks_set_send_callback(callbacks, server_send_callback); - - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - on_server_frame_recv_callback); - - nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, on_server_stream_close_callback); - - nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, - on_invalid_frame_recv_callback); - - nghttp2_session_callbacks_set_on_data_chunk_recv_callback( - callbacks, on_server_data_chunk_recv_callback); - nghttp2_session_callbacks_set_on_header_callback(callbacks, - on_server_header_callback); - - nghttp2_session_callbacks_set_error_callback(callbacks, on_error_callback); - nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, on_server_begin_headers_callback); - + init_nghttp2_callbacks(callbacks); nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); nghttp2_session_callbacks_del(callbacks); return session; } -int send_server_connection_header(nghttp2_session *session) -{ - nghttp2_settings_entry iv[1] = { - {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; - int rv; - - rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, - ARRLEN(iv)); - return rv; -} - -// send_callback send data to network -static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *data, - size_t length, int flags, void *user_data) -{ - return onClientDataSendCallback(user_data, (void *)data, length); -} - -static int on_client_header_callback(nghttp2_session *session, - const nghttp2_frame *frame, const uint8_t *name, - size_t namelen, const uint8_t *value, - size_t valuelen, uint8_t flags, void *user_data) -{ - //printf("on_header_callback\n"); - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) - { - /* Print response headers for the initiated request. */ - //print_header(stderr, name, namelen, value, valuelen); - onClientHeaderCallback(user_data, frame->hd.stream_id, - (void *)name, namelen, (void *)value, valuelen); - break; - } - } - return 0; -} - -static int on_client_begin_headers_callback(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) -{ - //printf("on_begin_headers_callback\n"); - int stream_id = frame->hd.stream_id; - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) - { - //fprintf(stderr, "Response headers for stream ID=%d:\n", - // frame->hd.stream_id); - onClientBeginHeaderCallback(user_data, stream_id); - } - break; - } - return 0; -} - -int on_invalid_frame_recv_callback(nghttp2_session *session, - const nghttp2_frame *frame, - int lib_error_code, void *user_data) -{ - printf("on_invalid_frame_recv, frame %d, code %d, msg %s\n", - frame->hd.type, - lib_error_code, - nghttp2_strerror(lib_error_code)); - return 0; -} -#if 0 -static int on_client_frame_send_callback(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) -{ - size_t i; - (void)user_data; - //printf("on_frame_send_callback\n"); - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - /* - if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) - { - */ - if (1) - { - const nghttp2_nv *nva = frame->headers.nva; - printf("[INFO] C ----------------------------> S (HEADERS)\n"); - for (i = 0; i < frame->headers.nvlen; ++i) - { - fwrite(nva[i].name, 1, nva[i].namelen, stdout); - printf(": "); - fwrite(nva[i].value, 1, nva[i].valuelen, stdout); - printf("\n"); - } - } - break; - case NGHTTP2_RST_STREAM: - printf("[INFO] C ----------------------------> S (RST_STREAM)\n"); - break; - case NGHTTP2_GOAWAY: - printf("[INFO] C ----------------------------> S (GOAWAY)\n"); - break; - } - return 0; -} -#endif -static int on_client_frame_recv_callback(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) -{ - //printf("on_frame_recv_callback %d\n", frame->hd.type); - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) - { - //fprintf(stderr, "All headers received\n"); - onClientHeadersDoneCallback(user_data, frame->hd.stream_id); - } - break; - case NGHTTP2_RST_STREAM: - //printf("server send rst_stream %d\n", frame->rst_stream.error_code); - break; - case NGHTTP2_GOAWAY: - //printf("server send go away\n"); - onClientConnectionCloseCallback(user_data); - break; - case NGHTTP2_PING: - //printf("ping frame received\n"); - break; - } - return 0; -} - -static int on_client_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, - int32_t stream_id, const uint8_t *data, - size_t len, void *user_data) +nghttp2_session *init_nghttp2_client_session(size_t data) { - return onClientDataChunkRecv(user_data, stream_id, (void *)data, len); -} + nghttp2_session_callbacks *callbacks; + nghttp2_session *session; + nghttp2_session_callbacks_new(&callbacks); + init_nghttp2_callbacks(callbacks); + nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data))); -static int on_client_stream_close_callback(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *user_data) -{ - onClientStreamClose(user_data, stream_id); - return 0; + nghttp2_session_callbacks_del(callbacks); + return session; } -static ssize_t on_client_data_source_read_callback(nghttp2_session *session, int32_t stream_id, - uint8_t *buf, size_t length, uint32_t *data_flags, - nghttp2_data_source *source, void *user_data) +int init_nghttp2_callbacks(nghttp2_session_callbacks *callbacks) { - int ret = onClientDataSourceReadCallback(user_data, stream_id, buf, length); - if (ret == 0) - { - *data_flags = NGHTTP2_DATA_FLAG_EOF; - } - return ret; -} -static ssize_t on_server_data_source_read_callback(nghttp2_session *session, int32_t stream_id, - uint8_t *buf, size_t length, uint32_t *data_flags, - nghttp2_data_source *source, void *user_data) -{ - int ret = onServerDataSourceReadCallback(user_data, stream_id, buf, length); - if (ret == 0) - { - *data_flags = NGHTTP2_DATA_FLAG_EOF; - } - return ret; -} + nghttp2_session_callbacks_set_send_callback(callbacks, on_send_callback); -int on_error_callback(nghttp2_session *session, - const char *msg, size_t len, void *user_data) -{ - //printf("errmsg %*s\n", msg, len); - printf("error: %s\n", msg); - return 0; -} + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + on_frame_recv_callback); -void init_client_callbacks(nghttp2_session_callbacks *callbacks) -{ - nghttp2_session_callbacks_set_send_callback(callbacks, client_send_callback); - //nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback); + nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, on_stream_close_callback); - //nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback); - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - on_client_frame_recv_callback); - //nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_client_frame_send_callback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback( - callbacks, on_client_data_chunk_recv_callback); - - nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, on_client_stream_close_callback); - + callbacks, on_data_chunk_recv_callback); nghttp2_session_callbacks_set_on_header_callback(callbacks, - on_client_header_callback); + on_header_callback); + nghttp2_session_callbacks_set_error_callback(callbacks, on_error_callback); nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, on_client_begin_headers_callback); + callbacks, on_begin_headers_callback); } -nghttp2_session *init_nghttp2_client_session(size_t data) +int send_connection_header(nghttp2_session *session) { - int ret; - nghttp2_session *session; - nghttp2_session_callbacks *callbacks; - nghttp2_session_callbacks_new(&callbacks); - init_client_callbacks(callbacks); - ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data))); - if (session == NULL) - { - printf("c init session failed: %s\n", nghttp2_strerror(ret)); - } - return session; -} -#if 0 -int send_client_connection_header(nghttp2_session *session) -{ - //nghttp2_settings_entry iv[1] = { - // {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; + nghttp2_settings_entry iv[1] = { + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; int rv; - /* client 24 bytes magic string will be sent by nghttp2 library */ - /* rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv)); - */ - - rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, NULL, 0); - /* - if (rv != 0) - { - errx(1, "Could not submit SETTINGS: %s", nghttp2_strerror(rv)); - } - */ return rv; } -#endif -#if 0 -int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen, - nghttp2_data_provider *dp) -{ - int32_t stream_id; - /* - nghttp2_nv hdrs[] = { - MAKE_NV2(":method", "GET"), - MAKE_NV(":scheme", &uri[u->field_data[UF_SCHEMA].off], - u->field_data[UF_SCHEMA].len), - MAKE_NV(":authority", stream_data->authority, stream_data->authoritylen), - MAKE_NV(":path", stream_data->path, stream_data->pathlen)}; - fprintf(stderr, "Request headers:\n"); - print_headers(stderr, hdrs, ARRLEN(hdrs)); - */ - /* - int i; - for (i = 0; i < hdrlen; i++) - { - printf("header %s: %s\n", hdrs[i].name, hdrs[i].value); - } - */ - stream_id = nghttp2_submit_request(session, NULL, hdrs, - hdrlen, dp, NULL); - /* - if (stream_id < 0) - { - errx(1, "Could not submit HTTP request: %s", nghttp2_strerror(stream_id)); - } - */ - - return stream_id; -} -#endif - -int data_provider_set_callback(size_t cdp, size_t data, int type) -{ - //nghttp2_data_provider *dp = malloc(sizeof(nghttp2_data_provider)); - nghttp2_data_provider *dp = (nghttp2_data_provider *)cdp; - dp->source.ptr = (void *)((int *)data); - if (type == 0) - { - dp->read_callback = on_server_data_source_read_callback; - } - else - { - dp->read_callback = on_client_data_source_read_callback; - } - return 0; -} -int _nghttp2_submit_response(nghttp2_session *sess, int streamid, - size_t nv, size_t nvlen, nghttp2_data_provider *dp) -{ - return nghttp2_submit_response(sess, streamid, (nghttp2_nv *)nv, nvlen, dp); -} - -int _nghttp2_submit_request(nghttp2_session *session, const nghttp2_priority_spec *pri_spec, - size_t nva, size_t nvlen, - const nghttp2_data_provider *data_prd, void *stream_user_data) -{ - return nghttp2_submit_request(session, pri_spec, (nghttp2_nv *)nva, nvlen, data_prd, stream_user_data); +int data_provider_set_callback(size_t dp, size_t data, int t){ + nghttp2_data_provider *cdp = (nghttp2_data_provider*)dp; + cdp->source.ptr = (void *)data; + cdp->read_callback=on_data_source_read_callback; } \ No newline at end of file diff --git a/stream.go b/stream.go index f6d4e97..e43b234 100644 --- a/stream.go +++ b/stream.go @@ -1,207 +1,48 @@ -package nghttp2 - -/* -#include "_nghttp2.h" -*/ -import "C" -import ( - "fmt" - "io" - "net/http" - "strings" - "sync" - "unsafe" -) - -// ClientStream http2 client stream -type ClientStream struct { - streamID int - conn *ClientConn - cdp C.nghttp2_data_provider - dp *dataProvider - res *http.Response - resch chan *http.Response - errch chan error - closed bool - lock *sync.Mutex -} - -// Read read stream data -func (s *ClientStream) Read(buf []byte) (n int, err error) { - if s.closed || s.res == nil || s.res.Body == nil { - return 0, io.EOF - } - return s.res.Body.Read(buf) -} - -// Write write data to stream -func (s *ClientStream) Write(buf []byte) (n int, err error) { - if s.closed { - return 0, io.EOF - } - if s.dp != nil { - return s.dp.Write(buf) - } - return 0, fmt.Errorf("empty data provider") -} - -// Close close the stream -func (s *ClientStream) Close() error { - //s.lock.Lock() - //defer s.lock.Unlock() - if s.closed { - return nil - } - s.closed = true - err := io.EOF - //log.Printf("close stream %d", int(s.streamID)) - select { - case s.errch <- err: - default: - } - if s.res != nil && s.res.Body != nil { - s.res.Body.Close() - } - //log.Println("close stream done") - if s.dp != nil { - s.dp.Close() - } - - if s.res != nil && s.res.Request != nil && - s.res.Request.Method == "CONNECT" { - //log.Printf("send rst stream for %d", s.streamID) - s.conn.lock.Lock() - C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) - s.conn.lock.Unlock() - } - return nil -} - -// ServerStream server stream -type ServerStream struct { - streamID int - // headers receive done - headersDone bool - // is stream_end flag received - streamEnd bool - // request - req *http.Request - // response header - header http.Header - // response statusCode - statusCode int - // response has send - responseSend bool - - // server connection - conn *ServerConn - - // data provider - dp *dataProvider - cdp C.nghttp2_data_provider - - closed bool - //buf *bytes.Buffer -} - -// Write write data to stream, -// implements http.ResponseWriter -func (s *ServerStream) Write(buf []byte) (int, error) { - if s.closed { - return 0, io.EOF - } - - if !s.responseSend { - s.WriteHeader(http.StatusOK) - } - return s.dp.Write(buf) -} - -// WriteHeader set response code and send reponse, -// implements http.ResponseWriter -func (s *ServerStream) WriteHeader(code int) { - if s.closed { - return - } - - if s.responseSend { - return - } - - s.responseSend = true - s.statusCode = code - - var nv = []C.nghttp2_nv{} - - nv = append(nv, newNV(":status", fmt.Sprintf("%d", code))) - - for k, v := range s.header { - //log.Println(k, v[0]) - _k := strings.ToLower(k) - if _k == "host" || _k == "connection" || _k == "proxy-connection" || - _k == "transfer-encoding" { - continue - } - nv = append(nv, newNV(k, v[0])) - } - - var dp *dataProvider - - dp = newDataProvider(unsafe.Pointer(&s.cdp), s.conn.lock, 0) - dp.streamID = s.streamID - dp.session = s.conn.session - - s.dp = dp - - s.conn.lock.Lock() - ret := C._nghttp2_submit_response( - s.conn.session, C.int(s.streamID), - C.size_t(uintptr(unsafe.Pointer(&nv[0]))), - C.size_t(len(nv)), &s.cdp) - s.conn.lock.Unlock() - - if int(ret) < 0 { - panic(fmt.Sprintf("sumit response error %s", - C.GoString(C.nghttp2_strerror(ret)))) - } - //log.Printf("stream %d send response", s.streamID) -} - -// Header return the http.Header, -// implements http.ResponseWriter -func (s *ServerStream) Header() http.Header { - if s.header == nil { - s.header = http.Header{} - } - return s.header -} - -// Close close the stream -func (s *ServerStream) Close() error { - if s.closed { - return nil - } - s.closed = true - - if s.req.Body != nil { - s.req.Body.Close() - } - - if s.dp != nil { - s.dp.Close() - //s.dp = nil - } - - if s.req.Method == "CONNECT" { - s.conn.lock.Lock() - s.conn.lock.Unlock() - - if _, ok := s.conn.streams[s.streamID]; ok { - //log.Printf("send rst stream %d", s.streamID) - C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) - delete(s.conn.streams, s.streamID) - } - } - //log.Printf("stream %d closed", s.streamID) - return nil -} +package nghttp2 + +import ( + "errors" + "net" + "net/http" + "time" +) + +type stream struct { + streamID int + conn *Conn + dp *dataProvider + bp *bodyProvider + request *http.Request + response *http.Response + resch chan *http.Response +} + +var _ net.Conn = &stream{} + +func (s *stream) Read(buf []byte) (int, error) { + return 0, errors.New("not implement") +} +func (s *stream) Write(buf []byte) (int, error) { + if s.conn.isServer { + return 0, errors.New("not implement") + } + return 0, errors.New("not implement") +} +func (s *stream) Close() error { + return nil +} +func (s *stream) LocalAddr() net.Addr { + return nil +} +func (s *stream) RemoteAddr() net.Addr { + return nil +} +func (s *stream) SetDeadline(t time.Time) error { + return errors.New("not implement") +} +func (s *stream) SetReadDeadline(t time.Time) error { + return errors.New("not implement") +} +func (s *stream) SetWriteDeadline(t time.Time) error { + return errors.New("not implement") +}