Initial commit

merge_conn
fangdingjun 6 years ago
commit 7a2599e0ca

1
.gitignore vendored

@ -0,0 +1 @@
*~

@ -0,0 +1,37 @@
#ifndef _NGHTTP2_H
#define _NGHTTP2_H
#include <nghttp2/nghttp2.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
extern ssize_t DataRead(void *, void *data, size_t);
extern ssize_t DataWrite(void *, void *data, size_t);
extern ssize_t DataSourceRead(void *, void *, size_t);
extern int OnDataRecv(void *, int, void *, size_t);
extern int OnBeginHeaderCallback(void *, int);
extern int OnHeaderCallback(void *, int, void *, int, void *, int);
extern int OnFrameRecvCallback(void *, int);
struct nv_array
{
nghttp2_nv *nv;
size_t len;
};
void delete_nv_array(struct nv_array *a);
nghttp2_data_provider *new_data_provider(void *data);
int nv_array_set(struct nv_array *a, int index,
char *name, char *value,
size_t namelen, size_t valuelen, int flag);
struct nv_array *new_nv_array(size_t n);
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen);
int send_client_connection_header(nghttp2_session *session);
void init_nghttp2_session(nghttp2_session *session, void *data);
#endif

@ -0,0 +1,237 @@
package nghttp2
/*
#cgo pkg-config: libnghttp2
#include "_nghttp2.h"
*/
import "C"
import (
"fmt"
"io"
"net"
"net/http"
"strings"
"unsafe"
)
// Conn http2 connection
type Conn struct {
session *C.nghttp2_session
conn net.Conn
streams map[int]*Stream
}
// Stream http2 stream
type Stream struct {
streamID int
cdp *C.nghttp2_data_provider
dp *dataProvider
// application read data from stream
r io.Reader
// recv stream data from session
w io.Writer
}
type dataProvider struct {
// drain the data
r io.Reader
// provider the data
w io.Writer
}
// NewConn create http2 connection
func NewConn(c net.Conn) *Conn {
conn := &Conn{conn: c, streams: make(map[int]*Stream)}
C.init_nghttp2_session(conn.session, unsafe.Pointer(conn))
C.send_client_connection_header(conn.session)
return conn
}
func (c *Conn) onDataRecv(buf []byte, streamID int) {
stream := c.streams[streamID]
stream.onDataRecv(buf)
}
func (c *Conn) onBeginHeader(streamID int) {
stream := c.streams[streamID]
stream.onBeginHeader()
}
func (c *Conn) onHeader(streamID int, name, value string) {
stream := c.streams[streamID]
stream.onHeader(name, value)
}
func (c *Conn) onFrameRecv(streamID int) {
stream := c.streams[streamID]
stream.onFrameRecv()
}
// NewRequest create a new http2 stream
func (c *Conn) NewRequest(req *http.Request) *http.Response {
nvIndex := 0
nvMax := 25
nva := C.new_nv_array(C.size_t(nvMax))
setNvArray(nva, nvIndex, ":method", req.Method, 0)
nvIndex += 1
setNvArray(nva, nvIndex, ":scheme", "https", 0)
nvIndex += 1
setNvArray(nva, nvIndex, ":authority", req.Host, 0)
nvIndex += 1
p := req.URL.Path
q := req.URL.Query().Encode()
if q != "" {
p = p + "?" + q
}
setNvArray(nva, nvIndex, ":path", p, 0)
nvIndex += 1
for k, v := range req.Header {
if strings.ToLower(k) == "host" {
continue
}
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
nvIndex += 1
}
var dp *dataProvider
var cdp *C.nghttp2_data_provider
if req.Body != nil {
dp, cdp = newDataProvider(req.Body, nil)
}
streamID := C.submit_request(c.session, nva.nv, C.size_t(nvIndex+1))
C.delete_nv_array(nva)
if int(streamID) < 0 {
return nil
}
r, w := io.Pipe()
s := &Stream{streamID: int(streamID), dp: dp, cdp: cdp, r: r, w: w}
c.streams[int(streamID)] = s
return nil
}
func setNvArray(a *C.struct_nv_array, index int, name, value string, flags int) {
cname := C.CString(name)
cvalue := C.CString(value)
cnamelen := C.size_t(len(name))
cvaluelen := C.size_t(len(value))
cflags := C.int(flags)
defer C.free(unsafe.Pointer(cname))
defer C.free(unsafe.Pointer(cvalue))
C.nv_array_set(a, C.int(index), cname,
cvalue, cnamelen, cvaluelen, cflags)
}
func (dp *dataProvider) Read(buf []byte) (n int, err error) {
return dp.r.Read(buf)
}
func (dp *dataProvider) Write(buf []byte) (n int, err error) {
if dp.w == nil {
return 0, fmt.Errorf("write not supported")
}
return dp.w.Write(buf)
}
func newDataProvider(r io.Reader, w io.Writer) (*dataProvider, *C.nghttp2_data_provider) {
dp := &dataProvider{r, w}
cdp := C.new_data_provider(unsafe.Pointer(dp))
return dp, cdp
}
func (s *Stream) Read(buf []byte) (n int, err error) {
return s.r.Read(buf)
}
func (s *Stream) Write(buf []byte) (n int, err error) {
return s.dp.Write(buf)
}
func (s *Stream) onDataRecv(buf []byte) {
s.w.Write(buf)
}
func (s *Stream) onBeginHeader() {
}
func (s *Stream) onHeader(name, value string) {
}
func (s *Stream) onFrameRecv() {
}
//export DataSourceRead
func DataSourceRead(ptr unsafe.Pointer, buf unsafe.Pointer, length C.size_t) C.ssize_t {
dp := (*dataProvider)(ptr)
gobuf := make([]byte, int(length))
n, err := dp.Read(gobuf)
if err != nil {
if err == io.EOF {
return 0
}
return -1
}
cbuf := C.CBytes(gobuf)
defer C.free(cbuf)
C.memcpy(buf, cbuf, C.size_t(n))
return C.ssize_t(n)
}
//export OnDataRecv
func OnDataRecv(ptr unsafe.Pointer, streamID C.int, buf unsafe.Pointer, length C.size_t) C.int {
conn := (*Conn)(ptr)
gobuf := C.GoBytes(buf, C.int(length))
conn.onDataRecv(gobuf, int(streamID))
return 0
}
//export DataRead
func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
conn := (*Conn)(ptr)
buf := make([]byte, int(size))
n, err := conn.conn.Read(buf)
if err != nil {
return -1
}
return C.ssize_t(n)
}
//export DataWrite
func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
conn := (*Conn)(ptr)
buf := C.GoBytes(data, C.int(size))
n, err := conn.conn.Write(buf)
if err != nil {
return -1
}
return C.ssize_t(n)
}
//export OnBeginHeaderCallback
func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
conn := (*Conn)(ptr)
conn.onBeginHeader(int(streamID))
return 0
}
//export OnHeaderCallback
func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int {
conn := (*Conn)(ptr)
goname := C.GoBytes(name, namelen)
govalue := C.GoBytes(value, valuelen)
conn.onHeader(int(streamID), string(goname), string(govalue))
return 0
}
//export OnFrameRecvCallback
func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int {
conn := (*Conn)(ptr)
conn.onFrameRecv(int(streamID))
return 0
}

