diff --git a/_nghttp2.h b/_nghttp2.h index 09e8b90..90efe5a 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -21,7 +21,7 @@ struct nv_array }; void delete_nv_array(struct nv_array *a); -nghttp2_data_provider *new_data_provider(void *data); +nghttp2_data_provider *new_data_provider(size_t data); int nv_array_set(struct nv_array *a, int index, char *name, char *value, @@ -29,10 +29,12 @@ int nv_array_set(struct nv_array *a, int index, struct nv_array *new_nv_array(size_t n); -int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen); +int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen, + nghttp2_data_provider *dp); int send_client_connection_header(nghttp2_session *session); -nghttp2_session * init_nghttp2_session(size_t data); +nghttp2_session *init_client_session(size_t data); +nghttp2_session *init_server_session(size_t data); #endif \ No newline at end of file diff --git a/http2.go b/http2.go index c1a1476..944152a 100644 --- a/http2.go +++ b/http2.go @@ -20,12 +20,13 @@ import ( // Conn http2 connection type Conn struct { - session *C.nghttp2_session - conn net.Conn - streams map[int]*Stream - lock *sync.Mutex - errch chan struct{} - err error + session *C.nghttp2_session + conn net.Conn + streams map[int]*Stream + lock *sync.Mutex + errch chan struct{} + err error + isServer bool } // Stream http2 stream @@ -36,10 +37,14 @@ type Stream struct { // application read data from stream r *io.PipeReader // recv stream data from session - w *io.PipeWriter - res *http.Response - resch chan *http.Response - errch chan error + w *io.PipeWriter + res *http.Response + resch chan *http.Response + errch chan error + closed bool +} + +type Request struct { } type dataProvider struct { @@ -49,24 +54,45 @@ type dataProvider struct { w io.Writer } -// NewConn create http2 connection -func NewConn(c net.Conn) (*Conn, error) { +// NewClientConn create http2 client +func NewClientConn(c net.Conn) (*Conn, error) { conn := &Conn{ - conn: c, streams: make(map[int]*Stream), lock: new(sync.Mutex), + conn: c, streams: make(map[int]*Stream), + lock: new(sync.Mutex), errch: make(chan struct{}), } - conn.session = C.init_nghttp2_session(C.size_t(int(uintptr(unsafe.Pointer(conn))))) + conn.session = C.init_client_session( + C.size_t(int(uintptr(unsafe.Pointer(conn))))) if conn.session == nil { return nil, fmt.Errorf("init session failed") } ret := C.send_client_connection_header(conn.session) if int(ret) < 0 { - log.Printf("submit settings error: %s", C.GoString(C.nghttp2_strerror(ret))) + log.Printf("submit settings error: %s", + C.GoString(C.nghttp2_strerror(ret))) } go conn.run() return conn, nil } +// NewServerConn create http2 server +func NewServerConn(c net.Conn) (*Conn, error) { + conn := &Conn{ + conn: c, streams: make(map[int]*Stream), + lock: new(sync.Mutex), + errch: make(chan struct{}), + isServer: true, + } + conn.session = C.init_server_session( + C.size_t(int(uintptr(unsafe.Pointer(conn))))) + if conn.session == nil { + return nil, fmt.Errorf("init session failed") + } + go conn.run() + return conn, nil + +} + func (c *Conn) onDataRecv(buf []byte, streamID int) { stream := c.streams[streamID] stream.onDataRecv(buf) @@ -104,6 +130,7 @@ func (c *Conn) Close() error { for _, s := range c.streams { s.Close() } + C.nghttp2_session_terminate_session(c.session, 0) C.nghttp2_session_del(c.session) close(c.errch) c.conn.Close() @@ -116,31 +143,56 @@ func (c *Conn) run() { var delay = 50 var ret C.int + datach := make(chan []byte) + errch := make(chan error) + + go func() { + buf := make([]byte, 4096) + for { + n, err := c.conn.Read(buf) + if err != nil { + errch <- err + break + } + datach <- buf[:n] + } + }() + loop: for { select { case <-c.errch: break loop + case err := <-errch: + c.err = err + break loop default: } - wantRead = int(C.nghttp2_session_want_read(c.session)) wantWrite = int(C.nghttp2_session_want_write(c.session)) if wantWrite != 0 { ret = C.nghttp2_session_send(c.session) if int(ret) < 0 { - c.err = fmt.Errorf("sesion send error: %s", C.GoString(C.nghttp2_strerror(ret))) + c.err = fmt.Errorf("sesion send error: %s", + C.GoString(C.nghttp2_strerror(ret))) log.Println(c.err) break } } - if wantRead != 0 { - ret = C.nghttp2_session_recv(c.session) - if int(ret) < 0 { - c.err = fmt.Errorf("sesion recv error: %s", C.GoString(C.nghttp2_strerror(ret))) + + wantRead = int(C.nghttp2_session_want_read(c.session)) + select { + case d := <-datach: + d1 := C.CBytes(d) + ret1 := C.nghttp2_session_mem_recv(c.session, + (*C.uchar)(d1), C.size_t(int(len(d)))) + if int(ret1) < 0 { + c.err = fmt.Errorf("sesion recv error: %s", + C.GoString(C.nghttp2_strerror(ret))) log.Println(c.err) - break + break loop } + default: } // make delay when no data read/write @@ -152,12 +204,23 @@ loop: } } -// NewRequest create a new http2 stream -func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) { +// AcceptRequest get a request from session +// this block until a request is avaliable, +// server only +func (c *Conn) AcceptRequest() (req *Request, err error) { + return nil, nil +} + +// CreateRequest submit a request and return a http.Response, client only +func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) { if c.err != nil { return nil, c.err } + if c.isServer { + return nil, fmt.Errorf("only client can create new request") + } + nvIndex := 0 nvMax := 25 nva := C.new_nv_array(C.size_t(nvMax)) @@ -179,6 +242,7 @@ func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) { if strings.ToLower(k) == "host" { continue } + //log.Printf("header %s: %s", k, v) setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) nvIndex++ } @@ -187,12 +251,13 @@ func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) { if req.Body != nil { dp, cdp = newDataProvider(req.Body, nil) } - streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex+1)) + streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp) C.delete_nv_array(nva) if int(streamID) < 0 { - return nil, fmt.Errorf("submit request error: %s", C.GoString(C.nghttp2_strerror(streamID))) + return nil, fmt.Errorf("submit request error: %s", + C.GoString(C.nghttp2_strerror(streamID))) } - log.Println("stream id ", int(streamID)) + //log.Println("stream id ", int(streamID)) r, w := io.Pipe() s := &Stream{ streamID: int(streamID), @@ -216,14 +281,15 @@ func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) { //return nil, fmt.Errorf("unknown error") } -func setNvArray(a *C.struct_nv_array, index int, name, value string, flags int) { +func setNvArray(a *C.struct_nv_array, index int, + name, value string, flags int) { cname := C.CString(name) cvalue := C.CString(value) cnamelen := C.size_t(len(name)) cvaluelen := C.size_t(len(value)) cflags := C.int(flags) - defer C.free(unsafe.Pointer(cname)) - defer C.free(unsafe.Pointer(cvalue)) + //defer C.free(unsafe.Pointer(cname)) + //defer C.free(unsafe.Pointer(cvalue)) C.nv_array_set(a, C.int(index), cname, cvalue, cnamelen, cvaluelen, cflags) } @@ -239,9 +305,10 @@ func (dp *dataProvider) Write(buf []byte) (n int, err error) { return dp.w.Write(buf) } -func newDataProvider(r io.Reader, w io.Writer) (*dataProvider, *C.nghttp2_data_provider) { +func newDataProvider(r io.Reader, w io.Writer) ( + *dataProvider, *C.nghttp2_data_provider) { dp := &dataProvider{r, w} - cdp := C.new_data_provider(unsafe.Pointer(dp)) + cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp)))) return dp, cdp } @@ -279,23 +346,36 @@ func (s *Stream) onHeader(name, value string) { func (s *Stream) onFrameRecv() { s.res.Body = s s.resch <- s.res + //log.Println("stream frame recv") } // Close close the stream func (s *Stream) Close() error { + if s.closed { + return nil + } + err := fmt.Errorf("stream closed") + //log.Println("close stream") select { - case s.errch <- fmt.Errorf("stream closed"): + case s.errch <- err: + default: } + //log.Println("close stream resch") close(s.resch) + //log.Println("close stream errch") close(s.errch) - s.w.Close() + //log.Println("close pipe w") + s.w.CloseWithError(err) + //log.Println("close stream done") + s.closed = true return nil } // DataSourceRead callback function for data read from data provider source //export DataSourceRead -func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t { - log.Println("data source read") +func DataSourceRead(ptr unsafe.Pointer, + buf unsafe.Pointer, length C.size_t) C.ssize_t { + //log.Println("data source read") dp := (*dataProvider)(ptr) gobuf := make([]byte, int(length)) n, err := dp.Read(gobuf) @@ -313,8 +393,9 @@ func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.s // OnDataRecv callback function for data frame received //export OnDataRecv -func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int { - log.Println("on data recv") +func OnDataRecv(ptr unsafe.Pointer, streamID C.int, + buf unsafe.Pointer, length C.size_t) C.int { + //log.Println("on data recv") conn := (*Conn)(ptr) gobuf := C.GoBytes(buf, C.int(length)) conn.onDataRecv(gobuf, int(streamID)) @@ -324,39 +405,41 @@ func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C // DataRead callback function for session wants read data from peer //export DataRead func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { - log.Println("data read") + //log.Println("data read req", int(size)) conn := (*Conn)(ptr) buf := make([]byte, int(size)) - log.Println(conn.conn.RemoteAddr()) + //log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Read(buf) if err != nil { - log.Println(err) + //log.Println(err) return -1 } - log.Println("read from network ", n) + cbuf := C.CBytes(buf) + //log.Println("read from network ", n, buf[:n]) + C.memcpy(data, cbuf, C.size_t(n)) return C.ssize_t(n) } // DataWrite callback function for session wants send data to peer //export DataWrite func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { - log.Println("data write") + //log.Println("data write req ", int(size)) conn := (*Conn)(ptr) buf := C.GoBytes(data, C.int(size)) - log.Println(conn.conn.RemoteAddr()) + //log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Write(buf) if err != nil { - log.Println(err) + //log.Println(err) return -1 } - log.Println("write data to network ", n) + //log.Println("write data to network ", n) return C.ssize_t(n) } // OnBeginHeaderCallback callback function for response //export OnBeginHeaderCallback func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - log.Println("begin header") + //log.Println("begin header") conn := (*Conn)(ptr) conn.onBeginHeader(int(streamID)) return 0 @@ -367,7 +450,7 @@ func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, name unsafe.Pointer, namelen C.int, value unsafe.Pointer, valuelen C.int) C.int { - log.Println("header") + //log.Println("header") conn := (*Conn)(ptr) goname := C.GoBytes(name, namelen) govalue := C.GoBytes(value, valuelen) @@ -378,7 +461,7 @@ func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, // OnFrameRecvCallback callback function for begion to recv data //export OnFrameRecvCallback func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { - log.Println("frame recv") + //log.Println("frame recv") conn := (*Conn)(ptr) conn.onFrameRecv(int(streamID)) return 0 @@ -387,7 +470,7 @@ func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { // OnStreamClose callback function for stream close //export OnStreamClose func OnStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - log.Println("stream close") + //log.Println("stream close") conn := (*Conn)(ptr) conn.onStreamClose(int(streamID)) return 0 diff --git a/http2_test.go b/http2_test.go index 34a8425..f31bd3d 100644 --- a/http2_test.go +++ b/http2_test.go @@ -1,8 +1,11 @@ package nghttp2 import ( + "bytes" "crypto/tls" + "log" "net/http" + "net/url" "os" "testing" ) @@ -21,14 +24,23 @@ func TestHttp2Client(t *testing.T) { if cstate.NegotiatedProtocol != "h2" { t.Fatal("no http2 on server") } - h2conn, err := NewConn(conn) + h2conn, err := NewClientConn(conn) if err != nil { t.Fatal(err) } - req, _ := http.NewRequest("GET", "http://www.simicloud.com/media/httpbin/get", nil) - res, err := h2conn.NewRequest(req) + param := url.Values{} + param.Add("e", "b") + param.Add("f", "d") + data := bytes.NewReader([]byte(param.Encode())) + req, _ := http.NewRequest("POST", "https://www.simicloud.com/media/httpbin/post?a=b&c=d", data) + log.Printf("%+v", req) + req.Header.Set("accept", "*/*") + req.Header.Set("user-agent", "go-nghttp2/1.0") + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + res, err := h2conn.CreateRequest(req) if err != nil { t.Fatal(err) } res.Write(os.Stderr) + log.Println("end") } diff --git a/nghttp2.c b/nghttp2.c index 53bbf83..657b3da 100644 --- a/nghttp2.c +++ b/nghttp2.c @@ -21,7 +21,7 @@ static int on_header_callback(nghttp2_session *session, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) { - printf("on_header_callback\n"); + //printf("on_header_callback\n"); switch (frame->hd.type) { case NGHTTP2_HEADERS: @@ -41,15 +41,15 @@ static int on_begin_headers_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { - printf("on_begin_headers_callback\n"); + //printf("on_begin_headers_callback\n"); int stream_id = frame->hd.stream_id; switch (frame->hd.type) { case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { - fprintf(stderr, "Response headers for stream ID=%d:\n", - frame->hd.stream_id); + //fprintf(stderr, "Response headers for stream ID=%d:\n", + // frame->hd.stream_id); OnBeginHeaderCallback(user_data, stream_id); } break; @@ -57,19 +57,71 @@ static int on_begin_headers_callback(nghttp2_session *session, return 0; } +int on_invalid_frame_recv_callback(nghttp2_session *session, + const nghttp2_frame *frame, int lib_error_code, void *user_data) +{ + printf("on_invalid_frame_recv, frame %d, code %d, msg %s\n", + frame->hd.type, + lib_error_code, + nghttp2_strerror(lib_error_code)); + return 0; +} + +static int on_frame_send_callback(nghttp2_session *session, + const nghttp2_frame *frame, void *user_data) +{ + size_t i; + (void)user_data; + //printf("on_frame_send_callback\n"); + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + /* + if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) + { + */ + if (1) + { + const nghttp2_nv *nva = frame->headers.nva; + printf("[INFO] C ----------------------------> S (HEADERS)\n"); + for (i = 0; i < frame->headers.nvlen; ++i) + { + fwrite(nva[i].name, 1, nva[i].namelen, stdout); + printf(": "); + fwrite(nva[i].value, 1, nva[i].valuelen, stdout); + printf("\n"); + } + } + break; + case NGHTTP2_RST_STREAM: + printf("[INFO] C ----------------------------> S (RST_STREAM)\n"); + break; + case NGHTTP2_GOAWAY: + printf("[INFO] C ----------------------------> S (GOAWAY)\n"); + break; + } + return 0; +} + static int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { - printf("on_frame_recv_callback\n"); + //printf("on_frame_recv_callback %d\n", frame->hd.type); switch (frame->hd.type) { case NGHTTP2_HEADERS: if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { - fprintf(stderr, "All headers received\n"); + //fprintf(stderr, "All headers received\n"); OnFrameRecvCallback(user_data, frame->hd.stream_id); } break; + case NGHTTP2_RST_STREAM: + printf("server send rst_stream %d\n", frame->rst_stream.error_code); + break; + case NGHTTP2_GOAWAY: + printf("server send go away\n"); + break; } return 0; } @@ -92,7 +144,7 @@ ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data) { - int ret = DataSourceRead(source, buf, length); + int ret = DataSourceRead(source->ptr, buf, length); if (ret == 0) { *data_flags = NGHTTP2_DATA_FLAG_EOF; @@ -107,21 +159,17 @@ int on_error_callback(nghttp2_session *session, int lib_error_code, return 0; } -nghttp2_session *init_nghttp2_session(size_t data) +void init_callbacks(nghttp2_session_callbacks *callbacks) { - int ret; - nghttp2_session *session; - nghttp2_session_callbacks *callbacks; - - nghttp2_session_callbacks_new(&callbacks); - nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback); nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); + nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback); nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback); + nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_frame_send_callback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback( callbacks, on_data_chunk_recv_callback); @@ -133,13 +181,35 @@ nghttp2_session *init_nghttp2_session(size_t data) nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, on_begin_headers_callback); +} - ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data))); +nghttp2_session *init_client_session(size_t data) +{ + int ret; + nghttp2_session *session; + nghttp2_session_callbacks *callbacks; + nghttp2_session_callbacks_new(&callbacks); + init_callbacks(callbacks); + ret = nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); + if (session == NULL) + { + printf("c init session failed: %s\n", nghttp2_strerror(ret)); + } + return session; +} + +nghttp2_session *init_server_session(size_t data) +{ + int ret; + nghttp2_session *session; + nghttp2_session_callbacks *callbacks; + nghttp2_session_callbacks_new(&callbacks); + init_callbacks(callbacks); + ret = nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); if (session == NULL) { printf("c init session failed: %s\n", nghttp2_strerror(ret)); } - nghttp2_session_callbacks_del(callbacks); return session; } @@ -150,8 +220,12 @@ int send_client_connection_header(nghttp2_session *session) int rv; /* client 24 bytes magic string will be sent by nghttp2 library */ + /* rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv)); + */ + + rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, NULL, 0); /* if (rv != 0) { @@ -161,7 +235,8 @@ int send_client_connection_header(nghttp2_session *session) return rv; } -int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen) +int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen, + nghttp2_data_provider *dp) { int32_t stream_id; /* @@ -174,8 +249,13 @@ int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen fprintf(stderr, "Request headers:\n"); print_headers(stderr, hdrs, ARRLEN(hdrs)); */ + int i; + for (i = 0; i < hdrlen; i++) + { + printf("header %s: %s\n", hdrs[i].name, hdrs[i].value); + } stream_id = nghttp2_submit_request(session, NULL, hdrs, - hdrlen, NULL, NULL); + hdrlen, dp, NULL); /* if (stream_id < 0) { @@ -190,6 +270,7 @@ struct nv_array *new_nv_array(size_t n) { struct nv_array *a = malloc(sizeof(struct nv_array)); nghttp2_nv *nv = (nghttp2_nv *)malloc(n * sizeof(nghttp2_nv)); + memset(nv, 0, n * sizeof(nghttp2_nv)); a->nv = nv; a->len = n; return a; @@ -203,24 +284,39 @@ int nv_array_set(struct nv_array *a, int index, { return -1; } - nghttp2_nv nv = (a->nv)[index]; - nv.name = name; - nv.value = value; - nv.namelen = namelen; - nv.valuelen = valuelen; - nv.flags = flag; + nghttp2_nv *nv = &((a->nv)[index]); + nv->name = name; + nv->value = value; + nv->namelen = namelen; + nv->valuelen = valuelen; + nv->flags = flag; return 0; } void delete_nv_array(struct nv_array *a) { + int i; + nghttp2_nv *nv; + for (i = 0; i < a->len; i++) + { + nv = &((a->nv)[i]); + if (nv->name != NULL) + { + free(nv->name); + } + if (nv->value != NULL) + { + free(nv->value); + } + } free(a->nv); free(a); } -nghttp2_data_provider *new_data_provider(void *data) +nghttp2_data_provider *new_data_provider(size_t data) { nghttp2_data_provider *dp = malloc(sizeof(nghttp2_data_provider)); - dp->source.ptr = data; + dp->source.ptr = (void *)((int *)data); dp->read_callback = data_source_read_callback; + return dp; } \ No newline at end of file