merge client/server connection and stream

merge_conn
fangdingjun 6 years ago
parent 439e39a688
commit c87800ed83

@ -7,16 +7,17 @@
#define ARRLEN(x) (sizeof(x) / sizeof(x[0]))
extern ssize_t onClientDataRecvCallback(void *, void *data, size_t);
extern ssize_t onClientDataSendCallback(void *, void *data, size_t);
extern ssize_t onDataRecvCallback(void *, void *data, size_t);
extern ssize_t onDataSendCallback(void *, void *data, size_t);
extern ssize_t onServerDataSourceReadCallback(void *, int, void *, size_t);
extern ssize_t onClientDataSourceReadCallback(void *, int, void *, size_t);
extern int onClientDataChunkRecv(void *, int, void *, size_t);
extern int onClientBeginHeaderCallback(void *, int);
extern int onClientHeaderCallback(void *, int, void *, int, void *, int);
extern int onClientHeadersDoneCallback(void *, int);
extern int onClientStreamClose(void *, int);
extern void onClientConnectionCloseCallback(void *user_data);
extern ssize_t onDataSourceReadCallback(void *, int, void *, size_t);
extern int onDataChunkRecv(void *, int, void *, size_t);
extern int onBeginHeaderCallback(void *, int);
extern int onHeaderCallback(void *, int, void *, int, void *, int);
extern int onHeadersDoneCallback(void *, int);
extern int onStreamClose(void *, int);
extern void onConnectionCloseCallback(void *user_data);
extern void onStreamEndCallback(void *, int);
int _nghttp2_submit_response(nghttp2_session *sess, int streamid,
size_t nv, size_t nvlen, nghttp2_data_provider *dp);
@ -25,15 +26,7 @@ int _nghttp2_submit_request(nghttp2_session *session, const nghttp2_priority_spe
size_t nva, size_t nvlen,
const nghttp2_data_provider *data_prd, void *stream_user_data);
extern ssize_t onServerDataRecvCallback(void *, void *data, size_t);
extern ssize_t onServerDataSendCallback(void *, void *data, size_t);
extern int onServerDataChunkRecv(void *, int, void *, size_t);
extern int onServerBeginHeaderCallback(void *, int);
extern int onServerHeaderCallback(void *, int, void *, int, void *, int);
extern int onServerStreamEndCallback(void *, int);
extern int onServerHeadersDoneCallback(void *, int);
extern int onServerStreamClose(void *, int);
int send_server_connection_header(nghttp2_session *session);
int send_connection_header(nghttp2_session *session);
struct nv_array
{

@ -5,17 +5,19 @@ package nghttp2
*/
import "C"
import (
"bytes"
"crypto/tls"
"errors"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"unsafe"
)
var (
errAgain = errors.New("again")
)
const (
NGHTTP2_NO_ERROR = 0
NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE = -521
@ -23,176 +25,16 @@ const (
NGHTTP2_ERR_DEFERRED = -508
)
// onServerDataSendCallback callback function for libnghttp2 library
// want send data to network.
//
//export onServerDataSendCallback
func onServerDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer,
length C.size_t) C.ssize_t {
//log.Println("server data send")
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
buf := C.GoBytes(data, C.int(length))
n, err := conn.conn.Write(buf)
if err != nil {
return NGHTTP2_ERR_CALLBACK_FAILURE
}
//log.Println("send ", n, " bytes to network ", buf)
return C.ssize_t(n)
}
// onServerDataChunkRecv callback function for libnghttp2 library's data chunk recv.
//
//export onServerDataChunkRecv
func onServerDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
data unsafe.Pointer, length C.size_t) C.int {
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
bp := s.req.Body.(*bodyProvider)
buf := C.GoBytes(data, C.int(length))
bp.Write(buf)
return C.int(length)
}
// onServerBeginHeaderCallback callback function for begin begin header recv.
//
//export onServerBeginHeaderCallback
func onServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
var TLS tls.ConnectionState
if tlsconn, ok := conn.conn.(*tls.Conn); ok {
TLS = tlsconn.ConnectionState()
}
s := &ServerStream{
streamID: int(streamID),
conn: conn,
req: &http.Request{
//URL: &url.URL{},
Header: http.Header{},
Proto: "HTTP/2.0",
ProtoMajor: 2,
ProtoMinor: 0,
RemoteAddr: conn.conn.RemoteAddr().String(),
TLS: &TLS,
},
//buf: new(bytes.Buffer),
}
//conn.lock.Lock()
conn.streams[int(streamID)] = s
//conn.lock.Unlock()
return NGHTTP2_NO_ERROR
}
// onServerHeaderCallback callback function for each header recv.
//
//export onServerHeaderCallback
func onServerHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int {
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
hdrname := C.GoStringN((*C.char)(name), namelen)
hdrvalue := C.GoStringN((*C.char)(value), valuelen)
hdrname = strings.ToLower(hdrname)
switch hdrname {
case ":method":
s.req.Method = hdrvalue
case ":scheme":
// s.req.URL.Scheme = hdrvalue
case ":path":
s.req.RequestURI = hdrvalue
u, _ := url.ParseRequestURI(s.req.RequestURI)
s.req.URL = u
case ":authority":
s.req.Host = hdrvalue
case "content-length":
s.req.Header.Add(hdrname, hdrvalue)
n, err := strconv.ParseInt(hdrvalue, 10, 64)
if err == nil {
s.req.ContentLength = n
}
default:
s.req.Header.Add(hdrname, hdrvalue)
}
return NGHTTP2_NO_ERROR
}
// onServerStreamEndCallback callback function for the stream when END_STREAM flag set
//
//export onServerStreamEndCallback
func onServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int {
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
s.streamEnd = true
bp := s.req.Body.(*bodyProvider)
if s.req.Method != "CONNECT" {
bp.closed = true
//log.Println("stream end flag set, begin to serve")
go conn.serve(s)
}
return NGHTTP2_NO_ERROR
}
// onServerHeadersDoneCallback callback function for the stream when all headers received.
//
//export onServerHeadersDoneCallback
func onServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
s.headersDone = true
bp := &bodyProvider{
buf: new(bytes.Buffer),
lock: new(sync.Mutex),
}
s.req.Body = bp
if s.req.Method == "CONNECT" {
go conn.serve(s)
}
return NGHTTP2_NO_ERROR
}
// onServerStreamClose callback function for the stream when closed.
//
//export onServerStreamClose
func onServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
//log.Printf("stream %d closed", int(streamID))
if !ok {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
//conn.lock.Lock()
delete(conn.streams, int(streamID))
//conn.lock.Unlock()
go s.Close()
return NGHTTP2_NO_ERROR
}
// onClientDataSourceReadCallback callback function for libnghttp2 library
// onDataSourceReadCallback callback function for libnghttp2 library
// want read data from data provider source,
// return NGHTTP2_ERR_DEFERRED will cause data frame defered,
// application later call nghttp2_session_resume_data will re-quene the data frame
//
//export onClientDataSourceReadCallback
func onClientDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int,
//export onDataSourceReadCallback
func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.ssize_t {
//log.Println("onDataSourceReadCallback begin")
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("client dp callback, stream not exists")
@ -221,193 +63,169 @@ func onClientDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int,
return C.ssize_t(n)
}
// onServerDataSourceReadCallback callback function for libnghttp2 library
// want read data from data provider source,
// return NGHTTP2_ERR_DEFERRED will cause data frame defered,
// application later call nghttp2_session_resume_data will re-quene the data frame
// onDataChunkRecv callback function for libnghttp2 library data chunk received.
//
//export onServerDataSourceReadCallback
func onServerDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.ssize_t {
//log.Println("onDataSourceReadCallback begin")
conn := (*ServerConn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("server dp callback, stream not exists")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
gobuf := make([]byte, int(length))
n, err := s.dp.Read(gobuf)
if err != nil {
if err == io.EOF {
//log.Println("onDataSourceReadCallback end")
return 0
}
if err == errAgain {
//log.Println("onDataSourceReadCallback end")
s.dp.deferred = true
return NGHTTP2_ERR_DEFERRED
}
//log.Println("onDataSourceReadCallback end")
return NGHTTP2_ERR_CALLBACK_FAILURE
}
//cbuf := C.CBytes(gobuf)
//defer C.free(cbuf)
//C.memcpy(buf, cbuf, C.size_t(n))
C.memcpy(buf, unsafe.Pointer(&gobuf[0]), C.size_t(n))
//log.Println("onDataSourceReadCallback end")
return C.ssize_t(n)
}
// onClientDataChunkRecv callback function for libnghttp2 library data chunk received.
//
//export onClientDataChunkRecv
func onClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
//export onDataChunkRecv
func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.int {
//log.Println("onClientDataChunkRecv begin")
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
//log.Println("onDataChunkRecv begin")
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
gobuf := C.GoBytes(buf, C.int(length))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientDataChunkRecv end")
//log.Println("onDataChunkRecv end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
if s.res.Body == nil {
if s.bp == nil {
//log.Println("empty body")
//log.Println("onClientDataChunkRecv end")
//log.Println("onDataChunkRecv end")
return C.int(length)
}
if bp, ok := s.res.Body.(*bodyProvider); ok {
n, err := bp.Write(gobuf)
if err != nil {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
//log.Println("onClientDataChunkRecv end")
return C.int(n)
n, err := s.bp.Write(gobuf)
if err != nil {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
//log.Println("onClientDataChunkRecv end")
return C.int(length)
//log.Println("onDataChunkRecv end")
return C.int(n)
}
// onClientDataSendCallback callback function for libnghttp2 library want send data to network.
// onDataSendCallback callback function for libnghttp2 library want send data to network.
//
//export onClientDataSendCallback
func onClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("onClientDataSendCallback begin")
//export onDataSendCallback
func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("onDataSendCallback begin")
//log.Println("data write req ", int(size))
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
buf := C.GoBytes(data, C.int(size))
//log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Write(buf)
if err != nil {
//log.Println("onClientDataSendCallback end")
//log.Println("onDataSendCallback end")
return NGHTTP2_ERR_CALLBACK_FAILURE
}
//log.Printf("write %d bytes to network ", n)
//log.Println("onClientDataSendCallback end")
//log.Println("onDataSendCallback end")
return C.ssize_t(n)
}
// onClientBeginHeaderCallback callback function for begin header receive.
// onBeginHeaderCallback callback function for begin header receive.
//
//export onClientBeginHeaderCallback
func onClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onClientBeginHeaderCallback begin")
//export onBeginHeaderCallback
func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onBeginHeaderCallback begin")
//log.Printf("stream %d begin headers", int(streamID))
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientBeginHeaderCallback end")
//log.Println("onBeginHeaderCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
var TLS tls.ConnectionState
if tlsconn, ok := conn.conn.(*tls.Conn); ok {
TLS = tlsconn.ConnectionState()
}
s.res = &http.Response{
Header: make(http.Header),
Body: &bodyProvider{
buf: new(bytes.Buffer),
lock: new(sync.Mutex),
},
TLS: &TLS,
if conn.isServer {
s.request = &http.Request{
Header: make(http.Header),
Proto: "HTTP/2",
ProtoMajor: 2,
ProtoMinor: 0,
TLS: &TLS,
Body: s.bp,
}
return NGHTTP2_NO_ERROR
}
s.response = &http.Response{
Proto: "HTTP/2",
ProtoMajor: 2,
ProtoMinor: 0,
Header: make(http.Header),
Body: s.bp,
TLS: &TLS,
}
//log.Println("onClientBeginHeaderCallback end")
//log.Println("onBeginHeaderCallback end")
return NGHTTP2_NO_ERROR
}
// onClientHeaderCallback callback function for each header received.
// onHeaderCallback callback function for each header received.
//
//export onClientHeaderCallback
func onClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
//export onHeaderCallback
func onHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int {
//log.Println("onClientHeaderCallback begin")
//log.Println("onHeaderCallback begin")
//log.Println("header")
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
goname := string(C.GoBytes(name, namelen))
govalue := string(C.GoBytes(value, valuelen))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientHeaderCallback end")
//log.Println("onHeaderCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
goname = strings.ToLower(goname)
switch goname {
case ":method":
s.request.Method = govalue
case ":scheme":
case ":authority":
s.request.Host = govalue
case ":path":
s.request.RequestURI = govalue
case ":status":
statusCode, _ := strconv.Atoi(govalue)
s.res.StatusCode = statusCode
s.res.Status = http.StatusText(statusCode)
s.res.Proto = "HTTP/2.0"
s.res.ProtoMajor = 2
s.res.ProtoMinor = 0
s.response.StatusCode = statusCode
s.response.Status = http.StatusText(statusCode)
case "content-length":
s.res.Header.Add(goname, govalue)
s.response.Header.Add(goname, govalue)
n, err := strconv.ParseInt(govalue, 10, 64)
if err == nil {
s.res.ContentLength = n
s.response.ContentLength = n
}
case "transfer-encoding":
s.res.Header.Add(goname, govalue)
s.res.TransferEncoding = append(s.res.TransferEncoding, govalue)
s.response.Header.Add(goname, govalue)
s.response.TransferEncoding = append(s.response.TransferEncoding, govalue)
default:
s.res.Header.Add(goname, govalue)
s.response.Header.Add(goname, govalue)
}
//log.Println("onClientHeaderCallback end")
//log.Println("onHeaderCallback end")
return NGHTTP2_NO_ERROR
}
// onClientHeadersDoneCallback callback function for the stream when all headers received.
// onHeadersDoneCallback callback function for the stream when all headers received.
//
//export onClientHeadersDoneCallback
func onClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onClientHeadersDoneCallback begin")
//export onHeadersDoneCallback
func onHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onHeadersDoneCallback begin")
//log.Printf("stream %d headers done", int(streamID))
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)]
if !ok {
//log.Println("onClientHeadersDoneCallback end")
//log.Println("onHeadersDoneCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
if conn.isServer {
return NGHTTP2_NO_ERROR
}
select {
case s.resch <- s.res:
case s.resch <- s.response:
default:
}
//log.Println("onClientHeadersDoneCallback end")
//log.Println("onHeadersDoneCallback end")
return NGHTTP2_NO_ERROR
}
// onClientStreamClose callback function for the stream when closed.
// onStreamClose callback function for the stream when closed.
//
//export onClientStreamClose
func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onClientStreamClose begin")
//export onStreamClose
func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onStreamClose begin")
//log.Printf("stream %d closed", int(streamID))
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
stream, ok := conn.streams[int(streamID)]
if ok {
@ -416,16 +234,16 @@ func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
delete(conn.streams, int(streamID))
//go stream.Close()
//conn.lock.Unlock()
//log.Println("onClientStreamClose end")
//log.Println("onStreamClose end")
return NGHTTP2_NO_ERROR
}
//log.Println("onClientStreamClose end")
//log.Println("onStreamClose end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
//export onClientConnectionCloseCallback
func onClientConnectionCloseCallback(ptr unsafe.Pointer) {
conn := (*ClientConn)(unsafe.Pointer(uintptr(ptr)))
//export onConnectionCloseCallback
func onConnectionCloseCallback(ptr unsafe.Pointer) {
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
conn.err = io.EOF
// signal all goroutings exit
@ -436,3 +254,8 @@ func onClientConnectionCloseCallback(ptr unsafe.Pointer) {
}
}
}
//export onStreamEndCallback
func onStreamEndCallback(ptr unsafe.Pointer, streamID C.int) {
}

@ -1,589 +1,154 @@
package nghttp2
/*
#cgo pkg-config: libnghttp2
#include "_nghttp2.h"
*/
import "C"
import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"unsafe"
)
var (
errAgain = errors.New("again")
)
// ClientConn http2 client connection
type ClientConn struct {
session *C.nghttp2_session
conn net.Conn
streams map[int]*ClientStream
lock *sync.Mutex
errch chan struct{}
exitch chan struct{}
err error
closed bool
streamCount int
}
// Client create http2 client
func Client(c net.Conn) (*ClientConn, error) {
conn := &ClientConn{
conn: c, streams: make(map[int]*ClientStream),
lock: new(sync.Mutex),
errch: make(chan struct{}),
exitch: make(chan struct{}),
}
conn.session = C.init_nghttp2_client_session(
C.size_t(int(uintptr(unsafe.Pointer(conn)))))
if conn.session == nil {
return nil, fmt.Errorf("init session failed")
}
ret := C.nghttp2_submit_settings(conn.session, 0, nil, 0)
if int(ret) < 0 {
conn.Close()
return nil, fmt.Errorf("submit settings error: %s",
C.GoString(C.nghttp2_strerror(ret)))
}
go conn.run()
return conn, nil
}
// Error return current error on connection
func (c *ClientConn) Error() error {
c.lock.Lock()
defer c.lock.Unlock()
return c.err
}
// Close close the http2 connection
func (c *ClientConn) Close() error {
if c.closed {
return nil
}
c.closed = true
for _, s := range c.streams {
s.Close()
}
c.lock.Lock()
defer c.lock.Unlock()
//log.Println("close client connection")
C.nghttp2_session_terminate_session(c.session, 0)
C.nghttp2_session_del(c.session)
close(c.exitch)
c.conn.Close()
return nil
}
func (c *ClientConn) run() {
var wantWrite int
var delay = 50 * time.Millisecond
var keepalive = 5 * time.Second
var ret C.int
var lastDataRecv time.Time
//defer c.Close()
defer close(c.errch)
errch := make(chan struct{}, 5)
// data read loop
go func() {
buf := make([]byte, 16*1024)
readloop:
for {
select {
case <-c.exitch:
break readloop
case <-errch:
break readloop
default:
}
n, err := c.conn.Read(buf)
if err != nil {
c.lock.Lock()
c.err = err
c.lock.Unlock()
close(errch)
break
}
//log.Printf("read %d bytes from network", n)
lastDataRecv = time.Now()
//d1 := C.CBytes(buf[:n])
c.lock.Lock()
//ret1 := C.nghttp2_session_mem_recv(c.session,
// (*C.uchar)(d1), C.size_t(n))
ret1 := C.nghttp2_session_mem_recv(c.session,
(*C.uchar)(unsafe.Pointer(&buf[0])), C.size_t(n))
c.lock.Unlock()
//C.free(d1)
if int(ret1) < 0 {
c.lock.Lock()
c.err = fmt.Errorf("sesion recv error: %s",
C.GoString(C.nghttp2_strerror(ret)))
//log.Println(c.err)
c.lock.Unlock()
close(errch)
break
}
}
}()
// keep alive loop
go func() {
for {
select {
case <-c.exitch:
return
case <-errch:
return
case <-time.After(keepalive):
}
now := time.Now()
last := lastDataRecv
d := now.Sub(last)
if d > keepalive {
c.lock.Lock()
C.nghttp2_submit_ping(c.session, 0, nil)
c.lock.Unlock()
//log.Println("submit ping")
}
}
}()
loop:
for {
select {
case <-c.errch:
break loop
case <-errch:
break loop
case <-c.exitch:
break loop
default:
}
c.lock.Lock()
ret = C.nghttp2_session_send(c.session)
c.lock.Unlock()
if int(ret) < 0 {
c.lock.Lock()
c.err = fmt.Errorf("sesion send error: %s",
C.GoString(C.nghttp2_strerror(ret)))
c.lock.Unlock()
//log.Println(c.err)
close(errch)
break
}
c.lock.Lock()
wantWrite = int(C.nghttp2_session_want_write(c.session))
c.lock.Unlock()
// make delay when no data read/write
if wantWrite == 0 {
time.Sleep(delay)
}
}
}
// Connect submit a CONNECT request, return a ClientStream and http status code from server
//
// equals to "CONNECT host:port" in http/1.1
func (c *ClientConn) Connect(addr string) (cs *ClientStream, statusCode int, err error) {
if c.err != nil {
return nil, http.StatusServiceUnavailable, c.err
}
var nv = []C.nghttp2_nv{}
nv = append(nv, newNV(":method", "CONNECT"))
nv = append(nv, newNV(":authority", addr))
var dp *dataProvider
s := &ClientStream{
conn: c,
cdp: C.nghttp2_data_provider{},
resch: make(chan *http.Response),
errch: make(chan error),
lock: new(sync.Mutex),
}
dp = newDataProvider(unsafe.Pointer(&s.cdp), c.lock, 1)
s.dp = dp
c.lock.Lock()
streamID := C._nghttp2_submit_request(c.session, nil,
C.size_t(uintptr(unsafe.Pointer(&nv[0]))), C.size_t(len(nv)), &s.cdp, nil)
c.lock.Unlock()
if int(streamID) < 0 {
return nil, http.StatusServiceUnavailable, fmt.Errorf(
"submit request error: %s", C.GoString(C.nghttp2_strerror(streamID)))
}
dp.streamID = int(streamID)
dp.session = c.session
s.streamID = int(streamID)
//log.Println("stream id ", int(streamID))
c.lock.Lock()
c.streams[int(streamID)] = s
c.streamCount++
c.lock.Unlock()
//log.Printf("new stream id %d", int(streamID))
select {
case err := <-s.errch:
//log.Println("wait response, got ", err)
return nil, http.StatusServiceUnavailable, err
case res := <-s.resch:
if res != nil {
res.Request = &http.Request{
Method: "CONNECT",
RequestURI: addr,
URL: &url.URL{},
Host: addr,
}
return s, res.StatusCode, nil
}
//log.Println("wait response, empty response")
return nil, http.StatusServiceUnavailable, io.EOF
case <-c.errch:
return nil, http.StatusServiceUnavailable, fmt.Errorf("connection error")
}
}
func newNV(name, value string) C.nghttp2_nv {
nv := C.nghttp2_nv{}
nameArr := make([]byte, len(name)+1)
valueArr := make([]byte, len(value)+1)
copy(nameArr, []byte(name))
copy(valueArr, []byte(value))
nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0]))
nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0]))
nv.namelen = C.size_t(len(name))
nv.valuelen = C.size_t(len(value))
nv.flags = 0
return nv
}
// CreateRequest submit a request and return a http.Response,
func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
if c.err != nil {
return nil, c.err
}
nv := []C.nghttp2_nv{}
nv = append(nv, newNV(":method", req.Method))
nv = append(nv, newNV(":scheme", "https"))
nv = append(nv, newNV(":authority", req.Host))
/*
:path must starts with "/"
req.RequestURI maybe starts with http://
*/
p := req.URL.Path
q := req.URL.Query().Encode()
if q != "" {
p = p + "?" + q
}
nv = append(nv, newNV(":path", p))
//log.Printf("%s http://%s%s", req.Method, req.Host, p)
for k, v := range req.Header {
//log.Printf("header %s: %s\n", k, v[0])
_k := strings.ToLower(k)
if _k == "host" || _k == "connection" || _k == "proxy-connection" {
continue
}
//log.Printf("header %s: %s", k, v)
nv = append(nv, newNV(k, v[0]))
}
var dp *dataProvider
s := &ClientStream{
//streamID: int(streamID),
conn: c,
resch: make(chan *http.Response),
errch: make(chan error),
lock: new(sync.Mutex),
}
if req.Method == "PUT" || req.Method == "POST" || req.Method == "CONNECT" {
s.cdp = C.nghttp2_data_provider{}
dp = newDataProvider(unsafe.Pointer(&s.cdp), c.lock, 1)
s.dp = dp
go func() {
io.Copy(dp, req.Body)
dp.Close()
}()
}
c.lock.Lock()
streamID := C._nghttp2_submit_request(c.session, nil,
C.size_t(uintptr(unsafe.Pointer(&nv[0]))), C.size_t(len(nv)), &s.cdp, nil)
c.lock.Unlock()
//C.delete_nv_array(nva)
if int(streamID) < 0 {
return nil, fmt.Errorf("submit request error: %s",
C.GoString(C.nghttp2_strerror(streamID)))
}
s.streamID = int(streamID)
//log.Printf("new stream, id %d", int(streamID))
if dp != nil {
dp.streamID = int(streamID)
dp.session = c.session
}
c.lock.Lock()
c.streams[int(streamID)] = s
c.streamCount++
c.lock.Unlock()
// waiting for response from server
select {
case err := <-s.errch:
return nil, err
case res := <-s.resch:
if res != nil {
res.Request = req
return res, nil
}
return nil, io.EOF
case <-c.errch:
return nil, fmt.Errorf("connection error")
}
//return nil, fmt.Errorf("unknown error")
}
// CanTakeNewRequest check if the ClientConn can submit a new request
func (c *ClientConn) CanTakeNewRequest() bool {
c.lock.Lock()
c.lock.Unlock()
if c.closed {
return false
}
if c.err != nil {
return false
}
if c.streamCount > ((1 << 31) / 2) {
return false
}
return true
}
// ServerConn server connection
type ServerConn struct {
// Handler handler to handle request
Handler http.Handler
closed bool
session *C.nghttp2_session
streams map[int]*ServerStream
lock *sync.Mutex
conn net.Conn
errch chan struct{}
exitch chan struct{}
err error
}
// HTTP2Handler is the http2 server handler that can co-work with standard net/http.
//
// usage example:
// l, err := net.Listen("tcp", ":1222")
// srv := &http.Server{
// TLSConfig: &tls.Config{
// NextProtos:[]string{"h2", "http/1.1"},
// }
// TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){
// "h2": nghttp2.Http2Handler
// }
// }
// srv.ServeTLS(l, "server.crt", "server.key")
func HTTP2Handler(srv *http.Server, conn *tls.Conn, handler http.Handler) {
h2conn, err := Server(conn, handler)
if err != nil {
panic(err.Error())
}
h2conn.Run()
}
// Server create new server connection
func Server(c net.Conn, handler http.Handler) (*ServerConn, error) {
conn := &ServerConn{
conn: c,
Handler: handler,
streams: make(map[int]*ServerStream),
lock: new(sync.Mutex),
errch: make(chan struct{}),
exitch: make(chan struct{}),
}
conn.session = C.init_nghttp2_server_session(
C.size_t(uintptr(unsafe.Pointer(conn))))
if conn.session == nil {
return nil, fmt.Errorf("init session failed")
}
//log.Println("send server connection header")
ret := C.send_server_connection_header(conn.session)
if int(ret) < 0 {
return nil, fmt.Errorf(C.GoString(C.nghttp2_strerror(ret)))
//return nil, fmt.Errorf("send connection header failed")
}
//go conn.run()
return conn, nil
}
func (c *ServerConn) serve(s *ServerStream) {
var handler = c.Handler
if c.Handler == nil {
handler = http.DefaultServeMux
}
if s.req.URL == nil {
s.req.URL = &url.URL{}
}
// call http.Handler to serve request
handler.ServeHTTP(s, s.req)
s.Close()
}
// Close close the server connection
func (c *ServerConn) Close() error {
if c.closed {
return nil
}
c.closed = true
for _, s := range c.streams {
s.Close()
}
c.lock.Lock()
defer c.lock.Unlock()
C.nghttp2_session_terminate_session(c.session, 0)
C.nghttp2_session_del(c.session)
close(c.exitch)
c.conn.Close()
return nil
}
// Run run the server loop
func (c *ServerConn) Run() {
var wantWrite int
var delay = 100 * time.Millisecond
var ret C.int
defer c.Close()
defer close(c.errch)
errch := make(chan struct{}, 5)
go func() {
buf := make([]byte, 16*1024)
readloop:
for {
select {
case <-c.exitch:
break readloop
case <-errch:
break readloop
default:
}
n, err := c.conn.Read(buf)
if err != nil {
c.lock.Lock()
c.err = err
c.lock.Unlock()
close(errch)
break
}
//d1 := C.CBytes(buf[:n])
c.lock.Lock()
//ret1 := C.nghttp2_session_mem_recv(c.session,
// (*C.uchar)(d1), C.size_t(n))
ret1 := C.nghttp2_session_mem_recv(c.session,
(*C.uchar)(unsafe.Pointer(&buf[0])), C.size_t(n))
c.lock.Unlock()
//C.free(d1)
if int(ret1) < 0 {
c.lock.Lock()
c.err = fmt.Errorf("sesion recv error: %s",
C.GoString(C.nghttp2_strerror(ret)))
c.lock.Unlock()
//log.Println(c.err)
close(errch)
break
}
}
}()
loop:
for {
select {
case <-c.errch:
break loop
case <-errch:
break loop
case <-c.exitch:
break loop
default:
}
c.lock.Lock()
ret = C.nghttp2_session_send(c.session)
c.lock.Unlock()
if int(ret) < 0 {
c.lock.Lock()
c.err = fmt.Errorf("sesion send error: %s",
C.GoString(C.nghttp2_strerror(ret)))
c.lock.Unlock()
//log.Println(c.err)
close(errch)
break
}
c.lock.Lock()
wantWrite = int(C.nghttp2_session_want_write(c.session))
c.lock.Unlock()
// make delay when no data read/write
if wantWrite == 0 {
time.Sleep(delay)
}
}
}
package nghttp2
/*
#cgo pkg-config: libnghttp2
#include "_nghttp2.h"
*/
import "C"
import (
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
"unsafe"
)
// Conn http2 connection
type Conn struct {
conn net.Conn
session *C.nghttp2_session
streams map[int]*stream
streamCount int
closed bool
isServer bool
handler http.Handler
lock *sync.Mutex
err error
errch chan error
exitch chan struct{}
}
// RoundTrip submit http request and return the response
func (c *Conn) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, errors.New("not implement")
}
// Connect submit connect request
func (c *Conn) Connect(addr string) (net.Conn, error) {
return nil, errors.New("not implement")
}
// Run run the event loop
func (c *Conn) Run() {
defer c.Close()
go c.readloop()
go c.writeloop()
for {
select {
case err := <-c.errch:
c.err = err
return
case <-c.exitch:
return
}
}
}
// Close close the connection
func (c *Conn) Close() error {
if c.closed {
return nil
}
c.closed = true
for _, s := range c.streams {
s.Close()
}
close(c.exitch)
c.conn.Close()
return nil
}
func (c *Conn) errorNotify(err error) {
select {
case c.errch <- err:
default:
}
}
func (c *Conn) readloop() {
type data struct {
buf []byte
err error
}
var ret C.ssize_t
var err error
var d data
datach := make(chan data)
go func() {
d1 := data{}
var n int
var err1 error
for {
buf := make([]byte, 16*1024)
n, err1 = c.conn.Read(buf)
d1.buf = buf[:n]
d1.err = err1
datach <- d1
}
}()
for {
select {
case <-c.exitch:
return
case d = <-datach:
if d.err != nil {
c.errorNotify(d.err)
return
}
c.lock.Lock()
ret = C.nghttp2_session_mem_recv(c.session,
(*C.uchar)(unsafe.Pointer(&d.buf[0])), C.size_t(len(d.buf)))
c.lock.Unlock()
if int(ret) < 0 {
err = fmt.Errorf("http2 recv error: %s", C.GoString(C.nghttp2_strerror(C.int(ret))))
c.errorNotify(err)
return
}
}
}
}
func (c *Conn) writeloop() {
var ret C.int
var err error
var delay = 50 * time.Millisecond
for {
select {
case <-c.exitch:
return
default:
}
c.lock.Lock()
ret = C.nghttp2_session_send(c.session)
c.lock.Unlock()
if int(ret) < 0 {
err = fmt.Errorf("http2 send error: %s", C.GoString(C.nghttp2_strerror(C.int(ret))))
c.errorNotify(err)
return
}
c.lock.Lock()
wantWrite := C.nghttp2_session_want_write(c.session)
c.lock.Unlock()
if int(wantWrite) == 0 {
time.Sleep(delay)
}
}
}

