add connection run loop

merge_conn
fangdingjun 6 years ago
parent 7a2599e0ca
commit 9f05ca356a

@ -12,6 +12,7 @@ extern int OnDataRecv(void *, int, void *, size_t);
extern int OnBeginHeaderCallback(void *, int);
extern int OnHeaderCallback(void *, int, void *, int, void *, int);
extern int OnFrameRecvCallback(void *, int);
extern int OnStreamClose(void *, int);
struct nv_array
{
@ -32,6 +33,6 @@ int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen
int send_client_connection_header(nghttp2_session *session);
void init_nghttp2_session(nghttp2_session *session, void *data);
nghttp2_session * init_nghttp2_session(size_t data);
#endif

@ -8,9 +8,13 @@ import "C"
import (
"fmt"
"io"
"log"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"unsafe"
)
@ -19,6 +23,9 @@ type Conn struct {
session *C.nghttp2_session
conn net.Conn
streams map[int]*Stream
lock *sync.Mutex
errch chan struct{}
err error
}
// Stream http2 stream
@ -27,9 +34,12 @@ type Stream struct {
cdp *C.nghttp2_data_provider
dp *dataProvider
// application read data from stream
r io.Reader
r *io.PipeReader
// recv stream data from session
w io.Writer
w *io.PipeWriter
res *http.Response
resch chan *http.Response
errch chan error
}
type dataProvider struct {
@ -40,11 +50,21 @@ type dataProvider struct {
}
// NewConn create http2 connection
func NewConn(c net.Conn) *Conn {
conn := &Conn{conn: c, streams: make(map[int]*Stream)}
C.init_nghttp2_session(conn.session, unsafe.Pointer(conn))
C.send_client_connection_header(conn.session)
return conn
func NewConn(c net.Conn) (*Conn, error) {
conn := &Conn{
conn: c, streams: make(map[int]*Stream), lock: new(sync.Mutex),
errch: make(chan struct{}),
}
conn.session = C.init_nghttp2_session(C.size_t(int(uintptr(unsafe.Pointer(conn)))))
if conn.session == nil {
return nil, fmt.Errorf("init session failed")
}
ret := C.send_client_connection_header(conn.session)
if int(ret) < 0 {
log.Printf("submit settings error: %s", C.GoString(C.nghttp2_strerror(ret)))
}
go conn.run()
return conn, nil
}
func (c *Conn) onDataRecv(buf []byte, streamID int) {
@ -68,17 +88,85 @@ func (c *Conn) onFrameRecv(streamID int) {
stream.onFrameRecv()
}
func (c *Conn) onStreamClose(streamID int) {
stream, ok := c.streams[streamID]
if ok {
stream.Close()
c.lock.Lock()
delete(c.streams, streamID)
c.lock.Unlock()
}
}
// Close close the http2 connection
func (c *Conn) Close() error {
for _, s := range c.streams {
s.Close()
}
C.nghttp2_session_del(c.session)
close(c.errch)
c.conn.Close()
return nil
}
func (c *Conn) run() {
var wantRead int
var wantWrite int
var delay = 50
var ret C.int
loop:
for {
select {
case <-c.errch:
break loop
default:
}
wantRead = int(C.nghttp2_session_want_read(c.session))
wantWrite = int(C.nghttp2_session_want_write(c.session))
if wantWrite != 0 {
ret = C.nghttp2_session_send(c.session)
if int(ret) < 0 {
c.err = fmt.Errorf("sesion send error: %s", C.GoString(C.nghttp2_strerror(ret)))
log.Println(c.err)
break
}
}
if wantRead != 0 {
ret = C.nghttp2_session_recv(c.session)
if int(ret) < 0 {
c.err = fmt.Errorf("sesion recv error: %s", C.GoString(C.nghttp2_strerror(ret)))
log.Println(c.err)
break
}
}
// make delay when no data read/write
if wantRead == 0 && wantWrite == 0 {
select {
case <-time.After(time.Duration(delay) * time.Millisecond):
}
}
}
}
// NewRequest create a new http2 stream
func (c *Conn) NewRequest(req *http.Request) *http.Response {
func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) {
if c.err != nil {
return nil, c.err
}
nvIndex := 0
nvMax := 25
nva := C.new_nv_array(C.size_t(nvMax))
setNvArray(nva, nvIndex, ":method", req.Method, 0)
nvIndex += 1
nvIndex++
setNvArray(nva, nvIndex, ":scheme", "https", 0)
nvIndex += 1
nvIndex++
setNvArray(nva, nvIndex, ":authority", req.Host, 0)
nvIndex += 1
nvIndex++
p := req.URL.Path
q := req.URL.Query().Encode()
@ -86,13 +174,13 @@ func (c *Conn) NewRequest(req *http.Request) *http.Response {
p = p + "?" + q
}
setNvArray(nva, nvIndex, ":path", p, 0)
nvIndex += 1
nvIndex++
for k, v := range req.Header {
if strings.ToLower(k) == "host" {
continue
}
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
nvIndex += 1
nvIndex++
}
var dp *dataProvider
var cdp *C.nghttp2_data_provider
@ -102,13 +190,30 @@ func (c *Conn) NewRequest(req *http.Request) *http.Response {
streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex+1))
C.delete_nv_array(nva)
if int(streamID) < 0 {
return nil
return nil, fmt.Errorf("submit request error: %s", C.GoString(C.nghttp2_strerror(streamID)))
}
log.Println("stream id ", int(streamID))
r, w := io.Pipe()
s := &Stream{streamID: int(streamID), dp: dp, cdp: cdp, r: r, w: w}
s := &Stream{
streamID: int(streamID),
dp: dp,
cdp: cdp,
r: r,
w: w,
resch: make(chan *http.Response),
errch: make(chan error),
}
c.lock.Lock()
c.streams[int(streamID)] = s
c.lock.Unlock()
return nil
select {
case err := <-s.errch:
return nil, err
case res := <-s.resch:
return res, nil
}
//return nil, fmt.Errorf("unknown error")
}
func setNvArray(a *C.struct_nv_array, index int, name, value string, flags int) {
@ -153,19 +258,44 @@ func (s *Stream) onDataRecv(buf []byte) {
}
func (s *Stream) onBeginHeader() {
s.res = &http.Response{
Header: make(http.Header),
}
}
func (s *Stream) onHeader(name, value string) {
if name == ":status" {
statusCode, _ := strconv.Atoi(value)
s.res.StatusCode = statusCode
s.res.Status = http.StatusText(statusCode)
s.res.Proto = "HTTP/2.0"
s.res.ProtoMajor = 2
s.res.ProtoMinor = 0
return
}
s.res.Header.Add(name, value)
}
func (s *Stream) onFrameRecv() {
s.res.Body = s
s.resch <- s.res
}
// Close close the stream
func (s *Stream) Close() error {
select {
case s.errch <- fmt.Errorf("stream closed"):
}
close(s.resch)
close(s.errch)
s.w.Close()
return nil
}
// DataSourceRead callback function for data read from data provider source
//export DataSourceRead
func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t {
log.Println("data source read")
dp := (*dataProvider)(ptr)
gobuf := make([]byte, int(length))
n, err := dp.Read(gobuf)
@ -181,47 +311,63 @@ func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.s
return C.ssize_t(n)
}
// OnDataRecv callback function for data frame received
//export OnDataRecv
func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int {
log.Println("on data recv")
conn := (*Conn)(ptr)
gobuf := C.GoBytes(buf, C.int(length))
conn.onDataRecv(gobuf, int(streamID))
return 0
}
// DataRead callback function for session wants read data from peer
//export DataRead
func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
log.Println("data read")
conn := (*Conn)(ptr)
buf := make([]byte, int(size))
log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Read(buf)
if err != nil {
log.Println(err)
return -1
}
log.Println("read from network ", n)
return C.ssize_t(n)
}
// DataWrite callback function for session wants send data to peer
//export DataWrite
func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
log.Println("data write")
conn := (*Conn)(ptr)
buf := C.GoBytes(data, C.int(size))
log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Write(buf)
if err != nil {
log.Println(err)
return -1
}
log.Println("write data to network ", n)
return C.ssize_t(n)
}
// OnBeginHeaderCallback callback function for response
//export OnBeginHeaderCallback
func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
log.Println("begin header")
conn := (*Conn)(ptr)
conn.onBeginHeader(int(streamID))
return 0
}
// OnHeaderCallback callback function for header
//export OnHeaderCallback
func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int {
log.Println("header")
conn := (*Conn)(ptr)
goname := C.GoBytes(name, namelen)
govalue := C.GoBytes(value, valuelen)
@ -229,9 +375,20 @@ func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int,
return 0
}
// OnFrameRecvCallback callback function for begion to recv data
//export OnFrameRecvCallback
func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int {
log.Println("frame recv")
conn := (*Conn)(ptr)
conn.onFrameRecv(int(streamID))
return 0
}
// OnStreamClose callback function for stream close
//export OnStreamClose
func OnStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
log.Println("stream close")
conn := (*Conn)(ptr)
conn.onStreamClose(int(streamID))
return 0
}

