From 3396a7519d44727c6dfc95c6d839fe89be99de2d Mon Sep 17 00:00:00 2001 From: fangdingjun Date: Wed, 4 Jul 2018 09:48:36 +0800 Subject: [PATCH] add server implement --- _nghttp2.h | 14 +- client.c | 35 ++-- client.go | 81 ++++---- http2_test.go | 85 +++++++++ server.c | 132 +++++++++++++ server.go | 438 ++++++++++++++++++++++++++++++++++++++++++++ testdata/server.crt | 21 +++ testdata/server.key | 27 +++ 8 files changed, 766 insertions(+), 67 deletions(-) create mode 100644 server.c create mode 100644 server.go create mode 100644 testdata/server.crt create mode 100644 testdata/server.key diff --git a/_nghttp2.h b/_nghttp2.h index b84095b..188e3bb 100644 --- a/_nghttp2.h +++ b/_nghttp2.h @@ -5,15 +5,27 @@ #include #include +#define ARRLEN(x) (sizeof(x) / sizeof(x[0])) + extern ssize_t ClientDataRecv(void *, void *data, size_t); extern ssize_t ClientDataSend(void *, void *data, size_t); extern ssize_t DataSourceRead(void *, void *, size_t); extern int OnClientDataRecv(void *, int, void *, size_t); extern int OnClientBeginHeaderCallback(void *, int); extern int OnClientHeaderCallback(void *, int, void *, int, void *, int); -extern int OnClientFrameRecvCallback(void *, int); +extern int OnClientHeadersDoneCallback(void *, int); extern int OnClientStreamClose(void *, int); +extern ssize_t ServerDataRecv(void *, void *data, size_t); +extern ssize_t ServerDataSend(void *, void *data, size_t); +extern int OnServerDataRecv(void *, int, void *, size_t); +extern int OnServerBeginHeaderCallback(void *, int); +extern int OnServerHeaderCallback(void *, int, void *, int, void *, int); +extern int OnServerStreamEndCallback(void *, int); +extern int OnServerHeadersDoneCallback(void *, int); +extern int OnServerStreamClose(void *, int); +int send_server_connection_header(nghttp2_session *session); + struct nv_array { nghttp2_nv *nv; diff --git a/client.c b/client.c index 7296c05..e38eeb1 100644 --- a/client.c +++ b/client.c @@ -1,24 +1,23 @@ #include "_nghttp2.h" - // send_callback send data to network static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *data, - size_t length, int flags, void *user_data) + size_t length, int flags, void *user_data) { return ClientDataSend(user_data, (void *)data, length); } // recv_callback read data from network static ssize_t client_recv_callback(nghttp2_session *session, uint8_t *buf, - size_t length, int flags, void *user_data) + size_t length, int flags, void *user_data) { return ClientDataRecv(user_data, (void *)buf, length); } static int on_client_header_callback(nghttp2_session *session, - const nghttp2_frame *frame, const uint8_t *name, - size_t namelen, const uint8_t *value, - size_t valuelen, uint8_t flags, void *user_data) + const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data) { //printf("on_header_callback\n"); switch (frame->hd.type) @@ -29,7 +28,7 @@ static int on_client_header_callback(nghttp2_session *session, /* Print response headers for the initiated request. */ //print_header(stderr, name, namelen, value, valuelen); OnClientHeaderCallback(user_data, frame->hd.stream_id, - (void *)name, namelen, (void *)value, valuelen); + (void *)name, namelen, (void *)value, valuelen); break; } } @@ -37,8 +36,8 @@ static int on_client_header_callback(nghttp2_session *session, } static int on_client_begin_headers_callback(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) + const nghttp2_frame *frame, + void *user_data) { //printf("on_begin_headers_callback\n"); int stream_id = frame->hd.stream_id; @@ -67,7 +66,7 @@ int on_invalid_frame_recv_callback(nghttp2_session *session, } static int on_client_frame_send_callback(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) + const nghttp2_frame *frame, void *user_data) { size_t i; (void)user_data; @@ -103,7 +102,7 @@ static int on_client_frame_send_callback(nghttp2_session *session, } static int on_client_frame_recv_callback(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) + const nghttp2_frame *frame, void *user_data) { //printf("on_frame_recv_callback %d\n", frame->hd.type); switch (frame->hd.type) @@ -126,22 +125,22 @@ static int on_client_frame_recv_callback(nghttp2_session *session, } static int on_client_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, - int32_t stream_id, const uint8_t *data, - size_t len, void *user_data) + int32_t stream_id, const uint8_t *data, + size_t len, void *user_data) { return OnClientDataRecv(user_data, stream_id, (void *)data, len); } static int on_client_stream_close_callback(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *user_data) + uint32_t error_code, void *user_data) { OnClientStreamClose(user_data, stream_id); return 0; } static ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id, - uint8_t *buf, size_t length, uint32_t *data_flags, - nghttp2_data_source *source, void *user_data) + uint8_t *buf, size_t length, uint32_t *data_flags, + nghttp2_data_source *source, void *user_data) { int ret = DataSourceRead(source->ptr, buf, length); if (ret == 0) @@ -168,7 +167,7 @@ void init_client_callbacks(nghttp2_session_callbacks *callbacks) nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_client_frame_recv_callback); - nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_client_frame_send_callback); + //nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_client_frame_send_callback); nghttp2_session_callbacks_set_on_data_chunk_recv_callback( callbacks, on_client_data_chunk_recv_callback); @@ -233,11 +232,13 @@ int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen fprintf(stderr, "Request headers:\n"); print_headers(stderr, hdrs, ARRLEN(hdrs)); */ + /* int i; for (i = 0; i < hdrlen; i++) { printf("header %s: %s\n", hdrs[i].name, hdrs[i].value); } + */ stream_id = nghttp2_submit_request(session, NULL, hdrs, hdrlen, dp, NULL); /* diff --git a/client.go b/client.go index b51aa13..1610133 100644 --- a/client.go +++ b/client.go @@ -52,16 +52,11 @@ type ClientStream struct { } type dataProvider struct { - // drain the data - r io.Reader - // provider the data - w io.Writer - datach chan []byte - errch chan error buf *bytes.Buffer - run bool - streamID int + closed bool + lock *sync.Mutex session *C.nghttp2_session + streamID int } // NewClientConn create http2 client @@ -247,7 +242,11 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) { var dp *dataProvider var cdp *C.nghttp2_data_provider if req.Body != nil { - dp, cdp = newDataProvider(req.Body, nil) + dp, cdp = newDataProvider() + go func() { + io.Copy(dp, req.Body) + dp.Close() + }() } streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp) if dp != nil { @@ -298,40 +297,15 @@ func setNvArray(a *C.struct_nv_array, index int, cvalue, cnamelen, cvaluelen, cflags) } -func (dp *dataProvider) start() { - buf := make([]byte, 4096) - for { - n, err := dp.r.Read(buf) - if err != nil { - dp.errch <- err - break - } - dp.datach <- buf[:n] - C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) - } -} - // Read read from data provider // this emulate a unblocking reading // if data is not avaliable return errAgain func (dp *dataProvider) Read(buf []byte) (n int, err error) { - if !dp.run { - go dp.start() - dp.run = true - time.Sleep(100 * time.Millisecond) - } - - select { - case err := <-dp.errch: - //log.Println("d err ", err) - return 0, err - case b := <-dp.datach: - dp.buf.Write(b) - default: - } + dp.lock.Lock() + defer dp.lock.Unlock() n, err = dp.buf.Read(buf) - if err != nil { - //log.Println(err) + + if err != nil && !dp.closed { return 0, errAgain } return @@ -339,19 +313,25 @@ func (dp *dataProvider) Read(buf []byte) (n int, err error) { // Write provider data for data provider func (dp *dataProvider) Write(buf []byte) (n int, err error) { - if dp.w == nil { - return 0, fmt.Errorf("write not supported") - } - return dp.w.Write(buf) + dp.lock.Lock() + defer dp.lock.Unlock() + C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) + return dp.buf.Write(buf) +} + +// Close +func (dp *dataProvider) Close() error { + dp.lock.Lock() + defer dp.lock.Unlock() + dp.closed = true + C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) + return nil } - -func newDataProvider(r io.Reader, w io.Writer) ( +func newDataProvider() ( *dataProvider, *C.nghttp2_data_provider) { dp := &dataProvider{ - r: r, w: w, - errch: make(chan error), - datach: make(chan []byte), - buf: new(bytes.Buffer), + buf: new(bytes.Buffer), + lock: new(sync.Mutex), } cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp)))) return dp, cdp @@ -399,7 +379,7 @@ func (s *ClientStream) Close() error { if s.closed { return nil } - err := fmt.Errorf("stream closed") + err := io.EOF //log.Println("close stream") select { case s.errch <- err: @@ -412,6 +392,9 @@ func (s *ClientStream) Close() error { //log.Println("close pipe w") s.w.CloseWithError(err) //log.Println("close stream done") + if s.dp != nil { + s.dp.Close() + } s.closed = true return nil } diff --git a/http2_test.go b/http2_test.go index 6872d9c..e8e96a3 100644 --- a/http2_test.go +++ b/http2_test.go @@ -3,6 +3,8 @@ package nghttp2 import ( "bytes" "crypto/tls" + "fmt" + "io/ioutil" "log" "net/http" "net/url" @@ -55,3 +57,86 @@ func TestHttp2Client(t *testing.T) { log.Println("end") } + +func TestHttp2Server(t *testing.T) { + cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key") + if err != nil { + t.Fatal(err) + } + + l, err := tls.Listen("tcp", "127.0.0.1:0", &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2"}, + }) + if err != nil { + t.Fatal(err) + } + defer l.Close() + addr := l.Addr().String() + go func() { + http.HandleFunc("/get", func(w http.ResponseWriter, r *http.Request) { + log.Printf("%+v", r) + hdr := w.Header() + hdr.Set("content-type", "text/plain") + hdr.Set("aa", "bb") + d, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println(err) + return + } + w.Write(d) + }) + for { + c, err := l.Accept() + if err != nil { + break + } + h2conn, err := NewServerConn(c, nil) + if err != nil { + t.Fatal(err) + } + log.Printf("%+v", h2conn) + go h2conn.Run() + } + }() + conn, err := tls.Dial("tcp", addr, &tls.Config{ + NextProtos: []string{"h2"}, + ServerName: "localhost", + InsecureSkipVerify: true, + }) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + cstate := conn.ConnectionState() + if cstate.NegotiatedProtocol != "h2" { + t.Fatal("no http2 on server") + } + h2conn, err := NewClientConn(conn) + if err != nil { + t.Fatal(err) + } + d := bytes.NewBuffer([]byte("hello")) + req, _ := http.NewRequest("POST", + fmt.Sprintf("https://%s/get?a=b&c=d", addr), d) + req.Header.Add("User-Agent", "nghttp2/1.32") + req.Header.Add("Content-Type", "text/palin") + res, err := h2conn.CreateRequest(req) + if err != nil { + t.Fatal(err) + } + if res.StatusCode != http.StatusOK { + t.Errorf("expect http code %d, got %d", http.StatusOK, res.StatusCode) + } + defer res.Body.Close() + log.Printf("%+v", res) + data, err := ioutil.ReadAll(res.Body) + log.Println(string(data)) + if err != nil { + t.Error(err) + } + if string(data) != "hello" { + t.Errorf("expect %s, got %s", "hello", string(data)) + } +} diff --git a/server.c b/server.c new file mode 100644 index 0000000..7213d3c --- /dev/null +++ b/server.c @@ -0,0 +1,132 @@ +#include "_nghttp2.h" + +static ssize_t server_send_callback(nghttp2_session *session, + const uint8_t *data, size_t length, + int flags, void *user_data) +{ + return ServerDataSend(user_data, (void *)data, length); +} + +static int on_server_frame_recv_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) + { + OnServerHeadersDoneCallback(user_data, frame->hd.stream_id); + } + case NGHTTP2_DATA: + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) + { + OnServerStreamEndCallback(user_data, frame->hd.stream_id); + } + break; + } + return 0; +} + +static int on_server_stream_close_callback(nghttp2_session *session, + int32_t stream_id, + uint32_t error_code, + void *user_data) + +{ + OnServerStreamClose(user_data, stream_id); + return 0; +} + +static int on_server_header_callback(nghttp2_session *session, + const nghttp2_frame *frame, + const uint8_t *name, size_t namelen, + const uint8_t *value, + size_t valuelen, uint8_t flags, + void *user_data) +{ + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) + { + OnServerHeaderCallback(user_data, frame->hd.stream_id, + (void *)name, namelen, (void *)value, valuelen); + } + break; + } + return 0; +} + +static int on_server_data_chunk_recv_callback(nghttp2_session *session, + uint8_t flags, + int32_t stream_id, + const uint8_t *data, + size_t len, void *user_data) +{ + return OnServerDataRecv(user_data, stream_id, (void *)data, len); +} + +static int on_server_begin_headers_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) + { + OnServerBeginHeaderCallback(user_data, frame->hd.stream_id); + } + break; + } + return 0; +} + +nghttp2_session *init_server_session(size_t data) +{ + nghttp2_session_callbacks *callbacks; + nghttp2_session *session; + + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, server_send_callback); + + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + on_server_frame_recv_callback); + + nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, on_server_stream_close_callback); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + callbacks, on_server_data_chunk_recv_callback); + nghttp2_session_callbacks_set_on_header_callback(callbacks, + on_server_header_callback); + + nghttp2_session_callbacks_set_on_begin_headers_callback( + callbacks, on_server_begin_headers_callback); + + nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data))); + + nghttp2_session_callbacks_del(callbacks); + return session; +} + +int send_server_connection_header(nghttp2_session *session) +{ + nghttp2_settings_entry iv[1] = { + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; + int rv; + + rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, + ARRLEN(iv)); + return rv; + /* + if (rv != 0) { + // warnx("Fatal error: %s", nghttp2_strerror(rv)); + return rv; + } + return 0; + */ +} \ No newline at end of file diff --git a/server.go b/server.go new file mode 100644 index 0000000..0914ca1 --- /dev/null +++ b/server.go @@ -0,0 +1,438 @@ +package nghttp2 + +/* +#cgo pkg-config: libnghttp2 +#include "_nghttp2.h" +*/ +import "C" +import ( + "bytes" + "fmt" + "log" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" + "unsafe" +) + +// ServerConn server connection +type ServerConn struct { + // Handler handler to handle request + Handler http.Handler + + session *C.nghttp2_session + streams map[int]*ServerStream + lock *sync.Mutex + conn net.Conn + errch chan struct{} + exitch chan struct{} + err error +} + +// ServerStream server stream +type ServerStream struct { + streamID int + // headers receive done + headersDone bool + // is stream_end flag received + streamEnd bool + // request + req *http.Request + // response header + header http.Header + // response statusCode + statusCode int + // response has send + responseSend bool + + // server connection + conn *ServerConn + + // data provider + dp *dataProvider + cdp *C.nghttp2_data_provider + + closed bool + //buf *bytes.Buffer +} + +type bodyProvider struct { + buf *bytes.Buffer + closed bool + lock *sync.Mutex +} + +func (bp *bodyProvider) Read(buf []byte) (int, error) { + for { + bp.lock.Lock() + n, err := bp.buf.Read(buf) + bp.lock.Unlock() + if err != nil && !bp.closed { + time.Sleep(100 * time.Millisecond) + continue + } + return n, err + } +} + +func (bp *bodyProvider) Write(buf []byte) (int, error) { + bp.lock.Lock() + defer bp.lock.Unlock() + return bp.buf.Write(buf) +} + +func (bp *bodyProvider) Close() error { + /* + if c, ok := bp.w.(io.Closer); ok{ + return c.Close() + } + */ + bp.closed = true + return nil +} + +// NewServerConn create new server connection +func NewServerConn(c net.Conn, handler http.Handler) (*ServerConn, error) { + conn := &ServerConn{ + conn: c, + Handler: handler, + streams: make(map[int]*ServerStream), + lock: new(sync.Mutex), + errch: make(chan struct{}), + exitch: make(chan struct{}), + } + conn.session = C.init_server_session(C.size_t(uintptr(unsafe.Pointer(conn)))) + if conn.session == nil { + return nil, fmt.Errorf("init session failed") + } + //log.Println("send server connection header") + ret := C.send_server_connection_header(conn.session) + if int(ret) < 0 { + log.Println(C.GoString(C.nghttp2_strerror(ret))) + return nil, fmt.Errorf("send connection header failed") + } + //go conn.run() + return conn, nil +} + +func (c *ServerConn) serve(s *ServerStream) { + var handler = c.Handler + if c.Handler == nil { + handler = http.DefaultServeMux + } + handler.ServeHTTP(s, s.req) + s.Close() +} + +// Close close the server connection +func (c *ServerConn) Close() error { + for _, s := range c.streams { + s.Close() + } + C.nghttp2_session_terminate_session(c.session, 0) + C.nghttp2_session_del(c.session) + close(c.exitch) + c.conn.Close() + return nil +} + +// Run run the server loop +func (c *ServerConn) Run() { + var wantRead int + var wantWrite int + var delay = 50 + var ret C.int + + defer c.Close() + defer close(c.errch) + + datach := make(chan []byte) + errch := make(chan error) + + go func() { + buf := make([]byte, 16*1024) + readloop: + for { + select { + case <-c.exitch: + break readloop + default: + } + + n, err := c.conn.Read(buf) + if err != nil { + errch <- err + break + } + datach <- buf[:n] + } + }() + +loop: + for { + select { + case <-c.errch: + break loop + case err := <-errch: + c.err = err + break loop + case <-c.exitch: + break loop + default: + } + + wantWrite = int(C.nghttp2_session_want_write(c.session)) + if wantWrite != 0 { + ret = C.nghttp2_session_send(c.session) + if int(ret) < 0 { + c.err = fmt.Errorf("sesion send error: %s", + C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break + } + } + + wantRead = int(C.nghttp2_session_want_read(c.session)) + select { + case d := <-datach: + d1 := C.CBytes(d) + ret1 := C.nghttp2_session_mem_recv(c.session, + (*C.uchar)(d1), C.size_t(int(len(d)))) + C.free(d1) + if int(ret1) < 0 { + c.err = fmt.Errorf("sesion recv error: %s", + C.GoString(C.nghttp2_strerror(ret))) + log.Println(c.err) + break loop + } + default: + } + + // make delay when no data read/write + if wantRead == 0 && wantWrite == 0 { + select { + case <-time.After(time.Duration(delay) * time.Millisecond): + } + } + } +} + +// Write implements http.ResponseWriter +func (s *ServerStream) Write(buf []byte) (int, error) { + if !s.responseSend { + s.WriteHeader(http.StatusOK) + } + /* + //log.Printf("stream %d, send %d bytes", s.streamID, len(buf)) + if s.buf.Len() > 2048 { + s.dp.Write(s.buf.Bytes()) + s.buf.Reset() + } + + if len(buf) < 2048 { + s.buf.Write(buf) + return len(buf), nil + } + */ + return s.dp.Write(buf) +} + +// WriteHeader implements http.ResponseWriter +func (s *ServerStream) WriteHeader(code int) { + s.statusCode = code + nvIndex := 0 + nvMax := 25 + nva := C.new_nv_array(C.size_t(nvMax)) + setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0) + nvIndex++ + + for k, v := range s.header { + if strings.ToLower(k) == "host" { + continue + } + //log.Printf("header %s: %s", k, v) + setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) + nvIndex++ + } + var dp *dataProvider + var cdp *C.nghttp2_data_provider + dp, cdp = newDataProvider() + dp.streamID = s.streamID + dp.session = s.conn.session + s.dp = dp + s.cdp = cdp + ret := C.nghttp2_submit_response( + s.conn.session, C.int(s.streamID), nva.nv, C.size_t(nvIndex), cdp) + C.delete_nv_array(nva) + if int(ret) < 0 { + panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret)))) + } + s.responseSend = true + log.Printf("stream %d send response", s.streamID) +} + +// Header implements http.ResponseWriter +func (s *ServerStream) Header() http.Header { + if s.header == nil { + s.header = http.Header{} + } + return s.header +} + +// Close close the stream +func (s *ServerStream) Close() error { + if s.closed { + return nil + } + //C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) + s.req.Body.Close() + if s.dp != nil { + s.dp.Close() + } + s.closed = true + log.Printf("stream %d closed", s.streamID) + return nil +} + +// ServerDataRecv callback function for receive data from network +//export ServerDataRecv +func ServerDataRecv(ptr unsafe.Pointer, data unsafe.Pointer, + length C.size_t) C.ssize_t { + conn := (*ServerConn)(ptr) + buf := make([]byte, int(length)) + n, err := conn.conn.Read(buf) + if err != nil { + return -1 + } + cbuf := C.CBytes(buf[:n]) + defer C.free(cbuf) + C.memcpy(data, cbuf, C.size_t(n)) + return C.ssize_t(n) +} + +// ServerDataSend callback function for send data to network +//export ServerDataSend +func ServerDataSend(ptr unsafe.Pointer, data unsafe.Pointer, + length C.size_t) C.ssize_t { + //log.Println("server data send") + conn := (*ServerConn)(ptr) + buf := C.GoBytes(data, C.int(length)) + n, err := conn.conn.Write(buf) + if err != nil { + return -1 + } + //log.Println("send ", n, " bytes to network ", buf) + return C.ssize_t(n) +} + +// OnServerDataRecv callback function for data recv +//export OnServerDataRecv +func OnServerDataRecv(ptr unsafe.Pointer, streamID C.int, + data unsafe.Pointer, length C.size_t) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + bp := s.req.Body.(*bodyProvider) + buf := C.GoBytes(data, C.int(length)) + bp.Write(buf) + return C.int(length) +} + +// OnServerBeginHeaderCallback callback function for begin header +//export OnServerBeginHeaderCallback +func OnServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*ServerConn)(ptr) + s := &ServerStream{ + streamID: int(streamID), + conn: conn, + req: &http.Request{ + URL: &url.URL{}, + Header: http.Header{}, + Proto: "HTTP/2.0", + ProtoMajor: 2, + ProtoMinor: 0, + }, + //buf: new(bytes.Buffer), + } + conn.streams[int(streamID)] = s + return 0 +} + +// OnServerHeaderCallback callback function for header +//export OnServerHeaderCallback +func OnServerHeaderCallback(ptr unsafe.Pointer, streamID C.int, + name unsafe.Pointer, namelen C.int, + value unsafe.Pointer, valuelen C.int) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + hdrname := C.GoStringN((*C.char)(name), namelen) + hdrvalue := C.GoStringN((*C.char)(value), valuelen) + hdrname = strings.ToLower(hdrname) + switch hdrname { + case ":method": + s.req.Method = hdrvalue + case ":scheme": + s.req.URL.Scheme = hdrvalue + case ":path": + s.req.RequestURI = hdrvalue + u, _ := url.ParseRequestURI(s.req.RequestURI) + scheme := s.req.URL.Scheme + *(s.req.URL) = *u + if scheme != "" { + s.req.URL.Scheme = scheme + } + case ":authority": + s.req.Host = hdrvalue + default: + s.req.Header.Add(hdrname, hdrvalue) + + } + return 0 +} + +// OnServerStreamEndCallback callback function for frame received +//export OnServerStreamEndCallback +func OnServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int { + + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + s.streamEnd = true + bp := s.req.Body.(*bodyProvider) + if s.req.Method != "CONNECT" { + bp.closed = true + log.Println("stream end flag set, begin to serve") + go conn.serve(s) + } + return 0 +} + +// OnServerHeadersDoneCallback callback function for all headers received +//export OnServerHeadersDoneCallback +func OnServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + s.headersDone = true + bp := &bodyProvider{ + buf: new(bytes.Buffer), + lock: new(sync.Mutex), + } + s.req.Body = bp + if s.req.Method == "CONNECT" { + go conn.serve(s) + } + return 0 +} + +// OnServerStreamClose callback function for stream close +//export OnServerStreamClose +func OnServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*ServerConn)(ptr) + s := conn.streams[int(streamID)] + conn.lock.Lock() + delete(conn.streams, int(streamID)) + conn.lock.Unlock() + s.Close() + return 0 +} diff --git a/testdata/server.crt b/testdata/server.crt new file mode 100644 index 0000000..ba731b0 --- /dev/null +++ b/testdata/server.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDYDCCAkigAwIBAgIJAJ92ThBK0H0ZMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTgwNjI3MDMwMjUwWhcNMjgwNjI0MDMwMjUwWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEAufMjhIyoWzqqRKbglkfCWqSbU1N6SPaSjBmVQCfFGzUZx3DHWvemJ8Ip +2Nb9Gx9fI/GiRHs1K6noWZjJzsFZNdsoAADg+vXhS4MEwsItcRnKFgbmBtjKPbQy +jHnndAy8Wwe73NXTy7oBSbd5CZogvblLjfndSUIhCXVv7+PFHjfG78LEL7Dp1i1A +96SV2YuVYHr6FIS6C5FA0FGtGpXUC265jUN+sI88ONXMc/7zQ+4VWggB+Kq3B7uR +4DjTtxAC0X58AdWJm3yH6nTJLcsfOVQs1u7Mg97aUpRo9osw7ZMcEEchOw85L0Rl +K4vlZDnB2xD+8S8RRQi+Y/4GKPAihwIDAQABo1MwUTAdBgNVHQ4EFgQUGY2IxTxn +3tKlRsUnko97fF4X8WYwHwYDVR0jBBgwFoAUGY2IxTxn3tKlRsUnko97fF4X8WYw +DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAHpvWRFV+Cen2VQdX +OO1YYufqcGPmTxo0XmksOUQsd+vwR0HSjCK1oC0PlloviFvCXn9p4ZvwVK1NdK19 +RUkiXPAIw5QmXeNQgCJRv7jhObIuqfKVGfZuH8PdPMcPzU+PxgaZ/gnaW62AAFJA +frZXwMhr/Ar+CvH4NSZfOxBF4LOWM0eVYfxUOFq+qvYptdSxXyK1kuBxFXOVNEIw +CWp75uzvZ1gNaUrdfCNoYMV2qeyuL8gNfRhExrDfjn4ouOZq0Lh8FHCnGdoZo15D +ZHkLUqdKnOXtHKdnLcxStF6orKF7I3f3fp5kyRM19uZzLV07zdawVJSDbaO2cJz1 +Qff5gg== +-----END CERTIFICATE----- diff --git a/testdata/server.key b/testdata/server.key new file mode 100644 index 0000000..28d543c --- /dev/null +++ b/testdata/server.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAufMjhIyoWzqqRKbglkfCWqSbU1N6SPaSjBmVQCfFGzUZx3DH +WvemJ8Ip2Nb9Gx9fI/GiRHs1K6noWZjJzsFZNdsoAADg+vXhS4MEwsItcRnKFgbm +BtjKPbQyjHnndAy8Wwe73NXTy7oBSbd5CZogvblLjfndSUIhCXVv7+PFHjfG78LE +L7Dp1i1A96SV2YuVYHr6FIS6C5FA0FGtGpXUC265jUN+sI88ONXMc/7zQ+4VWggB ++Kq3B7uR4DjTtxAC0X58AdWJm3yH6nTJLcsfOVQs1u7Mg97aUpRo9osw7ZMcEEch +Ow85L0RlK4vlZDnB2xD+8S8RRQi+Y/4GKPAihwIDAQABAoIBAFUuooin/r+8Ah/s +6lktikUHvvfO9+fQvJVdate24D50dti8Ozba18zCz3S8F7qDBoxqttD0OAlGjl/s +9BW40Osw+AP4YxfT5182J8ooNbToAYFRq7JbQVo+4CEl/vdUljyFMHQbqChdjxV7 +7QCPEIyZA5mIauySVJwGpj6YcsZCMemwEVjD3tFEeXF6qu0hD9R4SVDiJEosxta7 +evzfENVUgfUMceJQzRUQ/3HfbgBJybPfNQ8fH2ORF2eOgjcmlmh+EwXGNcuCCpXd +EOndM30YYBRJBCAuvCdPrg9/2H2KL9jsIuLG1PyO9/IiLGMh5nTP/2TYjKOxNcYh +1g8S4+ECgYEA8lW1Q9msH1wWM+eZwHDg1rZ3BLMCguRHh2CN/q1KAOgdRsEBsP3A +u45MjwwBWzy56H6w69RC4AXQcMdGjZT20095RpYD8Rv1hmaB/1n9mL6i8V2p+X5D +pk1EgDDTWy8VbT3jn/Qt8hxINGvKYbZaRQgAtAXPx71mNGgxYLm3+18CgYEAxG93 ++on9SmhTXZNke+BRrgX0FTpkGtdCwfNnGMGyX7qlNR3kUls17JYE7ejLED7NWxOT +uSBCib8Lhh7qhCl+N9rvxpZkeESoZazhoBIigv9osOLOxZtrct1sg7MHDTWgIzT+ ++OvLzveHPuliVKZVJdM69dz2QUplC9buf0JtUdkCgYAHaA0xNK7pCnR3Q6XUVt7Y +UR1UHHCANZ/mCFJurTcszetPJUj68tZ4JQI8AP7tne6Ep5KaspMUq7jSKZUDcMEW +dkBbouwd61/Wqr1gY4y3pWPvgpBWWsCQjZ4BWPystcSu4Qxa8CiTVL/0MjMuR1d8 +8qCq396Y2TYNdf3EWgjAewKBgHUNJyU3zKLl/6cnCR130bQtAAEWRkhoNPN1ot1x +rmS0x3UbVs5sY3mS+2T47ufDRIMc603JF10VZjyJd51BTGDkKTTgsQWpg97yYZAM +vlvo7e1ZeXTu49wSbXMc3vrUFZRlI/oYJ94wSXsHfvyKEPr1H5EaFfNZ7VRcwsk6 +QAIhAoGBAOX5gnUHiQe9n8Vg5AZS6JoNf11+ZHemDNczQAZ4o7L0nHvorqeKBZvG +U7jo1XDTyrdbs8BPibVdArSpD23auLBL8WZ2wORvFLWXl5po/GSGqoOVS9KoBGMs +6E2UkiVR00zYSXTXwsiin3DrLuZcjG7InV45I+ws5DerYFGoNer/ +-----END RSA PRIVATE KEY-----