commit 7a2599e0ca64b51048756105b56493dd70c6f50a Author: fangdingjun Date: Sat Jun 30 19:01:46 2018 +0800 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b25c15b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*~ diff --git a/_nghttp2.h b/_nghttp2.h new file mode 100644 index 0000000..d016412 --- /dev/null +++ b/_nghttp2.h @@ -0,0 +1,37 @@ +#ifndef _NGHTTP2_H +#define _NGHTTP2_H +#include +#include +#include +#include + +extern ssize_t DataRead(void *, void *data, size_t); +extern ssize_t DataWrite(void *, void *data, size_t); +extern ssize_t DataSourceRead(void *, void *, size_t); +extern int OnDataRecv(void *, int, void *, size_t); +extern int OnBeginHeaderCallback(void *, int); +extern int OnHeaderCallback(void *, int, void *, int, void *, int); +extern int OnFrameRecvCallback(void *, int); + +struct nv_array +{ + nghttp2_nv *nv; + size_t len; +}; + +void delete_nv_array(struct nv_array *a); +nghttp2_data_provider *new_data_provider(void *data); + +int nv_array_set(struct nv_array *a, int index, + char *name, char *value, + size_t namelen, size_t valuelen, int flag); + +struct nv_array *new_nv_array(size_t n); + +int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen); + +int send_client_connection_header(nghttp2_session *session); + +void init_nghttp2_session(nghttp2_session *session, void *data); + +#endif \ No newline at end of file diff --git a/http2.go b/http2.go new file mode 100644 index 0000000..05f8af7 --- /dev/null +++ b/http2.go @@ -0,0 +1,237 @@ +package nghttp2 + +/* +#cgo pkg-config: libnghttp2 +#include "_nghttp2.h" +*/ +import "C" +import ( + "fmt" + "io" + "net" + "net/http" + "strings" + "unsafe" +) + +// Conn http2 connection +type Conn struct { + session *C.nghttp2_session + conn net.Conn + streams map[int]*Stream +} + +// Stream http2 stream +type Stream struct { + streamID int + cdp *C.nghttp2_data_provider + dp *dataProvider + // application read data from stream + r io.Reader + // recv stream data from session + w io.Writer +} + +type dataProvider struct { + // drain the data + r io.Reader + // provider the data + w io.Writer +} + +// NewConn create http2 connection +func NewConn(c net.Conn) *Conn { + conn := &Conn{conn: c, streams: make(map[int]*Stream)} + C.init_nghttp2_session(conn.session, unsafe.Pointer(conn)) + C.send_client_connection_header(conn.session) + return conn +} + +func (c *Conn) onDataRecv(buf []byte, streamID int) { + stream := c.streams[streamID] + stream.onDataRecv(buf) +} + +func (c *Conn) onBeginHeader(streamID int) { + stream := c.streams[streamID] + stream.onBeginHeader() +} + +func (c *Conn) onHeader(streamID int, name, value string) { + stream := c.streams[streamID] + stream.onHeader(name, value) + +} + +func (c *Conn) onFrameRecv(streamID int) { + stream := c.streams[streamID] + stream.onFrameRecv() +} + +// NewRequest create a new http2 stream +func (c *Conn) NewRequest(req *http.Request) *http.Response { + nvIndex := 0 + nvMax := 25 + nva := C.new_nv_array(C.size_t(nvMax)) + setNvArray(nva, nvIndex, ":method", req.Method, 0) + nvIndex += 1 + setNvArray(nva, nvIndex, ":scheme", "https", 0) + nvIndex += 1 + setNvArray(nva, nvIndex, ":authority", req.Host, 0) + nvIndex += 1 + + p := req.URL.Path + q := req.URL.Query().Encode() + if q != "" { + p = p + "?" + q + } + setNvArray(nva, nvIndex, ":path", p, 0) + nvIndex += 1 + for k, v := range req.Header { + if strings.ToLower(k) == "host" { + continue + } + setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) + nvIndex += 1 + } + var dp *dataProvider + var cdp *C.nghttp2_data_provider + if req.Body != nil { + dp, cdp = newDataProvider(req.Body, nil) + } + streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex+1)) + C.delete_nv_array(nva) + if int(streamID) < 0 { + return nil + } + r, w := io.Pipe() + s := &Stream{streamID: int(streamID), dp: dp, cdp: cdp, r: r, w: w} + c.streams[int(streamID)] = s + + return nil +} + +func setNvArray(a *C.struct_nv_array, index int, name, value string, flags int) { + cname := C.CString(name) + cvalue := C.CString(value) + cnamelen := C.size_t(len(name)) + cvaluelen := C.size_t(len(value)) + cflags := C.int(flags) + defer C.free(unsafe.Pointer(cname)) + defer C.free(unsafe.Pointer(cvalue)) + C.nv_array_set(a, C.int(index), cname, + cvalue, cnamelen, cvaluelen, cflags) +} + +func (dp *dataProvider) Read(buf []byte) (n int, err error) { + return dp.r.Read(buf) +} + +func (dp *dataProvider) Write(buf []byte) (n int, err error) { + if dp.w == nil { + return 0, fmt.Errorf("write not supported") + } + return dp.w.Write(buf) +} + +func newDataProvider(r io.Reader, w io.Writer) (*dataProvider, *C.nghttp2_data_provider) { + dp := &dataProvider{r, w} + cdp := C.new_data_provider(unsafe.Pointer(dp)) + return dp, cdp +} + +func (s *Stream) Read(buf []byte) (n int, err error) { + return s.r.Read(buf) +} + +func (s *Stream) Write(buf []byte) (n int, err error) { + return s.dp.Write(buf) +} + +func (s *Stream) onDataRecv(buf []byte) { + s.w.Write(buf) +} + +func (s *Stream) onBeginHeader() { + +} + +func (s *Stream) onHeader(name, value string) { + +} + +func (s *Stream) onFrameRecv() { + +} + +//export DataSourceRead +func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t { + dp := (*dataProvider)(ptr) + gobuf := make([]byte, int(length)) + n, err := dp.Read(gobuf) + if err != nil { + if err == io.EOF { + return 0 + } + return -1 + } + cbuf := C.CBytes(gobuf) + defer C.free(cbuf) + C.memcpy(buf, cbuf, C.size_t(n)) + return C.ssize_t(n) +} + +//export OnDataRecv +func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int { + conn := (*Conn)(ptr) + gobuf := C.GoBytes(buf, C.int(length)) + conn.onDataRecv(gobuf, int(streamID)) + return 0 +} + +//export DataRead +func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + conn := (*Conn)(ptr) + buf := make([]byte, int(size)) + n, err := conn.conn.Read(buf) + if err != nil { + return -1 + } + return C.ssize_t(n) +} + +//export DataWrite +func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + conn := (*Conn)(ptr) + buf := C.GoBytes(data, C.int(size)) + n, err := conn.conn.Write(buf) + if err != nil { + return -1 + } + return C.ssize_t(n) +} + +//export OnBeginHeaderCallback +func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*Conn)(ptr) + conn.onBeginHeader(int(streamID)) + return 0 +} + +//export OnHeaderCallback +func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, + name unsafe.Pointer, namelen C.int, + value unsafe.Pointer, valuelen C.int) C.int { + conn := (*Conn)(ptr) + goname := C.GoBytes(name, namelen) + govalue := C.GoBytes(value, valuelen) + conn.onHeader(int(streamID), string(goname), string(govalue)) + return 0 +} + +//export OnFrameRecvCallback +func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { + conn := (*Conn)(ptr) + conn.onFrameRecv(int(streamID)) + return 0 +} diff --git a/nghttp2.c b/nghttp2.c new file mode 100644 index 0000000..de1a1cc --- /dev/null +++ b/nghttp2.c @@ -0,0 +1,209 @@ +#include "_nghttp2.h" + +#define ARRLEN(x) (sizeof(x) / sizeof(x[0])) + +// send_callback send data to network +static ssize_t send_callback(nghttp2_session *session, const uint8_t *data, + size_t length, int flags, void *user_data) +{ + return DataWrite(user_data, (void *)data, length); +} + +// recv_callback read data from network +static ssize_t recv_callback(nghttp2_session *session, uint8_t *buf, + size_t length, int flags, void *user_data) +{ + return DataRead(user_data, (void *)buf, length); +} + +static int on_header_callback(nghttp2_session *session, + const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data) +{ + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) + { + /* Print response headers for the initiated request. */ + //print_header(stderr, name, namelen, value, valuelen); + OnHeaderCallback(user_data, frame->hd.stream_id, + (void *)name, namelen, (void *)value, valuelen); + break; + } + } + return 0; +} + +static int on_begin_headers_callback(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + int stream_id = frame->hd.stream_id; + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) + { + fprintf(stderr, "Response headers for stream ID=%d:\n", + frame->hd.stream_id); + } + OnBeginHeaderCallback(user_data, stream_id); + break; + } + return 0; +} + +static int on_frame_recv_callback(nghttp2_session *session, + const nghttp2_frame *frame, void *user_data) +{ + + switch (frame->hd.type) + { + case NGHTTP2_HEADERS: + if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) + { + fprintf(stderr, "All headers received\n"); + } + OnFrameRecvCallback(user_data, frame->hd.stream_id); + break; + } + return 0; +} + +static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, + int32_t stream_id, const uint8_t *data, + size_t len, void *user_data) +{ + return OnDataRecv(user_data, stream_id, (void *)data, len); +} + +static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, + uint32_t error_code, void *user_data) +{ + return 0; +} + +ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id, + uint8_t *buf, size_t length, uint32_t *data_flags, + nghttp2_data_source *source, void *user_data) +{ + int ret = DataSourceRead(source, buf, length); + if (ret == 0) + { + *data_flags = NGHTTP2_DATA_FLAG_EOF; + } + return ret; +} + +void init_nghttp2_session(nghttp2_session *session, void *data) +{ + nghttp2_session_callbacks *callbacks; + + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); + nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback); + + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + on_frame_recv_callback); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback( + callbacks, on_data_chunk_recv_callback); + + nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, on_stream_close_callback); + + nghttp2_session_callbacks_set_on_header_callback(callbacks, + on_header_callback); + + nghttp2_session_callbacks_set_on_begin_headers_callback( + callbacks, on_begin_headers_callback); + + nghttp2_session_client_new(&session, callbacks, data); + + nghttp2_session_callbacks_del(callbacks); +} + +int send_client_connection_header(nghttp2_session *session) +{ + nghttp2_settings_entry iv[1] = { + {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}}; + int rv; + + /* client 24 bytes magic string will be sent by nghttp2 library */ + rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv, + ARRLEN(iv)); + /* + if (rv != 0) + { + errx(1, "Could not submit SETTINGS: %s", nghttp2_strerror(rv)); + } + */ + return rv; +} + +int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen) +{ + int32_t stream_id; + /* + nghttp2_nv hdrs[] = { + MAKE_NV2(":method", "GET"), + MAKE_NV(":scheme", &uri[u->field_data[UF_SCHEMA].off], + u->field_data[UF_SCHEMA].len), + MAKE_NV(":authority", stream_data->authority, stream_data->authoritylen), + MAKE_NV(":path", stream_data->path, stream_data->pathlen)}; + fprintf(stderr, "Request headers:\n"); + print_headers(stderr, hdrs, ARRLEN(hdrs)); + */ + stream_id = nghttp2_submit_request(session, NULL, hdrs, + hdrlen, NULL, NULL); + /* + if (stream_id < 0) + { + errx(1, "Could not submit HTTP request: %s", nghttp2_strerror(stream_id)); + } + */ + + return stream_id; +} + +struct nv_array *new_nv_array(size_t n) +{ + struct nv_array *a = malloc(sizeof(struct nv_array)); + nghttp2_nv *nv = (nghttp2_nv *)malloc(n * sizeof(nghttp2_nv)); + a->nv = nv; + a->len = n; + return a; +} + +int nv_array_set(struct nv_array *a, int index, + char *name, char *value, + size_t namelen, size_t valuelen, int flag) +{ + if (index > (a->len - 1)) + { + return -1; + } + nghttp2_nv nv = (a->nv)[index]; + nv.name = name; + nv.value = value; + nv.namelen = namelen; + nv.valuelen = valuelen; + nv.flags = flag; + return 0; +} + +void delete_nv_array(struct nv_array *a) +{ + free(a->nv); + free(a); +} + +nghttp2_data_provider *new_data_provider(void *data) +{ + nghttp2_data_provider *dp = malloc(sizeof(nghttp2_data_provider)); + dp->source.ptr = data; + dp->read_callback = data_source_read_callback; +} \ No newline at end of file