@ -21,6 +21,7 @@ static int on_header_callback(nghttp2_session *session,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
printf("on_header_callback\n");
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
@ -40,6 +41,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
printf("on_begin_headers_callback\n");
int stream_id = frame->hd.stream_id;
switch (frame->hd.type)
{
@ -48,8 +50,8 @@ static int on_begin_headers_callback(nghttp2_session *session,
{
fprintf(stderr, "Response headers for stream ID=%d:\n",
frame->hd.stream_id);
OnBeginHeaderCallback(user_data, stream_id);
}
OnBeginHeaderCallback(user_data, stream_id);
break;
}
return 0;
@ -58,15 +60,15 @@ static int on_begin_headers_callback(nghttp2_session *session,
static int on_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
printf("on_frame_recv_callback\n");
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
fprintf(stderr, "All headers received\n");
OnFrameRecvCallback(user_data, frame->hd.stream_id);
}
OnFrameRecvCallback(user_data, frame->hd.stream_id);
break;
}
return 0;
@ -82,6 +84,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
OnStreamClose(user_data, stream_id);
return 0;
}
@ -96,9 +99,18 @@ ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id,
}
return ret;
}
int on_error_callback(nghttp2_session *session, int lib_error_code,
const char *msg, size_t len, void *user_data)
{
//printf("errmsg %*s\n", msg, len);
printf("code: %d, error: %s\n", lib_error_code, nghttp2_strerror(lib_error_code));
return 0;
}
void init_nghttp2_session(nghttp2_session *session, void *data)
nghttp2_session *init_nghttp2_session(size_t data)
{
int ret;
nghttp2_session *session;
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
@ -106,6 +118,7 @@ void init_nghttp2_session(nghttp2_session *session, void *data)
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback);
nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_frame_recv_callback);
@ -121,9 +134,13 @@ void init_nghttp2_session(nghttp2_session *session, void *data)
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_begin_headers_callback);
nghttp2_session_client_new(&session, callbacks, data);
ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data)));
if (session == NULL)
{
printf("c init session failed: %s\n", nghttp2_strerror(ret));
}
nghttp2_session_callbacks_del(callbacks);
return session;
}
int send_client_connection_header(nghttp2_session *session)

Loading…
Cancel
Save