@ -144,3 +144,18 @@ func (bp *bodyProvider) Close() error {
bp.closed = true
return nil
}
func newNV(name, value string) C.nghttp2_nv {
nv := C.nghttp2_nv{}
nameArr := make([]byte, len(name)+1)
valueArr := make([]byte, len(value)+1)
copy(nameArr, []byte(name))
copy(valueArr, []byte(value))
nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0]))
nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0]))
nv.namelen = C.size_t(len(name))
nv.valuelen = C.size_t(len(value))
nv.flags = 0
return nv
}

@ -7,76 +7,103 @@ int on_invalid_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame,
int lib_error_code, void *user_data);
static ssize_t server_send_callback(nghttp2_session *session,
const uint8_t *data, size_t length,
int flags, void *user_data)
int init_nghttp2_callbacks(nghttp2_session_callbacks *callbacks);
int on_error_callback(nghttp2_session *session, const char *msg,
size_t len, void *user_data)
{
printf("on error callback: %s\n", msg);
return 0;
}
int on_invalid_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame,
int lib_error_code, void *user_data)
{
return onServerDataSendCallback(user_data, (void *)data, length);
printf("invalid frame recv: %s\n", nghttp2_strerror(lib_error_code));
return 0;
}
static ssize_t on_data_source_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length, uint32_t *data_flags,
nghttp2_data_source *source, void *user_data)
{
int ret = onDataSourceReadCallback(user_data, stream_id, buf, length);
if (ret == 0)
{
*data_flags = NGHTTP2_DATA_FLAG_EOF;
}
return ret;
}
static int on_server_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
static ssize_t on_send_callback(nghttp2_session *session,
const uint8_t *data, size_t length,
int flags, void *user_data)
{
return onDataSendCallback(user_data, (void *)data, length);
}
static int on_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST)
{
onServerHeadersDoneCallback(user_data, frame->hd.stream_id);
onHeadersDoneCallback(user_data, frame->hd.stream_id);
}
case NGHTTP2_DATA:
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
{
onServerStreamEndCallback(user_data, frame->hd.stream_id);
onStreamEndCallback(user_data, frame->hd.stream_id);
}
break;
}
return 0;
}
static int on_server_stream_close_callback(nghttp2_session *session,
int32_t stream_id,
uint32_t error_code,
void *user_data)
static int on_stream_close_callback(nghttp2_session *session,
int32_t stream_id,
uint32_t error_code,
void *user_data)
{
onServerStreamClose(user_data, stream_id);
onStreamClose(user_data, stream_id);
return 0;
}
static int on_server_header_callback(nghttp2_session *session,
const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value,
size_t valuelen, uint8_t flags,
void *user_data)
static int on_header_callback(nghttp2_session *session,
const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value,
size_t valuelen, uint8_t flags,
void *user_data)
{
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST)
{
onServerHeaderCallback(user_data, frame->hd.stream_id,
(void *)name, namelen, (void *)value, valuelen);
onHeaderCallback(user_data, frame->hd.stream_id,
(void *)name, namelen, (void *)value, valuelen);
}
break;
}
return 0;
}
static int on_server_data_chunk_recv_callback(nghttp2_session *session,
uint8_t flags,
int32_t stream_id,
const uint8_t *data,
size_t len, void *user_data)
static int on_data_chunk_recv_callback(nghttp2_session *session,
uint8_t flags,
int32_t stream_id,
const uint8_t *data,
size_t len, void *user_data)
{
return onServerDataChunkRecv(user_data, stream_id, (void *)data, len);
return onDataChunkRecv(user_data, stream_id, (void *)data, len);
}
static int on_server_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
switch (frame->hd.type)
@ -84,7 +111,7 @@ static int on_server_begin_headers_callback(nghttp2_session *session,
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST)
{
onServerBeginHeaderCallback(user_data, frame->hd.stream_id);
onBeginHeaderCallback(user_data, frame->hd.stream_id);
}
break;
}
@ -96,335 +123,62 @@ nghttp2_session *init_nghttp2_server_session(size_t data)
nghttp2_session_callbacks *callbacks;
nghttp2_session *session;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, server_send_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_server_frame_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, on_server_stream_close_callback);
nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks,
on_invalid_frame_recv_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks, on_server_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
on_server_header_callback);
nghttp2_session_callbacks_set_error_callback(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_server_begin_headers_callback);
init_nghttp2_callbacks(callbacks);
nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data)));
nghttp2_session_callbacks_del(callbacks);
return session;
}
int send_server_connection_header(nghttp2_session *session)
{
nghttp2_settings_entry iv[1] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}};
int rv;
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv,
ARRLEN(iv));
return rv;
}
// send_callback send data to network
static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
return onClientDataSendCallback(user_data, (void *)data, length);
}
static int on_client_header_callback(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
//printf("on_header_callback\n");
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
/* Print response headers for the initiated request. */
//print_header(stderr, name, namelen, value, valuelen);
onClientHeaderCallback(user_data, frame->hd.stream_id,
(void *)name, namelen, (void *)value, valuelen);
break;
}
}
return 0;
}
static int on_client_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
//printf("on_begin_headers_callback\n");
int stream_id = frame->hd.stream_id;
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
//fprintf(stderr, "Response headers for stream ID=%d:\n",
// frame->hd.stream_id);
onClientBeginHeaderCallback(user_data, stream_id);
}
break;
}
return 0;
}
int on_invalid_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame,
int lib_error_code, void *user_data)
{
printf("on_invalid_frame_recv, frame %d, code %d, msg %s\n",
frame->hd.type,
lib_error_code,
nghttp2_strerror(lib_error_code));
return 0;
}
#if 0
static int on_client_frame_send_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
size_t i;
(void)user_data;
//printf("on_frame_send_callback\n");
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
/*
if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id))
{
*/
if (1)
{
const nghttp2_nv *nva = frame->headers.nva;
printf("[INFO] C ----------------------------> S (HEADERS)\n");
for (i = 0; i < frame->headers.nvlen; ++i)
{
fwrite(nva[i].name, 1, nva[i].namelen, stdout);
printf(": ");
fwrite(nva[i].value, 1, nva[i].valuelen, stdout);
printf("\n");
}
}
break;
case NGHTTP2_RST_STREAM:
printf("[INFO] C ----------------------------> S (RST_STREAM)\n");
break;
case NGHTTP2_GOAWAY:
printf("[INFO] C ----------------------------> S (GOAWAY)\n");
break;
}
return 0;
}
#endif
static int on_client_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
//printf("on_frame_recv_callback %d\n", frame->hd.type);
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
//fprintf(stderr, "All headers received\n");
onClientHeadersDoneCallback(user_data, frame->hd.stream_id);
}
break;
case NGHTTP2_RST_STREAM:
//printf("server send rst_stream %d\n", frame->rst_stream.error_code);
break;
case NGHTTP2_GOAWAY:
//printf("server send go away\n");
onClientConnectionCloseCallback(user_data);
break;
case NGHTTP2_PING:
//printf("ping frame received\n");
break;
}
return 0;
}
static int on_client_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data)
nghttp2_session *init_nghttp2_client_session(size_t data)
{
return onClientDataChunkRecv(user_data, stream_id, (void *)data, len);
}
nghttp2_session_callbacks *callbacks;
nghttp2_session *session;
nghttp2_session_callbacks_new(&callbacks);
init_nghttp2_callbacks(callbacks);
nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data)));
static int on_client_stream_close_callback(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
onClientStreamClose(user_data, stream_id);
return 0;
nghttp2_session_callbacks_del(callbacks);
return session;
}
static ssize_t on_client_data_source_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length, uint32_t *data_flags,
nghttp2_data_source *source, void *user_data)
int init_nghttp2_callbacks(nghttp2_session_callbacks *callbacks)
{
int ret = onClientDataSourceReadCallback(user_data, stream_id, buf, length);
if (ret == 0)
{
*data_flags = NGHTTP2_DATA_FLAG_EOF;
}
return ret;
}
static ssize_t on_server_data_source_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length, uint32_t *data_flags,
nghttp2_data_source *source, void *user_data)
{
int ret = onServerDataSourceReadCallback(user_data, stream_id, buf, length);
if (ret == 0)
{
*data_flags = NGHTTP2_DATA_FLAG_EOF;
}
return ret;
}
nghttp2_session_callbacks_set_send_callback(callbacks, on_send_callback);
int on_error_callback(nghttp2_session *session,
const char *msg, size_t len, void *user_data)
{
//printf("errmsg %*s\n", msg, len);
printf("error: %s\n", msg);
return 0;
}
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_frame_recv_callback);
void init_client_callbacks(nghttp2_session_callbacks *callbacks)
{
nghttp2_session_callbacks_set_send_callback(callbacks, client_send_callback);
//nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, on_stream_close_callback);
//nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks,
on_invalid_frame_recv_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_client_frame_recv_callback);
//nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_client_frame_send_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks, on_client_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, on_client_stream_close_callback);
callbacks, on_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
on_client_header_callback);
on_header_callback);
nghttp2_session_callbacks_set_error_callback(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_client_begin_headers_callback);
callbacks, on_begin_headers_callback);
}
nghttp2_session *init_nghttp2_client_session(size_t data)
int send_connection_header(nghttp2_session *session)
{
int ret;
nghttp2_session *session;
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
init_client_callbacks(callbacks);
ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data)));
if (session == NULL)
{
printf("c init session failed: %s\n", nghttp2_strerror(ret));
}
return session;
}
#if 0
int send_client_connection_header(nghttp2_session *session)
{
//nghttp2_settings_entry iv[1] = {
// {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}};
nghttp2_settings_entry iv[1] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}};
int rv;
/* client 24 bytes magic string will be sent by nghttp2 library */
/*
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv,
ARRLEN(iv));
*/
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, NULL, 0);
/*
if (rv != 0)
{
errx(1, "Could not submit SETTINGS: %s", nghttp2_strerror(rv));
}
*/
return rv;
}
#endif
#if 0
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen,
nghttp2_data_provider *dp)
{
int32_t stream_id;
/*
nghttp2_nv hdrs[] = {
MAKE_NV2(":method", "GET"),
MAKE_NV(":scheme", &uri[u->field_data[UF_SCHEMA].off],
u->field_data[UF_SCHEMA].len),
MAKE_NV(":authority", stream_data->authority, stream_data->authoritylen),
MAKE_NV(":path", stream_data->path, stream_data->pathlen)};
fprintf(stderr, "Request headers:\n");
print_headers(stderr, hdrs, ARRLEN(hdrs));
*/
/*
int i;
for (i = 0; i < hdrlen; i++)
{
printf("header %s: %s\n", hdrs[i].name, hdrs[i].value);
}
*/
stream_id = nghttp2_submit_request(session, NULL, hdrs,
hdrlen, dp, NULL);
/*
if (stream_id < 0)
{
errx(1, "Could not submit HTTP request: %s", nghttp2_strerror(stream_id));
}
*/
return stream_id;
}
#endif
int data_provider_set_callback(size_t cdp, size_t data, int type)
{
//nghttp2_data_provider *dp = malloc(sizeof(nghttp2_data_provider));
nghttp2_data_provider *dp = (nghttp2_data_provider *)cdp;
dp->source.ptr = (void *)((int *)data);
if (type == 0)
{
dp->read_callback = on_server_data_source_read_callback;
}
else
{
dp->read_callback = on_client_data_source_read_callback;
}
return 0;
}
int _nghttp2_submit_response(nghttp2_session *sess, int streamid,
size_t nv, size_t nvlen, nghttp2_data_provider *dp)
{
return nghttp2_submit_response(sess, streamid, (nghttp2_nv *)nv, nvlen, dp);
}
int _nghttp2_submit_request(nghttp2_session *session, const nghttp2_priority_spec *pri_spec,
size_t nva, size_t nvlen,
const nghttp2_data_provider *data_prd, void *stream_user_data)
{
return nghttp2_submit_request(session, pri_spec, (nghttp2_nv *)nva, nvlen, data_prd, stream_user_data);
int data_provider_set_callback(size_t dp, size_t data, int t){
nghttp2_data_provider *cdp = (nghttp2_data_provider*)dp;
cdp->source.ptr = (void *)data;
cdp->read_callback=on_data_source_read_callback;
}

@ -1,207 +1,48 @@
package nghttp2
/*
#include "_nghttp2.h"
*/
import "C"
import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"unsafe"
)
// ClientStream http2 client stream
type ClientStream struct {
streamID int
conn *ClientConn
cdp C.nghttp2_data_provider
dp *dataProvider
res *http.Response
resch chan *http.Response
errch chan error
closed bool
lock *sync.Mutex
}
// Read read stream data
func (s *ClientStream) Read(buf []byte) (n int, err error) {
if s.closed || s.res == nil || s.res.Body == nil {
return 0, io.EOF
}
return s.res.Body.Read(buf)
}
// Write write data to stream
func (s *ClientStream) Write(buf []byte) (n int, err error) {
if s.closed {
return 0, io.EOF
}
if s.dp != nil {
return s.dp.Write(buf)
}
return 0, fmt.Errorf("empty data provider")
}
// Close close the stream
func (s *ClientStream) Close() error {
//s.lock.Lock()
//defer s.lock.Unlock()
if s.closed {
return nil
}
s.closed = true
err := io.EOF
//log.Printf("close stream %d", int(s.streamID))
select {
case s.errch <- err:
default:
}
if s.res != nil && s.res.Body != nil {
s.res.Body.Close()
}
//log.Println("close stream done")
if s.dp != nil {
s.dp.Close()
}
if s.res != nil && s.res.Request != nil &&
s.res.Request.Method == "CONNECT" {
//log.Printf("send rst stream for %d", s.streamID)
s.conn.lock.Lock()
C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0)
s.conn.lock.Unlock()
}
return nil
}
// ServerStream server stream
type ServerStream struct {
streamID int
// headers receive done
headersDone bool
// is stream_end flag received
streamEnd bool
// request
req *http.Request
// response header
header http.Header
// response statusCode
statusCode int
// response has send
responseSend bool
// server connection
conn *ServerConn
// data provider
dp *dataProvider
cdp C.nghttp2_data_provider
closed bool
//buf *bytes.Buffer
}
// Write write data to stream,
// implements http.ResponseWriter
func (s *ServerStream) Write(buf []byte) (int, error) {
if s.closed {
return 0, io.EOF
}
if !s.responseSend {
s.WriteHeader(http.StatusOK)
}
return s.dp.Write(buf)
}
// WriteHeader set response code and send reponse,
// implements http.ResponseWriter
func (s *ServerStream) WriteHeader(code int) {
if s.closed {
return
}
if s.responseSend {
return
}
s.responseSend = true
s.statusCode = code
var nv = []C.nghttp2_nv{}
nv = append(nv, newNV(":status", fmt.Sprintf("%d", code)))
for k, v := range s.header {
//log.Println(k, v[0])
_k := strings.ToLower(k)
if _k == "host" || _k == "connection" || _k == "proxy-connection" ||
_k == "transfer-encoding" {
continue
}
nv = append(nv, newNV(k, v[0]))
}
var dp *dataProvider
dp = newDataProvider(unsafe.Pointer(&s.cdp), s.conn.lock, 0)
dp.streamID = s.streamID
dp.session = s.conn.session
s.dp = dp
s.conn.lock.Lock()
ret := C._nghttp2_submit_response(
s.conn.session, C.int(s.streamID),
C.size_t(uintptr(unsafe.Pointer(&nv[0]))),
C.size_t(len(nv)), &s.cdp)
s.conn.lock.Unlock()
if int(ret) < 0 {
panic(fmt.Sprintf("sumit response error %s",
C.GoString(C.nghttp2_strerror(ret))))
}
//log.Printf("stream %d send response", s.streamID)
}
// Header return the http.Header,
// implements http.ResponseWriter
func (s *ServerStream) Header() http.Header {
if s.header == nil {
s.header = http.Header{}
}
return s.header
}
// Close close the stream
func (s *ServerStream) Close() error {
if s.closed {
return nil
}
s.closed = true
if s.req.Body != nil {
s.req.Body.Close()
}
if s.dp != nil {
s.dp.Close()
//s.dp = nil
}
if s.req.Method == "CONNECT" {
s.conn.lock.Lock()
s.conn.lock.Unlock()
if _, ok := s.conn.streams[s.streamID]; ok {
//log.Printf("send rst stream %d", s.streamID)
C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0)
delete(s.conn.streams, s.streamID)
}
}
//log.Printf("stream %d closed", s.streamID)
return nil
}
package nghttp2
import (
"errors"
"net"
"net/http"
"time"
)
type stream struct {
streamID int
conn *Conn
dp *dataProvider
bp *bodyProvider
request *http.Request
response *http.Response
resch chan *http.Response
}
var _ net.Conn = &stream{}
func (s *stream) Read(buf []byte) (int, error) {
return 0, errors.New("not implement")
}
func (s *stream) Write(buf []byte) (int, error) {
if s.conn.isServer {
return 0, errors.New("not implement")
}
return 0, errors.New("not implement")
}
func (s *stream) Close() error {
return nil
}
func (s *stream) LocalAddr() net.Addr {
return nil
}
func (s *stream) RemoteAddr() net.Addr {
return nil
}
func (s *stream) SetDeadline(t time.Time) error {
return errors.New("not implement")
}
func (s *stream) SetReadDeadline(t time.Time) error {
return errors.New("not implement")
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return errors.New("not implement")
}

Loading…
Cancel
Save