restructure
parent
3396a7519d
commit
8d4069b1c2
@ -0,0 +1,271 @@
|
|||||||
|
package nghttp2
|
||||||
|
|
||||||
|
/*
|
||||||
|
#include "_nghttp2.h"
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OnServerDataRecvCallback callback function for libnghttp2 library
|
||||||
|
// want receive data from network,
|
||||||
|
//export OnServerDataRecvCallback
|
||||||
|
func OnServerDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer,
|
||||||
|
length C.size_t) C.ssize_t {
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
buf := make([]byte, int(length))
|
||||||
|
n, err := conn.conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
cbuf := C.CBytes(buf[:n])
|
||||||
|
defer C.free(cbuf)
|
||||||
|
C.memcpy(data, cbuf, C.size_t(n))
|
||||||
|
return C.ssize_t(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerDataSendCallback callback function for libnghttp2 library
|
||||||
|
// want send data to network
|
||||||
|
//export OnServerDataSendCallback
|
||||||
|
func OnServerDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer,
|
||||||
|
length C.size_t) C.ssize_t {
|
||||||
|
//log.Println("server data send")
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
buf := C.GoBytes(data, C.int(length))
|
||||||
|
n, err := conn.conn.Write(buf)
|
||||||
|
if err != nil {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
//log.Println("send ", n, " bytes to network ", buf)
|
||||||
|
return C.ssize_t(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerDataChunkRecv callback function for libnghttp2 library's data chunk recv
|
||||||
|
//export OnServerDataChunkRecv
|
||||||
|
func OnServerDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
|
||||||
|
data unsafe.Pointer, length C.size_t) C.int {
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
s := conn.streams[int(streamID)]
|
||||||
|
bp := s.req.Body.(*bodyProvider)
|
||||||
|
buf := C.GoBytes(data, C.int(length))
|
||||||
|
bp.Write(buf)
|
||||||
|
return C.int(length)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerBeginHeaderCallback callback function for begin begin header recv
|
||||||
|
//export OnServerBeginHeaderCallback
|
||||||
|
func OnServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
s := &ServerStream{
|
||||||
|
streamID: int(streamID),
|
||||||
|
conn: conn,
|
||||||
|
req: &http.Request{
|
||||||
|
URL: &url.URL{},
|
||||||
|
Header: http.Header{},
|
||||||
|
Proto: "HTTP/2.0",
|
||||||
|
ProtoMajor: 2,
|
||||||
|
ProtoMinor: 0,
|
||||||
|
},
|
||||||
|
//buf: new(bytes.Buffer),
|
||||||
|
}
|
||||||
|
conn.streams[int(streamID)] = s
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerHeaderCallback callback function for each header recv
|
||||||
|
//export OnServerHeaderCallback
|
||||||
|
func OnServerHeaderCallback(ptr unsafe.Pointer, streamID C.int,
|
||||||
|
name unsafe.Pointer, namelen C.int,
|
||||||
|
value unsafe.Pointer, valuelen C.int) C.int {
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
s := conn.streams[int(streamID)]
|
||||||
|
hdrname := C.GoStringN((*C.char)(name), namelen)
|
||||||
|
hdrvalue := C.GoStringN((*C.char)(value), valuelen)
|
||||||
|
hdrname = strings.ToLower(hdrname)
|
||||||
|
switch hdrname {
|
||||||
|
case ":method":
|
||||||
|
s.req.Method = hdrvalue
|
||||||
|
case ":scheme":
|
||||||
|
s.req.URL.Scheme = hdrvalue
|
||||||
|
case ":path":
|
||||||
|
s.req.RequestURI = hdrvalue
|
||||||
|
u, _ := url.ParseRequestURI(s.req.RequestURI)
|
||||||
|
scheme := s.req.URL.Scheme
|
||||||
|
*(s.req.URL) = *u
|
||||||
|
if scheme != "" {
|
||||||
|
s.req.URL.Scheme = scheme
|
||||||
|
}
|
||||||
|
case ":authority":
|
||||||
|
s.req.Host = hdrvalue
|
||||||
|
default:
|
||||||
|
s.req.Header.Add(hdrname, hdrvalue)
|
||||||
|
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerStreamEndCallback callback function for the stream when END_STREAM flag set
|
||||||
|
//export OnServerStreamEndCallback
|
||||||
|
func OnServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
s := conn.streams[int(streamID)]
|
||||||
|
s.streamEnd = true
|
||||||
|
bp := s.req.Body.(*bodyProvider)
|
||||||
|
if s.req.Method != "CONNECT" {
|
||||||
|
bp.closed = true
|
||||||
|
log.Println("stream end flag set, begin to serve")
|
||||||
|
go conn.serve(s)
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerHeadersDoneCallback callback function for the stream when all headers received
|
||||||
|
//export OnServerHeadersDoneCallback
|
||||||
|
func OnServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
s := conn.streams[int(streamID)]
|
||||||
|
s.headersDone = true
|
||||||
|
bp := &bodyProvider{
|
||||||
|
buf: new(bytes.Buffer),
|
||||||
|
lock: new(sync.Mutex),
|
||||||
|
}
|
||||||
|
s.req.Body = bp
|
||||||
|
if s.req.Method == "CONNECT" {
|
||||||
|
go conn.serve(s)
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServerStreamClose callback function for the stream when closed
|
||||||
|
//export OnServerStreamClose
|
||||||
|
func OnServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
conn := (*ServerConn)(ptr)
|
||||||
|
s := conn.streams[int(streamID)]
|
||||||
|
conn.lock.Lock()
|
||||||
|
delete(conn.streams, int(streamID))
|
||||||
|
conn.lock.Unlock()
|
||||||
|
s.Close()
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnDataSourceReadCallback callback function for libnghttp2 library
|
||||||
|
// want read data from data provider source,
|
||||||
|
// return NGHTTP2_ERR_DEFERED will cause data frame defered,
|
||||||
|
// application later call nghttp2_session_resume_data will re-quene the data frame
|
||||||
|
//
|
||||||
|
//export OnDataSourceReadCallback
|
||||||
|
func OnDataSourceReadCallback(ptr unsafe.Pointer,
|
||||||
|
buf unsafe.Pointer, length C.size_t) C.ssize_t {
|
||||||
|
//log.Println("data source read")
|
||||||
|
dp := (*dataProvider)(ptr)
|
||||||
|
gobuf := make([]byte, int(length))
|
||||||
|
n, err := dp.Read(gobuf)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if err == errAgain {
|
||||||
|
// NGHTTP2_ERR_DEFERED
|
||||||
|
return -508
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
cbuf := C.CBytes(gobuf)
|
||||||
|
defer C.free(cbuf)
|
||||||
|
C.memcpy(buf, cbuf, C.size_t(n))
|
||||||
|
return C.ssize_t(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientDataChunkRecv callback function for libnghttp2 library data chunk received,
|
||||||
|
//export OnClientDataChunkRecv
|
||||||
|
func OnClientDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
|
||||||
|
buf unsafe.Pointer, length C.size_t) C.int {
|
||||||
|
//log.Println("on data recv")
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
gobuf := C.GoBytes(buf, C.int(length))
|
||||||
|
conn.onDataRecv(gobuf, int(streamID))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientDataRecvCallback callback function for libnghttp2 library want read data from network,
|
||||||
|
//export OnClientDataRecvCallback
|
||||||
|
func OnClientDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
|
||||||
|
//log.Println("data read req", int(size))
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
buf := make([]byte, int(size))
|
||||||
|
//log.Println(conn.conn.RemoteAddr())
|
||||||
|
n, err := conn.conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
//log.Println(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
cbuf := C.CBytes(buf)
|
||||||
|
//log.Println("read from network ", n, buf[:n])
|
||||||
|
C.memcpy(data, cbuf, C.size_t(n))
|
||||||
|
return C.ssize_t(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientDataSendCallback callback function for libnghttp2 library want send data to network,
|
||||||
|
//export OnClientDataSendCallback
|
||||||
|
func OnClientDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
|
||||||
|
//log.Println("data write req ", int(size))
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
buf := C.GoBytes(data, C.int(size))
|
||||||
|
//log.Println(conn.conn.RemoteAddr())
|
||||||
|
n, err := conn.conn.Write(buf)
|
||||||
|
if err != nil {
|
||||||
|
//log.Println(err)
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
//log.Println("write data to network ", n)
|
||||||
|
return C.ssize_t(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientBeginHeaderCallback callback function for begin header receive,
|
||||||
|
//export OnClientBeginHeaderCallback
|
||||||
|
func OnClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
//log.Println("begin header")
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
conn.onBeginHeader(int(streamID))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientHeaderCallback callback function for each header received,
|
||||||
|
//export OnClientHeaderCallback
|
||||||
|
func OnClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
|
||||||
|
name unsafe.Pointer, namelen C.int,
|
||||||
|
value unsafe.Pointer, valuelen C.int) C.int {
|
||||||
|
//log.Println("header")
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
goname := C.GoBytes(name, namelen)
|
||||||
|
govalue := C.GoBytes(value, valuelen)
|
||||||
|
conn.onHeader(int(streamID), string(goname), string(govalue))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientHeadersDoneCallback callback function for the stream when all headers received,
|
||||||
|
//export OnClientHeadersDoneCallback
|
||||||
|
func OnClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
//log.Println("frame recv")
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
conn.onHeadersDone(int(streamID))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnClientStreamClose callback function for the stream when closed,
|
||||||
|
//export OnClientStreamClose
|
||||||
|
func OnClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
|
||||||
|
//log.Println("stream close")
|
||||||
|
conn := (*ClientConn)(ptr)
|
||||||
|
conn.onStreamClose(int(streamID))
|
||||||
|
return 0
|
||||||
|
}
|
@ -1,509 +0,0 @@
|
|||||||
package nghttp2
|
|
||||||
|
|
||||||
/*
|
|
||||||
#cgo pkg-config: libnghttp2
|
|
||||||
#include "_nghttp2.h"
|
|
||||||
*/
|
|
||||||
import "C"
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
"unsafe"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
errAgain = errors.New("again")
|
|
||||||
)
|
|
||||||
|
|
||||||
// ClientConn http2 connection
|
|
||||||
type ClientConn struct {
|
|
||||||
session *C.nghttp2_session
|
|
||||||
conn net.Conn
|
|
||||||
streams map[int]*ClientStream
|
|
||||||
lock *sync.Mutex
|
|
||||||
errch chan struct{}
|
|
||||||
exitch chan struct{}
|
|
||||||
err error
|
|
||||||
isServer bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientStream http2 stream
|
|
||||||
type ClientStream struct {
|
|
||||||
streamID int
|
|
||||||
cdp *C.nghttp2_data_provider
|
|
||||||
dp *dataProvider
|
|
||||||
// application read data from stream
|
|
||||||
r *io.PipeReader
|
|
||||||
// recv stream data from session
|
|
||||||
w *io.PipeWriter
|
|
||||||
res *http.Response
|
|
||||||
resch chan *http.Response
|
|
||||||
errch chan error
|
|
||||||
closed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type dataProvider struct {
|
|
||||||
buf *bytes.Buffer
|
|
||||||
closed bool
|
|
||||||
lock *sync.Mutex
|
|
||||||
session *C.nghttp2_session
|
|
||||||
streamID int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewClientConn create http2 client
|
|
||||||
func NewClientConn(c net.Conn) (*ClientConn, error) {
|
|
||||||
conn := &ClientConn{
|
|
||||||
conn: c, streams: make(map[int]*ClientStream),
|
|
||||||
lock: new(sync.Mutex),
|
|
||||||
errch: make(chan struct{}),
|
|
||||||
exitch: make(chan struct{}),
|
|
||||||
}
|
|
||||||
conn.session = C.init_client_session(
|
|
||||||
C.size_t(int(uintptr(unsafe.Pointer(conn)))))
|
|
||||||
if conn.session == nil {
|
|
||||||
return nil, fmt.Errorf("init session failed")
|
|
||||||
}
|
|
||||||
ret := C.send_client_connection_header(conn.session)
|
|
||||||
if int(ret) < 0 {
|
|
||||||
log.Printf("submit settings error: %s",
|
|
||||||
C.GoString(C.nghttp2_strerror(ret)))
|
|
||||||
}
|
|
||||||
go conn.run()
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConn) onDataRecv(buf []byte, streamID int) {
|
|
||||||
stream := c.streams[streamID]
|
|
||||||
stream.onDataRecv(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConn) onBeginHeader(streamID int) {
|
|
||||||
stream := c.streams[streamID]
|
|
||||||
stream.onBeginHeader()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConn) onHeader(streamID int, name, value string) {
|
|
||||||
stream := c.streams[streamID]
|
|
||||||
stream.onHeader(name, value)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConn) onFrameRecv(streamID int) {
|
|
||||||
stream := c.streams[streamID]
|
|
||||||
stream.onFrameRecv()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConn) onStreamClose(streamID int) {
|
|
||||||
stream, ok := c.streams[streamID]
|
|
||||||
if ok {
|
|
||||||
stream.Close()
|
|
||||||
c.lock.Lock()
|
|
||||||
delete(c.streams, streamID)
|
|
||||||
c.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close close the http2 connection
|
|
||||||
func (c *ClientConn) Close() error {
|
|
||||||
for _, s := range c.streams {
|
|
||||||
s.Close()
|
|
||||||
}
|
|
||||||
C.nghttp2_session_terminate_session(c.session, 0)
|
|
||||||
C.nghttp2_session_del(c.session)
|
|
||||||
close(c.exitch)
|
|
||||||
c.conn.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientConn) run() {
|
|
||||||
var wantRead int
|
|
||||||
var wantWrite int
|
|
||||||
var delay = 50
|
|
||||||
var ret C.int
|
|
||||||
|
|
||||||
defer close(c.errch)
|
|
||||||
|
|
||||||
datach := make(chan []byte)
|
|
||||||
errch := make(chan error)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
buf := make([]byte, 16*1024)
|
|
||||||
readloop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.exitch:
|
|
||||||
break readloop
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := c.conn.Read(buf)
|
|
||||||
if err != nil {
|
|
||||||
errch <- err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
datach <- buf[:n]
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.errch:
|
|
||||||
break loop
|
|
||||||
case err := <-errch:
|
|
||||||
c.err = err
|
|
||||||
break loop
|
|
||||||
case <-c.exitch:
|
|
||||||
break loop
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
wantWrite = int(C.nghttp2_session_want_write(c.session))
|
|
||||||
if wantWrite != 0 {
|
|
||||||
ret = C.nghttp2_session_send(c.session)
|
|
||||||
if int(ret) < 0 {
|
|
||||||
c.err = fmt.Errorf("sesion send error: %s",
|
|
||||||
C.GoString(C.nghttp2_strerror(ret)))
|
|
||||||
log.Println(c.err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wantRead = int(C.nghttp2_session_want_read(c.session))
|
|
||||||
select {
|
|
||||||
case d := <-datach:
|
|
||||||
d1 := C.CBytes(d)
|
|
||||||
ret1 := C.nghttp2_session_mem_recv(c.session,
|
|
||||||
(*C.uchar)(d1), C.size_t(int(len(d))))
|
|
||||||
C.free(d1)
|
|
||||||
if int(ret1) < 0 {
|
|
||||||
c.err = fmt.Errorf("sesion recv error: %s",
|
|
||||||
C.GoString(C.nghttp2_strerror(ret)))
|
|
||||||
log.Println(c.err)
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// make delay when no data read/write
|
|
||||||
if wantRead == 0 && wantWrite == 0 {
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Duration(delay) * time.Millisecond):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateRequest submit a request and return a http.Response, client only
|
|
||||||
func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
|
|
||||||
if c.err != nil {
|
|
||||||
return nil, c.err
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.isServer {
|
|
||||||
return nil, fmt.Errorf("only client can create new request")
|
|
||||||
}
|
|
||||||
|
|
||||||
nvIndex := 0
|
|
||||||
nvMax := 25
|
|
||||||
nva := C.new_nv_array(C.size_t(nvMax))
|
|
||||||
setNvArray(nva, nvIndex, ":method", req.Method, 0)
|
|
||||||
nvIndex++
|
|
||||||
setNvArray(nva, nvIndex, ":scheme", "https", 0)
|
|
||||||
nvIndex++
|
|
||||||
setNvArray(nva, nvIndex, ":authority", req.Host, 0)
|
|
||||||
nvIndex++
|
|
||||||
|
|
||||||
p := req.URL.Path
|
|
||||||
q := req.URL.Query().Encode()
|
|
||||||
if q != "" {
|
|
||||||
p = p + "?" + q
|
|
||||||
}
|
|
||||||
setNvArray(nva, nvIndex, ":path", p, 0)
|
|
||||||
nvIndex++
|
|
||||||
for k, v := range req.Header {
|
|
||||||
if strings.ToLower(k) == "host" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
//log.Printf("header %s: %s", k, v)
|
|
||||||
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
|
|
||||||
nvIndex++
|
|
||||||
}
|
|
||||||
var dp *dataProvider
|
|
||||||
var cdp *C.nghttp2_data_provider
|
|
||||||
if req.Body != nil {
|
|
||||||
dp, cdp = newDataProvider()
|
|
||||||
go func() {
|
|
||||||
io.Copy(dp, req.Body)
|
|
||||||
dp.Close()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp)
|
|
||||||
if dp != nil {
|
|
||||||
dp.streamID = int(streamID)
|
|
||||||
dp.session = c.session
|
|
||||||
}
|
|
||||||
C.delete_nv_array(nva)
|
|
||||||
if int(streamID) < 0 {
|
|
||||||
return nil, fmt.Errorf("submit request error: %s",
|
|
||||||
C.GoString(C.nghttp2_strerror(streamID)))
|
|
||||||
}
|
|
||||||
//log.Println("stream id ", int(streamID))
|
|
||||||
r, w := io.Pipe()
|
|
||||||
s := &ClientStream{
|
|
||||||
streamID: int(streamID),
|
|
||||||
dp: dp,
|
|
||||||
cdp: cdp,
|
|
||||||
r: r,
|
|
||||||
w: w,
|
|
||||||
resch: make(chan *http.Response),
|
|
||||||
errch: make(chan error),
|
|
||||||
}
|
|
||||||
c.lock.Lock()
|
|
||||||
c.streams[int(streamID)] = s
|
|
||||||
c.lock.Unlock()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-s.errch:
|
|
||||||
return nil, err
|
|
||||||
case res := <-s.resch:
|
|
||||||
return res, nil
|
|
||||||
case <-c.errch:
|
|
||||||
return nil, fmt.Errorf("connection error")
|
|
||||||
}
|
|
||||||
//return nil, fmt.Errorf("unknown error")
|
|
||||||
}
|
|
||||||
|
|
||||||
func setNvArray(a *C.struct_nv_array, index int,
|
|
||||||
name, value string, flags int) {
|
|
||||||
cname := C.CString(name)
|
|
||||||
cvalue := C.CString(value)
|
|
||||||
cnamelen := C.size_t(len(name))
|
|
||||||
cvaluelen := C.size_t(len(value))
|
|
||||||
cflags := C.int(flags)
|
|
||||||
//defer C.free(unsafe.Pointer(cname))
|
|
||||||
//defer C.free(unsafe.Pointer(cvalue))
|
|
||||||
C.nv_array_set(a, C.int(index), cname,
|
|
||||||
cvalue, cnamelen, cvaluelen, cflags)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read read from data provider
|
|
||||||
// this emulate a unblocking reading
|
|
||||||
// if data is not avaliable return errAgain
|
|
||||||
func (dp *dataProvider) Read(buf []byte) (n int, err error) {
|
|
||||||
dp.lock.Lock()
|
|
||||||
defer dp.lock.Unlock()
|
|
||||||
n, err = dp.buf.Read(buf)
|
|
||||||
|
|
||||||
if err != nil && !dp.closed {
|
|
||||||
return 0, errAgain
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write provider data for data provider
|
|
||||||
func (dp *dataProvider) Write(buf []byte) (n int, err error) {
|
|
||||||
dp.lock.Lock()
|
|
||||||
defer dp.lock.Unlock()
|
|
||||||
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
|
|
||||||
return dp.buf.Write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close
|
|
||||||
func (dp *dataProvider) Close() error {
|
|
||||||
dp.lock.Lock()
|
|
||||||
defer dp.lock.Unlock()
|
|
||||||
dp.closed = true
|
|
||||||
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func newDataProvider() (
|
|
||||||
*dataProvider, *C.nghttp2_data_provider) {
|
|
||||||
dp := &dataProvider{
|
|
||||||
buf: new(bytes.Buffer),
|
|
||||||
lock: new(sync.Mutex),
|
|
||||||
}
|
|
||||||
cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp))))
|
|
||||||
return dp, cdp
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ClientStream) Read(buf []byte) (n int, err error) {
|
|
||||||
return s.r.Read(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ClientStream) Write(buf []byte) (n int, err error) {
|
|
||||||
return s.dp.Write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ClientStream) onDataRecv(buf []byte) {
|
|
||||||
s.w.Write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ClientStream) onBeginHeader() {
|
|
||||||
s.res = &http.Response{
|
|
||||||
Header: make(http.Header),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ClientStream) onHeader(name, value string) {
|
|
||||||
if name == ":status" {
|
|
||||||
statusCode, _ := strconv.Atoi(value)
|
|
||||||
s.res.StatusCode = statusCode
|
|
||||||
s.res.Status = http.StatusText(statusCode)
|
|
||||||
s.res.Proto = "HTTP/2.0"
|
|
||||||
s.res.ProtoMajor = 2
|
|
||||||
s.res.ProtoMinor = 0
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.res.Header.Add(name, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ClientStream) onFrameRecv() {
|
|
||||||
s.res.Body = s
|
|
||||||
s.resch <- s.res
|
|
||||||
//log.Println("stream frame recv")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close close the stream
|
|
||||||
func (s *ClientStream) Close() error {
|
|
||||||
if s.closed {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err := io.EOF
|
|
||||||
//log.Println("close stream")
|
|
||||||
select {
|
|
||||||
case s.errch <- err:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
//log.Println("close stream resch")
|
|
||||||
close(s.resch)
|
|
||||||
//log.Println("close stream errch")
|
|
||||||
close(s.errch)
|
|
||||||
//log.Println("close pipe w")
|
|
||||||
s.w.CloseWithError(err)
|
|
||||||
//log.Println("close stream done")
|
|
||||||
if s.dp != nil {
|
|
||||||
s.dp.Close()
|
|
||||||
}
|
|
||||||
s.closed = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DataSourceRead callback function for data read from data provider source
|
|
||||||
//export DataSourceRead
|
|
||||||
func DataSourceRead(ptr unsafe.Pointer,
|
|
||||||
buf unsafe.Pointer, length C.size_t) C.ssize_t {
|
|
||||||
//log.Println("data source read")
|
|
||||||
dp := (*dataProvider)(ptr)
|
|
||||||
gobuf := make([]byte, int(length))
|
|
||||||
n, err := dp.Read(gobuf)
|
|
||||||
if err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
if err == errAgain {
|
|
||||||
// NGHTTP2_ERR_DEFERED
|
|
||||||
return -508
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
cbuf := C.CBytes(gobuf)
|
|
||||||
defer C.free(cbuf)
|
|
||||||
C.memcpy(buf, cbuf, C.size_t(n))
|
|
||||||
return C.ssize_t(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnClientDataRecv callback function for data frame received
|
|
||||||
//export OnClientDataRecv
|
|
||||||
func OnClientDataRecv(ptr unsafe.Pointer, streamID C.int,
|
|
||||||
buf unsafe.Pointer, length C.size_t) C.int {
|
|
||||||
//log.Println("on data recv")
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
gobuf := C.GoBytes(buf, C.int(length))
|
|
||||||
conn.onDataRecv(gobuf, int(streamID))
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientDataRecv callback function for session wants read data from peer
|
|
||||||
//export ClientDataRecv
|
|
||||||
func ClientDataRecv(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
|
|
||||||
//log.Println("data read req", int(size))
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
buf := make([]byte, int(size))
|
|
||||||
//log.Println(conn.conn.RemoteAddr())
|
|
||||||
n, err := conn.conn.Read(buf)
|
|
||||||
if err != nil {
|
|
||||||
//log.Println(err)
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
cbuf := C.CBytes(buf)
|
|
||||||
//log.Println("read from network ", n, buf[:n])
|
|
||||||
C.memcpy(data, cbuf, C.size_t(n))
|
|
||||||
return C.ssize_t(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientDataSend callback function for session wants send data to peer
|
|
||||||
//export ClientDataSend
|
|
||||||
func ClientDataSend(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
|
|
||||||
//log.Println("data write req ", int(size))
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
buf := C.GoBytes(data, C.int(size))
|
|
||||||
//log.Println(conn.conn.RemoteAddr())
|
|
||||||
n, err := conn.conn.Write(buf)
|
|
||||||
if err != nil {
|
|
||||||
//log.Println(err)
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
//log.Println("write data to network ", n)
|
|
||||||
return C.ssize_t(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnClientBeginHeaderCallback callback function for response
|
|
||||||
//export OnClientBeginHeaderCallback
|
|
||||||
func OnClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
//log.Println("begin header")
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
conn.onBeginHeader(int(streamID))
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnClientHeaderCallback callback function for header
|
|
||||||
//export OnClientHeaderCallback
|
|
||||||
func OnClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
|
|
||||||
name unsafe.Pointer, namelen C.int,
|
|
||||||
value unsafe.Pointer, valuelen C.int) C.int {
|
|
||||||
//log.Println("header")
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
goname := C.GoBytes(name, namelen)
|
|
||||||
govalue := C.GoBytes(value, valuelen)
|
|
||||||
conn.onHeader(int(streamID), string(goname), string(govalue))
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnClientHeadersDoneCallback callback function for begion to recv data
|
|
||||||
//export OnClientHeadersDoneCallback
|
|
||||||
func OnClientHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
//log.Println("frame recv")
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
conn.onFrameRecv(int(streamID))
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnClientStreamClose callback function for stream close
|
|
||||||
//export OnClientStreamClose
|
|
||||||
func OnClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
//log.Println("stream close")
|
|
||||||
conn := (*ClientConn)(ptr)
|
|
||||||
conn.onStreamClose(int(streamID))
|
|
||||||
return 0
|
|
||||||
}
|
|
@ -0,0 +1,417 @@
|
|||||||
|
package nghttp2
|
||||||
|
|
||||||
|
/*
|
||||||
|
#cgo pkg-config: libnghttp2
|
||||||
|
#include "_nghttp2.h"
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errAgain = errors.New("again")
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClientConn http2 client connection
|
||||||
|
type ClientConn struct {
|
||||||
|
session *C.nghttp2_session
|
||||||
|
conn net.Conn
|
||||||
|
streams map[int]*ClientStream
|
||||||
|
lock *sync.Mutex
|
||||||
|
errch chan struct{}
|
||||||
|
exitch chan struct{}
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientConn create http2 client
|
||||||
|
func NewClientConn(c net.Conn) (*ClientConn, error) {
|
||||||
|
conn := &ClientConn{
|
||||||
|
conn: c, streams: make(map[int]*ClientStream),
|
||||||
|
lock: new(sync.Mutex),
|
||||||
|
errch: make(chan struct{}),
|
||||||
|
exitch: make(chan struct{}),
|
||||||
|
}
|
||||||
|
conn.session = C.init_client_session(
|
||||||
|
C.size_t(int(uintptr(unsafe.Pointer(conn)))))
|
||||||
|
if conn.session == nil {
|
||||||
|
return nil, fmt.Errorf("init session failed")
|
||||||
|
}
|
||||||
|
ret := C.send_client_connection_header(conn.session)
|
||||||
|
if int(ret) < 0 {
|
||||||
|
log.Printf("submit settings error: %s",
|
||||||
|
C.GoString(C.nghttp2_strerror(ret)))
|
||||||
|
}
|
||||||
|
go conn.run()
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientConn) onDataRecv(buf []byte, streamID int) {
|
||||||
|
s := c.streams[streamID]
|
||||||
|
if s.res.Body == nil {
|
||||||
|
log.Println("empty body")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if bp, ok := s.res.Body.(*bodyProvider); ok {
|
||||||
|
bp.Write(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientConn) onBeginHeader(streamID int) {
|
||||||
|
s := c.streams[streamID]
|
||||||
|
|
||||||
|
s.res = &http.Response{
|
||||||
|
Header: make(http.Header),
|
||||||
|
Body: &bodyProvider{
|
||||||
|
buf: new(bytes.Buffer),
|
||||||
|
lock: new(sync.Mutex),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientConn) onHeader(streamID int, name, value string) {
|
||||||
|
s := c.streams[streamID]
|
||||||
|
if name == ":status" {
|
||||||
|
statusCode, _ := strconv.Atoi(value)
|
||||||
|
s.res.StatusCode = statusCode
|
||||||
|
s.res.Status = http.StatusText(statusCode)
|
||||||
|
s.res.Proto = "HTTP/2.0"
|
||||||
|
s.res.ProtoMajor = 2
|
||||||
|
s.res.ProtoMinor = 0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.res.Header.Add(name, value)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientConn) onHeadersDone(streamID int) {
|
||||||
|
s := c.streams[streamID]
|
||||||
|
s.resch <- s.res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientConn) onStreamClose(streamID int) {
|
||||||
|
stream, ok := c.streams[streamID]
|
||||||
|
if ok {
|
||||||
|
stream.Close()
|
||||||
|
c.lock.Lock()
|
||||||
|
delete(c.streams, streamID)
|
||||||
|
c.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close close the http2 connection
|
||||||
|
func (c *ClientConn) Close() error {
|
||||||
|
for _, s := range c.streams {
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
C.nghttp2_session_terminate_session(c.session, 0)
|
||||||
|
C.nghttp2_session_del(c.session)
|
||||||
|
close(c.exitch)
|
||||||
|
c.conn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientConn) run() {
|
||||||
|
var wantRead int
|
||||||
|
var wantWrite int
|
||||||
|
var delay = 50
|
||||||
|
var ret C.int
|
||||||
|
|
||||||
|
defer close(c.errch)
|
||||||
|
|
||||||
|
datach := make(chan []byte)
|
||||||
|
errch := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
buf := make([]byte, 16*1024)
|
||||||
|
readloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.exitch:
|
||||||
|
break readloop
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := c.conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
errch <- err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
datach <- buf[:n]
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.errch:
|
||||||
|
break loop
|
||||||
|
case err := <-errch:
|
||||||
|
c.err = err
|
||||||
|
break loop
|
||||||
|
case <-c.exitch:
|
||||||
|
break loop
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
wantWrite = int(C.nghttp2_session_want_write(c.session))
|
||||||
|
if wantWrite != 0 {
|
||||||
|
ret = C.nghttp2_session_send(c.session)
|
||||||
|
if int(ret) < 0 {
|
||||||
|
c.err = fmt.Errorf("sesion send error: %s",
|
||||||
|
C.GoString(C.nghttp2_strerror(ret)))
|
||||||
|
log.Println(c.err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wantRead = int(C.nghttp2_session_want_read(c.session))
|
||||||
|
select {
|
||||||
|
case d := <-datach:
|
||||||
|
d1 := C.CBytes(d)
|
||||||
|
ret1 := C.nghttp2_session_mem_recv(c.session,
|
||||||
|
(*C.uchar)(d1), C.size_t(int(len(d))))
|
||||||
|
C.free(d1)
|
||||||
|
if int(ret1) < 0 {
|
||||||
|
c.err = fmt.Errorf("sesion recv error: %s",
|
||||||
|
C.GoString(C.nghttp2_strerror(ret)))
|
||||||
|
log.Println(c.err)
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// make delay when no data read/write
|
||||||
|
if wantRead == 0 && wantWrite == 0 {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Duration(delay) * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateRequest submit a request and return a http.Response,
|
||||||
|
func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
|
||||||
|
if c.err != nil {
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
|
|
||||||
|
nvIndex := 0
|
||||||
|
nvMax := 25
|
||||||
|
nva := C.new_nv_array(C.size_t(nvMax))
|
||||||
|
setNvArray(nva, nvIndex, ":method", req.Method, 0)
|
||||||
|
nvIndex++
|
||||||
|
setNvArray(nva, nvIndex, ":scheme", "https", 0)
|
||||||
|
nvIndex++
|
||||||
|
setNvArray(nva, nvIndex, ":authority", req.Host, 0)
|
||||||
|
nvIndex++
|
||||||
|
|
||||||
|
p := req.URL.Path
|
||||||
|
q := req.URL.Query().Encode()
|
||||||
|
if q != "" {
|
||||||
|
p = p + "?" + q
|
||||||
|
}
|
||||||
|
setNvArray(nva, nvIndex, ":path", p, 0)
|
||||||
|
nvIndex++
|
||||||
|
for k, v := range req.Header {
|
||||||
|
if strings.ToLower(k) == "host" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//log.Printf("header %s: %s", k, v)
|
||||||
|
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
|
||||||
|
nvIndex++
|
||||||
|
}
|
||||||
|
var dp *dataProvider
|
||||||
|
var cdp *C.nghttp2_data_provider
|
||||||
|
if req.Body != nil {
|
||||||
|
dp, cdp = newDataProvider()
|
||||||
|
go func() {
|
||||||
|
io.Copy(dp, req.Body)
|
||||||
|
dp.Close()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp)
|
||||||
|
if dp != nil {
|
||||||
|
dp.streamID = int(streamID)
|
||||||
|
dp.session = c.session
|
||||||
|
}
|
||||||
|
C.delete_nv_array(nva)
|
||||||
|
if int(streamID) < 0 {
|
||||||
|
return nil, fmt.Errorf("submit request error: %s",
|
||||||
|
C.GoString(C.nghttp2_strerror(streamID)))
|
||||||
|
}
|
||||||
|
//log.Println("stream id ", int(streamID))
|
||||||
|
s := &ClientStream{
|
||||||
|
streamID: int(streamID),
|
||||||
|
dp: dp,
|
||||||
|
cdp: cdp,
|
||||||
|
resch: make(chan *http.Response),
|
||||||
|
errch: make(chan error),
|
||||||
|
}
|
||||||
|
c.lock.Lock()
|
||||||
|
c.streams[int(streamID)] = s
|
||||||
|
c.lock.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-s.errch:
|
||||||
|
return nil, err
|
||||||
|
case res := <-s.resch:
|
||||||
|
return res, nil
|
||||||
|
case <-c.errch:
|
||||||
|
return nil, fmt.Errorf("connection error")
|
||||||
|
}
|
||||||
|
//return nil, fmt.Errorf("unknown error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerConn server connection
|
||||||
|
type ServerConn struct {
|
||||||
|
// Handler handler to handle request
|
||||||
|
Handler http.Handler
|
||||||
|
|
||||||
|
session *C.nghttp2_session
|
||||||
|
streams map[int]*ServerStream
|
||||||
|
lock *sync.Mutex
|
||||||
|
conn net.Conn
|
||||||
|
errch chan struct{}
|
||||||
|
exitch chan struct{}
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServerConn create new server connection
|
||||||
|
func NewServerConn(c net.Conn, handler http.Handler) (*ServerConn, error) {
|
||||||
|
conn := &ServerConn{
|
||||||
|
conn: c,
|
||||||
|
Handler: handler,
|
||||||
|
streams: make(map[int]*ServerStream),
|
||||||
|
lock: new(sync.Mutex),
|
||||||
|
errch: make(chan struct{}),
|
||||||
|
exitch: make(chan struct{}),
|
||||||
|
}
|
||||||
|
conn.session = C.init_server_session(C.size_t(uintptr(unsafe.Pointer(conn))))
|
||||||
|
if conn.session == nil {
|
||||||
|
return nil, fmt.Errorf("init session failed")
|
||||||
|
}
|
||||||
|
//log.Println("send server connection header")
|
||||||
|
ret := C.send_server_connection_header(conn.session)
|
||||||
|
if int(ret) < 0 {
|
||||||
|
log.Println(C.GoString(C.nghttp2_strerror(ret)))
|
||||||
|
return nil, fmt.Errorf("send connection header failed")
|
||||||
|
}
|
||||||
|
//go conn.run()
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ServerConn) serve(s *ServerStream) {
|
||||||
|
var handler = c.Handler
|
||||||
|
if c.Handler == nil {
|
||||||
|
handler = http.DefaultServeMux
|
||||||
|
}
|
||||||
|
handler.ServeHTTP(s, s.req)
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close close the server connection
|
||||||
|
func (c *ServerConn) Close() error {
|
||||||
|
for _, s := range c.streams {
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
C.nghttp2_session_terminate_session(c.session, 0)
|
||||||
|
C.nghttp2_session_del(c.session)
|
||||||
|
close(c.exitch)
|
||||||
|
c.conn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run run the server loop
|
||||||
|
func (c *ServerConn) Run() {
|
||||||
|
var wantRead int
|
||||||
|
var wantWrite int
|
||||||
|
var delay = 50
|
||||||
|
var ret C.int
|
||||||
|
|
||||||
|
defer c.Close()
|
||||||
|
defer close(c.errch)
|
||||||
|
|
||||||
|
datach := make(chan []byte)
|
||||||
|
errch := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
buf := make([]byte, 16*1024)
|
||||||
|
readloop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.exitch:
|
||||||
|
break readloop
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := c.conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
errch <- err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
datach <- buf[:n]
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.errch:
|
||||||
|
break loop
|
||||||
|
case err := <-errch:
|
||||||
|
c.err = err
|
||||||
|
break loop
|
||||||
|
case <-c.exitch:
|
||||||
|
break loop
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
wantWrite = int(C.nghttp2_session_want_write(c.session))
|
||||||
|
if wantWrite != 0 {
|
||||||
|
ret = C.nghttp2_session_send(c.session)
|
||||||
|
if int(ret) < 0 {
|
||||||
|
c.err = fmt.Errorf("sesion send error: %s",
|
||||||
|
C.GoString(C.nghttp2_strerror(ret)))
|
||||||
|
log.Println(c.err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wantRead = int(C.nghttp2_session_want_read(c.session))
|
||||||
|
select {
|
||||||
|
case d := <-datach:
|
||||||
|
d1 := C.CBytes(d)
|
||||||
|
ret1 := C.nghttp2_session_mem_recv(c.session,
|
||||||
|
(*C.uchar)(d1), C.size_t(int(len(d))))
|
||||||
|
C.free(d1)
|
||||||
|
if int(ret1) < 0 {
|
||||||
|
c.err = fmt.Errorf("sesion recv error: %s",
|
||||||
|
C.GoString(C.nghttp2_strerror(ret)))
|
||||||
|
log.Println(c.err)
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// make delay when no data read/write
|
||||||
|
if wantRead == 0 && wantWrite == 0 {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Duration(delay) * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,120 @@
|
|||||||
|
package nghttp2
|
||||||
|
|
||||||
|
/*
|
||||||
|
#include "_nghttp2.h"
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// dataProvider provider data for libnghttp2 library
|
||||||
|
// libnghttp2 callback will Read to read the data,
|
||||||
|
// application call Write to provider data,
|
||||||
|
// application call Close will cause Read return io.EOF
|
||||||
|
type dataProvider struct {
|
||||||
|
buf *bytes.Buffer
|
||||||
|
closed bool
|
||||||
|
lock *sync.Mutex
|
||||||
|
session *C.nghttp2_session
|
||||||
|
streamID int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read read from data provider
|
||||||
|
func (dp *dataProvider) Read(buf []byte) (n int, err error) {
|
||||||
|
dp.lock.Lock()
|
||||||
|
defer dp.lock.Unlock()
|
||||||
|
n, err = dp.buf.Read(buf)
|
||||||
|
|
||||||
|
if err != nil && !dp.closed {
|
||||||
|
return 0, errAgain
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write provider data for data provider
|
||||||
|
func (dp *dataProvider) Write(buf []byte) (n int, err error) {
|
||||||
|
dp.lock.Lock()
|
||||||
|
defer dp.lock.Unlock()
|
||||||
|
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
|
||||||
|
return dp.buf.Write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close end to provide data
|
||||||
|
func (dp *dataProvider) Close() error {
|
||||||
|
dp.lock.Lock()
|
||||||
|
defer dp.lock.Unlock()
|
||||||
|
dp.closed = true
|
||||||
|
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDataProvider() (
|
||||||
|
*dataProvider, *C.nghttp2_data_provider) {
|
||||||
|
dp := &dataProvider{
|
||||||
|
buf: new(bytes.Buffer),
|
||||||
|
lock: new(sync.Mutex),
|
||||||
|
}
|
||||||
|
cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp))))
|
||||||
|
return dp, cdp
|
||||||
|
}
|
||||||
|
|
||||||
|
// bodyProvider provide data for http body
|
||||||
|
// Read will block when data not yet avaliable
|
||||||
|
type bodyProvider struct {
|
||||||
|
buf *bytes.Buffer
|
||||||
|
closed bool
|
||||||
|
lock *sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read read data from provider
|
||||||
|
// will block when data not yet avaliable
|
||||||
|
func (bp *bodyProvider) Read(buf []byte) (int, error) {
|
||||||
|
for {
|
||||||
|
bp.lock.Lock()
|
||||||
|
n, err := bp.buf.Read(buf)
|
||||||
|
bp.lock.Unlock()
|
||||||
|
if err != nil && !bp.closed {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write provide data for dataProvider
|
||||||
|
// libnghttp2 data chunk recv callback will call this
|
||||||
|
func (bp *bodyProvider) Write(buf []byte) (int, error) {
|
||||||
|
bp.lock.Lock()
|
||||||
|
defer bp.lock.Unlock()
|
||||||
|
return bp.buf.Write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close end to provide data
|
||||||
|
func (bp *bodyProvider) Close() error {
|
||||||
|
/*
|
||||||
|
if c, ok := bp.w.(io.Closer); ok{
|
||||||
|
return c.Close()
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
bp.closed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// setNvArray set the array for nghttp2_nv array
|
||||||
|
func setNvArray(a *C.struct_nv_array, index int,
|
||||||
|
name, value string, flags int) {
|
||||||
|
cname := C.CString(name)
|
||||||
|
cvalue := C.CString(value)
|
||||||
|
cnamelen := C.size_t(len(name))
|
||||||
|
cvaluelen := C.size_t(len(value))
|
||||||
|
cflags := C.int(flags)
|
||||||
|
|
||||||
|
// note: cname and cvalue will freed in C.delete_nv_array
|
||||||
|
|
||||||
|
C.nv_array_set(a, C.int(index), cname,
|
||||||
|
cvalue, cnamelen, cvaluelen, cflags)
|
||||||
|
}
|
@ -1,132 +0,0 @@
|
|||||||
#include "_nghttp2.h"
|
|
||||||
|
|
||||||
static ssize_t server_send_callback(nghttp2_session *session,
|
|
||||||
const uint8_t *data, size_t length,
|
|
||||||
int flags, void *user_data)
|
|
||||||
{
|
|
||||||
return ServerDataSend(user_data, (void *)data, length);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int on_server_frame_recv_callback(nghttp2_session *session,
|
|
||||||
const nghttp2_frame *frame,
|
|
||||||
void *user_data)
|
|
||||||
{
|
|
||||||
switch (frame->hd.type)
|
|
||||||
{
|
|
||||||
case NGHTTP2_HEADERS:
|
|
||||||
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST)
|
|
||||||
{
|
|
||||||
OnServerHeadersDoneCallback(user_data, frame->hd.stream_id);
|
|
||||||
}
|
|
||||||
case NGHTTP2_DATA:
|
|
||||||
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
|
|
||||||
{
|
|
||||||
OnServerStreamEndCallback(user_data, frame->hd.stream_id);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int on_server_stream_close_callback(nghttp2_session *session,
|
|
||||||
int32_t stream_id,
|
|
||||||
uint32_t error_code,
|
|
||||||
void *user_data)
|
|
||||||
|
|
||||||
{
|
|
||||||
OnServerStreamClose(user_data, stream_id);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int on_server_header_callback(nghttp2_session *session,
|
|
||||||
const nghttp2_frame *frame,
|
|
||||||
const uint8_t *name, size_t namelen,
|
|
||||||
const uint8_t *value,
|
|
||||||
size_t valuelen, uint8_t flags,
|
|
||||||
void *user_data)
|
|
||||||
{
|
|
||||||
switch (frame->hd.type)
|
|
||||||
{
|
|
||||||
case NGHTTP2_HEADERS:
|
|
||||||
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST)
|
|
||||||
{
|
|
||||||
OnServerHeaderCallback(user_data, frame->hd.stream_id,
|
|
||||||
(void *)name, namelen, (void *)value, valuelen);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int on_server_data_chunk_recv_callback(nghttp2_session *session,
|
|
||||||
uint8_t flags,
|
|
||||||
int32_t stream_id,
|
|
||||||
const uint8_t *data,
|
|
||||||
size_t len, void *user_data)
|
|
||||||
{
|
|
||||||
return OnServerDataRecv(user_data, stream_id, (void *)data, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int on_server_begin_headers_callback(nghttp2_session *session,
|
|
||||||
const nghttp2_frame *frame,
|
|
||||||
void *user_data)
|
|
||||||
{
|
|
||||||
|
|
||||||
switch (frame->hd.type)
|
|
||||||
{
|
|
||||||
case NGHTTP2_HEADERS:
|
|
||||||
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST)
|
|
||||||
{
|
|
||||||
OnServerBeginHeaderCallback(user_data, frame->hd.stream_id);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
nghttp2_session *init_server_session(size_t data)
|
|
||||||
{
|
|
||||||
nghttp2_session_callbacks *callbacks;
|
|
||||||
nghttp2_session *session;
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_new(&callbacks);
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_set_send_callback(callbacks, server_send_callback);
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
|
|
||||||
on_server_frame_recv_callback);
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_set_on_stream_close_callback(
|
|
||||||
callbacks, on_server_stream_close_callback);
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
|
|
||||||
callbacks, on_server_data_chunk_recv_callback);
|
|
||||||
nghttp2_session_callbacks_set_on_header_callback(callbacks,
|
|
||||||
on_server_header_callback);
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_set_on_begin_headers_callback(
|
|
||||||
callbacks, on_server_begin_headers_callback);
|
|
||||||
|
|
||||||
nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data)));
|
|
||||||
|
|
||||||
nghttp2_session_callbacks_del(callbacks);
|
|
||||||
return session;
|
|
||||||
}
|
|
||||||
|
|
||||||
int send_server_connection_header(nghttp2_session *session)
|
|
||||||
{
|
|
||||||
nghttp2_settings_entry iv[1] = {
|
|
||||||
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}};
|
|
||||||
int rv;
|
|
||||||
|
|
||||||
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv,
|
|
||||||
ARRLEN(iv));
|
|
||||||
return rv;
|
|
||||||
/*
|
|
||||||
if (rv != 0) {
|
|
||||||
// warnx("Fatal error: %s", nghttp2_strerror(rv));
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
*/
|
|
||||||
}
|
|
@ -1,438 +0,0 @@
|
|||||||
package nghttp2
|
|
||||||
|
|
||||||
/*
|
|
||||||
#cgo pkg-config: libnghttp2
|
|
||||||
#include "_nghttp2.h"
|
|
||||||
*/
|
|
||||||
import "C"
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
"unsafe"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ServerConn server connection
|
|
||||||
type ServerConn struct {
|
|
||||||
// Handler handler to handle request
|
|
||||||
Handler http.Handler
|
|
||||||
|
|
||||||
session *C.nghttp2_session
|
|
||||||
streams map[int]*ServerStream
|
|
||||||
lock *sync.Mutex
|
|
||||||
conn net.Conn
|
|
||||||
errch chan struct{}
|
|
||||||
exitch chan struct{}
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerStream server stream
|
|
||||||
type ServerStream struct {
|
|
||||||
streamID int
|
|
||||||
// headers receive done
|
|
||||||
headersDone bool
|
|
||||||
// is stream_end flag received
|
|
||||||
streamEnd bool
|
|
||||||
// request
|
|
||||||
req *http.Request
|
|
||||||
// response header
|
|
||||||
header http.Header
|
|
||||||
// response statusCode
|
|
||||||
statusCode int
|
|
||||||
// response has send
|
|
||||||
responseSend bool
|
|
||||||
|
|
||||||
// server connection
|
|
||||||
conn *ServerConn
|
|
||||||
|
|
||||||
// data provider
|
|
||||||
dp *dataProvider
|
|
||||||
cdp *C.nghttp2_data_provider
|
|
||||||
|
|
||||||
closed bool
|
|
||||||
//buf *bytes.Buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
type bodyProvider struct {
|
|
||||||
buf *bytes.Buffer
|
|
||||||
closed bool
|
|
||||||
lock *sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bp *bodyProvider) Read(buf []byte) (int, error) {
|
|
||||||
for {
|
|
||||||
bp.lock.Lock()
|
|
||||||
n, err := bp.buf.Read(buf)
|
|
||||||
bp.lock.Unlock()
|
|
||||||
if err != nil && !bp.closed {
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bp *bodyProvider) Write(buf []byte) (int, error) {
|
|
||||||
bp.lock.Lock()
|
|
||||||
defer bp.lock.Unlock()
|
|
||||||
return bp.buf.Write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bp *bodyProvider) Close() error {
|
|
||||||
/*
|
|
||||||
if c, ok := bp.w.(io.Closer); ok{
|
|
||||||
return c.Close()
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
bp.closed = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewServerConn create new server connection
|
|
||||||
func NewServerConn(c net.Conn, handler http.Handler) (*ServerConn, error) {
|
|
||||||
conn := &ServerConn{
|
|
||||||
conn: c,
|
|
||||||
Handler: handler,
|
|
||||||
streams: make(map[int]*ServerStream),
|
|
||||||
lock: new(sync.Mutex),
|
|
||||||
errch: make(chan struct{}),
|
|
||||||
exitch: make(chan struct{}),
|
|
||||||
}
|
|
||||||
conn.session = C.init_server_session(C.size_t(uintptr(unsafe.Pointer(conn))))
|
|
||||||
if conn.session == nil {
|
|
||||||
return nil, fmt.Errorf("init session failed")
|
|
||||||
}
|
|
||||||
//log.Println("send server connection header")
|
|
||||||
ret := C.send_server_connection_header(conn.session)
|
|
||||||
if int(ret) < 0 {
|
|
||||||
log.Println(C.GoString(C.nghttp2_strerror(ret)))
|
|
||||||
return nil, fmt.Errorf("send connection header failed")
|
|
||||||
}
|
|
||||||
//go conn.run()
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ServerConn) serve(s *ServerStream) {
|
|
||||||
var handler = c.Handler
|
|
||||||
if c.Handler == nil {
|
|
||||||
handler = http.DefaultServeMux
|
|
||||||
}
|
|
||||||
handler.ServeHTTP(s, s.req)
|
|
||||||
s.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close close the server connection
|
|
||||||
func (c *ServerConn) Close() error {
|
|
||||||
for _, s := range c.streams {
|
|
||||||
s.Close()
|
|
||||||
}
|
|
||||||
C.nghttp2_session_terminate_session(c.session, 0)
|
|
||||||
C.nghttp2_session_del(c.session)
|
|
||||||
close(c.exitch)
|
|
||||||
c.conn.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run run the server loop
|
|
||||||
func (c *ServerConn) Run() {
|
|
||||||
var wantRead int
|
|
||||||
var wantWrite int
|
|
||||||
var delay = 50
|
|
||||||
var ret C.int
|
|
||||||
|
|
||||||
defer c.Close()
|
|
||||||
defer close(c.errch)
|
|
||||||
|
|
||||||
datach := make(chan []byte)
|
|
||||||
errch := make(chan error)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
buf := make([]byte, 16*1024)
|
|
||||||
readloop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.exitch:
|
|
||||||
break readloop
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := c.conn.Read(buf)
|
|
||||||
if err != nil {
|
|
||||||
errch <- err
|
|
||||||
break
|
|
||||||
}
|
|
||||||
datach <- buf[:n]
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.errch:
|
|
||||||
break loop
|
|
||||||
case err := <-errch:
|
|
||||||
c.err = err
|
|
||||||
break loop
|
|
||||||
case <-c.exitch:
|
|
||||||
break loop
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
wantWrite = int(C.nghttp2_session_want_write(c.session))
|
|
||||||
if wantWrite != 0 {
|
|
||||||
ret = C.nghttp2_session_send(c.session)
|
|
||||||
if int(ret) < 0 {
|
|
||||||
c.err = fmt.Errorf("sesion send error: %s",
|
|
||||||
C.GoString(C.nghttp2_strerror(ret)))
|
|
||||||
log.Println(c.err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wantRead = int(C.nghttp2_session_want_read(c.session))
|
|
||||||
select {
|
|
||||||
case d := <-datach:
|
|
||||||
d1 := C.CBytes(d)
|
|
||||||
ret1 := C.nghttp2_session_mem_recv(c.session,
|
|
||||||
(*C.uchar)(d1), C.size_t(int(len(d))))
|
|
||||||
C.free(d1)
|
|
||||||
if int(ret1) < 0 {
|
|
||||||
c.err = fmt.Errorf("sesion recv error: %s",
|
|
||||||
C.GoString(C.nghttp2_strerror(ret)))
|
|
||||||
log.Println(c.err)
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// make delay when no data read/write
|
|
||||||
if wantRead == 0 && wantWrite == 0 {
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Duration(delay) * time.Millisecond):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write implements http.ResponseWriter
|
|
||||||
func (s *ServerStream) Write(buf []byte) (int, error) {
|
|
||||||
if !s.responseSend {
|
|
||||||
s.WriteHeader(http.StatusOK)
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
//log.Printf("stream %d, send %d bytes", s.streamID, len(buf))
|
|
||||||
if s.buf.Len() > 2048 {
|
|
||||||
s.dp.Write(s.buf.Bytes())
|
|
||||||
s.buf.Reset()
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(buf) < 2048 {
|
|
||||||
s.buf.Write(buf)
|
|
||||||
return len(buf), nil
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
return s.dp.Write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteHeader implements http.ResponseWriter
|
|
||||||
func (s *ServerStream) WriteHeader(code int) {
|
|
||||||
s.statusCode = code
|
|
||||||
nvIndex := 0
|
|
||||||
nvMax := 25
|
|
||||||
nva := C.new_nv_array(C.size_t(nvMax))
|
|
||||||
setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0)
|
|
||||||
nvIndex++
|
|
||||||
|
|
||||||
for k, v := range s.header {
|
|
||||||
if strings.ToLower(k) == "host" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
//log.Printf("header %s: %s", k, v)
|
|
||||||
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
|
|
||||||
nvIndex++
|
|
||||||
}
|
|
||||||
var dp *dataProvider
|
|
||||||
var cdp *C.nghttp2_data_provider
|
|
||||||
dp, cdp = newDataProvider()
|
|
||||||
dp.streamID = s.streamID
|
|
||||||
dp.session = s.conn.session
|
|
||||||
s.dp = dp
|
|
||||||
s.cdp = cdp
|
|
||||||
ret := C.nghttp2_submit_response(
|
|
||||||
s.conn.session, C.int(s.streamID), nva.nv, C.size_t(nvIndex), cdp)
|
|
||||||
C.delete_nv_array(nva)
|
|
||||||
if int(ret) < 0 {
|
|
||||||
panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret))))
|
|
||||||
}
|
|
||||||
s.responseSend = true
|
|
||||||
log.Printf("stream %d send response", s.streamID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Header implements http.ResponseWriter
|
|
||||||
func (s *ServerStream) Header() http.Header {
|
|
||||||
if s.header == nil {
|
|
||||||
s.header = http.Header{}
|
|
||||||
}
|
|
||||||
return s.header
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close close the stream
|
|
||||||
func (s *ServerStream) Close() error {
|
|
||||||
if s.closed {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
//C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0)
|
|
||||||
s.req.Body.Close()
|
|
||||||
if s.dp != nil {
|
|
||||||
s.dp.Close()
|
|
||||||
}
|
|
||||||
s.closed = true
|
|
||||||
log.Printf("stream %d closed", s.streamID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerDataRecv callback function for receive data from network
|
|
||||||
//export ServerDataRecv
|
|
||||||
func ServerDataRecv(ptr unsafe.Pointer, data unsafe.Pointer,
|
|
||||||
length C.size_t) C.ssize_t {
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
buf := make([]byte, int(length))
|
|
||||||
n, err := conn.conn.Read(buf)
|
|
||||||
if err != nil {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
cbuf := C.CBytes(buf[:n])
|
|
||||||
defer C.free(cbuf)
|
|
||||||
C.memcpy(data, cbuf, C.size_t(n))
|
|
||||||
return C.ssize_t(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServerDataSend callback function for send data to network
|
|
||||||
//export ServerDataSend
|
|
||||||
func ServerDataSend(ptr unsafe.Pointer, data unsafe.Pointer,
|
|
||||||
length C.size_t) C.ssize_t {
|
|
||||||
//log.Println("server data send")
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
buf := C.GoBytes(data, C.int(length))
|
|
||||||
n, err := conn.conn.Write(buf)
|
|
||||||
if err != nil {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
//log.Println("send ", n, " bytes to network ", buf)
|
|
||||||
return C.ssize_t(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnServerDataRecv callback function for data recv
|
|
||||||
//export OnServerDataRecv
|
|
||||||
func OnServerDataRecv(ptr unsafe.Pointer, streamID C.int,
|
|
||||||
data unsafe.Pointer, length C.size_t) C.int {
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
s := conn.streams[int(streamID)]
|
|
||||||
bp := s.req.Body.(*bodyProvider)
|
|
||||||
buf := C.GoBytes(data, C.int(length))
|
|
||||||
bp.Write(buf)
|
|
||||||
return C.int(length)
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnServerBeginHeaderCallback callback function for begin header
|
|
||||||
//export OnServerBeginHeaderCallback
|
|
||||||
func OnServerBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
s := &ServerStream{
|
|
||||||
streamID: int(streamID),
|
|
||||||
conn: conn,
|
|
||||||
req: &http.Request{
|
|
||||||
URL: &url.URL{},
|
|
||||||
Header: http.Header{},
|
|
||||||
Proto: "HTTP/2.0",
|
|
||||||
ProtoMajor: 2,
|
|
||||||
ProtoMinor: 0,
|
|
||||||
},
|
|
||||||
//buf: new(bytes.Buffer),
|
|
||||||
}
|
|
||||||
conn.streams[int(streamID)] = s
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnServerHeaderCallback callback function for header
|
|
||||||
//export OnServerHeaderCallback
|
|
||||||
func OnServerHeaderCallback(ptr unsafe.Pointer, streamID C.int,
|
|
||||||
name unsafe.Pointer, namelen C.int,
|
|
||||||
value unsafe.Pointer, valuelen C.int) C.int {
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
s := conn.streams[int(streamID)]
|
|
||||||
hdrname := C.GoStringN((*C.char)(name), namelen)
|
|
||||||
hdrvalue := C.GoStringN((*C.char)(value), valuelen)
|
|
||||||
hdrname = strings.ToLower(hdrname)
|
|
||||||
switch hdrname {
|
|
||||||
case ":method":
|
|
||||||
s.req.Method = hdrvalue
|
|
||||||
case ":scheme":
|
|
||||||
s.req.URL.Scheme = hdrvalue
|
|
||||||
case ":path":
|
|
||||||
s.req.RequestURI = hdrvalue
|
|
||||||
u, _ := url.ParseRequestURI(s.req.RequestURI)
|
|
||||||
scheme := s.req.URL.Scheme
|
|
||||||
*(s.req.URL) = *u
|
|
||||||
if scheme != "" {
|
|
||||||
s.req.URL.Scheme = scheme
|
|
||||||
}
|
|
||||||
case ":authority":
|
|
||||||
s.req.Host = hdrvalue
|
|
||||||
default:
|
|
||||||
s.req.Header.Add(hdrname, hdrvalue)
|
|
||||||
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnServerStreamEndCallback callback function for frame received
|
|
||||||
//export OnServerStreamEndCallback
|
|
||||||
func OnServerStreamEndCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
s := conn.streams[int(streamID)]
|
|
||||||
s.streamEnd = true
|
|
||||||
bp := s.req.Body.(*bodyProvider)
|
|
||||||
if s.req.Method != "CONNECT" {
|
|
||||||
bp.closed = true
|
|
||||||
log.Println("stream end flag set, begin to serve")
|
|
||||||
go conn.serve(s)
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnServerHeadersDoneCallback callback function for all headers received
|
|
||||||
//export OnServerHeadersDoneCallback
|
|
||||||
func OnServerHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
s := conn.streams[int(streamID)]
|
|
||||||
s.headersDone = true
|
|
||||||
bp := &bodyProvider{
|
|
||||||
buf: new(bytes.Buffer),
|
|
||||||
lock: new(sync.Mutex),
|
|
||||||
}
|
|
||||||
s.req.Body = bp
|
|
||||||
if s.req.Method == "CONNECT" {
|
|
||||||
go conn.serve(s)
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnServerStreamClose callback function for stream close
|
|
||||||
//export OnServerStreamClose
|
|
||||||
func OnServerStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
|
|
||||||
conn := (*ServerConn)(ptr)
|
|
||||||
s := conn.streams[int(streamID)]
|
|
||||||
conn.lock.Lock()
|
|
||||||
delete(conn.streams, int(streamID))
|
|
||||||
conn.lock.Unlock()
|
|
||||||
s.Close()
|
|
||||||
return 0
|
|
||||||
}
|
|
@ -0,0 +1,172 @@
|
|||||||
|
package nghttp2
|
||||||
|
|
||||||
|
/*
|
||||||
|
#include "_nghttp2.h"
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClientStream http2 client stream
|
||||||
|
type ClientStream struct {
|
||||||
|
streamID int
|
||||||
|
cdp *C.nghttp2_data_provider
|
||||||
|
dp *dataProvider
|
||||||
|
// application read data from stream
|
||||||
|
//r *io.PipeReader
|
||||||
|
// recv stream data from session
|
||||||
|
//w *io.PipeWriter
|
||||||
|
res *http.Response
|
||||||
|
resch chan *http.Response
|
||||||
|
errch chan error
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read read stream data
|
||||||
|
func (s *ClientStream) Read(buf []byte) (n int, err error) {
|
||||||
|
return s.res.Body.Read(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write write data to stream
|
||||||
|
func (s *ClientStream) Write(buf []byte) (n int, err error) {
|
||||||
|
if s.dp != nil {
|
||||||
|
return s.dp.Write(buf)
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("empty data provider")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close close the stream
|
||||||
|
func (s *ClientStream) Close() error {
|
||||||
|
if s.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := io.EOF
|
||||||
|
//log.Println("close stream")
|
||||||
|
select {
|
||||||
|
case s.errch <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
//log.Println("close stream resch")
|
||||||
|
close(s.resch)
|
||||||
|
//log.Println("close stream errch")
|
||||||
|
close(s.errch)
|
||||||
|
//log.Println("close pipe w")
|
||||||
|
s.res.Body.Close()
|
||||||
|
//log.Println("close stream done")
|
||||||
|
if s.dp != nil {
|
||||||
|
s.dp.Close()
|
||||||
|
}
|
||||||
|
s.closed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerStream server stream
|
||||||
|
type ServerStream struct {
|
||||||
|
streamID int
|
||||||
|
// headers receive done
|
||||||
|
headersDone bool
|
||||||
|
// is stream_end flag received
|
||||||
|
streamEnd bool
|
||||||
|
// request
|
||||||
|
req *http.Request
|
||||||
|
// response header
|
||||||
|
header http.Header
|
||||||
|
// response statusCode
|
||||||
|
statusCode int
|
||||||
|
// response has send
|
||||||
|
responseSend bool
|
||||||
|
|
||||||
|
// server connection
|
||||||
|
conn *ServerConn
|
||||||
|
|
||||||
|
// data provider
|
||||||
|
dp *dataProvider
|
||||||
|
cdp *C.nghttp2_data_provider
|
||||||
|
|
||||||
|
closed bool
|
||||||
|
//buf *bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write write data to stream,
|
||||||
|
// implements http.ResponseWriter
|
||||||
|
func (s *ServerStream) Write(buf []byte) (int, error) {
|
||||||
|
if !s.responseSend {
|
||||||
|
s.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
//log.Printf("stream %d, send %d bytes", s.streamID, len(buf))
|
||||||
|
if s.buf.Len() > 2048 {
|
||||||
|
s.dp.Write(s.buf.Bytes())
|
||||||
|
s.buf.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(buf) < 2048 {
|
||||||
|
s.buf.Write(buf)
|
||||||
|
return len(buf), nil
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
return s.dp.Write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteHeader set response code and send reponse,
|
||||||
|
// implements http.ResponseWriter
|
||||||
|
func (s *ServerStream) WriteHeader(code int) {
|
||||||
|
s.statusCode = code
|
||||||
|
nvIndex := 0
|
||||||
|
nvMax := 25
|
||||||
|
nva := C.new_nv_array(C.size_t(nvMax))
|
||||||
|
setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0)
|
||||||
|
nvIndex++
|
||||||
|
|
||||||
|
for k, v := range s.header {
|
||||||
|
if strings.ToLower(k) == "host" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//log.Printf("header %s: %s", k, v)
|
||||||
|
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
|
||||||
|
nvIndex++
|
||||||
|
}
|
||||||
|
var dp *dataProvider
|
||||||
|
var cdp *C.nghttp2_data_provider
|
||||||
|
dp, cdp = newDataProvider()
|
||||||
|
dp.streamID = s.streamID
|
||||||
|
dp.session = s.conn.session
|
||||||
|
s.dp = dp
|
||||||
|
s.cdp = cdp
|
||||||
|
ret := C.nghttp2_submit_response(
|
||||||
|
s.conn.session, C.int(s.streamID), nva.nv, C.size_t(nvIndex), cdp)
|
||||||
|
C.delete_nv_array(nva)
|
||||||
|
if int(ret) < 0 {
|
||||||
|
panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret))))
|
||||||
|
}
|
||||||
|
s.responseSend = true
|
||||||
|
//log.Printf("stream %d send response", s.streamID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Header return the http.Header,
|
||||||
|
// implements http.ResponseWriter
|
||||||
|
func (s *ServerStream) Header() http.Header {
|
||||||
|
if s.header == nil {
|
||||||
|
s.header = http.Header{}
|
||||||
|
}
|
||||||
|
return s.header
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close close the stream
|
||||||
|
func (s *ServerStream) Close() error {
|
||||||
|
if s.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
//C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0)
|
||||||
|
s.req.Body.Close()
|
||||||
|
if s.dp != nil {
|
||||||
|
s.dp.Close()
|
||||||
|
}
|
||||||
|
s.closed = true
|
||||||
|
//log.Printf("stream %d closed", s.streamID)
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue