From 8d4069b1c2c212ff216c16f7b782d32a201b88f1 Mon Sep 17 00:00:00 2001 From: fangdingjun Date: Wed, 4 Jul 2018 11:02:25 +0800 Subject: [PATCH] restructure --- _nghttp2.h | 14 +- callbacks.go | 271 ++++++++++++++++++++++ client.go | 509 ------------------------------------------ conn.go | 417 ++++++++++++++++++++++++++++++++++ data_provider.go | 120 ++++++++++ http2_test.go | 6 + client.c => nghttp2.c | 139 +++++++++++- server.c | 132 ----------- server.go | 438 ------------------------------------ stream.go | 172 ++++++++++++++ 10 files changed, 1128 insertions(+), 1090 deletions(-) create mode 100644 callbacks.go delete mode 100644 client.go create mode 100644 conn.go create mode 100644 data_provider.go rename client.c => nghttp2.c (65%) delete mode 100644 server.c delete mode 100644 server.go create mode 100644 stream.go diff --git a/_nghttp2.h b/_nghttp2.h index 188e3bb..41eb69a 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -7,18 +7,18 @@ #define ARRLEN(x) (sizeof(x) / sizeof(x[0])) -extern ssize_t ClientDataRecv(void *, void *data, size_t); -extern ssize_t ClientDataSend(void *, void *data, size_t); -extern ssize_t DataSourceRead(void *, void *, size_t); -extern int OnClientDataRecv(void *, int, void *, size_t); +extern ssize_t OnClientDataRecvCallback(void *, void *data, size_t); +extern ssize_t OnClientDataSendCallback(void *, void *data, size_t); +extern ssize_t OnDataSourceReadCallback(void *, void *, size_t); +extern int OnClientDataChunkRecv(void *, int, void *, size_t); extern int OnClientBeginHeaderCallback(void *, int); extern int OnClientHeaderCallback(void *, int, void *, int, void *, int); extern int OnClientHeadersDoneCallback(void *, int); extern int OnClientStreamClose(void *, int); -extern ssize_t ServerDataRecv(void *, void *data, size_t); -extern ssize_t ServerDataSend(void *, void *data, size_t); -extern int OnServerDataRecv(void *, int, void *, size_t); +extern ssize_t OnServerDataRecvCallback(void *, void *data, size_t); +extern ssize_t OnServerDataSendCallback(void *, void *data, size_t); +extern int OnServerDataChunkRecv(void *, int, void *, size_t); extern int OnServerBeginHeaderCallback(void *, int); extern int OnServerHeaderCallback(void *, int, void *, int, void *, int); extern int OnServerStreamEndCallback(void *, int); diff --git a/callbacks.go b/callbacks.go new file mode 100644 index 0000000..b9e1454 --- /dev/null +++ b/callbacks.go @@ -0,0 +1,271 @@ +package nghttp2 + +/* +#include "_nghttp2.h" +*/ +import "C" +import ( + "bytes" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" + "unsafe" +) + +// OnServerDataRecvCallback callback function for libnghttp2 library +// want receive data from network, +//export OnServerDataRecvCallback +func OnServerDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, + length C.size_t) C.ssize_t { + conn := (*ServerConn)(ptr) + buf := make([]byte, int(length)) + n, err := conn.conn.Read(buf) + if err != nil { + return -1 + } + cbuf := C.CBytes(buf[:n]) + defer C.free(cbuf) + C.memcpy(data, cbuf, C.size_t(n)) + return C.ssize_t(n) +} + +// OnServerDataSendCallback callback function for libnghttp2 library +// want send data to network +//export OnServerDataSendCallback +func OnServerDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, + length C.size_t) C.ssize_t { + //log.Println("server data send") + conn := (*ServerConn)(ptr) + buf := C.GoBytes(data, C.int(length)) + n, err := conn.conn.Write(buf) + if err != nil { + return -1 + } + //log.Println("send ", n, " bytes to network ", buf) + return C.ssize_t(n) +} + +// OnServerDataChunkRecv callback function for libnghttp2 library's data chunk recv +//export OnServerDataChunkRecv +func OnServerDataChunkRecv(ptr unsafe.Pointer, streamID C.int, + data unsafe.Pointer, length C.size_t) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + bp := s.req.Body.(*bodyProvider) + buf := C.GoBytes(data, C.int(length)) + bp.Write(buf) + return C.int(length) +} + +// OnServerBeginHeaderCallback callback function for begin begin header recv +//export OnServerBeginHeaderCallback +func OnServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*ServerConn)(ptr) + s := &ServerStream{ + streamID: int(streamID), + conn: conn, + req: &http.Request{ + URL: &url.URL{}, + Header: http.Header{}, + Proto: "HTTP/2.0", + ProtoMajor: 2, + ProtoMinor: 0, + }, + //buf: new(bytes.Buffer), + } + conn.streams[int(streamID)] = s + return 0 +} + +// OnServerHeaderCallback callback function for each header recv +//export OnServerHeaderCallback +func OnServerHeaderCallback(ptr unsafe.Pointer, streamID C.int, + name unsafe.Pointer, namelen C.int, + value unsafe.Pointer, valuelen C.int) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + hdrname := C.GoStringN((*C.char)(name), namelen) + hdrvalue := C.GoStringN((*C.char)(value), valuelen) + hdrname = strings.ToLower(hdrname) + switch hdrname { + case ":method": + s.req.Method = hdrvalue + case ":scheme": + s.req.URL.Scheme = hdrvalue + case ":path": + s.req.RequestURI = hdrvalue + u, _ := url.ParseRequestURI(s.req.RequestURI) + scheme := s.req.URL.Scheme + *(s.req.URL) = *u + if scheme != "" { + s.req.URL.Scheme = scheme + } + case ":authority": + s.req.Host = hdrvalue + default: + s.req.Header.Add(hdrname, hdrvalue) + + } + return 0 +} + +// OnServerStreamEndCallback callback function for the stream when END_STREAM flag set +//export OnServerStreamEndCallback +func OnServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int { + + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + s.streamEnd = true + bp := s.req.Body.(*bodyProvider) + if s.req.Method != "CONNECT" { + bp.closed = true + log.Println("stream end flag set, begin to serve") + go conn.serve(s) + } + return 0 +} + +// OnServerHeadersDoneCallback callback function for the stream when all headers received +//export OnServerHeadersDoneCallback +func OnServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + s.headersDone = true + bp := &bodyProvider{ + buf: new(bytes.Buffer), + lock: new(sync.Mutex), + } + s.req.Body = bp + if s.req.Method == "CONNECT" { + go conn.serve(s) + } + return 0 +} + +// OnServerStreamClose callback function for the stream when closed +//export OnServerStreamClose +func OnServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + conn.lock.Lock() + delete(conn.streams, int(streamID)) + conn.lock.Unlock() + s.Close() + return 0 +} + +// OnDataSourceReadCallback callback function for libnghttp2 library +// want read data from data provider source, +// return NGHTTP2_ERR_DEFERED will cause data frame defered, +// application later call nghttp2_session_resume_data will re-quene the data frame +// +//export OnDataSourceReadCallback +func OnDataSourceReadCallback(ptr unsafe.Pointer, + buf unsafe.Pointer, length C.size_t) C.ssize_t { + //log.Println("data source read") + dp := (*dataProvider)(ptr) + gobuf := make([]byte, int(length)) + n, err := dp.Read(gobuf) + if err != nil { + if err == io.EOF { + return 0 + } + if err == errAgain { + // NGHTTP2_ERR_DEFERED + return -508 + } + return -1 + } + cbuf := C.CBytes(gobuf) + defer C.free(cbuf) + C.memcpy(buf, cbuf, C.size_t(n)) + return C.ssize_t(n) +} + +// OnClientDataChunkRecv callback function for libnghttp2 library data chunk received, +//export OnClientDataChunkRecv +func OnClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int, + buf unsafe.Pointer, length C.size_t) C.int { + //log.Println("on data recv") + conn := (*ClientConn)(ptr) + gobuf := C.GoBytes(buf, C.int(length)) + conn.onDataRecv(gobuf, int(streamID)) + return 0 +} + +// OnClientDataRecvCallback callback function for libnghttp2 library want read data from network, +//export OnClientDataRecvCallback +func OnClientDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + //log.Println("data read req", int(size)) + conn := (*ClientConn)(ptr) + buf := make([]byte, int(size)) + //log.Println(conn.conn.RemoteAddr()) + n, err := conn.conn.Read(buf) + if err != nil { + //log.Println(err) + return -1 + } + cbuf := C.CBytes(buf) + //log.Println("read from network ", n, buf[:n]) + C.memcpy(data, cbuf, C.size_t(n)) + return C.ssize_t(n) +} + +// OnClientDataSendCallback callback function for libnghttp2 library want send data to network, +//export OnClientDataSendCallback +func OnClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + //log.Println("data write req ", int(size)) + conn := (*ClientConn)(ptr) + buf := C.GoBytes(data, C.int(size)) + //log.Println(conn.conn.RemoteAddr()) + n, err := conn.conn.Write(buf) + if err != nil { + //log.Println(err) + return -1 + } + //log.Println("write data to network ", n) + return C.ssize_t(n) +} + +// OnClientBeginHeaderCallback callback function for begin header receive, +//export OnClientBeginHeaderCallback +func OnClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("begin header") + conn := (*ClientConn)(ptr) + conn.onBeginHeader(int(streamID)) + return 0 +} + +// OnClientHeaderCallback callback function for each header received, +//export OnClientHeaderCallback +func OnClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, + name unsafe.Pointer, namelen C.int, + value unsafe.Pointer, valuelen C.int) C.int { + //log.Println("header") + conn := (*ClientConn)(ptr) + goname := C.GoBytes(name, namelen) + govalue := C.GoBytes(value, valuelen) + conn.onHeader(int(streamID), string(goname), string(govalue)) + return 0 +} + +// OnClientHeadersDoneCallback callback function for the stream when all headers received, +//export OnClientHeadersDoneCallback +func OnClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("frame recv") + conn := (*ClientConn)(ptr) + conn.onHeadersDone(int(streamID)) + return 0 +} + +// OnClientStreamClose callback function for the stream when closed, +//export OnClientStreamClose +func OnClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("stream close") + conn := (*ClientConn)(ptr) + conn.onStreamClose(int(streamID)) + return 0 +} diff --git a/client.go b/client.go deleted file mode 100644 index 1610133..0000000 --- a/client.go +++ /dev/null @@ -1,509 +0,0 @@ -package nghttp2 - -/* -#cgo pkg-config: libnghttp2 -#include "_nghttp2.h" -*/ -import "C" -import ( - "bytes" - "errors" - "fmt" - "io" - "log" - "net" - "net/http" - "strconv" - "strings" - "sync" - "time" - "unsafe" -) - -var ( - errAgain = errors.New("again") -) - -// ClientConn http2 connection -type ClientConn struct { - session *C.nghttp2_session - conn net.Conn - streams map[int]*ClientStream - lock *sync.Mutex - errch chan struct{} - exitch chan struct{} - err error - isServer bool -} - -// ClientStream http2 stream -type ClientStream struct { - streamID int - cdp *C.nghttp2_data_provider - dp *dataProvider - // application read data from stream - r *io.PipeReader - // recv stream data from session - w *io.PipeWriter - res *http.Response - resch chan *http.Response - errch chan error - closed bool -} - -type dataProvider struct { - buf *bytes.Buffer - closed bool - lock *sync.Mutex - session *C.nghttp2_session - streamID int -} - -// NewClientConn create http2 client -func NewClientConn(c net.Conn) (*ClientConn, error) { - conn := &ClientConn{ - conn: c, streams: make(map[int]*ClientStream), - lock: new(sync.Mutex), - errch: make(chan struct{}), - exitch: make(chan struct{}), - } - conn.session = C.init_client_session( - C.size_t(int(uintptr(unsafe.Pointer(conn))))) - if conn.session == nil { - return nil, fmt.Errorf("init session failed") - } - ret := C.send_client_connection_header(conn.session) - if int(ret) < 0 { - log.Printf("submit settings error: %s", - C.GoString(C.nghttp2_strerror(ret))) - } - go conn.run() - return conn, nil -} - -func (c *ClientConn) onDataRecv(buf []byte, streamID int) { - stream := c.streams[streamID] - stream.onDataRecv(buf) -} - -func (c *ClientConn) onBeginHeader(streamID int) { - stream := c.streams[streamID] - stream.onBeginHeader() -} - -func (c *ClientConn) onHeader(streamID int, name, value string) { - stream := c.streams[streamID] - stream.onHeader(name, value) - -} - -func (c *ClientConn) onFrameRecv(streamID int) { - stream := c.streams[streamID] - stream.onFrameRecv() -} - -func (c *ClientConn) onStreamClose(streamID int) { - stream, ok := c.streams[streamID] - if ok { - stream.Close() - c.lock.Lock() - delete(c.streams, streamID) - c.lock.Unlock() - } - -} - -// Close close the http2 connection -func (c *ClientConn) Close() error { - for _, s := range c.streams { - s.Close() - } - C.nghttp2_session_terminate_session(c.session, 0) - C.nghttp2_session_del(c.session) - close(c.exitch) - c.conn.Close() - return nil -} - -func (c *ClientConn) run() { - var wantRead int - var wantWrite int - var delay = 50 - var ret C.int - - defer close(c.errch) - - datach := make(chan []byte) - errch := make(chan error) - - go func() { - buf := make([]byte, 16*1024) - readloop: - for { - select { - case <-c.exitch: - break readloop - default: - } - - n, err := c.conn.Read(buf) - if err != nil { - errch <- err - break - } - datach <- buf[:n] - } - }() - -loop: - for { - select { - case <-c.errch: - break loop - case err := <-errch: - c.err = err - break loop - case <-c.exitch: - break loop - default: - } - - wantWrite = int(C.nghttp2_session_want_write(c.session)) - if wantWrite != 0 { - ret = C.nghttp2_session_send(c.session) - if int(ret) < 0 { - c.err = fmt.Errorf("sesion send error: %s", - C.GoString(C.nghttp2_strerror(ret))) - log.Println(c.err) - break - } - } - - wantRead = int(C.nghttp2_session_want_read(c.session)) - select { - case d := <-datach: - d1 := C.CBytes(d) - ret1 := C.nghttp2_session_mem_recv(c.session, - (*C.uchar)(d1), C.size_t(int(len(d)))) - C.free(d1) - if int(ret1) < 0 { - c.err = fmt.Errorf("sesion recv error: %s", - C.GoString(C.nghttp2_strerror(ret))) - log.Println(c.err) - break loop - } - default: - } - - // make delay when no data read/write - if wantRead == 0 && wantWrite == 0 { - select { - case <-time.After(time.Duration(delay) * time.Millisecond): - } - } - } -} - -// CreateRequest submit a request and return a http.Response, client only -func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { - if c.err != nil { - return nil, c.err - } - - if c.isServer { - return nil, fmt.Errorf("only client can create new request") - } - - nvIndex := 0 - nvMax := 25 - nva := C.new_nv_array(C.size_t(nvMax)) - setNvArray(nva, nvIndex, ":method", req.Method, 0) - nvIndex++ - setNvArray(nva, nvIndex, ":scheme", "https", 0) - nvIndex++ - setNvArray(nva, nvIndex, ":authority", req.Host, 0) - nvIndex++ - - p := req.URL.Path - q := req.URL.Query().Encode() - if q != "" { - p = p + "?" + q - } - setNvArray(nva, nvIndex, ":path", p, 0) - nvIndex++ - for k, v := range req.Header { - if strings.ToLower(k) == "host" { - continue - } - //log.Printf("header %s: %s", k, v) - setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) - nvIndex++ - } - var dp *dataProvider - var cdp *C.nghttp2_data_provider - if req.Body != nil { - dp, cdp = newDataProvider() - go func() { - io.Copy(dp, req.Body) - dp.Close() - }() - } - streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp) - if dp != nil { - dp.streamID = int(streamID) - dp.session = c.session - } - C.delete_nv_array(nva) - if int(streamID) < 0 { - return nil, fmt.Errorf("submit request error: %s", - C.GoString(C.nghttp2_strerror(streamID))) - } - //log.Println("stream id ", int(streamID)) - r, w := io.Pipe() - s := &ClientStream{ - streamID: int(streamID), - dp: dp, - cdp: cdp, - r: r, - w: w, - resch: make(chan *http.Response), - errch: make(chan error), - } - c.lock.Lock() - c.streams[int(streamID)] = s - c.lock.Unlock() - - select { - case err := <-s.errch: - return nil, err - case res := <-s.resch: - return res, nil - case <-c.errch: - return nil, fmt.Errorf("connection error") - } - //return nil, fmt.Errorf("unknown error") -} - -func setNvArray(a *C.struct_nv_array, index int, - name, value string, flags int) { - cname := C.CString(name) - cvalue := C.CString(value) - cnamelen := C.size_t(len(name)) - cvaluelen := C.size_t(len(value)) - cflags := C.int(flags) - //defer C.free(unsafe.Pointer(cname)) - //defer C.free(unsafe.Pointer(cvalue)) - C.nv_array_set(a, C.int(index), cname, - cvalue, cnamelen, cvaluelen, cflags) -} - -// Read read from data provider -// this emulate a unblocking reading -// if data is not avaliable return errAgain -func (dp *dataProvider) Read(buf []byte) (n int, err error) { - dp.lock.Lock() - defer dp.lock.Unlock() - n, err = dp.buf.Read(buf) - - if err != nil && !dp.closed { - return 0, errAgain - } - return -} - -// Write provider data for data provider -func (dp *dataProvider) Write(buf []byte) (n int, err error) { - dp.lock.Lock() - defer dp.lock.Unlock() - C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) - return dp.buf.Write(buf) -} - -// Close -func (dp *dataProvider) Close() error { - dp.lock.Lock() - defer dp.lock.Unlock() - dp.closed = true - C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) - return nil -} -func newDataProvider() ( - *dataProvider, *C.nghttp2_data_provider) { - dp := &dataProvider{ - buf: new(bytes.Buffer), - lock: new(sync.Mutex), - } - cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp)))) - return dp, cdp -} - -func (s *ClientStream) Read(buf []byte) (n int, err error) { - return s.r.Read(buf) -} - -func (s *ClientStream) Write(buf []byte) (n int, err error) { - return s.dp.Write(buf) -} - -func (s *ClientStream) onDataRecv(buf []byte) { - s.w.Write(buf) -} - -func (s *ClientStream) onBeginHeader() { - s.res = &http.Response{ - Header: make(http.Header), - } -} - -func (s *ClientStream) onHeader(name, value string) { - if name == ":status" { - statusCode, _ := strconv.Atoi(value) - s.res.StatusCode = statusCode - s.res.Status = http.StatusText(statusCode) - s.res.Proto = "HTTP/2.0" - s.res.ProtoMajor = 2 - s.res.ProtoMinor = 0 - return - } - s.res.Header.Add(name, value) -} - -func (s *ClientStream) onFrameRecv() { - s.res.Body = s - s.resch <- s.res - //log.Println("stream frame recv") -} - -// Close close the stream -func (s *ClientStream) Close() error { - if s.closed { - return nil - } - err := io.EOF - //log.Println("close stream") - select { - case s.errch <- err: - default: - } - //log.Println("close stream resch") - close(s.resch) - //log.Println("close stream errch") - close(s.errch) - //log.Println("close pipe w") - s.w.CloseWithError(err) - //log.Println("close stream done") - if s.dp != nil { - s.dp.Close() - } - s.closed = true - return nil -} - -// DataSourceRead callback function for data read from data provider source -//export DataSourceRead -func DataSourceRead(ptr unsafe.Pointer, - buf unsafe.Pointer, length C.size_t) C.ssize_t { - //log.Println("data source read") - dp := (*dataProvider)(ptr) - gobuf := make([]byte, int(length)) - n, err := dp.Read(gobuf) - if err != nil { - if err == io.EOF { - return 0 - } - if err == errAgain { - // NGHTTP2_ERR_DEFERED - return -508 - } - return -1 - } - cbuf := C.CBytes(gobuf) - defer C.free(cbuf) - C.memcpy(buf, cbuf, C.size_t(n)) - return C.ssize_t(n) -} - -// OnClientDataRecv callback function for data frame received -//export OnClientDataRecv -func OnClientDataRecv(ptr unsafe.Pointer, streamID C.int, - buf unsafe.Pointer, length C.size_t) C.int { - //log.Println("on data recv") - conn := (*ClientConn)(ptr) - gobuf := C.GoBytes(buf, C.int(length)) - conn.onDataRecv(gobuf, int(streamID)) - return 0 -} - -// ClientDataRecv callback function for session wants read data from peer -//export ClientDataRecv -func ClientDataRecv(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { - //log.Println("data read req", int(size)) - conn := (*ClientConn)(ptr) - buf := make([]byte, int(size)) - //log.Println(conn.conn.RemoteAddr()) - n, err := conn.conn.Read(buf) - if err != nil { - //log.Println(err) - return -1 - } - cbuf := C.CBytes(buf) - //log.Println("read from network ", n, buf[:n]) - C.memcpy(data, cbuf, C.size_t(n)) - return C.ssize_t(n) -} - -// ClientDataSend callback function for session wants send data to peer -//export ClientDataSend -func ClientDataSend(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { - //log.Println("data write req ", int(size)) - conn := (*ClientConn)(ptr) - buf := C.GoBytes(data, C.int(size)) - //log.Println(conn.conn.RemoteAddr()) - n, err := conn.conn.Write(buf) - if err != nil { - //log.Println(err) - return -1 - } - //log.Println("write data to network ", n) - return C.ssize_t(n) -} - -// OnClientBeginHeaderCallback callback function for response -//export OnClientBeginHeaderCallback -func OnClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("begin header") - conn := (*ClientConn)(ptr) - conn.onBeginHeader(int(streamID)) - return 0 -} - -// OnClientHeaderCallback callback function for header -//export OnClientHeaderCallback -func OnClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, - name unsafe.Pointer, namelen C.int, - value unsafe.Pointer, valuelen C.int) C.int { - //log.Println("header") - conn := (*ClientConn)(ptr) - goname := C.GoBytes(name, namelen) - govalue := C.GoBytes(value, valuelen) - conn.onHeader(int(streamID), string(goname), string(govalue)) - return 0 -} - -// OnClientHeadersDoneCallback callback function for begion to recv data -//export OnClientHeadersDoneCallback -func OnClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("frame recv") - conn := (*ClientConn)(ptr) - conn.onFrameRecv(int(streamID)) - return 0 -} - -// OnClientStreamClose callback function for stream close -//export OnClientStreamClose -func OnClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("stream close") - conn := (*ClientConn)(ptr) - conn.onStreamClose(int(streamID)) - return 0 -} diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..a253dbe --- /dev/null +++ b/conn.go @@ -0,0 +1,417 @@ +package nghttp2 + +/* +#cgo pkg-config: libnghttp2 +#include "_nghttp2.h" +*/ +import "C" +import ( + "bytes" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + "unsafe" +) + +var ( + errAgain = errors.New("again") +) + +// ClientConn http2 client connection +type ClientConn struct { + session *C.nghttp2_session + conn net.Conn + streams map[int]*ClientStream + lock *sync.Mutex + errch chan struct{} + exitch chan struct{} + err error +} + +// NewClientConn create http2 client +func NewClientConn(c net.Conn) (*ClientConn, error) { + conn := &ClientConn{ + conn: c, streams: make(map[int]*ClientStream), + lock: new(sync.Mutex), + errch: make(chan struct{}), + exitch: make(chan struct{}), + } + conn.session = C.init_client_session( + C.size_t(int(uintptr(unsafe.Pointer(conn))))) + if conn.session == nil { + return nil, fmt.Errorf("init session failed") + } + ret := C.send_client_connection_header(conn.session) + if int(ret) < 0 { + log.Printf("submit settings error: %s", + C.GoString(C.nghttp2_strerror(ret))) + } + go conn.run() + return conn, nil +} + +func (c *ClientConn) onDataRecv(buf []byte, streamID int) { + s := c.streams[streamID] + if s.res.Body == nil { + log.Println("empty body") + return + } + + if bp, ok := s.res.Body.(*bodyProvider); ok { + bp.Write(buf) + } +} + +func (c *ClientConn) onBeginHeader(streamID int) { + s := c.streams[streamID] + + s.res = &http.Response{ + Header: make(http.Header), + Body: &bodyProvider{ + buf: new(bytes.Buffer), + lock: new(sync.Mutex), + }, + } +} + +func (c *ClientConn) onHeader(streamID int, name, value string) { + s := c.streams[streamID] + if name == ":status" { + statusCode, _ := strconv.Atoi(value) + s.res.StatusCode = statusCode + s.res.Status = http.StatusText(statusCode) + s.res.Proto = "HTTP/2.0" + s.res.ProtoMajor = 2 + s.res.ProtoMinor = 0 + return + } + s.res.Header.Add(name, value) + +} + +func (c *ClientConn) onHeadersDone(streamID int) { + s := c.streams[streamID] + s.resch <- s.res +} + +func (c *ClientConn) onStreamClose(streamID int) { + stream, ok := c.streams[streamID] + if ok { + stream.Close() + c.lock.Lock() + delete(c.streams, streamID) + c.lock.Unlock() + } + +} + +// Close close the http2 connection +func (c *ClientConn) Close() error { + for _, s := range c.streams { + s.Close() + } + C.nghttp2_session_terminate_session(c.session, 0) + C.nghttp2_session_del(c.session) + close(c.exitch) + c.conn.Close() + return nil +} + +func (c *ClientConn) run() { + var wantRead int + var wantWrite int + var delay = 50 + var ret C.int + + defer close(c.errch) + + datach := make(chan []byte) + errch := make(chan error) + + go func() { + buf := make([]byte, 16*1024) + readloop: + for { + select { + case <-c.exitch: + break readloop + default: + } + + n, err := c.conn.Read(buf) + if err != nil { + errch <- err + break + } + datach <- buf[:n] + } + }() + +loop: + for { + select { + case <-c.errch: + break loop + case err := <-errch: + c.err = err + break loop + case <-c.exitch: + break loop + default: + } + + wantWrite = int(C.nghttp2_session_want_write(c.session)) + if wantWrite != 0 { + ret = C.nghttp2_session_send(c.session) + if int(ret) < 0 { + c.err = fmt.Errorf("sesion send error: %s", + C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break + } + } + + wantRead = int(C.nghttp2_session_want_read(c.session)) + select { + case d := <-datach: + d1 := C.CBytes(d) + ret1 := C.nghttp2_session_mem_recv(c.session, + (*C.uchar)(d1), C.size_t(int(len(d)))) + C.free(d1) + if int(ret1) < 0 { + c.err = fmt.Errorf("sesion recv error: %s", + C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break loop + } + default: + } + + // make delay when no data read/write + if wantRead == 0 && wantWrite == 0 { + select { + case <-time.After(time.Duration(delay) * time.Millisecond): + } + } + } +} + +// CreateRequest submit a request and return a http.Response, +func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { + if c.err != nil { + return nil, c.err + } + + nvIndex := 0 + nvMax := 25 + nva := C.new_nv_array(C.size_t(nvMax)) + setNvArray(nva, nvIndex, ":method", req.Method, 0) + nvIndex++ + setNvArray(nva, nvIndex, ":scheme", "https", 0) + nvIndex++ + setNvArray(nva, nvIndex, ":authority", req.Host, 0) + nvIndex++ + + p := req.URL.Path + q := req.URL.Query().Encode() + if q != "" { + p = p + "?" + q + } + setNvArray(nva, nvIndex, ":path", p, 0) + nvIndex++ + for k, v := range req.Header { + if strings.ToLower(k) == "host" { + continue + } + //log.Printf("header %s: %s", k, v) + setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) + nvIndex++ + } + var dp *dataProvider + var cdp *C.nghttp2_data_provider + if req.Body != nil { + dp, cdp = newDataProvider() + go func() { + io.Copy(dp, req.Body) + dp.Close() + }() + } + streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp) + if dp != nil { + dp.streamID = int(streamID) + dp.session = c.session + } + C.delete_nv_array(nva) + if int(streamID) < 0 { + return nil, fmt.Errorf("submit request error: %s", + C.GoString(C.nghttp2_strerror(streamID))) + } + //log.Println("stream id ", int(streamID)) + s := &ClientStream{ + streamID: int(streamID), + dp: dp, + cdp: cdp, + resch: make(chan *http.Response), + errch: make(chan error), + } + c.lock.Lock() + c.streams[int(streamID)] = s + c.lock.Unlock() + + select { + case err := <-s.errch: + return nil, err + case res := <-s.resch: + return res, nil + case <-c.errch: + return nil, fmt.Errorf("connection error") + } + //return nil, fmt.Errorf("unknown error") +} + +// ServerConn server connection +type ServerConn struct { + // Handler handler to handle request + Handler http.Handler + + session *C.nghttp2_session + streams map[int]*ServerStream + lock *sync.Mutex + conn net.Conn + errch chan struct{} + exitch chan struct{} + err error +} + +// NewServerConn create new server connection +func NewServerConn(c net.Conn, handler http.Handler) (*ServerConn, error) { + conn := &ServerConn{ + conn: c, + Handler: handler, + streams: make(map[int]*ServerStream), + lock: new(sync.Mutex), + errch: make(chan struct{}), + exitch: make(chan struct{}), + } + conn.session = C.init_server_session(C.size_t(uintptr(unsafe.Pointer(conn)))) + if conn.session == nil { + return nil, fmt.Errorf("init session failed") + } + //log.Println("send server connection header") + ret := C.send_server_connection_header(conn.session) + if int(ret) < 0 { + log.Println(C.GoString(C.nghttp2_strerror(ret))) + return nil, fmt.Errorf("send connection header failed") + } + //go conn.run() + return conn, nil +} + +func (c *ServerConn) serve(s *ServerStream) { + var handler = c.Handler + if c.Handler == nil { + handler = http.DefaultServeMux + } + handler.ServeHTTP(s, s.req) + s.Close() +} + +// Close close the server connection +func (c *ServerConn) Close() error { + for _, s := range c.streams { + s.Close() + } + C.nghttp2_session_terminate_session(c.session, 0) + C.nghttp2_session_del(c.session) + close(c.exitch) + c.conn.Close() + return nil +} + +// Run run the server loop +func (c *ServerConn) Run() { + var wantRead int + var wantWrite int + var delay = 50 + var ret C.int + + defer c.Close() + defer close(c.errch) + + datach := make(chan []byte) + errch := make(chan error) + + go func() { + buf := make([]byte, 16*1024) + readloop: + for { + select { + case <-c.exitch: + break readloop + default: + } + + n, err := c.conn.Read(buf) + if err != nil { + errch <- err + break + } + datach <- buf[:n] + } + }() + +loop: + for { + select { + case <-c.errch: + break loop + case err := <-errch: + c.err = err + break loop + case <-c.exitch: + break loop + default: + } + + wantWrite = int(C.nghttp2_session_want_write(c.session)) + if wantWrite != 0 { + ret = C.nghttp2_session_send(c.session) + if int(ret) < 0 { + c.err = fmt.Errorf("sesion send error: %s", + C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break + } + } + + wantRead = int(C.nghttp2_session_want_read(c.session)) + select { + case d := <-datach: + d1 := C.CBytes(d) + ret1 := C.nghttp2_session_mem_recv(c.session, + (*C.uchar)(d1), C.size_t(int(len(d)))) + C.free(d1) + if int(ret1) < 0 { + c.err = fmt.Errorf("sesion recv error: %s", + C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break loop + } + default: + } + + // make delay when no data read/write + if wantRead == 0 && wantWrite == 0 { + select { + case <-time.After(time.Duration(delay) * time.Millisecond): + } + } + } +} diff --git a/data_provider.go b/data_provider.go new file mode 100644 index 0000000..cb7e2e5 --- /dev/null +++ b/data_provider.go @@ -0,0 +1,120 @@ +package nghttp2 + +/* +#include "_nghttp2.h" +*/ +import "C" +import ( + "bytes" + "sync" + "time" + "unsafe" +) + +// dataProvider provider data for libnghttp2 library +// libnghttp2 callback will Read to read the data, +// application call Write to provider data, +// application call Close will cause Read return io.EOF +type dataProvider struct { + buf *bytes.Buffer + closed bool + lock *sync.Mutex + session *C.nghttp2_session + streamID int +} + +// Read read from data provider +func (dp *dataProvider) Read(buf []byte) (n int, err error) { + dp.lock.Lock() + defer dp.lock.Unlock() + n, err = dp.buf.Read(buf) + + if err != nil && !dp.closed { + return 0, errAgain + } + return +} + +// Write provider data for data provider +func (dp *dataProvider) Write(buf []byte) (n int, err error) { + dp.lock.Lock() + defer dp.lock.Unlock() + C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) + return dp.buf.Write(buf) +} + +// Close end to provide data +func (dp *dataProvider) Close() error { + dp.lock.Lock() + defer dp.lock.Unlock() + dp.closed = true + C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) + return nil +} + +func newDataProvider() ( + *dataProvider, *C.nghttp2_data_provider) { + dp := &dataProvider{ + buf: new(bytes.Buffer), + lock: new(sync.Mutex), + } + cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp)))) + return dp, cdp +} + +// bodyProvider provide data for http body +// Read will block when data not yet avaliable +type bodyProvider struct { + buf *bytes.Buffer + closed bool + lock *sync.Mutex +} + +// Read read data from provider +// will block when data not yet avaliable +func (bp *bodyProvider) Read(buf []byte) (int, error) { + for { + bp.lock.Lock() + n, err := bp.buf.Read(buf) + bp.lock.Unlock() + if err != nil && !bp.closed { + time.Sleep(100 * time.Millisecond) + continue + } + return n, err + } +} + +// Write provide data for dataProvider +// libnghttp2 data chunk recv callback will call this +func (bp *bodyProvider) Write(buf []byte) (int, error) { + bp.lock.Lock() + defer bp.lock.Unlock() + return bp.buf.Write(buf) +} + +// Close end to provide data +func (bp *bodyProvider) Close() error { + /* + if c, ok := bp.w.(io.Closer); ok{ + return c.Close() + } + */ + bp.closed = true + return nil +} + +// setNvArray set the array for nghttp2_nv array +func setNvArray(a *C.struct_nv_array, index int, + name, value string, flags int) { + cname := C.CString(name) + cvalue := C.CString(value) + cnamelen := C.size_t(len(name)) + cvaluelen := C.size_t(len(value)) + cflags := C.int(flags) + + // note: cname and cvalue will freed in C.delete_nv_array + + C.nv_array_set(a, C.int(index), cname, + cvalue, cnamelen, cvaluelen, cflags) +} diff --git a/http2_test.go b/http2_test.go index e8e96a3..40265f0 100644 --- a/http2_test.go +++ b/http2_test.go @@ -45,6 +45,9 @@ func TestHttp2Client(t *testing.T) { if err != nil { t.Fatal(err) } + if res.StatusCode != http.StatusOK { + t.Errorf("expect %d, got %d", http.StatusOK, res.StatusCode) + } res.Write(os.Stderr) req, _ = http.NewRequest("GET", @@ -53,6 +56,9 @@ func TestHttp2Client(t *testing.T) { if err != nil { t.Fatal(err) } + if res.StatusCode != http.StatusOK { + t.Errorf("expect %d, got %d", http.StatusOK, res.StatusCode) + } res.Write(os.Stderr) log.Println("end") diff --git a/client.c b/nghttp2.c similarity index 65% rename from client.c rename to nghttp2.c index e38eeb1..c04a5d2 100644 --- a/client.c +++ b/nghttp2.c @@ -1,17 +1,148 @@ #include "_nghttp2.h" +static ssize_t server_send_callback(nghttp2_session *session, + const uint8_t *data, size_t length, + int flags, void *user_data) +{ + return OnServerDataSendCallback(user_data, (void *)data, length); +} + +static int on_server_frame_recv_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) + { + OnServerHeadersDoneCallback(user_data, frame->hd.stream_id); + } + case NGHTTP2_DATA: + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) + { + OnServerStreamEndCallback(user_data, frame->hd.stream_id); + } + break; + } + return 0; +} + +static int on_server_stream_close_callback(nghttp2_session *session, + int32_t stream_id, + uint32_t error_code, + void *user_data) + +{ + OnServerStreamClose(user_data, stream_id); + return 0; +} + +static int on_server_header_callback(nghttp2_session *session, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, + size_t valuelen, uint8_t flags, + void *user_data) +{ + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) + { + OnServerHeaderCallback(user_data, frame->hd.stream_id, + (void *)name, namelen, (void *)value, valuelen); + } + break; + } + return 0; +} + +static int on_server_data_chunk_recv_callback(nghttp2_session *session, + uint8_t flags, + int32_t stream_id, + const uint8_t *data, + size_t len, void *user_data) +{ + return OnServerDataChunkRecv(user_data, stream_id, (void *)data, len); +} + +static int on_server_begin_headers_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) + { + OnServerBeginHeaderCallback(user_data, frame->hd.stream_id); + } + break; + } + return 0; +} + +nghttp2_session *init_server_session(size_t data) +{ + nghttp2_session_callbacks *callbacks; + nghttp2_session *session; + + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, server_send_callback); + + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + on_server_frame_recv_callback); + + nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, on_server_stream_close_callback); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + callbacks, on_server_data_chunk_recv_callback); + nghttp2_session_callbacks_set_on_header_callback(callbacks, + on_server_header_callback); + + nghttp2_session_callbacks_set_on_begin_headers_callback( + callbacks, on_server_begin_headers_callback); + + nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); + + nghttp2_session_callbacks_del(callbacks); + return session; +} + +int send_server_connection_header(nghttp2_session *session) +{ + nghttp2_settings_entry iv[1] = { + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; + int rv; + + rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, + ARRLEN(iv)); + return rv; + /* + if (rv != 0) { + // warnx("Fatal error: %s", nghttp2_strerror(rv)); + return rv; + } + return 0; + */ +} + // send_callback send data to network static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) { - return ClientDataSend(user_data, (void *)data, length); + return OnClientDataSendCallback(user_data, (void *)data, length); } // recv_callback read data from network static ssize_t client_recv_callback(nghttp2_session *session, uint8_t *buf, size_t length, int flags, void *user_data) { - return ClientDataRecv(user_data, (void *)buf, length); + return OnClientDataRecvCallback(user_data, (void *)buf, length); } static int on_client_header_callback(nghttp2_session *session, @@ -128,7 +259,7 @@ static int on_client_data_chunk_recv_callback(nghttp2_session *session, uint8_t int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { - return OnClientDataRecv(user_data, stream_id, (void *)data, len); + return OnClientDataChunkRecv(user_data, stream_id, (void *)data, len); } static int on_client_stream_close_callback(nghttp2_session *session, int32_t stream_id, @@ -142,7 +273,7 @@ static ssize_t data_source_read_callback(nghttp2_session *session, int32_t strea uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data) { - int ret = DataSourceRead(source->ptr, buf, length); + int ret = OnDataSourceReadCallback(source->ptr, buf, length); if (ret == 0) { *data_flags = NGHTTP2_DATA_FLAG_EOF; diff --git a/server.c b/server.c deleted file mode 100644 index 7213d3c..0000000 --- a/server.c +++ /dev/null @@ -1,132 +0,0 @@ -#include "_nghttp2.h" - -static ssize_t server_send_callback(nghttp2_session *session, - const uint8_t *data, size_t length, - int flags, void *user_data) -{ - return ServerDataSend(user_data, (void *)data, length); -} - -static int on_server_frame_recv_callback(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) -{ - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) - { - OnServerHeadersDoneCallback(user_data, frame->hd.stream_id); - } - case NGHTTP2_DATA: - if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) - { - OnServerStreamEndCallback(user_data, frame->hd.stream_id); - } - break; - } - return 0; -} - -static int on_server_stream_close_callback(nghttp2_session *session, - int32_t stream_id, - uint32_t error_code, - void *user_data) - -{ - OnServerStreamClose(user_data, stream_id); - return 0; -} - -static int on_server_header_callback(nghttp2_session *session, - const nghttp2_frame *frame, - const uint8_t *name, size_t namelen, - const uint8_t *value, - size_t valuelen, uint8_t flags, - void *user_data) -{ - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) - { - OnServerHeaderCallback(user_data, frame->hd.stream_id, - (void *)name, namelen, (void *)value, valuelen); - } - break; - } - return 0; -} - -static int on_server_data_chunk_recv_callback(nghttp2_session *session, - uint8_t flags, - int32_t stream_id, - const uint8_t *data, - size_t len, void *user_data) -{ - return OnServerDataRecv(user_data, stream_id, (void *)data, len); -} - -static int on_server_begin_headers_callback(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) -{ - - switch (frame->hd.type) - { - case NGHTTP2_HEADERS: - if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) - { - OnServerBeginHeaderCallback(user_data, frame->hd.stream_id); - } - break; - } - return 0; -} - -nghttp2_session *init_server_session(size_t data) -{ - nghttp2_session_callbacks *callbacks; - nghttp2_session *session; - - nghttp2_session_callbacks_new(&callbacks); - - nghttp2_session_callbacks_set_send_callback(callbacks, server_send_callback); - - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - on_server_frame_recv_callback); - - nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, on_server_stream_close_callback); - - nghttp2_session_callbacks_set_on_data_chunk_recv_callback( - callbacks, on_server_data_chunk_recv_callback); - nghttp2_session_callbacks_set_on_header_callback(callbacks, - on_server_header_callback); - - nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, on_server_begin_headers_callback); - - nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); - - nghttp2_session_callbacks_del(callbacks); - return session; -} - -int send_server_connection_header(nghttp2_session *session) -{ - nghttp2_settings_entry iv[1] = { - {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; - int rv; - - rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, - ARRLEN(iv)); - return rv; - /* - if (rv != 0) { - // warnx("Fatal error: %s", nghttp2_strerror(rv)); - return rv; - } - return 0; - */ -} \ No newline at end of file diff --git a/server.go b/server.go deleted file mode 100644 index 0914ca1..0000000 --- a/server.go +++ /dev/null @@ -1,438 +0,0 @@ -package nghttp2 - -/* -#cgo pkg-config: libnghttp2 -#include "_nghttp2.h" -*/ -import "C" -import ( - "bytes" - "fmt" - "log" - "net" - "net/http" - "net/url" - "strings" - "sync" - "time" - "unsafe" -) - -// ServerConn server connection -type ServerConn struct { - // Handler handler to handle request - Handler http.Handler - - session *C.nghttp2_session - streams map[int]*ServerStream - lock *sync.Mutex - conn net.Conn - errch chan struct{} - exitch chan struct{} - err error -} - -// ServerStream server stream -type ServerStream struct { - streamID int - // headers receive done - headersDone bool - // is stream_end flag received - streamEnd bool - // request - req *http.Request - // response header - header http.Header - // response statusCode - statusCode int - // response has send - responseSend bool - - // server connection - conn *ServerConn - - // data provider - dp *dataProvider - cdp *C.nghttp2_data_provider - - closed bool - //buf *bytes.Buffer -} - -type bodyProvider struct { - buf *bytes.Buffer - closed bool - lock *sync.Mutex -} - -func (bp *bodyProvider) Read(buf []byte) (int, error) { - for { - bp.lock.Lock() - n, err := bp.buf.Read(buf) - bp.lock.Unlock() - if err != nil && !bp.closed { - time.Sleep(100 * time.Millisecond) - continue - } - return n, err - } -} - -func (bp *bodyProvider) Write(buf []byte) (int, error) { - bp.lock.Lock() - defer bp.lock.Unlock() - return bp.buf.Write(buf) -} - -func (bp *bodyProvider) Close() error { - /* - if c, ok := bp.w.(io.Closer); ok{ - return c.Close() - } - */ - bp.closed = true - return nil -} - -// NewServerConn create new server connection -func NewServerConn(c net.Conn, handler http.Handler) (*ServerConn, error) { - conn := &ServerConn{ - conn: c, - Handler: handler, - streams: make(map[int]*ServerStream), - lock: new(sync.Mutex), - errch: make(chan struct{}), - exitch: make(chan struct{}), - } - conn.session = C.init_server_session(C.size_t(uintptr(unsafe.Pointer(conn)))) - if conn.session == nil { - return nil, fmt.Errorf("init session failed") - } - //log.Println("send server connection header") - ret := C.send_server_connection_header(conn.session) - if int(ret) < 0 { - log.Println(C.GoString(C.nghttp2_strerror(ret))) - return nil, fmt.Errorf("send connection header failed") - } - //go conn.run() - return conn, nil -} - -func (c *ServerConn) serve(s *ServerStream) { - var handler = c.Handler - if c.Handler == nil { - handler = http.DefaultServeMux - } - handler.ServeHTTP(s, s.req) - s.Close() -} - -// Close close the server connection -func (c *ServerConn) Close() error { - for _, s := range c.streams { - s.Close() - } - C.nghttp2_session_terminate_session(c.session, 0) - C.nghttp2_session_del(c.session) - close(c.exitch) - c.conn.Close() - return nil -} - -// Run run the server loop -func (c *ServerConn) Run() { - var wantRead int - var wantWrite int - var delay = 50 - var ret C.int - - defer c.Close() - defer close(c.errch) - - datach := make(chan []byte) - errch := make(chan error) - - go func() { - buf := make([]byte, 16*1024) - readloop: - for { - select { - case <-c.exitch: - break readloop - default: - } - - n, err := c.conn.Read(buf) - if err != nil { - errch <- err - break - } - datach <- buf[:n] - } - }() - -loop: - for { - select { - case <-c.errch: - break loop - case err := <-errch: - c.err = err - break loop - case <-c.exitch: - break loop - default: - } - - wantWrite = int(C.nghttp2_session_want_write(c.session)) - if wantWrite != 0 { - ret = C.nghttp2_session_send(c.session) - if int(ret) < 0 { - c.err = fmt.Errorf("sesion send error: %s", - C.GoString(C.nghttp2_strerror(ret))) - log.Println(c.err) - break - } - } - - wantRead = int(C.nghttp2_session_want_read(c.session)) - select { - case d := <-datach: - d1 := C.CBytes(d) - ret1 := C.nghttp2_session_mem_recv(c.session, - (*C.uchar)(d1), C.size_t(int(len(d)))) - C.free(d1) - if int(ret1) < 0 { - c.err = fmt.Errorf("sesion recv error: %s", - C.GoString(C.nghttp2_strerror(ret))) - log.Println(c.err) - break loop - } - default: - } - - // make delay when no data read/write - if wantRead == 0 && wantWrite == 0 { - select { - case <-time.After(time.Duration(delay) * time.Millisecond): - } - } - } -} - -// Write implements http.ResponseWriter -func (s *ServerStream) Write(buf []byte) (int, error) { - if !s.responseSend { - s.WriteHeader(http.StatusOK) - } - /* - //log.Printf("stream %d, send %d bytes", s.streamID, len(buf)) - if s.buf.Len() > 2048 { - s.dp.Write(s.buf.Bytes()) - s.buf.Reset() - } - - if len(buf) < 2048 { - s.buf.Write(buf) - return len(buf), nil - } - */ - return s.dp.Write(buf) -} - -// WriteHeader implements http.ResponseWriter -func (s *ServerStream) WriteHeader(code int) { - s.statusCode = code - nvIndex := 0 - nvMax := 25 - nva := C.new_nv_array(C.size_t(nvMax)) - setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0) - nvIndex++ - - for k, v := range s.header { - if strings.ToLower(k) == "host" { - continue - } - //log.Printf("header %s: %s", k, v) - setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) - nvIndex++ - } - var dp *dataProvider - var cdp *C.nghttp2_data_provider - dp, cdp = newDataProvider() - dp.streamID = s.streamID - dp.session = s.conn.session - s.dp = dp - s.cdp = cdp - ret := C.nghttp2_submit_response( - s.conn.session, C.int(s.streamID), nva.nv, C.size_t(nvIndex), cdp) - C.delete_nv_array(nva) - if int(ret) < 0 { - panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret)))) - } - s.responseSend = true - log.Printf("stream %d send response", s.streamID) -} - -// Header implements http.ResponseWriter -func (s *ServerStream) Header() http.Header { - if s.header == nil { - s.header = http.Header{} - } - return s.header -} - -// Close close the stream -func (s *ServerStream) Close() error { - if s.closed { - return nil - } - //C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) - s.req.Body.Close() - if s.dp != nil { - s.dp.Close() - } - s.closed = true - log.Printf("stream %d closed", s.streamID) - return nil -} - -// ServerDataRecv callback function for receive data from network -//export ServerDataRecv -func ServerDataRecv(ptr unsafe.Pointer, data unsafe.Pointer, - length C.size_t) C.ssize_t { - conn := (*ServerConn)(ptr) - buf := make([]byte, int(length)) - n, err := conn.conn.Read(buf) - if err != nil { - return -1 - } - cbuf := C.CBytes(buf[:n]) - defer C.free(cbuf) - C.memcpy(data, cbuf, C.size_t(n)) - return C.ssize_t(n) -} - -// ServerDataSend callback function for send data to network -//export ServerDataSend -func ServerDataSend(ptr unsafe.Pointer, data unsafe.Pointer, - length C.size_t) C.ssize_t { - //log.Println("server data send") - conn := (*ServerConn)(ptr) - buf := C.GoBytes(data, C.int(length)) - n, err := conn.conn.Write(buf) - if err != nil { - return -1 - } - //log.Println("send ", n, " bytes to network ", buf) - return C.ssize_t(n) -} - -// OnServerDataRecv callback function for data recv -//export OnServerDataRecv -func OnServerDataRecv(ptr unsafe.Pointer, streamID C.int, - data unsafe.Pointer, length C.size_t) C.int { - conn := (*ServerConn)(ptr) - s := conn.streams[int(streamID)] - bp := s.req.Body.(*bodyProvider) - buf := C.GoBytes(data, C.int(length)) - bp.Write(buf) - return C.int(length) -} - -// OnServerBeginHeaderCallback callback function for begin header -//export OnServerBeginHeaderCallback -func OnServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - conn := (*ServerConn)(ptr) - s := &ServerStream{ - streamID: int(streamID), - conn: conn, - req: &http.Request{ - URL: &url.URL{}, - Header: http.Header{}, - Proto: "HTTP/2.0", - ProtoMajor: 2, - ProtoMinor: 0, - }, - //buf: new(bytes.Buffer), - } - conn.streams[int(streamID)] = s - return 0 -} - -// OnServerHeaderCallback callback function for header -//export OnServerHeaderCallback -func OnServerHeaderCallback(ptr unsafe.Pointer, streamID C.int, - name unsafe.Pointer, namelen C.int, - value unsafe.Pointer, valuelen C.int) C.int { - conn := (*ServerConn)(ptr) - s := conn.streams[int(streamID)] - hdrname := C.GoStringN((*C.char)(name), namelen) - hdrvalue := C.GoStringN((*C.char)(value), valuelen) - hdrname = strings.ToLower(hdrname) - switch hdrname { - case ":method": - s.req.Method = hdrvalue - case ":scheme": - s.req.URL.Scheme = hdrvalue - case ":path": - s.req.RequestURI = hdrvalue - u, _ := url.ParseRequestURI(s.req.RequestURI) - scheme := s.req.URL.Scheme - *(s.req.URL) = *u - if scheme != "" { - s.req.URL.Scheme = scheme - } - case ":authority": - s.req.Host = hdrvalue - default: - s.req.Header.Add(hdrname, hdrvalue) - - } - return 0 -} - -// OnServerStreamEndCallback callback function for frame received -//export OnServerStreamEndCallback -func OnServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int { - - conn := (*ServerConn)(ptr) - s := conn.streams[int(streamID)] - s.streamEnd = true - bp := s.req.Body.(*bodyProvider) - if s.req.Method != "CONNECT" { - bp.closed = true - log.Println("stream end flag set, begin to serve") - go conn.serve(s) - } - return 0 -} - -// OnServerHeadersDoneCallback callback function for all headers received -//export OnServerHeadersDoneCallback -func OnServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { - conn := (*ServerConn)(ptr) - s := conn.streams[int(streamID)] - s.headersDone = true - bp := &bodyProvider{ - buf: new(bytes.Buffer), - lock: new(sync.Mutex), - } - s.req.Body = bp - if s.req.Method == "CONNECT" { - go conn.serve(s) - } - return 0 -} - -// OnServerStreamClose callback function for stream close -//export OnServerStreamClose -func OnServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - conn := (*ServerConn)(ptr) - s := conn.streams[int(streamID)] - conn.lock.Lock() - delete(conn.streams, int(streamID)) - conn.lock.Unlock() - s.Close() - return 0 -} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..397ee29 --- /dev/null +++ b/stream.go @@ -0,0 +1,172 @@ +package nghttp2 + +/* +#include "_nghttp2.h" +*/ +import "C" +import ( + "fmt" + "io" + "net/http" + "strings" +) + +// ClientStream http2 client stream +type ClientStream struct { + streamID int + cdp *C.nghttp2_data_provider + dp *dataProvider + // application read data from stream + //r *io.PipeReader + // recv stream data from session + //w *io.PipeWriter + res *http.Response + resch chan *http.Response + errch chan error + closed bool +} + +// Read read stream data +func (s *ClientStream) Read(buf []byte) (n int, err error) { + return s.res.Body.Read(buf) +} + +// Write write data to stream +func (s *ClientStream) Write(buf []byte) (n int, err error) { + if s.dp != nil { + return s.dp.Write(buf) + } + return 0, fmt.Errorf("empty data provider") +} + +// Close close the stream +func (s *ClientStream) Close() error { + if s.closed { + return nil + } + err := io.EOF + //log.Println("close stream") + select { + case s.errch <- err: + default: + } + //log.Println("close stream resch") + close(s.resch) + //log.Println("close stream errch") + close(s.errch) + //log.Println("close pipe w") + s.res.Body.Close() + //log.Println("close stream done") + if s.dp != nil { + s.dp.Close() + } + s.closed = true + return nil +} + +// ServerStream server stream +type ServerStream struct { + streamID int + // headers receive done + headersDone bool + // is stream_end flag received + streamEnd bool + // request + req *http.Request + // response header + header http.Header + // response statusCode + statusCode int + // response has send + responseSend bool + + // server connection + conn *ServerConn + + // data provider + dp *dataProvider + cdp *C.nghttp2_data_provider + + closed bool + //buf *bytes.Buffer +} + +// Write write data to stream, +// implements http.ResponseWriter +func (s *ServerStream) Write(buf []byte) (int, error) { + if !s.responseSend { + s.WriteHeader(http.StatusOK) + } + /* + //log.Printf("stream %d, send %d bytes", s.streamID, len(buf)) + if s.buf.Len() > 2048 { + s.dp.Write(s.buf.Bytes()) + s.buf.Reset() + } + + if len(buf) < 2048 { + s.buf.Write(buf) + return len(buf), nil + } + */ + return s.dp.Write(buf) +} + +// WriteHeader set response code and send reponse, +// implements http.ResponseWriter +func (s *ServerStream) WriteHeader(code int) { + s.statusCode = code + nvIndex := 0 + nvMax := 25 + nva := C.new_nv_array(C.size_t(nvMax)) + setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0) + nvIndex++ + + for k, v := range s.header { + if strings.ToLower(k) == "host" { + continue + } + //log.Printf("header %s: %s", k, v) + setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) + nvIndex++ + } + var dp *dataProvider + var cdp *C.nghttp2_data_provider + dp, cdp = newDataProvider() + dp.streamID = s.streamID + dp.session = s.conn.session + s.dp = dp + s.cdp = cdp + ret := C.nghttp2_submit_response( + s.conn.session, C.int(s.streamID), nva.nv, C.size_t(nvIndex), cdp) + C.delete_nv_array(nva) + if int(ret) < 0 { + panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret)))) + } + s.responseSend = true + //log.Printf("stream %d send response", s.streamID) +} + +// Header return the http.Header, +// implements http.ResponseWriter +func (s *ServerStream) Header() http.Header { + if s.header == nil { + s.header = http.Header{} + } + return s.header +} + +// Close close the stream +func (s *ServerStream) Close() error { + if s.closed { + return nil + } + //C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) + s.req.Body.Close() + if s.dp != nil { + s.dp.Close() + } + s.closed = true + //log.Printf("stream %d closed", s.streamID) + return nil +}