fix error on submit http2 request

merge_conn
fangdingjun 7 years ago
parent abe4e4956f
commit b141943c15

@ -21,7 +21,7 @@ struct nv_array
};
void delete_nv_array(struct nv_array *a);
nghttp2_data_provider *new_data_provider(void *data);
nghttp2_data_provider *new_data_provider(size_t data);
int nv_array_set(struct nv_array *a, int index,
char *name, char *value,
@ -29,10 +29,12 @@ int nv_array_set(struct nv_array *a, int index,
struct nv_array *new_nv_array(size_t n);
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen);
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen,
nghttp2_data_provider *dp);
int send_client_connection_header(nghttp2_session *session);
nghttp2_session * init_nghttp2_session(size_t data);
nghttp2_session *init_client_session(size_t data);
nghttp2_session *init_server_session(size_t data);
#endif

@ -20,12 +20,13 @@ import (
// Conn http2 connection
type Conn struct {
session *C.nghttp2_session
conn net.Conn
streams map[int]*Stream
lock *sync.Mutex
errch chan struct{}
err error
session *C.nghttp2_session
conn net.Conn
streams map[int]*Stream
lock *sync.Mutex
errch chan struct{}
err error
isServer bool
}
// Stream http2 stream
@ -36,10 +37,14 @@ type Stream struct {
// application read data from stream
r *io.PipeReader
// recv stream data from session
w *io.PipeWriter
res *http.Response
resch chan *http.Response
errch chan error
w *io.PipeWriter
res *http.Response
resch chan *http.Response
errch chan error
closed bool
}
type Request struct {
}
type dataProvider struct {
@ -49,24 +54,45 @@ type dataProvider struct {
w io.Writer
}
// NewConn create http2 connection
func NewConn(c net.Conn) (*Conn, error) {
// NewClientConn create http2 client
func NewClientConn(c net.Conn) (*Conn, error) {
conn := &Conn{
conn: c, streams: make(map[int]*Stream), lock: new(sync.Mutex),
conn: c, streams: make(map[int]*Stream),
lock: new(sync.Mutex),
errch: make(chan struct{}),
}
conn.session = C.init_nghttp2_session(C.size_t(int(uintptr(unsafe.Pointer(conn)))))
conn.session = C.init_client_session(
C.size_t(int(uintptr(unsafe.Pointer(conn)))))
if conn.session == nil {
return nil, fmt.Errorf("init session failed")
}
ret := C.send_client_connection_header(conn.session)
if int(ret) < 0 {
log.Printf("submit settings error: %s", C.GoString(C.nghttp2_strerror(ret)))
log.Printf("submit settings error: %s",
C.GoString(C.nghttp2_strerror(ret)))
}
go conn.run()
return conn, nil
}
// NewServerConn create http2 server
func NewServerConn(c net.Conn) (*Conn, error) {
conn := &Conn{
conn: c, streams: make(map[int]*Stream),
lock: new(sync.Mutex),
errch: make(chan struct{}),
isServer: true,
}
conn.session = C.init_server_session(
C.size_t(int(uintptr(unsafe.Pointer(conn)))))
if conn.session == nil {
return nil, fmt.Errorf("init session failed")
}
go conn.run()
return conn, nil
}
func (c *Conn) onDataRecv(buf []byte, streamID int) {
stream := c.streams[streamID]
stream.onDataRecv(buf)
@ -104,6 +130,7 @@ func (c *Conn) Close() error {
for _, s := range c.streams {
s.Close()
}
C.nghttp2_session_terminate_session(c.session, 0)
C.nghttp2_session_del(c.session)
close(c.errch)
c.conn.Close()
@ -116,31 +143,56 @@ func (c *Conn) run() {
var delay = 50
var ret C.int
datach := make(chan []byte)
errch := make(chan error)
go func() {
buf := make([]byte, 4096)
for {
n, err := c.conn.Read(buf)
if err != nil {
errch <- err
break
}
datach <- buf[:n]
}
}()
loop:
for {
select {
case <-c.errch:
break loop
case err := <-errch:
c.err = err
break loop
default:
}
wantRead = int(C.nghttp2_session_want_read(c.session))
wantWrite = int(C.nghttp2_session_want_write(c.session))
if wantWrite != 0 {
ret = C.nghttp2_session_send(c.session)
if int(ret) < 0 {
c.err = fmt.Errorf("sesion send error: %s", C.GoString(C.nghttp2_strerror(ret)))
c.err = fmt.Errorf("sesion send error: %s",
C.GoString(C.nghttp2_strerror(ret)))
log.Println(c.err)
break
}
}
if wantRead != 0 {
ret = C.nghttp2_session_recv(c.session)
if int(ret) < 0 {
c.err = fmt.Errorf("sesion recv error: %s", C.GoString(C.nghttp2_strerror(ret)))
wantRead = int(C.nghttp2_session_want_read(c.session))
select {
case d := <-datach:
d1 := C.CBytes(d)
ret1 := C.nghttp2_session_mem_recv(c.session,
(*C.uchar)(d1), C.size_t(int(len(d))))
if int(ret1) < 0 {
c.err = fmt.Errorf("sesion recv error: %s",
C.GoString(C.nghttp2_strerror(ret)))
log.Println(c.err)
break
break loop
}
default:
}
// make delay when no data read/write
@ -152,12 +204,23 @@ loop:
}
}
// NewRequest create a new http2 stream
func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) {
// AcceptRequest get a request from session
// this block until a request is avaliable,
// server only
func (c *Conn) AcceptRequest() (req *Request, err error) {
return nil, nil
}
// CreateRequest submit a request and return a http.Response, client only
func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) {
if c.err != nil {
return nil, c.err
}
if c.isServer {
return nil, fmt.Errorf("only client can create new request")
}
nvIndex := 0
nvMax := 25
nva := C.new_nv_array(C.size_t(nvMax))
@ -179,6 +242,7 @@ func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) {
if strings.ToLower(k) == "host" {
continue
}
//log.Printf("header %s: %s", k, v)
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
nvIndex++
}
@ -187,12 +251,13 @@ func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) {
if req.Body != nil {
dp, cdp = newDataProvider(req.Body, nil)
}
streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex+1))
streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex), cdp)
C.delete_nv_array(nva)
if int(streamID) < 0 {
return nil, fmt.Errorf("submit request error: %s", C.GoString(C.nghttp2_strerror(streamID)))
return nil, fmt.Errorf("submit request error: %s",
C.GoString(C.nghttp2_strerror(streamID)))
}
log.Println("stream id ", int(streamID))
//log.Println("stream id ", int(streamID))
r, w := io.Pipe()
s := &Stream{
streamID: int(streamID),
@ -216,14 +281,15 @@ func (c *Conn) NewRequest(req *http.Request) (*http.Response, error) {
//return nil, fmt.Errorf("unknown error")
}
func setNvArray(a *C.struct_nv_array, index int, name, value string, flags int) {
func setNvArray(a *C.struct_nv_array, index int,
name, value string, flags int) {
cname := C.CString(name)
cvalue := C.CString(value)
cnamelen := C.size_t(len(name))
cvaluelen := C.size_t(len(value))
cflags := C.int(flags)
defer C.free(unsafe.Pointer(cname))
defer C.free(unsafe.Pointer(cvalue))
//defer C.free(unsafe.Pointer(cname))
//defer C.free(unsafe.Pointer(cvalue))
C.nv_array_set(a, C.int(index), cname,
cvalue, cnamelen, cvaluelen, cflags)
}
@ -239,9 +305,10 @@ func (dp *dataProvider) Write(buf []byte) (n int, err error) {
return dp.w.Write(buf)
}
func newDataProvider(r io.Reader, w io.Writer) (*dataProvider, *C.nghttp2_data_provider) {
func newDataProvider(r io.Reader, w io.Writer) (
*dataProvider, *C.nghttp2_data_provider) {
dp := &dataProvider{r, w}
cdp := C.new_data_provider(unsafe.Pointer(dp))
cdp := C.new_data_provider(C.size_t(uintptr(unsafe.Pointer(dp))))
return dp, cdp
}
@ -279,23 +346,36 @@ func (s *Stream) onHeader(name, value string) {
func (s *Stream) onFrameRecv() {
s.res.Body = s
s.resch <- s.res
//log.Println("stream frame recv")
}
// Close close the stream
func (s *Stream) Close() error {
if s.closed {
return nil
}
err := fmt.Errorf("stream closed")
//log.Println("close stream")
select {
case s.errch <- fmt.Errorf("stream closed"):
case s.errch <- err:
default:
}
//log.Println("close stream resch")
close(s.resch)
//log.Println("close stream errch")
close(s.errch)
s.w.Close()
//log.Println("close pipe w")
s.w.CloseWithError(err)
//log.Println("close stream done")
s.closed = true
return nil
}
// DataSourceRead callback function for data read from data provider source
//export DataSourceRead
func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t {
log.Println("data source read")
func DataSourceRead(ptr unsafe.Pointer,
buf unsafe.Pointer, length C.size_t) C.ssize_t {
//log.Println("data source read")
dp := (*dataProvider)(ptr)
gobuf := make([]byte, int(length))
n, err := dp.Read(gobuf)
@ -313,8 +393,9 @@ func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.s
// OnDataRecv callback function for data frame received
//export OnDataRecv
func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int {
log.Println("on data recv")
func OnDataRecv(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.int {
//log.Println("on data recv")
conn := (*Conn)(ptr)
gobuf := C.GoBytes(buf, C.int(length))
conn.onDataRecv(gobuf, int(streamID))
@ -324,39 +405,41 @@ func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C
// DataRead callback function for session wants read data from peer
//export DataRead
func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
log.Println("data read")
//log.Println("data read req", int(size))
conn := (*Conn)(ptr)
buf := make([]byte, int(size))
log.Println(conn.conn.RemoteAddr())
//log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Read(buf)
if err != nil {
log.Println(err)
//log.Println(err)
return -1
}
log.Println("read from network ", n)
cbuf := C.CBytes(buf)
//log.Println("read from network ", n, buf[:n])
C.memcpy(data, cbuf, C.size_t(n))
return C.ssize_t(n)
}
// DataWrite callback function for session wants send data to peer
//export DataWrite
func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
log.Println("data write")
//log.Println("data write req ", int(size))
conn := (*Conn)(ptr)
buf := C.GoBytes(data, C.int(size))
log.Println(conn.conn.RemoteAddr())
//log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Write(buf)
if err != nil {
log.Println(err)
//log.Println(err)
return -1
}
log.Println("write data to network ", n)
//log.Println("write data to network ", n)
return C.ssize_t(n)
}
// OnBeginHeaderCallback callback function for response
//export OnBeginHeaderCallback
func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
log.Println("begin header")
//log.Println("begin header")
conn := (*Conn)(ptr)
conn.onBeginHeader(int(streamID))
return 0
@ -367,7 +450,7 @@ func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int {
log.Println("header")
//log.Println("header")
conn := (*Conn)(ptr)
goname := C.GoBytes(name, namelen)
govalue := C.GoBytes(value, valuelen)
@ -378,7 +461,7 @@ func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int,
// OnFrameRecvCallback callback function for begion to recv data
//export OnFrameRecvCallback
func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int {
log.Println("frame recv")
//log.Println("frame recv")
conn := (*Conn)(ptr)
conn.onFrameRecv(int(streamID))
return 0
@ -387,7 +470,7 @@ func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int {
// OnStreamClose callback function for stream close
//export OnStreamClose
func OnStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
log.Println("stream close")
//log.Println("stream close")
conn := (*Conn)(ptr)
conn.onStreamClose(int(streamID))
return 0

@ -1,8 +1,11 @@
package nghttp2
import (
"bytes"
"crypto/tls"
"log"
"net/http"
"net/url"
"os"
"testing"
)
@ -21,14 +24,23 @@ func TestHttp2Client(t *testing.T) {
if cstate.NegotiatedProtocol != "h2" {
t.Fatal("no http2 on server")
}
h2conn, err := NewConn(conn)
h2conn, err := NewClientConn(conn)
if err != nil {
t.Fatal(err)
}
req, _ := http.NewRequest("GET", "http://www.simicloud.com/media/httpbin/get", nil)
res, err := h2conn.NewRequest(req)
param := url.Values{}
param.Add("e", "b")
param.Add("f", "d")
data := bytes.NewReader([]byte(param.Encode()))
req, _ := http.NewRequest("POST", "https://www.simicloud.com/media/httpbin/post?a=b&c=d", data)
log.Printf("%+v", req)
req.Header.Set("accept", "*/*")
req.Header.Set("user-agent", "go-nghttp2/1.0")
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
res, err := h2conn.CreateRequest(req)
if err != nil {
t.Fatal(err)
}
res.Write(os.Stderr)
log.Println("end")
}

@ -21,7 +21,7 @@ static int on_header_callback(nghttp2_session *session,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
printf("on_header_callback\n");
//printf("on_header_callback\n");
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
@ -41,15 +41,15 @@ static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
printf("on_begin_headers_callback\n");
//printf("on_begin_headers_callback\n");
int stream_id = frame->hd.stream_id;
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
fprintf(stderr, "Response headers for stream ID=%d:\n",
frame->hd.stream_id);
//fprintf(stderr, "Response headers for stream ID=%d:\n",
// frame->hd.stream_id);
OnBeginHeaderCallback(user_data, stream_id);
}
break;
@ -57,19 +57,71 @@ static int on_begin_headers_callback(nghttp2_session *session,
return 0;
}
int on_invalid_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, int lib_error_code, void *user_data)
{
printf("on_invalid_frame_recv, frame %d, code %d, msg %s\n",
frame->hd.type,
lib_error_code,
nghttp2_strerror(lib_error_code));
return 0;
}
static int on_frame_send_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
size_t i;
(void)user_data;
//printf("on_frame_send_callback\n");
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
/*
if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id))
{
*/
if (1)
{
const nghttp2_nv *nva = frame->headers.nva;
printf("[INFO] C ----------------------------> S (HEADERS)\n");
for (i = 0; i < frame->headers.nvlen; ++i)
{
fwrite(nva[i].name, 1, nva[i].namelen, stdout);
printf(": ");
fwrite(nva[i].value, 1, nva[i].valuelen, stdout);
printf("\n");
}
}
break;
case NGHTTP2_RST_STREAM:
printf("[INFO] C ----------------------------> S (RST_STREAM)\n");
break;
case NGHTTP2_GOAWAY:
printf("[INFO] C ----------------------------> S (GOAWAY)\n");
break;
}
return 0;
}
static int on_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
printf("on_frame_recv_callback\n");
//printf("on_frame_recv_callback %d\n", frame->hd.type);
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
fprintf(stderr, "All headers received\n");
//fprintf(stderr, "All headers received\n");
OnFrameRecvCallback(user_data, frame->hd.stream_id);
}
break;
case NGHTTP2_RST_STREAM:
printf("server send rst_stream %d\n", frame->rst_stream.error_code);
break;
case NGHTTP2_GOAWAY:
printf("server send go away\n");
break;
}
return 0;
}
@ -92,7 +144,7 @@ ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length, uint32_t *data_flags,
nghttp2_data_source *source, void *user_data)
{
int ret = DataSourceRead(source, buf, length);
int ret = DataSourceRead(source->ptr, buf, length);
if (ret == 0)
{
*data_flags = NGHTTP2_DATA_FLAG_EOF;
@ -107,21 +159,17 @@ int on_error_callback(nghttp2_session *session, int lib_error_code,
return 0;
}
nghttp2_session *init_nghttp2_session(size_t data)
void init_callbacks(nghttp2_session_callbacks *callbacks)
{
int ret;
nghttp2_session *session;
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback);
nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_frame_recv_callback);
nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_frame_send_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks, on_data_chunk_recv_callback);
@ -133,13 +181,35 @@ nghttp2_session *init_nghttp2_session(size_t data)
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_begin_headers_callback);
}
ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data)));
nghttp2_session *init_client_session(size_t data)
{
int ret;
nghttp2_session *session;
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
init_callbacks(callbacks);
ret = nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data)));
if (session == NULL)
{
printf("c init session failed: %s\n", nghttp2_strerror(ret));
}
return session;
}
nghttp2_session *init_server_session(size_t data)
{
int ret;
nghttp2_session *session;
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
init_callbacks(callbacks);
ret = nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data)));
if (session == NULL)
{
printf("c init session failed: %s\n", nghttp2_strerror(ret));
}
nghttp2_session_callbacks_del(callbacks);
return session;
}
@ -150,8 +220,12 @@ int send_client_connection_header(nghttp2_session *session)
int rv;
/* client 24 bytes magic string will be sent by nghttp2 library */
/*
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv,
ARRLEN(iv));
*/
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, NULL, 0);
/*
if (rv != 0)
{
@ -161,7 +235,8 @@ int send_client_connection_header(nghttp2_session *session)
return rv;
}
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen)
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen,
nghttp2_data_provider *dp)
{
int32_t stream_id;
/*
@ -174,8 +249,13 @@ int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen
fprintf(stderr, "Request headers:\n");
print_headers(stderr, hdrs, ARRLEN(hdrs));
*/
int i;
for (i = 0; i < hdrlen; i++)
{
printf("header %s: %s\n", hdrs[i].name, hdrs[i].value);
}
stream_id = nghttp2_submit_request(session, NULL, hdrs,
hdrlen, NULL, NULL);
hdrlen, dp, NULL);
/*
if (stream_id < 0)
{
@ -190,6 +270,7 @@ struct nv_array *new_nv_array(size_t n)
{
struct nv_array *a = malloc(sizeof(struct nv_array));
nghttp2_nv *nv = (nghttp2_nv *)malloc(n * sizeof(nghttp2_nv));
memset(nv, 0, n * sizeof(nghttp2_nv));
a->nv = nv;
a->len = n;
return a;
@ -203,24 +284,39 @@ int nv_array_set(struct nv_array *a, int index,
{
return -1;
}
nghttp2_nv nv = (a->nv)[index];
nv.name = name;
nv.value = value;
nv.namelen = namelen;
nv.valuelen = valuelen;
nv.flags = flag;
nghttp2_nv *nv = &((a->nv)[index]);
nv->name = name;
nv->value = value;
nv->namelen = namelen;
nv->valuelen = valuelen;
nv->flags = flag;
return 0;
}
void delete_nv_array(struct nv_array *a)
{
int i;
nghttp2_nv *nv;
for (i = 0; i < a->len; i++)
{
nv = &((a->nv)[i]);
if (nv->name != NULL)
{
free(nv->name);
}
if (nv->value != NULL)
{
free(nv->value);
}
}
free(a->nv);
free(a);
}
nghttp2_data_provider *new_data_provider(void *data)
nghttp2_data_provider *new_data_provider(size_t data)
{
nghttp2_data_provider *dp = malloc(sizeof(nghttp2_data_provider));
dp->source.ptr = data;
dp->source.ptr = (void *)((int *)data);
dp->read_callback = data_source_read_callback;
return dp;
}
Loading…
Cancel
Save