diff --git a/callbacks.go b/callbacks.go index ed11fbe..16f4807 100644 --- a/callbacks.go +++ b/callbacks.go @@ -1,333 +1,333 @@ -package nghttp2 - -/* -#include "_nghttp2.h" -*/ -import "C" -import ( - "bytes" - "crypto/tls" - "errors" - "io" - "net/http" - "net/url" - "runtime" - "strconv" - "strings" - "sync" - "unsafe" -) - -var ( - errAgain = errors.New("again") -) - -const ( - NGHTTP2_NO_ERROR = 0 - NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE = -521 - NGHTTP2_ERR_CALLBACK_FAILURE = -902 - NGHTTP2_ERR_DEFERRED = -508 -) - -/* -var bufPool = &sync.Pool{ - New: func() interface{} { - return make([]byte, 16*1024) - }, -} -*/ - -// onDataSourceReadCallback callback function for libnghttp2 library -// want read data from data provider source, -// return NGHTTP2_ERR_DEFERRED will cause data frame defered, -// application later call nghttp2_session_resume_data will re-quene the data frame -// -//export onDataSourceReadCallback -func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, - buf unsafe.Pointer, length C.size_t) C.ssize_t { - //log.Println("onDataSourceReadCallback begin") - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - //log.Println("client dp callback, stream not exists") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - gobuf := make([]byte, int(length)) - /* - _length := int(length) - gobuf := bufPool.Get().([]byte) - if len(gobuf) < _length { - gobuf = make([]byte, _length) - } - defer bufPool.Put(gobuf) - */ - - n, err := s.dp.Read(gobuf[0:]) - if err != nil { - if err == io.EOF { - //log.Println("onDataSourceReadCallback end") - return 0 - } - if err == errAgain { - //log.Println("onDataSourceReadCallback end") - //s.dp.deferred = true - return NGHTTP2_ERR_DEFERRED - } - //log.Println("onDataSourceReadCallback end") - return NGHTTP2_ERR_CALLBACK_FAILURE - } - //cbuf := C.CBytes(gobuf) - //defer C.free(cbuf) - //C.memcpy(buf, cbuf, C.size_t(n)) - C.memcpy(buf, unsafe.Pointer(&gobuf[0]), C.size_t(n)) - //log.Println("onDataSourceReadCallback end") - return C.ssize_t(n) -} - -// onDataChunkRecv callback function for libnghttp2 library data chunk received. -// -//export onDataChunkRecv -func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int, - buf unsafe.Pointer, length C.size_t) C.int { - //log.Println("onDataChunkRecv begin") - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - gobuf := C.GoBytes(buf, C.int(length)) - - s, ok := conn.streams[int(streamID)] - if !ok { - //log.Println("onDataChunkRecv end") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - if s.bp == nil { - //log.Println("empty body") - //log.Println("onDataChunkRecv end") - return C.int(length) - } - //log.Println("bp write") - n, err := s.bp.Write(gobuf) - if err != nil { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - //log.Println("onDataChunkRecv end") - return C.int(n) -} - -// onDataSendCallback callback function for libnghttp2 library want send data to network. -// -//export onDataSendCallback -func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { - //log.Println("onDataSendCallback begin") - //log.Println("data write req ", int(size)) - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - buf := C.GoBytes(data, C.int(size)) - //log.Println(conn.conn.RemoteAddr()) - n, err := conn.conn.Write(buf) - if err != nil { - //log.Println("onDataSendCallback end") - return NGHTTP2_ERR_CALLBACK_FAILURE - } - //log.Printf("write %d bytes to network ", n) - //log.Println("onDataSendCallback end") - return C.ssize_t(n) -} - -// onBeginHeaderCallback callback function for begin header receive. -// -//export onBeginHeaderCallback -func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("onBeginHeaderCallback begin") - //log.Printf("stream %d begin headers", int(streamID)) - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - - var TLS tls.ConnectionState - if tlsconn, ok := conn.conn.(*tls.Conn); ok { - TLS = tlsconn.ConnectionState() - } - // client - if !conn.isServer { - s, ok := conn.streams[int(streamID)] - if !ok { - //log.Println("onBeginHeaderCallback end") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - s.response = &http.Response{ - Proto: "HTTP/2", - ProtoMajor: 2, - ProtoMinor: 0, - Header: make(http.Header), - Body: s.bp, - TLS: &TLS, - } - return NGHTTP2_NO_ERROR - } - - // server - s := &stream{ - streamID: int(streamID), - conn: conn, - bp: &bodyProvider{ - buf: new(bytes.Buffer), - lock: new(sync.Mutex), - }, - request: &http.Request{ - Header: make(http.Header), - Proto: "HTTP/2", - ProtoMajor: 2, - ProtoMinor: 0, - TLS: &TLS, - }, - } - s.request.Body = s.bp - //log.Printf("new stream %d", int(streamID)) - conn.streams[int(streamID)] = s - - runtime.SetFinalizer(s, (*stream).free) - - //log.Println("onBeginHeaderCallback end") - return NGHTTP2_NO_ERROR -} - -// onHeaderCallback callback function for each header received. -// -//export onHeaderCallback -func onHeaderCallback(ptr unsafe.Pointer, streamID C.int, - name unsafe.Pointer, namelen C.int, - value unsafe.Pointer, valuelen C.int) C.int { - //log.Println("onHeaderCallback begin") - //log.Printf("header %d", int(streamID)) - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - goname := string(C.GoBytes(name, namelen)) - govalue := string(C.GoBytes(value, valuelen)) - - s, ok := conn.streams[int(streamID)] - if !ok { - //log.Println("onHeaderCallback end") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - var header http.Header - if conn.isServer { - header = s.request.Header - } else { - header = s.response.Header - } - goname = strings.ToLower(goname) - switch goname { - case ":method": - s.request.Method = govalue - case ":scheme": - case ":authority": - s.request.Host = govalue - case ":path": - s.request.RequestURI = govalue - u, err := url.Parse(govalue) - if err != nil { - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - s.request.URL = u - case ":status": - if s.response == nil { - //log.Println("empty response") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - statusCode, _ := strconv.Atoi(govalue) - s.response.StatusCode = statusCode - s.response.Status = http.StatusText(statusCode) - case "content-length": - header.Add(goname, govalue) - n, err := strconv.ParseInt(govalue, 10, 64) - if err == nil { - if conn.isServer { - s.request.ContentLength = n - } else { - s.response.ContentLength = n - } - } - case "transfer-encoding": - header.Add(goname, govalue) - if conn.isServer { - s.request.TransferEncoding = append(s.response.TransferEncoding, govalue) - } else { - s.response.TransferEncoding = append(s.response.TransferEncoding, govalue) - } - default: - header.Add(goname, govalue) - } - //log.Println("onHeaderCallback end") - return NGHTTP2_NO_ERROR -} - -// onHeadersDoneCallback callback function for the stream when all headers received. -// -//export onHeadersDoneCallback -func onHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("onHeadersDoneCallback begin") - //log.Printf("stream %d headers done", int(streamID)) - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - s, ok := conn.streams[int(streamID)] - if !ok { - //log.Println("onHeadersDoneCallback end") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE - } - s.headersEnd = true - if conn.isServer { - if s.request.Method == "CONNECT" { - go conn.serve(s) - } - return NGHTTP2_NO_ERROR - } - select { - case s.resch <- s.response: - default: - } - //log.Println("onHeadersDoneCallback end") - return NGHTTP2_NO_ERROR -} - -// onStreamClose callback function for the stream when closed. -// -//export onStreamClose -func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { - //log.Println("onStreamClose begin") - //log.Printf("stream %d closed", int(streamID)) - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - - stream, ok := conn.streams[int(streamID)] - if ok { - go stream.Close() - //log.Printf("remove stream %d", int(streamID)) - //conn.lock.Lock() - delete(conn.streams, int(streamID)) - //go stream.Close() - //conn.lock.Unlock() - //log.Println("onStreamClose end") - return NGHTTP2_NO_ERROR - } - //log.Println("onStreamClose end") - return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE -} - -//export onConnectionCloseCallback -func onConnectionCloseCallback(ptr unsafe.Pointer) { - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - conn.err = io.EOF - conn.Close() -} - -//export onStreamEndCallback -func onStreamEndCallback(ptr unsafe.Pointer, streamID C.int) { - conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) - stream, ok := conn.streams[int(streamID)] - if !ok { - return - } - stream.streamEnd = true - - stream.bp.Close() - - if stream.conn.isServer { - if stream.request.Method != "CONNECT" { - go conn.serve(stream) - } - return - } -} +package nghttp2 + +/* +#include "_nghttp2.h" +*/ +import "C" +import ( + "bytes" + "crypto/tls" + "errors" + "io" + "net/http" + "net/url" + "runtime" + "strconv" + "strings" + "sync" + "unsafe" +) + +var ( + errAgain = errors.New("again") +) + +const ( + NGHTTP2_NO_ERROR = 0 + NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE = -521 + NGHTTP2_ERR_CALLBACK_FAILURE = -902 + NGHTTP2_ERR_DEFERRED = -508 +) + +/* +var bufPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 16*1024) + }, +} +*/ + +// onDataSourceReadCallback callback function for libnghttp2 library +// want read data from data provider source, +// return NGHTTP2_ERR_DEFERRED will cause data frame defered, +// application later call nghttp2_session_resume_data will re-quene the data frame +// +//export onDataSourceReadCallback +func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, + buf unsafe.Pointer, length C.size_t) C.ssize_t { + //log.Println("onDataSourceReadCallback begin") + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + s, ok := conn.streams[int(streamID)] + if !ok { + //log.Println("client dp callback, stream not exists") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + gobuf := make([]byte, int(length)) + /* + _length := int(length) + gobuf := bufPool.Get().([]byte) + if len(gobuf) < _length { + gobuf = make([]byte, _length) + } + defer bufPool.Put(gobuf) + */ + + n, err := s.dp.Read(gobuf[0:]) + if err != nil { + if err == io.EOF { + //log.Println("onDataSourceReadCallback end") + return 0 + } + if err == errAgain { + //log.Println("onDataSourceReadCallback end") + //s.dp.deferred = true + return NGHTTP2_ERR_DEFERRED + } + //log.Println("onDataSourceReadCallback end") + return NGHTTP2_ERR_CALLBACK_FAILURE + } + //cbuf := C.CBytes(gobuf) + //defer C.free(cbuf) + //C.memcpy(buf, cbuf, C.size_t(n)) + C.memcpy(buf, unsafe.Pointer(&gobuf[0]), C.size_t(n)) + //log.Println("onDataSourceReadCallback end") + return C.ssize_t(n) +} + +// onDataChunkRecv callback function for libnghttp2 library data chunk received. +// +//export onDataChunkRecv +func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int, + buf unsafe.Pointer, length C.size_t) C.int { + //log.Println("onDataChunkRecv begin") + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + gobuf := C.GoBytes(buf, C.int(length)) + + s, ok := conn.streams[int(streamID)] + if !ok { + //log.Println("onDataChunkRecv end") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + if s.bp == nil { + //log.Println("empty body") + //log.Println("onDataChunkRecv end") + return C.int(length) + } + //log.Println("bp write") + n, err := s.bp.Write(gobuf) + if err != nil { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + //log.Println("onDataChunkRecv end") + return C.int(n) +} + +// onDataSendCallback callback function for libnghttp2 library want send data to network. +// +//export onDataSendCallback +func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { + //log.Println("onDataSendCallback begin") + //log.Println("data write req ", int(size)) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + buf := C.GoBytes(data, C.int(size)) + //log.Println(conn.conn.RemoteAddr()) + n, err := conn.conn.Write(buf) + if err != nil { + //log.Println("onDataSendCallback end") + return NGHTTP2_ERR_CALLBACK_FAILURE + } + //log.Printf("write %d bytes to network ", n) + //log.Println("onDataSendCallback end") + return C.ssize_t(n) +} + +// onBeginHeaderCallback callback function for begin header receive. +// +//export onBeginHeaderCallback +func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("onBeginHeaderCallback begin") + //log.Printf("stream %d begin headers", int(streamID)) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + + var TLS tls.ConnectionState + if tlsconn, ok := conn.conn.(*tls.Conn); ok { + TLS = tlsconn.ConnectionState() + } + // client + if !conn.isServer { + s, ok := conn.streams[int(streamID)] + if !ok { + //log.Println("onBeginHeaderCallback end") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + s.response = &http.Response{ + Proto: "HTTP/2", + ProtoMajor: 2, + ProtoMinor: 0, + Header: make(http.Header), + Body: s.bp, + TLS: &TLS, + } + return NGHTTP2_NO_ERROR + } + + // server + s := &stream{ + streamID: int(streamID), + conn: conn, + bp: &bodyProvider{ + buf: new(bytes.Buffer), + lock: new(sync.Mutex), + }, + request: &http.Request{ + Header: make(http.Header), + Proto: "HTTP/2", + ProtoMajor: 2, + ProtoMinor: 0, + TLS: &TLS, + }, + } + s.request.Body = s.bp + //log.Printf("new stream %d", int(streamID)) + conn.streams[int(streamID)] = s + + runtime.SetFinalizer(s, (*stream).free) + + //log.Println("onBeginHeaderCallback end") + return NGHTTP2_NO_ERROR +} + +// onHeaderCallback callback function for each header received. +// +//export onHeaderCallback +func onHeaderCallback(ptr unsafe.Pointer, streamID C.int, + name unsafe.Pointer, namelen C.int, + value unsafe.Pointer, valuelen C.int) C.int { + //log.Println("onHeaderCallback begin") + //log.Printf("header %d", int(streamID)) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + goname := string(C.GoBytes(name, namelen)) + govalue := string(C.GoBytes(value, valuelen)) + + s, ok := conn.streams[int(streamID)] + if !ok { + //log.Println("onHeaderCallback end") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + var header http.Header + if conn.isServer { + header = s.request.Header + } else { + header = s.response.Header + } + goname = strings.ToLower(goname) + switch goname { + case ":method": + s.request.Method = govalue + case ":scheme": + case ":authority": + s.request.Host = govalue + case ":path": + s.request.RequestURI = govalue + u, err := url.Parse(govalue) + if err != nil { + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + s.request.URL = u + case ":status": + if s.response == nil { + //log.Println("empty response") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + statusCode, _ := strconv.Atoi(govalue) + s.response.StatusCode = statusCode + s.response.Status = http.StatusText(statusCode) + case "content-length": + header.Add(goname, govalue) + n, err := strconv.ParseInt(govalue, 10, 64) + if err == nil { + if conn.isServer { + s.request.ContentLength = n + } else { + s.response.ContentLength = n + } + } + case "transfer-encoding": + header.Add(goname, govalue) + if conn.isServer { + s.request.TransferEncoding = append(s.response.TransferEncoding, govalue) + } else { + s.response.TransferEncoding = append(s.response.TransferEncoding, govalue) + } + default: + header.Add(goname, govalue) + } + //log.Println("onHeaderCallback end") + return NGHTTP2_NO_ERROR +} + +// onHeadersDoneCallback callback function for the stream when all headers received. +// +//export onHeadersDoneCallback +func onHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("onHeadersDoneCallback begin") + //log.Printf("stream %d headers done", int(streamID)) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + s, ok := conn.streams[int(streamID)] + if !ok { + //log.Println("onHeadersDoneCallback end") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE + } + s.headersEnd = true + if conn.isServer { + if s.request.Method == "CONNECT" { + go conn.serve(s) + } + return NGHTTP2_NO_ERROR + } + select { + case s.resch <- s.response: + default: + } + //log.Println("onHeadersDoneCallback end") + return NGHTTP2_NO_ERROR +} + +// onStreamClose callback function for the stream when closed. +// +//export onStreamClose +func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { + //log.Println("onStreamClose begin") + //log.Printf("stream %d closed", int(streamID)) + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + + stream, ok := conn.streams[int(streamID)] + if ok { + go stream.Close() + //log.Printf("remove stream %d", int(streamID)) + //conn.lock.Lock() + delete(conn.streams, int(streamID)) + //go stream.Close() + //conn.lock.Unlock() + //log.Println("onStreamClose end") + return NGHTTP2_NO_ERROR + } + //log.Println("onStreamClose end") + return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE +} + +//export onConnectionCloseCallback +func onConnectionCloseCallback(ptr unsafe.Pointer) { + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + conn.err = io.EOF + conn.Close() +} + +//export onStreamEndCallback +func onStreamEndCallback(ptr unsafe.Pointer, streamID C.int) { + conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) + stream, ok := conn.streams[int(streamID)] + if !ok { + return + } + stream.streamEnd = true + + stream.bp.Close() + + if stream.conn.isServer { + if stream.request.Method != "CONNECT" { + go conn.serve(stream) + } + return + } +} diff --git a/data_provider.go b/data_provider.go index ca7a986..b139e99 100644 --- a/data_provider.go +++ b/data_provider.go @@ -1,183 +1,183 @@ -package nghttp2 - -/* -#include "_nghttp2.h" -*/ -import "C" -import ( - "bytes" - "errors" - "io" - "log" - "sync" - "time" - "unsafe" -) - -// dataProvider provider data for libnghttp2 library -// libnghttp2 callback will Read to read the data, -// application call Write to provider data, -// application call Close will cause Read return io.EOF -type dataProvider struct { - buf *bytes.Buffer - closed bool - lock *sync.Mutex - sessLock *sync.Mutex - session *C.nghttp2_session - streamID int - deferred bool -} - -// Read read from data provider -func (dp *dataProvider) Read(buf []byte) (n int, err error) { - if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { - log.Println("dp read invalid state") - return 0, errors.New("invalid state") - } - dp.lock.Lock() - defer dp.lock.Unlock() - - n, err = dp.buf.Read(buf) - if err != nil && !dp.closed { - //log.Println("deferred") - dp.deferred = true - return 0, errAgain - } - return -} - -// Write provider data for data provider -func (dp *dataProvider) Write(buf []byte) (n int, err error) { - if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { - log.Println("dp write invalid state") - return 0, errors.New("invalid state") - } - - // make sure the buffer not too large - delay := 10 * time.Millisecond - maxBufSize := 1 * 1024 * 1024 - for { - dp.lock.Lock() - _len := dp.buf.Len() - closed := dp.closed - dp.lock.Unlock() - if closed { - return 0, io.EOF - } - if _len < maxBufSize { - break - } - time.Sleep(delay) - } - - dp.lock.Lock() - defer dp.lock.Unlock() - - //if dp.closed { - // return 0, io.EOF - //} - - n, err = dp.buf.Write(buf) - if dp.deferred { - dp.sessLock.Lock() - C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) - dp.sessLock.Unlock() - - //log.Println("resume") - dp.deferred = false - } - return -} - -// Close end to provide data -func (dp *dataProvider) Close() error { - if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { - log.Println("dp close, invalid state") - return errors.New("invalid state") - } - dp.lock.Lock() - defer dp.lock.Unlock() - - if dp.closed { - return nil - } - dp.closed = true - //log.Printf("dp close stream %d", dp.streamID) - if dp.deferred { - dp.sessLock.Lock() - C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) - dp.sessLock.Unlock() - - dp.deferred = false - } - return nil -} - -func newDataProvider(cdp unsafe.Pointer, sessionLock *sync.Mutex, t int) *dataProvider { - dp := &dataProvider{ - buf: new(bytes.Buffer), - lock: new(sync.Mutex), - sessLock: sessionLock, - } - C.data_provider_set_callback(C.size_t(uintptr(cdp)), - C.size_t(uintptr(unsafe.Pointer(dp))), C.int(t)) - return dp -} - -// bodyProvider provide data for http body -// Read will block when data not yet avaliable -type bodyProvider struct { - buf *bytes.Buffer - closed bool - lock *sync.Mutex -} - -// Read read data from provider -// will block when data not yet avaliable -func (bp *bodyProvider) Read(buf []byte) (int, error) { - var delay = 100 * time.Millisecond - - for { - bp.lock.Lock() - n, err := bp.buf.Read(buf) - bp.lock.Unlock() - if err != nil && !bp.closed { - time.Sleep(delay) - continue - } - return n, err - } -} - -// Write provide data for dataProvider -// libnghttp2 data chunk recv callback will call this -func (bp *bodyProvider) Write(buf []byte) (int, error) { - bp.lock.Lock() - defer bp.lock.Unlock() - - return bp.buf.Write(buf) -} - -// Close end to provide data -func (bp *bodyProvider) Close() error { - bp.lock.Lock() - defer bp.lock.Unlock() - - bp.closed = true - return nil -} - -func newNV(name, value string) C.nghttp2_nv { - nv := C.nghttp2_nv{} - nameArr := make([]byte, len(name)+1) - valueArr := make([]byte, len(value)+1) - copy(nameArr, []byte(name)) - copy(valueArr, []byte(value)) - - nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0])) - nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0])) - nv.namelen = C.size_t(len(name)) - nv.valuelen = C.size_t(len(value)) - nv.flags = 0 - return nv -} +package nghttp2 + +/* +#include "_nghttp2.h" +*/ +import "C" +import ( + "bytes" + "errors" + "io" + "log" + "sync" + "time" + "unsafe" +) + +// dataProvider provider data for libnghttp2 library +// libnghttp2 callback will Read to read the data, +// application call Write to provider data, +// application call Close will cause Read return io.EOF +type dataProvider struct { + buf *bytes.Buffer + closed bool + lock *sync.Mutex + sessLock *sync.Mutex + session *C.nghttp2_session + streamID int + deferred bool +} + +// Read read from data provider +func (dp *dataProvider) Read(buf []byte) (n int, err error) { + if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { + log.Println("dp read invalid state") + return 0, errors.New("invalid state") + } + dp.lock.Lock() + defer dp.lock.Unlock() + + n, err = dp.buf.Read(buf) + if err != nil && !dp.closed { + //log.Println("deferred") + dp.deferred = true + return 0, errAgain + } + return +} + +// Write provider data for data provider +func (dp *dataProvider) Write(buf []byte) (n int, err error) { + if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { + log.Println("dp write invalid state") + return 0, errors.New("invalid state") + } + + // make sure the buffer not too large + delay := 10 * time.Millisecond + maxBufSize := 1 * 1024 * 1024 + for { + dp.lock.Lock() + _len := dp.buf.Len() + closed := dp.closed + dp.lock.Unlock() + if closed { + return 0, io.EOF + } + if _len < maxBufSize { + break + } + time.Sleep(delay) + } + + dp.lock.Lock() + defer dp.lock.Unlock() + + //if dp.closed { + // return 0, io.EOF + //} + + n, err = dp.buf.Write(buf) + if dp.deferred { + dp.sessLock.Lock() + C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) + dp.sessLock.Unlock() + + //log.Println("resume") + dp.deferred = false + } + return +} + +// Close end to provide data +func (dp *dataProvider) Close() error { + if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { + log.Println("dp close, invalid state") + return errors.New("invalid state") + } + dp.lock.Lock() + defer dp.lock.Unlock() + + if dp.closed { + return nil + } + dp.closed = true + //log.Printf("dp close stream %d", dp.streamID) + if dp.deferred { + dp.sessLock.Lock() + C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) + dp.sessLock.Unlock() + + dp.deferred = false + } + return nil +} + +func newDataProvider(cdp unsafe.Pointer, sessionLock *sync.Mutex, t int) *dataProvider { + dp := &dataProvider{ + buf: new(bytes.Buffer), + lock: new(sync.Mutex), + sessLock: sessionLock, + } + C.data_provider_set_callback(C.size_t(uintptr(cdp)), + C.size_t(uintptr(unsafe.Pointer(dp))), C.int(t)) + return dp +} + +// bodyProvider provide data for http body +// Read will block when data not yet avaliable +type bodyProvider struct { + buf *bytes.Buffer + closed bool + lock *sync.Mutex +} + +// Read read data from provider +// will block when data not yet avaliable +func (bp *bodyProvider) Read(buf []byte) (int, error) { + var delay = 100 * time.Millisecond + + for { + bp.lock.Lock() + n, err := bp.buf.Read(buf) + bp.lock.Unlock() + if err != nil && !bp.closed { + time.Sleep(delay) + continue + } + return n, err + } +} + +// Write provide data for dataProvider +// libnghttp2 data chunk recv callback will call this +func (bp *bodyProvider) Write(buf []byte) (int, error) { + bp.lock.Lock() + defer bp.lock.Unlock() + + return bp.buf.Write(buf) +} + +// Close end to provide data +func (bp *bodyProvider) Close() error { + bp.lock.Lock() + defer bp.lock.Unlock() + + bp.closed = true + return nil +} + +func newNV(name, value string) C.nghttp2_nv { + nv := C.nghttp2_nv{} + nameArr := make([]byte, len(name)+1) + valueArr := make([]byte, len(value)+1) + copy(nameArr, []byte(name)) + copy(valueArr, []byte(value)) + + nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0])) + nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0])) + nv.namelen = C.size_t(len(name)) + nv.valuelen = C.size_t(len(value)) + nv.flags = 0 + return nv +} diff --git a/doc.go b/doc.go index f666a4f..d349a79 100644 --- a/doc.go +++ b/doc.go @@ -1,113 +1,113 @@ -/*Package nghttp2 is libnghttp2 binding for golang. - -server example - - cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key") - if err != nil { - log.Fatal(err) - } - - l, err := tls.Listen("tcp", "127.0.0.1:1100", &tls.Config{ - Certificates: []tls.Certificate{cert}, - NextProtos: []string{"h2"}, - }) - if err != nil { - log.Fatal(err) - } - defer l.Close() - addr := l.Addr().String() - - http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) { - log.Printf("%+v", r) - hdr := w.Header() - hdr.Set("content-type", "text/plain") - hdr.Set("aa", "bb") - d, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println(err) - return - } - w.Write(d) - }) - - for { - c, err := l.Accept() - if err != nil { - break - } - h2conn, err := Server(c, nil) - if err != nil { - log.Fatal(err) - } - log.Printf("%+v", h2conn) - go h2conn.Run() - } - -client example - - conn, err := tls.Dial("tcp", "nghttp2.org:443", &tls.Config{ - NextProtos: []string{"h2"}, - ServerName: "nghttp2.org", - }) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - if err := conn.Handshake(); err != nil{ - log.Fatal(err) - } - cstate := conn.ConnectionState() - if cstate.NegotiatedProtocol != "h2" { - log.Fatal("no http2 on server") - } - - h2conn, err := Client(conn) - if err != nil { - log.Fatal(err) - } - - param := url.Values{} - param.Add("e", "b") - param.Add("f", "d") - data := bytes.NewReader([]byte(param.Encode())) - req, _ := http.NewRequest("POST", - "https://nghttp2.org/httpbin/post?a=b&c=d", - data) - - log.Printf("%+v", req) - - req.Header.Set("user-agent", "go-nghttp2/1.0") - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - - res, err := h2conn.RoundTrip(req) - if err != nil { - log.Fatal(err) - } - - if res.StatusCode != http.StatusOK { - log.Printf("expect %d, got %d", http.StatusOK, res.StatusCode) - } - res.Write(os.Stderr) - - -co-work with net/http example - - l, err := net.Listen("tcp", "127.0.0.1:1222") - if err != nil { - log.Fatal(err) - } - srv := &http.Server{ - TLSConfig: &tls.Config{ - NextProtos: []string{"h2", "http/1.1"}, - }, - TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){ - "h2": nghttp2.HTTP2Handler, - }, - } - defer srv.Close() - - srv.ServeTLS(l, "testdata/server.crt", "testdata/server.key") - -see http2_test.go for more details -*/ -package nghttp2 +/*Package nghttp2 is libnghttp2 binding for golang. + +server example + + cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key") + if err != nil { + log.Fatal(err) + } + + l, err := tls.Listen("tcp", "127.0.0.1:1100", &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2"}, + }) + if err != nil { + log.Fatal(err) + } + defer l.Close() + addr := l.Addr().String() + + http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) { + log.Printf("%+v", r) + hdr := w.Header() + hdr.Set("content-type", "text/plain") + hdr.Set("aa", "bb") + d, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println(err) + return + } + w.Write(d) + }) + + for { + c, err := l.Accept() + if err != nil { + break + } + h2conn, err := Server(c, nil) + if err != nil { + log.Fatal(err) + } + log.Printf("%+v", h2conn) + go h2conn.Run() + } + +client example + + conn, err := tls.Dial("tcp", "nghttp2.org:443", &tls.Config{ + NextProtos: []string{"h2"}, + ServerName: "nghttp2.org", + }) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + if err := conn.Handshake(); err != nil{ + log.Fatal(err) + } + cstate := conn.ConnectionState() + if cstate.NegotiatedProtocol != "h2" { + log.Fatal("no http2 on server") + } + + h2conn, err := Client(conn) + if err != nil { + log.Fatal(err) + } + + param := url.Values{} + param.Add("e", "b") + param.Add("f", "d") + data := bytes.NewReader([]byte(param.Encode())) + req, _ := http.NewRequest("POST", + "https://nghttp2.org/httpbin/post?a=b&c=d", + data) + + log.Printf("%+v", req) + + req.Header.Set("user-agent", "go-nghttp2/1.0") + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + res, err := h2conn.RoundTrip(req) + if err != nil { + log.Fatal(err) + } + + if res.StatusCode != http.StatusOK { + log.Printf("expect %d, got %d", http.StatusOK, res.StatusCode) + } + res.Write(os.Stderr) + + +co-work with net/http example + + l, err := net.Listen("tcp", "127.0.0.1:1222") + if err != nil { + log.Fatal(err) + } + srv := &http.Server{ + TLSConfig: &tls.Config{ + NextProtos: []string{"h2", "http/1.1"}, + }, + TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){ + "h2": nghttp2.HTTP2Handler, + }, + } + defer srv.Close() + + srv.ServeTLS(l, "testdata/server.crt", "testdata/server.key") + +see http2_test.go for more details +*/ +package nghttp2