@ -0,0 +1,209 @@
#include "_nghttp2.h"
#define ARRLEN(x) (sizeof(x) / sizeof(x[0]))
// send_callback send data to network
static ssize_t send_callback(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
return DataWrite(user_data, (void *)data, length);
}
// recv_callback read data from network
static ssize_t recv_callback(nghttp2_session *session, uint8_t *buf,
size_t length, int flags, void *user_data)
{
return DataRead(user_data, (void *)buf, length);
}
static int on_header_callback(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
/* Print response headers for the initiated request. */
//print_header(stderr, name, namelen, value, valuelen);
OnHeaderCallback(user_data, frame->hd.stream_id,
(void *)name, namelen, (void *)value, valuelen);
break;
}
}
return 0;
}
static int on_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
int stream_id = frame->hd.stream_id;
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
fprintf(stderr, "Response headers for stream ID=%d:\n",
frame->hd.stream_id);
}
OnBeginHeaderCallback(user_data, stream_id);
break;
}
return 0;
}
static int on_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
switch (frame->hd.type)
{
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{
fprintf(stderr, "All headers received\n");
}
OnFrameRecvCallback(user_data, frame->hd.stream_id);
break;
}
return 0;
}
static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data)
{
return OnDataRecv(user_data, stream_id, (void *)data, len);
}
static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
return 0;
}
ssize_t data_source_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length, uint32_t *data_flags,
nghttp2_data_source *source, void *user_data)
{
int ret = DataSourceRead(source, buf, length);
if (ret == 0)
{
*data_flags = NGHTTP2_DATA_FLAG_EOF;
}
return ret;
}
void init_nghttp2_session(nghttp2_session *session, void *data)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_frame_recv_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks, on_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, on_stream_close_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
on_header_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_begin_headers_callback);
nghttp2_session_client_new(&session, callbacks, data);
nghttp2_session_callbacks_del(callbacks);
}
int send_client_connection_header(nghttp2_session *session)
{
nghttp2_settings_entry iv[1] = {
{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}};
int rv;
/* client 24 bytes magic string will be sent by nghttp2 library */
rv = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, iv,
ARRLEN(iv));
/*
if (rv != 0)
{
errx(1, "Could not submit SETTINGS: %s", nghttp2_strerror(rv));
}
*/
return rv;
}
int32_t submit_request(nghttp2_session *session, nghttp2_nv *hdrs, size_t hdrlen)
{
int32_t stream_id;
/*
nghttp2_nv hdrs[] = {
MAKE_NV2(":method", "GET"),
MAKE_NV(":scheme", &uri[u->field_data[UF_SCHEMA].off],
u->field_data[UF_SCHEMA].len),
MAKE_NV(":authority", stream_data->authority, stream_data->authoritylen),
MAKE_NV(":path", stream_data->path, stream_data->pathlen)};
fprintf(stderr, "Request headers:\n");
print_headers(stderr, hdrs, ARRLEN(hdrs));
*/
stream_id = nghttp2_submit_request(session, NULL, hdrs,
hdrlen, NULL, NULL);
/*
if (stream_id < 0)
{
errx(1, "Could not submit HTTP request: %s", nghttp2_strerror(stream_id));
}
*/
return stream_id;
}
struct nv_array *new_nv_array(size_t n)
{
struct nv_array *a = malloc(sizeof(struct nv_array));
nghttp2_nv *nv = (nghttp2_nv *)malloc(n * sizeof(nghttp2_nv));
a->nv = nv;
a->len = n;
return a;
}
int nv_array_set(struct nv_array *a, int index,
char *name, char *value,
size_t namelen, size_t valuelen, int flag)
{
if (index > (a->len - 1))
{
return -1;
}
nghttp2_nv nv = (a->nv)[index];
nv.name = name;
nv.value = value;
nv.namelen = namelen;
nv.valuelen = valuelen;
nv.flags = flag;
return 0;
}
void delete_nv_array(struct nv_array *a)
{
free(a->nv);
free(a);
}
nghttp2_data_provider *new_data_provider(void *data)
{
nghttp2_data_provider *dp = malloc(sizeof(nghttp2_data_provider));
dp->source.ptr = data;
dp->read_callback = data_source_read_callback;
}
Loading…
Cancel
Save