diff --git a/_nghttp2.h b/_nghttp2.h index 90efe5a..b84095b 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -5,14 +5,14 @@ #include #include -extern ssize_t DataRead(void *, void *data, size_t); -extern ssize_t DataWrite(void *, void *data, size_t); +extern ssize_t ClientDataRecv(void *, void *data, size_t); +extern ssize_t ClientDataSend(void *, void *data, size_t); extern ssize_t DataSourceRead(void *, void *, size_t); -extern int OnDataRecv(void *, int, void *, size_t); -extern int OnBeginHeaderCallback(void *, int); -extern int OnHeaderCallback(void *, int, void *, int, void *, int); -extern int OnFrameRecvCallback(void *, int); -extern int OnStreamClose(void *, int); +extern int OnClientDataRecv(void *, int, void *, size_t); +extern int OnClientBeginHeaderCallback(void *, int); +extern int OnClientHeaderCallback(void *, int, void *, int, void *, int); +extern int OnClientFrameRecvCallback(void *, int); +extern int OnClientStreamClose(void *, int); struct nv_array { diff --git a/nghttp2.c b/client.c similarity index 78% rename from nghttp2.c rename to client.c index 06d86fd..38bd807 100644 --- a/nghttp2.c +++ b/client.c @@ -3,20 +3,20 @@ #define ARRLEN(x) (sizeof(x) / sizeof(x[0])) // send_callback send data to network -static ssize_t send_callback(nghttp2_session *session, const uint8_t *data, +static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) { - return DataWrite(user_data, (void *)data, length); + return ClientDataSend(user_data, (void *)data, length); } // recv_callback read data from network -static ssize_t recv_callback(nghttp2_session *session, uint8_t *buf, +static ssize_t client_recv_callback(nghttp2_session *session, uint8_t *buf, size_t length, int flags, void *user_data) { - return DataRead(user_data, (void *)buf, length); + return ClientDataRecv(user_data, (void *)buf, length); } -static int on_header_callback(nghttp2_session *session, +static int on_client_header_callback(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) @@ -29,7 +29,7 @@ static int on_header_callback(nghttp2_session *session, { /* Print response headers for the initiated request. */ //print_header(stderr, name, namelen, value, valuelen); - OnHeaderCallback(user_data, frame->hd.stream_id, + OnClientHeaderCallback(user_data, frame->hd.stream_id, (void *)name, namelen, (void *)value, valuelen); break; } @@ -37,7 +37,7 @@ static int on_header_callback(nghttp2_session *session, return 0; } -static int on_begin_headers_callback(nghttp2_session *session, +static int on_client_begin_headers_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { @@ -50,7 +50,7 @@ static int on_begin_headers_callback(nghttp2_session *session, { //fprintf(stderr, "Response headers for stream ID=%d:\n", // frame->hd.stream_id); - OnBeginHeaderCallback(user_data, stream_id); + OnClientBeginHeaderCallback(user_data, stream_id); } break; } @@ -67,7 +67,7 @@ int on_invalid_frame_recv_callback(nghttp2_session *session, return 0; } -static int on_frame_send_callback(nghttp2_session *session, +static int on_client_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { size_t i; @@ -103,7 +103,7 @@ static int on_frame_send_callback(nghttp2_session *session, return 0; } -static int on_frame_recv_callback(nghttp2_session *session, +static int on_client_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { //printf("on_frame_recv_callback %d\n", frame->hd.type); @@ -113,7 +113,7 @@ static int on_frame_recv_callback(nghttp2_session *session, if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) { //fprintf(stderr, "All headers received\n"); - OnFrameRecvCallback(user_data, frame->hd.stream_id); + OnClientFrameRecvCallback(user_data, frame->hd.stream_id); } break; case NGHTTP2_RST_STREAM: @@ -126,17 +126,17 @@ static int on_frame_recv_callback(nghttp2_session *session, return 0; } -static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, +static int on_client_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { - return OnDataRecv(user_data, stream_id, (void *)data, len); + return OnClientDataRecv(user_data, stream_id, (void *)data, len); } -static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, +static int on_client_stream_close_callback(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { - OnStreamClose(user_data, stream_id); + OnClientStreamClose(user_data, stream_id); return 0; } @@ -159,28 +159,28 @@ int on_error_callback(nghttp2_session *session, int lib_error_code, return 0; } -void init_callbacks(nghttp2_session_callbacks *callbacks) +void init_client_callbacks(nghttp2_session_callbacks *callbacks) { - nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); - nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback); + nghttp2_session_callbacks_set_send_callback(callbacks, client_send_callback); + nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback); //nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback); nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - on_frame_recv_callback); + on_client_frame_recv_callback); - nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_frame_send_callback); + nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_client_frame_send_callback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback( - callbacks, on_data_chunk_recv_callback); + callbacks, on_client_data_chunk_recv_callback); nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, on_stream_close_callback); + callbacks, on_client_stream_close_callback); nghttp2_session_callbacks_set_on_header_callback(callbacks, - on_header_callback); + on_client_header_callback); nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, on_begin_headers_callback); + callbacks, on_client_begin_headers_callback); } nghttp2_session *init_client_session(size_t data) @@ -189,7 +189,7 @@ nghttp2_session *init_client_session(size_t data) nghttp2_session *session; nghttp2_session_callbacks *callbacks; nghttp2_session_callbacks_new(&callbacks); - init_callbacks(callbacks); + init_client_callbacks(callbacks); ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data))); if (session == NULL) { @@ -198,21 +198,6 @@ nghttp2_session *init_client_session(size_t data) return session; } -nghttp2_session *init_server_session(size_t data) -{ - int ret; - nghttp2_session *session; - nghttp2_session_callbacks *callbacks; - nghttp2_session_callbacks_new(&callbacks); - init_callbacks(callbacks); - ret = nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); - if (session == NULL) - { - printf("c init session failed: %s\n", nghttp2_strerror(ret)); - } - return session; -} - int send_client_connection_header(nghttp2_session *session) { nghttp2_settings_entry iv[1] = { diff --git a/http2.go b/client.go similarity index 70% rename from http2.go rename to client.go index b54a1d4..06def36 100644 --- a/http2.go +++ b/client.go @@ -18,19 +18,20 @@ import ( "unsafe" ) -// Conn http2 connection -type Conn struct { +// ClientConn http2 connection +type ClientConn struct { session *C.nghttp2_session conn net.Conn - streams map[int]*Stream + streams map[int]*ClientStream lock *sync.Mutex errch chan struct{} + exitch chan struct{} err error isServer bool } -// Stream http2 stream -type Stream struct { +// ClientStream http2 stream +type ClientStream struct { streamID int cdp *C.nghttp2_data_provider dp *dataProvider @@ -44,9 +45,6 @@ type Stream struct { closed bool } -type Request struct { -} - type dataProvider struct { // drain the data r io.Reader @@ -55,11 +53,12 @@ type dataProvider struct { } // NewClientConn create http2 client -func NewClientConn(c net.Conn) (*Conn, error) { - conn := &Conn{ - conn: c, streams: make(map[int]*Stream), - lock: new(sync.Mutex), - errch: make(chan struct{}), +func NewClientConn(c net.Conn) (*ClientConn, error) { + conn := &ClientConn{ + conn: c, streams: make(map[int]*ClientStream), + lock: new(sync.Mutex), + errch: make(chan struct{}), + exitch: make(chan struct{}), } conn.session = C.init_client_session( C.size_t(int(uintptr(unsafe.Pointer(conn))))) @@ -75,46 +74,28 @@ func NewClientConn(c net.Conn) (*Conn, error) { return conn, nil } -// NewServerConn create http2 server -func NewServerConn(c net.Conn) (*Conn, error) { - conn := &Conn{ - conn: c, streams: make(map[int]*Stream), - lock: new(sync.Mutex), - errch: make(chan struct{}), - isServer: true, - } - conn.session = C.init_server_session( - C.size_t(int(uintptr(unsafe.Pointer(conn))))) - if conn.session == nil { - return nil, fmt.Errorf("init session failed") - } - go conn.run() - return conn, nil - -} - -func (c *Conn) onDataRecv(buf []byte, streamID int) { +func (c *ClientConn) onDataRecv(buf []byte, streamID int) { stream := c.streams[streamID] stream.onDataRecv(buf) } -func (c *Conn) onBeginHeader(streamID int) { +func (c *ClientConn) onBeginHeader(streamID int) { stream := c.streams[streamID] stream.onBeginHeader() } -func (c *Conn) onHeader(streamID int, name, value string) { +func (c *ClientConn) onHeader(streamID int, name, value string) { stream := c.streams[streamID] stream.onHeader(name, value) } -func (c *Conn) onFrameRecv(streamID int) { +func (c *ClientConn) onFrameRecv(streamID int) { stream := c.streams[streamID] stream.onFrameRecv() } -func (c *Conn) onStreamClose(streamID int) { +func (c *ClientConn) onStreamClose(streamID int) { stream, ok := c.streams[streamID] if ok { stream.Close() @@ -126,29 +107,38 @@ func (c *Conn) onStreamClose(streamID int) { } // Close close the http2 connection -func (c *Conn) Close() error { +func (c *ClientConn) Close() error { for _, s := range c.streams { s.Close() } C.nghttp2_session_terminate_session(c.session, 0) C.nghttp2_session_del(c.session) - close(c.errch) + close(c.exitch) c.conn.Close() return nil } -func (c *Conn) run() { +func (c *ClientConn) run() { var wantRead int var wantWrite int var delay = 50 var ret C.int + defer close(c.errch) + datach := make(chan []byte) errch := make(chan error) go func() { buf := make([]byte, 16*1024) + readloop: for { + select { + case <-c.exitch: + break readloop + default: + } + n, err := c.conn.Read(buf) if err != nil { errch <- err @@ -166,6 +156,8 @@ loop: case err := <-errch: c.err = err break loop + case <-c.exitch: + break loop default: } @@ -205,15 +197,8 @@ loop: } } -// AcceptRequest get a request from session -// this block until a request is avaliable, -// server only -func (c *Conn) AcceptRequest() (req *Request, err error) { - return nil, nil -} - // CreateRequest submit a request and return a http.Response, client only -func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) { +func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { if c.err != nil { return nil, c.err } @@ -260,7 +245,7 @@ func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) { } //log.Println("stream id ", int(streamID)) r, w := io.Pipe() - s := &Stream{ + s := &ClientStream{ streamID: int(streamID), dp: dp, cdp: cdp, @@ -278,6 +263,8 @@ func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) { return nil, err case res := <-s.resch: return res, nil + case <-c.errch: + return nil, fmt.Errorf("connection error") } //return nil, fmt.Errorf("unknown error") } @@ -313,25 +300,25 @@ func newDataProvider(r io.Reader, w io.Writer) ( return dp, cdp } -func (s *Stream) Read(buf []byte) (n int, err error) { +func (s *ClientStream) Read(buf []byte) (n int, err error) { return s.r.Read(buf) } -func (s *Stream) Write(buf []byte) (n int, err error) { +func (s *ClientStream) Write(buf []byte) (n int, err error) { return s.dp.Write(buf) } -func (s *Stream) onDataRecv(buf []byte) { +func (s *ClientStream) onDataRecv(buf []byte) { s.w.Write(buf) } -func (s *Stream) onBeginHeader() { +func (s *ClientStream) onBeginHeader() { s.res = &http.Response{ Header: make(http.Header), } } -func (s *Stream) onHeader(name, value string) { +func (s *ClientStream) onHeader(name, value string) { if name == ":status" { statusCode, _ := strconv.Atoi(value) s.res.StatusCode = statusCode @@ -344,14 +331,14 @@ func (s *Stream) onHeader(name, value string) { s.res.Header.Add(name, value) } -func (s *Stream) onFrameRecv() { +func (s *ClientStream) onFrameRecv() { s.res.Body = s s.resch <- s.res //log.Println("stream frame recv") } // Close close the stream -func (s *Stream) Close() error { +func (s *ClientStream) Close() error { if s.closed { return nil } @@ -392,22 +379,22 @@ func DataSourceRead(ptr unsafe.Pointer, return C.ssize_t(n) } -// OnDataRecv callback function for data frame received -//export OnDataRecv -func OnDataRecv(ptr unsafe.Pointer, streamID C.int, +// OnClientDataRecv callback function for data frame received +//export OnClientDataRecv +func OnClientDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int { //log.Println("on data recv") - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) gobuf := C.GoBytes(buf, C.int(length)) conn.onDataRecv(gobuf, int(streamID)) return 0 } -// DataRead callback function for session wants read data from peer -//export DataRead -func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { +// ClientDataRecv callback function for session wants read data from peer +//export ClientDataRecv +func ClientDataRecv(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { //log.Println("data read req", int(size)) - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) buf := make([]byte, int(size)) //log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Read(buf) @@ -421,11 +408,11 @@ func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t return C.ssize_t(n) } -// DataWrite callback function for session wants send data to peer -//export DataWrite -func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { +// ClientDataSend callback function for session wants send data to peer +//export ClientDataSend +func ClientDataSend(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { //log.Println("data write req ", int(size)) - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) buf := C.GoBytes(data, C.int(size)) //log.Println(conn.conn.RemoteAddr()) n, err := conn.conn.Write(buf) @@ -437,42 +424,42 @@ func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t return C.ssize_t(n) } -// OnBeginHeaderCallback callback function for response -//export OnBeginHeaderCallback -func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { +// OnClientBeginHeaderCallback callback function for response +//export OnClientBeginHeaderCallback +func OnClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { //log.Println("begin header") - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) conn.onBeginHeader(int(streamID)) return 0 } -// OnHeaderCallback callback function for header -//export OnHeaderCallback -func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, +// OnClientHeaderCallback callback function for header +//export OnClientHeaderCallback +func OnClientHeaderCallback(ptr unsafe.Pointer, streamID C.int, name unsafe.Pointer, namelen C.int, value unsafe.Pointer, valuelen C.int) C.int { //log.Println("header") - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) goname := C.GoBytes(name, namelen) govalue := C.GoBytes(value, valuelen) conn.onHeader(int(streamID), string(goname), string(govalue)) return 0 } -// OnFrameRecvCallback callback function for begion to recv data -//export OnFrameRecvCallback -func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { +// OnClientFrameRecvCallback callback function for begion to recv data +//export OnClientFrameRecvCallback +func OnClientFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { //log.Println("frame recv") - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) conn.onFrameRecv(int(streamID)) return 0 } -// OnStreamClose callback function for stream close -//export OnStreamClose -func OnStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { +// OnClientStreamClose callback function for stream close +//export OnClientStreamClose +func OnClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { //log.Println("stream close") - conn := (*Conn)(ptr) + conn := (*ClientConn)(ptr) conn.onStreamClose(int(streamID)) return 0 }