change function name

merge_conn
fangdingjun 6 years ago
parent d41993b816
commit 65cc942aa0

@ -5,14 +5,14 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
extern ssize_t DataRead(void *, void *data, size_t); extern ssize_t ClientDataRecv(void *, void *data, size_t);
extern ssize_t DataWrite(void *, void *data, size_t); extern ssize_t ClientDataSend(void *, void *data, size_t);
extern ssize_t DataSourceRead(void *, void *, size_t); extern ssize_t DataSourceRead(void *, void *, size_t);
extern int OnDataRecv(void *, int, void *, size_t); extern int OnClientDataRecv(void *, int, void *, size_t);
extern int OnBeginHeaderCallback(void *, int); extern int OnClientBeginHeaderCallback(void *, int);
extern int OnHeaderCallback(void *, int, void *, int, void *, int); extern int OnClientHeaderCallback(void *, int, void *, int, void *, int);
extern int OnFrameRecvCallback(void *, int); extern int OnClientFrameRecvCallback(void *, int);
extern int OnStreamClose(void *, int); extern int OnClientStreamClose(void *, int);
struct nv_array struct nv_array
{ {

@ -3,20 +3,20 @@
#define ARRLEN(x) (sizeof(x) / sizeof(x[0])) #define ARRLEN(x) (sizeof(x) / sizeof(x[0]))
// send_callback send data to network // send_callback send data to network
static ssize_t send_callback(nghttp2_session *session, const uint8_t *data, static ssize_t client_send_callback(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data) size_t length, int flags, void *user_data)
{ {
return DataWrite(user_data, (void *)data, length); return ClientDataSend(user_data, (void *)data, length);
} }
// recv_callback read data from network // recv_callback read data from network
static ssize_t recv_callback(nghttp2_session *session, uint8_t *buf, static ssize_t client_recv_callback(nghttp2_session *session, uint8_t *buf,
size_t length, int flags, void *user_data) size_t length, int flags, void *user_data)
{ {
return DataRead(user_data, (void *)buf, length); return ClientDataRecv(user_data, (void *)buf, length);
} }
static int on_header_callback(nghttp2_session *session, static int on_client_header_callback(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value, size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data) size_t valuelen, uint8_t flags, void *user_data)
@ -29,7 +29,7 @@ static int on_header_callback(nghttp2_session *session,
{ {
/* Print response headers for the initiated request. */ /* Print response headers for the initiated request. */
//print_header(stderr, name, namelen, value, valuelen); //print_header(stderr, name, namelen, value, valuelen);
OnHeaderCallback(user_data, frame->hd.stream_id, OnClientHeaderCallback(user_data, frame->hd.stream_id,
(void *)name, namelen, (void *)value, valuelen); (void *)name, namelen, (void *)value, valuelen);
break; break;
} }
@ -37,7 +37,7 @@ static int on_header_callback(nghttp2_session *session,
return 0; return 0;
} }
static int on_begin_headers_callback(nghttp2_session *session, static int on_client_begin_headers_callback(nghttp2_session *session,
const nghttp2_frame *frame, const nghttp2_frame *frame,
void *user_data) void *user_data)
{ {
@ -50,7 +50,7 @@ static int on_begin_headers_callback(nghttp2_session *session,
{ {
//fprintf(stderr, "Response headers for stream ID=%d:\n", //fprintf(stderr, "Response headers for stream ID=%d:\n",
// frame->hd.stream_id); // frame->hd.stream_id);
OnBeginHeaderCallback(user_data, stream_id); OnClientBeginHeaderCallback(user_data, stream_id);
} }
break; break;
} }
@ -67,7 +67,7 @@ int on_invalid_frame_recv_callback(nghttp2_session *session,
return 0; return 0;
} }
static int on_frame_send_callback(nghttp2_session *session, static int on_client_frame_send_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data) const nghttp2_frame *frame, void *user_data)
{ {
size_t i; size_t i;
@ -103,7 +103,7 @@ static int on_frame_send_callback(nghttp2_session *session,
return 0; return 0;
} }
static int on_frame_recv_callback(nghttp2_session *session, static int on_client_frame_recv_callback(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data) const nghttp2_frame *frame, void *user_data)
{ {
//printf("on_frame_recv_callback %d\n", frame->hd.type); //printf("on_frame_recv_callback %d\n", frame->hd.type);
@ -113,7 +113,7 @@ static int on_frame_recv_callback(nghttp2_session *session,
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)
{ {
//fprintf(stderr, "All headers received\n"); //fprintf(stderr, "All headers received\n");
OnFrameRecvCallback(user_data, frame->hd.stream_id); OnClientFrameRecvCallback(user_data, frame->hd.stream_id);
} }
break; break;
case NGHTTP2_RST_STREAM: case NGHTTP2_RST_STREAM:
@ -126,17 +126,17 @@ static int on_frame_recv_callback(nghttp2_session *session,
return 0; return 0;
} }
static int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags, static int on_client_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data, int32_t stream_id, const uint8_t *data,
size_t len, void *user_data) size_t len, void *user_data)
{ {
return OnDataRecv(user_data, stream_id, (void *)data, len); return OnClientDataRecv(user_data, stream_id, (void *)data, len);
} }
static int on_stream_close_callback(nghttp2_session *session, int32_t stream_id, static int on_client_stream_close_callback(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data) uint32_t error_code, void *user_data)
{ {
OnStreamClose(user_data, stream_id); OnClientStreamClose(user_data, stream_id);
return 0; return 0;
} }
@ -159,28 +159,28 @@ int on_error_callback(nghttp2_session *session, int lib_error_code,
return 0; return 0;
} }
void init_callbacks(nghttp2_session_callbacks *callbacks) void init_client_callbacks(nghttp2_session_callbacks *callbacks)
{ {
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); nghttp2_session_callbacks_set_send_callback(callbacks, client_send_callback);
nghttp2_session_callbacks_set_recv_callback(callbacks, recv_callback); nghttp2_session_callbacks_set_recv_callback(callbacks, client_recv_callback);
//nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); //nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback); nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
on_frame_recv_callback); on_client_frame_recv_callback);
nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_frame_send_callback); nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, on_client_frame_send_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback( nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks, on_data_chunk_recv_callback); callbacks, on_client_data_chunk_recv_callback);
nghttp2_session_callbacks_set_on_stream_close_callback( nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, on_stream_close_callback); callbacks, on_client_stream_close_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks, nghttp2_session_callbacks_set_on_header_callback(callbacks,
on_header_callback); on_client_header_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback( nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, on_begin_headers_callback); callbacks, on_client_begin_headers_callback);
} }
nghttp2_session *init_client_session(size_t data) nghttp2_session *init_client_session(size_t data)
@ -189,7 +189,7 @@ nghttp2_session *init_client_session(size_t data)
nghttp2_session *session; nghttp2_session *session;
nghttp2_session_callbacks *callbacks; nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks); nghttp2_session_callbacks_new(&callbacks);
init_callbacks(callbacks); init_client_callbacks(callbacks);
ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data))); ret = nghttp2_session_client_new(&session, callbacks, (void *)((int *)(data)));
if (session == NULL) if (session == NULL)
{ {
@ -198,21 +198,6 @@ nghttp2_session *init_client_session(size_t data)
return session; return session;
} }
nghttp2_session *init_server_session(size_t data)
{
int ret;
nghttp2_session *session;
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
init_callbacks(callbacks);
ret = nghttp2_session_server_new(&session, callbacks, (void *)((int *)(data)));
if (session == NULL)
{
printf("c init session failed: %s\n", nghttp2_strerror(ret));
}
return session;
}
int send_client_connection_header(nghttp2_session *session) int send_client_connection_header(nghttp2_session *session)
{ {
nghttp2_settings_entry iv[1] = { nghttp2_settings_entry iv[1] = {

@ -18,19 +18,20 @@ import (
"unsafe" "unsafe"
) )
// Conn http2 connection // ClientConn http2 connection
type Conn struct { type ClientConn struct {
session *C.nghttp2_session session *C.nghttp2_session
conn net.Conn conn net.Conn
streams map[int]*Stream streams map[int]*ClientStream
lock *sync.Mutex lock *sync.Mutex
errch chan struct{} errch chan struct{}
exitch chan struct{}
err error err error
isServer bool isServer bool
} }
// Stream http2 stream // ClientStream http2 stream
type Stream struct { type ClientStream struct {
streamID int streamID int
cdp *C.nghttp2_data_provider cdp *C.nghttp2_data_provider
dp *dataProvider dp *dataProvider
@ -44,9 +45,6 @@ type Stream struct {
closed bool closed bool
} }
type Request struct {
}
type dataProvider struct { type dataProvider struct {
// drain the data // drain the data
r io.Reader r io.Reader
@ -55,11 +53,12 @@ type dataProvider struct {
} }
// NewClientConn create http2 client // NewClientConn create http2 client
func NewClientConn(c net.Conn) (*Conn, error) { func NewClientConn(c net.Conn) (*ClientConn, error) {
conn := &Conn{ conn := &ClientConn{
conn: c, streams: make(map[int]*Stream), conn: c, streams: make(map[int]*ClientStream),
lock: new(sync.Mutex), lock: new(sync.Mutex),
errch: make(chan struct{}), errch: make(chan struct{}),
exitch: make(chan struct{}),
} }
conn.session = C.init_client_session( conn.session = C.init_client_session(
C.size_t(int(uintptr(unsafe.Pointer(conn))))) C.size_t(int(uintptr(unsafe.Pointer(conn)))))
@ -75,46 +74,28 @@ func NewClientConn(c net.Conn) (*Conn, error) {
return conn, nil return conn, nil
} }
// NewServerConn create http2 server func (c *ClientConn) onDataRecv(buf []byte, streamID int) {
func NewServerConn(c net.Conn) (*Conn, error) {
conn := &Conn{
conn: c, streams: make(map[int]*Stream),
lock: new(sync.Mutex),
errch: make(chan struct{}),
isServer: true,
}
conn.session = C.init_server_session(
C.size_t(int(uintptr(unsafe.Pointer(conn)))))
if conn.session == nil {
return nil, fmt.Errorf("init session failed")
}
go conn.run()
return conn, nil
}
func (c *Conn) onDataRecv(buf []byte, streamID int) {
stream := c.streams[streamID] stream := c.streams[streamID]
stream.onDataRecv(buf) stream.onDataRecv(buf)
} }
func (c *Conn) onBeginHeader(streamID int) { func (c *ClientConn) onBeginHeader(streamID int) {
stream := c.streams[streamID] stream := c.streams[streamID]
stream.onBeginHeader() stream.onBeginHeader()
} }
func (c *Conn) onHeader(streamID int, name, value string) { func (c *ClientConn) onHeader(streamID int, name, value string) {
stream := c.streams[streamID] stream := c.streams[streamID]
stream.onHeader(name, value) stream.onHeader(name, value)
} }
func (c *Conn) onFrameRecv(streamID int) { func (c *ClientConn) onFrameRecv(streamID int) {
stream := c.streams[streamID] stream := c.streams[streamID]
stream.onFrameRecv() stream.onFrameRecv()
} }
func (c *Conn) onStreamClose(streamID int) { func (c *ClientConn) onStreamClose(streamID int) {
stream, ok := c.streams[streamID] stream, ok := c.streams[streamID]
if ok { if ok {
stream.Close() stream.Close()
@ -126,29 +107,38 @@ func (c *Conn) onStreamClose(streamID int) {
} }
// Close close the http2 connection // Close close the http2 connection
func (c *Conn) Close() error { func (c *ClientConn) Close() error {
for _, s := range c.streams { for _, s := range c.streams {
s.Close() s.Close()
} }
C.nghttp2_session_terminate_session(c.session, 0) C.nghttp2_session_terminate_session(c.session, 0)
C.nghttp2_session_del(c.session) C.nghttp2_session_del(c.session)
close(c.errch) close(c.exitch)
c.conn.Close() c.conn.Close()
return nil return nil
} }
func (c *Conn) run() { func (c *ClientConn) run() {
var wantRead int var wantRead int
var wantWrite int var wantWrite int
var delay = 50 var delay = 50
var ret C.int var ret C.int
defer close(c.errch)
datach := make(chan []byte) datach := make(chan []byte)
errch := make(chan error) errch := make(chan error)
go func() { go func() {
buf := make([]byte, 16*1024) buf := make([]byte, 16*1024)
readloop:
for { for {
select {
case <-c.exitch:
break readloop
default:
}
n, err := c.conn.Read(buf) n, err := c.conn.Read(buf)
if err != nil { if err != nil {
errch <- err errch <- err
@ -166,6 +156,8 @@ loop:
case err := <-errch: case err := <-errch:
c.err = err c.err = err
break loop break loop
case <-c.exitch:
break loop
default: default:
} }
@ -205,15 +197,8 @@ loop:
} }
} }
// AcceptRequest get a request from session
// this block until a request is avaliable,
// server only
func (c *Conn) AcceptRequest() (req *Request, err error) {
return nil, nil
}
// CreateRequest submit a request and return a http.Response, client only // CreateRequest submit a request and return a http.Response, client only
func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) { func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
if c.err != nil { if c.err != nil {
return nil, c.err return nil, c.err
} }
@ -260,7 +245,7 @@ func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) {
} }
//log.Println("stream id ", int(streamID)) //log.Println("stream id ", int(streamID))
r, w := io.Pipe() r, w := io.Pipe()
s := &Stream{ s := &ClientStream{
streamID: int(streamID), streamID: int(streamID),
dp: dp, dp: dp,
cdp: cdp, cdp: cdp,
@ -278,6 +263,8 @@ func (c *Conn) CreateRequest(req *http.Request) (*http.Response, error) {
return nil, err return nil, err
case res := <-s.resch: case res := <-s.resch:
return res, nil return res, nil
case <-c.errch:
return nil, fmt.Errorf("connection error")
} }
//return nil, fmt.Errorf("unknown error") //return nil, fmt.Errorf("unknown error")
} }
@ -313,25 +300,25 @@ func newDataProvider(r io.Reader, w io.Writer) (
return dp, cdp return dp, cdp
} }
func (s *Stream) Read(buf []byte) (n int, err error) { func (s *ClientStream) Read(buf []byte) (n int, err error) {
return s.r.Read(buf) return s.r.Read(buf)
} }
func (s *Stream) Write(buf []byte) (n int, err error) { func (s *ClientStream) Write(buf []byte) (n int, err error) {
return s.dp.Write(buf) return s.dp.Write(buf)
} }
func (s *Stream) onDataRecv(buf []byte) { func (s *ClientStream) onDataRecv(buf []byte) {
s.w.Write(buf) s.w.Write(buf)
} }
func (s *Stream) onBeginHeader() { func (s *ClientStream) onBeginHeader() {
s.res = &http.Response{ s.res = &http.Response{
Header: make(http.Header), Header: make(http.Header),
} }
} }
func (s *Stream) onHeader(name, value string) { func (s *ClientStream) onHeader(name, value string) {
if name == ":status" { if name == ":status" {
statusCode, _ := strconv.Atoi(value) statusCode, _ := strconv.Atoi(value)
s.res.StatusCode = statusCode s.res.StatusCode = statusCode
@ -344,14 +331,14 @@ func (s *Stream) onHeader(name, value string) {
s.res.Header.Add(name, value) s.res.Header.Add(name, value)
} }
func (s *Stream) onFrameRecv() { func (s *ClientStream) onFrameRecv() {
s.res.Body = s s.res.Body = s
s.resch <- s.res s.resch <- s.res
//log.Println("stream frame recv") //log.Println("stream frame recv")
} }
// Close close the stream // Close close the stream
func (s *Stream) Close() error { func (s *ClientStream) Close() error {
if s.closed { if s.closed {
return nil return nil
} }
@ -392,22 +379,22 @@ func DataSourceRead(ptr unsafe.Pointer,
return C.ssize_t(n) return C.ssize_t(n)
} }
// OnDataRecv callback function for data frame received // OnClientDataRecv callback function for data frame received
//export OnDataRecv //export OnClientDataRecv
func OnDataRecv(ptr unsafe.Pointer, streamID C.int, func OnClientDataRecv(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.int { buf unsafe.Pointer, length C.size_t) C.int {
//log.Println("on data recv") //log.Println("on data recv")
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
gobuf := C.GoBytes(buf, C.int(length)) gobuf := C.GoBytes(buf, C.int(length))
conn.onDataRecv(gobuf, int(streamID)) conn.onDataRecv(gobuf, int(streamID))
return 0 return 0
} }
// DataRead callback function for session wants read data from peer // ClientDataRecv callback function for session wants read data from peer
//export DataRead //export ClientDataRecv
func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { func ClientDataRecv(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("data read req", int(size)) //log.Println("data read req", int(size))
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
buf := make([]byte, int(size)) buf := make([]byte, int(size))
//log.Println(conn.conn.RemoteAddr()) //log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Read(buf) n, err := conn.conn.Read(buf)
@ -421,11 +408,11 @@ func DataRead(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t
return C.ssize_t(n) return C.ssize_t(n)
} }
// DataWrite callback function for session wants send data to peer // ClientDataSend callback function for session wants send data to peer
//export DataWrite //export ClientDataSend
func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { func ClientDataSend(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("data write req ", int(size)) //log.Println("data write req ", int(size))
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
buf := C.GoBytes(data, C.int(size)) buf := C.GoBytes(data, C.int(size))
//log.Println(conn.conn.RemoteAddr()) //log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Write(buf) n, err := conn.conn.Write(buf)
@ -437,42 +424,42 @@ func DataWrite(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t
return C.ssize_t(n) return C.ssize_t(n)
} }
// OnBeginHeaderCallback callback function for response // OnClientBeginHeaderCallback callback function for response
//export OnBeginHeaderCallback //export OnClientBeginHeaderCallback
func OnBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { func OnClientBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("begin header") //log.Println("begin header")
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
conn.onBeginHeader(int(streamID)) conn.onBeginHeader(int(streamID))
return 0 return 0
} }
// OnHeaderCallback callback function for header // OnClientHeaderCallback callback function for header
//export OnHeaderCallback //export OnClientHeaderCallback
func OnHeaderCallback(ptr unsafe.Pointer, streamID C.int, func OnClientHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int, name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int { value unsafe.Pointer, valuelen C.int) C.int {
//log.Println("header") //log.Println("header")
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
goname := C.GoBytes(name, namelen) goname := C.GoBytes(name, namelen)
govalue := C.GoBytes(value, valuelen) govalue := C.GoBytes(value, valuelen)
conn.onHeader(int(streamID), string(goname), string(govalue)) conn.onHeader(int(streamID), string(goname), string(govalue))
return 0 return 0
} }
// OnFrameRecvCallback callback function for begion to recv data // OnClientFrameRecvCallback callback function for begion to recv data
//export OnFrameRecvCallback //export OnClientFrameRecvCallback
func OnFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int { func OnClientFrameRecvCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("frame recv") //log.Println("frame recv")
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
conn.onFrameRecv(int(streamID)) conn.onFrameRecv(int(streamID))
return 0 return 0
} }
// OnStreamClose callback function for stream close // OnClientStreamClose callback function for stream close
//export OnStreamClose //export OnClientStreamClose
func OnStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { func OnClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("stream close") //log.Println("stream close")
conn := (*Conn)(ptr) conn := (*ClientConn)(ptr)
conn.onStreamClose(int(streamID)) conn.onStreamClose(int(streamID))
return 0 return 0
} }
Loading…
Cancel
Save