add callback for goaway frame

merge_conn
fangdingjun 6 years ago
parent 68c8ece2c6
commit 4125b26c35

@ -15,6 +15,7 @@ extern int onClientBeginHeaderCallback(void *, int);
extern int onClientHeaderCallback(void *, int, void *, int, void *, int);
extern int onClientHeadersDoneCallback(void *, int);
extern int onClientStreamClose(void *, int);
extern void onClientConnectionCloseCallback(void *user_data);
extern ssize_t onServerDataRecvCallback(void *, void *data, size_t);
extern ssize_t onServerDataSendCallback(void *, void *data, size_t);

@ -23,6 +23,7 @@ const (
NGHTTP2_ERR_DEFERRED = -508
)
/*
// onServerDataRecvCallback callback function for libnghttp2 library
// want receive data from network.
//
@ -40,6 +41,7 @@ func onServerDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer,
C.memcpy(data, cbuf, C.size_t(n))
return C.ssize_t(n)
}
*/
// onServerDataSendCallback callback function for libnghttp2 library
// want send data to network.
@ -98,7 +100,10 @@ func onServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
},
//buf: new(bytes.Buffer),
}
//conn.lock.Lock()
conn.streams[int(streamID)] = s
//conn.lock.Unlock()
return NGHTTP2_NO_ERROR
}
@ -190,9 +195,9 @@ func onServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
if !ok {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
conn.lock.Lock()
//conn.lock.Lock()
delete(conn.streams, int(streamID))
conn.lock.Unlock()
//conn.lock.Unlock()
s.Close()
return NGHTTP2_NO_ERROR
}
@ -205,22 +210,27 @@ func onServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
//export onDataSourceReadCallback
func onDataSourceReadCallback(ptr unsafe.Pointer,
buf unsafe.Pointer, length C.size_t) C.ssize_t {
//log.Println("data source read")
//log.Println("onDataSourceReadCallback begin")
dp := (*dataProvider)(ptr)
gobuf := make([]byte, int(length))
n, err := dp.Read(gobuf)
if err != nil {
if err == io.EOF {
//log.Println("onDataSourceReadCallback end")
return 0
}
if err == errAgain {
//log.Println("onDataSourceReadCallback end")
dp.deferred = true
return NGHTTP2_ERR_DEFERRED
}
//log.Println("onDataSourceReadCallback end")
return NGHTTP2_ERR_CALLBACK_FAILURE
}
cbuf := C.CBytes(gobuf)
defer C.free(cbuf)
C.memcpy(buf, cbuf, C.size_t(n))
//log.Println("onDataSourceReadCallback end")
return C.ssize_t(n)
}
@ -229,16 +239,18 @@ func onDataSourceReadCallback(ptr unsafe.Pointer,
//export onClientDataChunkRecv
func onClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.int {
//log.Println("on data recv")
//log.Println("onClientDataChunkRecv begin")
conn := (*ClientConn)(ptr)
gobuf := C.GoBytes(buf, C.int(length))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientDataChunkRecv end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
if s.res.Body == nil {
//log.Println("empty body")
//log.Println("onClientDataChunkRecv end")
return C.int(length)
}
@ -247,11 +259,14 @@ func onClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
if err != nil {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
//log.Println("onClientDataChunkRecv end")
return C.int(n)
}
//log.Println("onClientDataChunkRecv end")
return C.int(length)
}
/*
// onClientDataRecvCallback callback function for libnghttp2 library want read data from network.
//
//export onClientDataRecvCallback
@ -270,20 +285,23 @@ func onClientDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.si
C.memcpy(data, cbuf, C.size_t(n))
return C.ssize_t(n)
}
*/
// onClientDataSendCallback callback function for libnghttp2 library want send data to network.
//
//export onClientDataSendCallback
func onClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("onClientDataSendCallback begin")
//log.Println("data write req ", int(size))
conn := (*ClientConn)(ptr)
buf := C.GoBytes(data, C.int(size))
//log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Write(buf)
if err != nil {
//log.Println("onClientDataSendCallback end")
return NGHTTP2_ERR_CALLBACK_FAILURE
}
//log.Println("write data to network ", n)
//log.Printf("write %d bytes to network ", n)
//log.Println("onClientDataSendCallback end")
return C.ssize_t(n)
}
@ -291,11 +309,13 @@ func onClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.si
//
//export onClientBeginHeaderCallback
func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("begin header")
//log.Println("onClientBeginHeaderCallback begin")
//log.Printf("stream %d begin headers", int(streamID))
conn := (*ClientConn)(ptr)
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientBeginHeaderCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
var TLS tls.ConnectionState
@ -310,6 +330,7 @@ func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
},
TLS: &TLS,
}
//log.Println("onClientBeginHeaderCallback end")
return NGHTTP2_NO_ERROR
}
@ -319,6 +340,7 @@ func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int {
//log.Println("onClientHeaderCallback begin")
//log.Println("header")
conn := (*ClientConn)(ptr)
goname := string(C.GoBytes(name, namelen))
@ -326,6 +348,7 @@ func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientHeaderCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
goname = strings.ToLower(goname)
@ -349,6 +372,7 @@ func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
default:
s.res.Header.Add(goname, govalue)
}
//log.Println("onClientHeaderCallback end")
return NGHTTP2_NO_ERROR
}
@ -356,13 +380,19 @@ func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
//
//export onClientHeadersDoneCallback
func onClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("frame recv")
//log.Println("onClientHeadersDoneCallback begin")
//log.Printf("stream %d headers done", int(streamID))
conn := (*ClientConn)(ptr)
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientHeadersDoneCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
s.resch <- s.res
select {
case s.resch <- s.res:
default:
}
//log.Println("onClientHeadersDoneCallback end")
return NGHTTP2_NO_ERROR
}
@ -370,16 +400,30 @@ func onClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//
//export onClientStreamClose
func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("stream close")
//log.Println("onClientStreamClose begin")
//log.Printf("stream %d closed", int(streamID))
conn := (*ClientConn)(ptr)
stream, ok := conn.streams[int(streamID)]
if ok {
stream.Close()
conn.lock.Lock()
//conn.lock.Lock()
delete(conn.streams, int(streamID))
conn.lock.Unlock()
//go stream.Close()
//conn.lock.Unlock()
//log.Println("onClientStreamClose end")
return NGHTTP2_NO_ERROR
}
//log.Println("onClientStreamClose end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
//export onClientConnectionCloseCallback
func onClientConnectionCloseCallback(ptr unsafe.Pointer) {
conn := (*ClientConn)(ptr)
conn.err = io.EOF
select {
case conn.exitch <- struct{}{}:
default:
}
}

@ -1,5 +1,7 @@
#include "_nghttp2.h"
int on_error_callback(nghttp2_session *session, const char *msg, size_t len, void *user_data);
static ssize_t server_send_callback(nghttp2_session *session,
const uint8_t *data, size_t length,
int flags, void *user_data)
@ -104,6 +106,7 @@ nghttp2_session *init_nghttp2_server_session(size_t data)
nghttp2_session_callbacks_set_on_header_callback(callbacks,
on_server_header_callback);
nghttp2_session_callbacks_set_error_callback(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_server_begin_headers_callback);
@ -116,7 +119,7 @@ nghttp2_session *init_nghttp2_server_session(size_t data)
int send_server_connection_header(nghttp2_session *session)
{
nghttp2_settings_entry iv[1] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}};
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 1000}};
int rv;
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv,
@ -137,13 +140,14 @@ static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *dat
{
return onClientDataSendCallback(user_data, (void *)data, length);
}
/*
// recv_callback read data from network
static ssize_t client_recv_callback(nghttp2_session *session, uint8_t *buf,
size_t length, int flags, void *user_data)
{
return onClientDataRecvCallback(user_data, (void *)buf, length);
}
*/
static int on_client_header_callback(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
@ -246,10 +250,14 @@ static int on_client_frame_recv_callback(nghttp2_session *session,
}
break;
case NGHTTP2_RST_STREAM:
printf("server send rst_stream %d\n", frame->rst_stream.error_code);
//printf("server send rst_stream %d\n", frame->rst_stream.error_code);
break;
case NGHTTP2_GOAWAY:
printf("server send go away\n");
//printf("server send go away\n");
onClientConnectionCloseCallback(user_data);
break;
case NGHTTP2_PING:
//printf("ping frame received\n");
break;
}
return 0;
@ -280,18 +288,18 @@ static ssize_t data_source_read_callback(nghttp2_session *session, int32_t strea
}
return ret;
}
int on_error_callback(nghttp2_session *session, int lib_error_code,
int on_error_callback(nghttp2_session *session,
const char *msg, size_t len, void *user_data)
{
//printf("errmsg %*s\n", msg, len);
printf("code: %d, error: %s\n", lib_error_code, nghttp2_strerror(lib_error_code));
printf("error: %s\n", msg);
return 0;
}
void init_client_callbacks(nghttp2_session_callbacks *callbacks)
{
nghttp2_session_callbacks_set_send_callback(callbacks, client_send_callback);
nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback);
//nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback);
//nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback);

Loading…
Cancel
Save