From 4125b26c3586b87de39ad611173654e8606c5e2c Mon Sep 17 00:00:00 2001 From: fangdingjun Date: Thu, 12 Jul 2018 11:10:39 +0800 Subject: [PATCH] add callback for goaway frame --- _nghttp2.h | 1 + callbacks.go | 68 ++++++++++++++++++++++++++++++++++++++++++---------- nghttp2.c | 22 +++++++++++------ 3 files changed, 72 insertions(+), 19 deletions(-) diff --git a/_nghttp2.h b/_nghttp2.h index c80f360..ab8c8be 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -15,6 +15,7 @@ extern int onClientBeginHeaderCallback(void *, int); extern int onClientHeaderCallback(void *, int, void *, int, void *, int); extern int onClientHeadersDoneCallback(void *, int); extern int onClientStreamClose(void *, int); +extern void onClientConnectionCloseCallback(void *user_data); extern ssize_t onServerDataRecvCallback(void *, void *data, size_t); extern ssize_t onServerDataSendCallback(void *, void *data, size_t); diff --git a/callbacks.go b/callbacks.go index 22e5509..9d90dcb 100644 --- a/callbacks.go +++ b/callbacks.go @@ -23,6 +23,7 @@ const ( NGHTTP2_ERR_DEFERRED = -508 ) +/* // onServerDataRecvCallback callback function for libnghttp2 library // want receive data from network. // @@ -40,6 +41,7 @@ func onServerDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, C.memcpy(data, cbuf, C.size_t(n)) return C.ssize_t(n) } +*/ // onServerDataSendCallback callback function for libnghttp2 library // want send data to network. @@ -98,7 +100,10 @@ func onServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { }, //buf: new(bytes.Buffer), } + //conn.lock.Lock() conn.streams[int(streamID)] = s + //conn.lock.Unlock() + return NGHTTP2_NO_ERROR } @@ -190,9 +195,9 @@ func onServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { if !ok { return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } - conn.lock.Lock() + //conn.lock.Lock() delete(conn.streams, int(streamID)) - conn.lock.Unlock() + //conn.lock.Unlock() s.Close() return NGHTTP2_NO_ERROR } @@ -205,22 +210,27 @@ func onServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { //export onDataSourceReadCallback func onDataSourceReadCallback(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t { - //log.Println("data source read") + //log.Println("onDataSourceReadCallback begin") dp := (*dataProvider)(ptr) gobuf := make([]byte, int(length)) n, err := dp.Read(gobuf) if err != nil { if err == io.EOF { + //log.Println("onDataSourceReadCallback end") return 0 } if err == errAgain { + //log.Println("onDataSourceReadCallback end") + dp.deferred = true return NGHTTP2_ERR_DEFERRED } + //log.Println("onDataSourceReadCallback end") return NGHTTP2_ERR_CALLBACK_FAILURE } cbuf := C.CBytes(gobuf) defer C.free(cbuf) C.memcpy(buf, cbuf, C.size_t(n)) + //log.Println("onDataSourceReadCallback end") return C.ssize_t(n) } @@ -229,16 +239,18 @@ func onDataSourceReadCallback(ptr unsafe.Pointer, //export onClientDataChunkRecv func onClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int { - //log.Println("on data recv") + //log.Println("onClientDataChunkRecv begin") conn := (*ClientConn)(ptr) gobuf := C.GoBytes(buf, C.int(length)) s, ok := conn.streams[int(streamID)] if !ok { + //log.Println("onClientDataChunkRecv end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } if s.res.Body == nil { //log.Println("empty body") + //log.Println("onClientDataChunkRecv end") return C.int(length) } @@ -247,11 +259,14 @@ func onClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int, if err != nil { return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } + //log.Println("onClientDataChunkRecv end") return C.int(n) } + //log.Println("onClientDataChunkRecv end") return C.int(length) } +/* // onClientDataRecvCallback callback function for libnghttp2 library want read data from network. // //export onClientDataRecvCallback @@ -270,20 +285,23 @@ func onClientDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.si C.memcpy(data, cbuf, C.size_t(n)) return C.ssize_t(n) } - +*/ // onClientDataSendCallback callback function for libnghttp2 library want send data to network. // //export onClientDataSendCallback func onClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + //log.Println("onClientDataSendCallback begin") //log.Println("data write req ", int(size)) conn := (*ClientConn)(ptr) buf := C.GoBytes(data, C.int(size)) //log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Write(buf) if err != nil { + //log.Println("onClientDataSendCallback end") return NGHTTP2_ERR_CALLBACK_FAILURE } - //log.Println("write data to network ", n) + //log.Printf("write %d bytes to network ", n) + //log.Println("onClientDataSendCallback end") return C.ssize_t(n) } @@ -291,11 +309,13 @@ func onClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.si // //export onClientBeginHeaderCallback func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("begin header") + //log.Println("onClientBeginHeaderCallback begin") + //log.Printf("stream %d begin headers", int(streamID)) conn := (*ClientConn)(ptr) s, ok := conn.streams[int(streamID)] if !ok { + //log.Println("onClientBeginHeaderCallback end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } var TLS tls.ConnectionState @@ -310,6 +330,7 @@ func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { }, TLS: &TLS, } + //log.Println("onClientBeginHeaderCallback end") return NGHTTP2_NO_ERROR } @@ -319,6 +340,7 @@ func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, name unsafe.Pointer, namelen C.int, value unsafe.Pointer, valuelen C.int) C.int { + //log.Println("onClientHeaderCallback begin") //log.Println("header") conn := (*ClientConn)(ptr) goname := string(C.GoBytes(name, namelen)) @@ -326,6 +348,7 @@ func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, s, ok := conn.streams[int(streamID)] if !ok { + //log.Println("onClientHeaderCallback end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } goname = strings.ToLower(goname) @@ -349,6 +372,7 @@ func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, default: s.res.Header.Add(goname, govalue) } + //log.Println("onClientHeaderCallback end") return NGHTTP2_NO_ERROR } @@ -356,13 +380,19 @@ func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, // //export onClientHeadersDoneCallback func onClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("frame recv") + //log.Println("onClientHeadersDoneCallback begin") + //log.Printf("stream %d headers done", int(streamID)) conn := (*ClientConn)(ptr) s, ok := conn.streams[int(streamID)] if !ok { + //log.Println("onClientHeadersDoneCallback end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } - s.resch <- s.res + select { + case s.resch <- s.res: + default: + } + //log.Println("onClientHeadersDoneCallback end") return NGHTTP2_NO_ERROR } @@ -370,16 +400,30 @@ func onClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { // //export onClientStreamClose func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("stream close") + //log.Println("onClientStreamClose begin") + //log.Printf("stream %d closed", int(streamID)) conn := (*ClientConn)(ptr) stream, ok := conn.streams[int(streamID)] if ok { stream.Close() - conn.lock.Lock() + //conn.lock.Lock() delete(conn.streams, int(streamID)) - conn.lock.Unlock() + //go stream.Close() + //conn.lock.Unlock() + //log.Println("onClientStreamClose end") return NGHTTP2_NO_ERROR } + //log.Println("onClientStreamClose end") return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE } + +//export onClientConnectionCloseCallback +func onClientConnectionCloseCallback(ptr unsafe.Pointer) { + conn := (*ClientConn)(ptr) + conn.err = io.EOF + select { + case conn.exitch <- struct{}{}: + default: + } +} diff --git a/nghttp2.c b/nghttp2.c index ad8079a..ffa5a4d 100644 --- a/nghttp2.c +++ b/nghttp2.c @@ -1,5 +1,7 @@ #include "_nghttp2.h" +int on_error_callback(nghttp2_session *session, const char *msg, size_t len, void *user_data); + static ssize_t server_send_callback(nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) @@ -104,6 +106,7 @@ nghttp2_session *init_nghttp2_server_session(size_t data) nghttp2_session_callbacks_set_on_header_callback(callbacks, on_server_header_callback); + nghttp2_session_callbacks_set_error_callback(callbacks, on_error_callback); nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, on_server_begin_headers_callback); @@ -116,7 +119,7 @@ nghttp2_session *init_nghttp2_server_session(size_t data) int send_server_connection_header(nghttp2_session *session) { nghttp2_settings_entry iv[1] = { - {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 1000}}; int rv; rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, @@ -137,13 +140,14 @@ static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *dat { return onClientDataSendCallback(user_data, (void *)data, length); } - +/* // recv_callback read data from network static ssize_t client_recv_callback(nghttp2_session *session, uint8_t *buf, size_t length, int flags, void *user_data) { return onClientDataRecvCallback(user_data, (void *)buf, length); } +*/ static int on_client_header_callback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, @@ -246,10 +250,14 @@ static int on_client_frame_recv_callback(nghttp2_session *session, } break; case NGHTTP2_RST_STREAM: - printf("server send rst_stream %d\n", frame->rst_stream.error_code); + //printf("server send rst_stream %d\n", frame->rst_stream.error_code); break; case NGHTTP2_GOAWAY: - printf("server send go away\n"); + //printf("server send go away\n"); + onClientConnectionCloseCallback(user_data); + break; + case NGHTTP2_PING: + //printf("ping frame received\n"); break; } return 0; @@ -280,18 +288,18 @@ static ssize_t data_source_read_callback(nghttp2_session *session, int32_t strea } return ret; } -int on_error_callback(nghttp2_session *session, int lib_error_code, +int on_error_callback(nghttp2_session *session, const char *msg, size_t len, void *user_data) { //printf("errmsg %*s\n", msg, len); - printf("code: %d, error: %s\n", lib_error_code, nghttp2_strerror(lib_error_code)); + printf("error: %s\n", msg); return 0; } void init_client_callbacks(nghttp2_session_callbacks *callbacks) { nghttp2_session_callbacks_set_send_callback(callbacks, client_send_callback); - nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback); + //nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback); //nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback);