master
fangdingjun 6 years ago
parent 2f09b48a89
commit 2696323ff3

@ -1,333 +1,333 @@
package nghttp2 package nghttp2
/* /*
#include "_nghttp2.h" #include "_nghttp2.h"
*/ */
import "C" import "C"
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"errors" "errors"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"unsafe" "unsafe"
) )
var ( var (
errAgain = errors.New("again") errAgain = errors.New("again")
) )
const ( const (
NGHTTP2_NO_ERROR = 0 NGHTTP2_NO_ERROR = 0
NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE = -521 NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE = -521
NGHTTP2_ERR_CALLBACK_FAILURE = -902 NGHTTP2_ERR_CALLBACK_FAILURE = -902
NGHTTP2_ERR_DEFERRED = -508 NGHTTP2_ERR_DEFERRED = -508
) )
/* /*
var bufPool = &sync.Pool{ var bufPool = &sync.Pool{
New: func() interface{} { New: func() interface{} {
return make([]byte, 16*1024) return make([]byte, 16*1024)
}, },
} }
*/ */
// onDataSourceReadCallback callback function for libnghttp2 library // onDataSourceReadCallback callback function for libnghttp2 library
// want read data from data provider source, // want read data from data provider source,
// return NGHTTP2_ERR_DEFERRED will cause data frame defered, // return NGHTTP2_ERR_DEFERRED will cause data frame defered,
// application later call nghttp2_session_resume_data will re-quene the data frame // application later call nghttp2_session_resume_data will re-quene the data frame
// //
//export onDataSourceReadCallback //export onDataSourceReadCallback
func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int, func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.ssize_t { buf unsafe.Pointer, length C.size_t) C.ssize_t {
//log.Println("onDataSourceReadCallback begin") //log.Println("onDataSourceReadCallback begin")
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)] s, ok := conn.streams[int(streamID)]
if !ok { if !ok {
//log.Println("client dp callback, stream not exists") //log.Println("client dp callback, stream not exists")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
gobuf := make([]byte, int(length)) gobuf := make([]byte, int(length))
/* /*
_length := int(length) _length := int(length)
gobuf := bufPool.Get().([]byte) gobuf := bufPool.Get().([]byte)
if len(gobuf) < _length { if len(gobuf) < _length {
gobuf = make([]byte, _length) gobuf = make([]byte, _length)
} }
defer bufPool.Put(gobuf) defer bufPool.Put(gobuf)
*/ */
n, err := s.dp.Read(gobuf[0:]) n, err := s.dp.Read(gobuf[0:])
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
//log.Println("onDataSourceReadCallback end") //log.Println("onDataSourceReadCallback end")
return 0 return 0
} }
if err == errAgain { if err == errAgain {
//log.Println("onDataSourceReadCallback end") //log.Println("onDataSourceReadCallback end")
//s.dp.deferred = true //s.dp.deferred = true
return NGHTTP2_ERR_DEFERRED return NGHTTP2_ERR_DEFERRED
} }
//log.Println("onDataSourceReadCallback end") //log.Println("onDataSourceReadCallback end")
return NGHTTP2_ERR_CALLBACK_FAILURE return NGHTTP2_ERR_CALLBACK_FAILURE
} }
//cbuf := C.CBytes(gobuf) //cbuf := C.CBytes(gobuf)
//defer C.free(cbuf) //defer C.free(cbuf)
//C.memcpy(buf, cbuf, C.size_t(n)) //C.memcpy(buf, cbuf, C.size_t(n))
C.memcpy(buf, unsafe.Pointer(&gobuf[0]), C.size_t(n)) C.memcpy(buf, unsafe.Pointer(&gobuf[0]), C.size_t(n))
//log.Println("onDataSourceReadCallback end") //log.Println("onDataSourceReadCallback end")
return C.ssize_t(n) return C.ssize_t(n)
} }
// onDataChunkRecv callback function for libnghttp2 library data chunk received. // onDataChunkRecv callback function for libnghttp2 library data chunk received.
// //
//export onDataChunkRecv //export onDataChunkRecv
func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int, func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
buf unsafe.Pointer, length C.size_t) C.int { buf unsafe.Pointer, length C.size_t) C.int {
//log.Println("onDataChunkRecv begin") //log.Println("onDataChunkRecv begin")
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
gobuf := C.GoBytes(buf, C.int(length)) gobuf := C.GoBytes(buf, C.int(length))
s, ok := conn.streams[int(streamID)] s, ok := conn.streams[int(streamID)]
if !ok { if !ok {
//log.Println("onDataChunkRecv end") //log.Println("onDataChunkRecv end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
if s.bp == nil { if s.bp == nil {
//log.Println("empty body") //log.Println("empty body")
//log.Println("onDataChunkRecv end") //log.Println("onDataChunkRecv end")
return C.int(length) return C.int(length)
} }
//log.Println("bp write") //log.Println("bp write")
n, err := s.bp.Write(gobuf) n, err := s.bp.Write(gobuf)
if err != nil { if err != nil {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
//log.Println("onDataChunkRecv end") //log.Println("onDataChunkRecv end")
return C.int(n) return C.int(n)
} }
// onDataSendCallback callback function for libnghttp2 library want send data to network. // onDataSendCallback callback function for libnghttp2 library want send data to network.
// //
//export onDataSendCallback //export onDataSendCallback
func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t { func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("onDataSendCallback begin") //log.Println("onDataSendCallback begin")
//log.Println("data write req ", int(size)) //log.Println("data write req ", int(size))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
buf := C.GoBytes(data, C.int(size)) buf := C.GoBytes(data, C.int(size))
//log.Println(conn.conn.RemoteAddr()) //log.Println(conn.conn.RemoteAddr())
n, err := conn.conn.Write(buf) n, err := conn.conn.Write(buf)
if err != nil { if err != nil {
//log.Println("onDataSendCallback end") //log.Println("onDataSendCallback end")
return NGHTTP2_ERR_CALLBACK_FAILURE return NGHTTP2_ERR_CALLBACK_FAILURE
} }
//log.Printf("write %d bytes to network ", n) //log.Printf("write %d bytes to network ", n)
//log.Println("onDataSendCallback end") //log.Println("onDataSendCallback end")
return C.ssize_t(n) return C.ssize_t(n)
} }
// onBeginHeaderCallback callback function for begin header receive. // onBeginHeaderCallback callback function for begin header receive.
// //
//export onBeginHeaderCallback //export onBeginHeaderCallback
func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int { func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onBeginHeaderCallback begin") //log.Println("onBeginHeaderCallback begin")
//log.Printf("stream %d begin headers", int(streamID)) //log.Printf("stream %d begin headers", int(streamID))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
var TLS tls.ConnectionState var TLS tls.ConnectionState
if tlsconn, ok := conn.conn.(*tls.Conn); ok { if tlsconn, ok := conn.conn.(*tls.Conn); ok {
TLS = tlsconn.ConnectionState() TLS = tlsconn.ConnectionState()
} }
// client // client
if !conn.isServer { if !conn.isServer {
s, ok := conn.streams[int(streamID)] s, ok := conn.streams[int(streamID)]
if !ok { if !ok {
//log.Println("onBeginHeaderCallback end") //log.Println("onBeginHeaderCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
s.response = &http.Response{ s.response = &http.Response{
Proto: "HTTP/2", Proto: "HTTP/2",
ProtoMajor: 2, ProtoMajor: 2,
ProtoMinor: 0, ProtoMinor: 0,
Header: make(http.Header), Header: make(http.Header),
Body: s.bp, Body: s.bp,
TLS: &TLS, TLS: &TLS,
} }
return NGHTTP2_NO_ERROR return NGHTTP2_NO_ERROR
} }
// server // server
s := &stream{ s := &stream{
streamID: int(streamID), streamID: int(streamID),
conn: conn, conn: conn,
bp: &bodyProvider{ bp: &bodyProvider{
buf: new(bytes.Buffer), buf: new(bytes.Buffer),
lock: new(sync.Mutex), lock: new(sync.Mutex),
}, },
request: &http.Request{ request: &http.Request{
Header: make(http.Header), Header: make(http.Header),
Proto: "HTTP/2", Proto: "HTTP/2",
ProtoMajor: 2, ProtoMajor: 2,
ProtoMinor: 0, ProtoMinor: 0,
TLS: &TLS, TLS: &TLS,
}, },
} }
s.request.Body = s.bp s.request.Body = s.bp
//log.Printf("new stream %d", int(streamID)) //log.Printf("new stream %d", int(streamID))
conn.streams[int(streamID)] = s conn.streams[int(streamID)] = s
runtime.SetFinalizer(s, (*stream).free) runtime.SetFinalizer(s, (*stream).free)
//log.Println("onBeginHeaderCallback end") //log.Println("onBeginHeaderCallback end")
return NGHTTP2_NO_ERROR return NGHTTP2_NO_ERROR
} }
// onHeaderCallback callback function for each header received. // onHeaderCallback callback function for each header received.
// //
//export onHeaderCallback //export onHeaderCallback
func onHeaderCallback(ptr unsafe.Pointer, streamID C.int, func onHeaderCallback(ptr unsafe.Pointer, streamID C.int,
name unsafe.Pointer, namelen C.int, name unsafe.Pointer, namelen C.int,
value unsafe.Pointer, valuelen C.int) C.int { value unsafe.Pointer, valuelen C.int) C.int {
//log.Println("onHeaderCallback begin") //log.Println("onHeaderCallback begin")
//log.Printf("header %d", int(streamID)) //log.Printf("header %d", int(streamID))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
goname := string(C.GoBytes(name, namelen)) goname := string(C.GoBytes(name, namelen))
govalue := string(C.GoBytes(value, valuelen)) govalue := string(C.GoBytes(value, valuelen))
s, ok := conn.streams[int(streamID)] s, ok := conn.streams[int(streamID)]
if !ok { if !ok {
//log.Println("onHeaderCallback end") //log.Println("onHeaderCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
var header http.Header var header http.Header
if conn.isServer { if conn.isServer {
header = s.request.Header header = s.request.Header
} else { } else {
header = s.response.Header header = s.response.Header
} }
goname = strings.ToLower(goname) goname = strings.ToLower(goname)
switch goname { switch goname {
case ":method": case ":method":
s.request.Method = govalue s.request.Method = govalue
case ":scheme": case ":scheme":
case ":authority": case ":authority":
s.request.Host = govalue s.request.Host = govalue
case ":path": case ":path":
s.request.RequestURI = govalue s.request.RequestURI = govalue
u, err := url.Parse(govalue) u, err := url.Parse(govalue)
if err != nil { if err != nil {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
s.request.URL = u s.request.URL = u
case ":status": case ":status":
if s.response == nil { if s.response == nil {
//log.Println("empty response") //log.Println("empty response")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
statusCode, _ := strconv.Atoi(govalue) statusCode, _ := strconv.Atoi(govalue)
s.response.StatusCode = statusCode s.response.StatusCode = statusCode
s.response.Status = http.StatusText(statusCode) s.response.Status = http.StatusText(statusCode)
case "content-length": case "content-length":
header.Add(goname, govalue) header.Add(goname, govalue)
n, err := strconv.ParseInt(govalue, 10, 64) n, err := strconv.ParseInt(govalue, 10, 64)
if err == nil { if err == nil {
if conn.isServer { if conn.isServer {
s.request.ContentLength = n s.request.ContentLength = n
} else { } else {
s.response.ContentLength = n s.response.ContentLength = n
} }
} }
case "transfer-encoding": case "transfer-encoding":
header.Add(goname, govalue) header.Add(goname, govalue)
if conn.isServer { if conn.isServer {
s.request.TransferEncoding = append(s.response.TransferEncoding, govalue) s.request.TransferEncoding = append(s.response.TransferEncoding, govalue)
} else { } else {
s.response.TransferEncoding = append(s.response.TransferEncoding, govalue) s.response.TransferEncoding = append(s.response.TransferEncoding, govalue)
} }
default: default:
header.Add(goname, govalue) header.Add(goname, govalue)
} }
//log.Println("onHeaderCallback end") //log.Println("onHeaderCallback end")
return NGHTTP2_NO_ERROR return NGHTTP2_NO_ERROR
} }
// onHeadersDoneCallback callback function for the stream when all headers received. // onHeadersDoneCallback callback function for the stream when all headers received.
// //
//export onHeadersDoneCallback //export onHeadersDoneCallback
func onHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int { func onHeadersDoneCallback(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onHeadersDoneCallback begin") //log.Println("onHeadersDoneCallback begin")
//log.Printf("stream %d headers done", int(streamID)) //log.Printf("stream %d headers done", int(streamID))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
s, ok := conn.streams[int(streamID)] s, ok := conn.streams[int(streamID)]
if !ok { if !ok {
//log.Println("onHeadersDoneCallback end") //log.Println("onHeadersDoneCallback end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
s.headersEnd = true s.headersEnd = true
if conn.isServer { if conn.isServer {
if s.request.Method == "CONNECT" { if s.request.Method == "CONNECT" {
go conn.serve(s) go conn.serve(s)
} }
return NGHTTP2_NO_ERROR return NGHTTP2_NO_ERROR
} }
select { select {
case s.resch <- s.response: case s.resch <- s.response:
default: default:
} }
//log.Println("onHeadersDoneCallback end") //log.Println("onHeadersDoneCallback end")
return NGHTTP2_NO_ERROR return NGHTTP2_NO_ERROR
} }
// onStreamClose callback function for the stream when closed. // onStreamClose callback function for the stream when closed.
// //
//export onStreamClose //export onStreamClose
func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int { func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
//log.Println("onStreamClose begin") //log.Println("onStreamClose begin")
//log.Printf("stream %d closed", int(streamID)) //log.Printf("stream %d closed", int(streamID))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
stream, ok := conn.streams[int(streamID)] stream, ok := conn.streams[int(streamID)]
if ok { if ok {
go stream.Close() go stream.Close()
//log.Printf("remove stream %d", int(streamID)) //log.Printf("remove stream %d", int(streamID))
//conn.lock.Lock() //conn.lock.Lock()
delete(conn.streams, int(streamID)) delete(conn.streams, int(streamID))
//go stream.Close() //go stream.Close()
//conn.lock.Unlock() //conn.lock.Unlock()
//log.Println("onStreamClose end") //log.Println("onStreamClose end")
return NGHTTP2_NO_ERROR return NGHTTP2_NO_ERROR
} }
//log.Println("onStreamClose end") //log.Println("onStreamClose end")
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
} }
//export onConnectionCloseCallback //export onConnectionCloseCallback
func onConnectionCloseCallback(ptr unsafe.Pointer) { func onConnectionCloseCallback(ptr unsafe.Pointer) {
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
conn.err = io.EOF conn.err = io.EOF
conn.Close() conn.Close()
} }
//export onStreamEndCallback //export onStreamEndCallback
func onStreamEndCallback(ptr unsafe.Pointer, streamID C.int) { func onStreamEndCallback(ptr unsafe.Pointer, streamID C.int) {
conn := (*Conn)(unsafe.Pointer(uintptr(ptr))) conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
stream, ok := conn.streams[int(streamID)] stream, ok := conn.streams[int(streamID)]
if !ok { if !ok {
return return
} }
stream.streamEnd = true stream.streamEnd = true
stream.bp.Close() stream.bp.Close()
if stream.conn.isServer { if stream.conn.isServer {
if stream.request.Method != "CONNECT" { if stream.request.Method != "CONNECT" {
go conn.serve(stream) go conn.serve(stream)
} }
return return
} }
} }

@ -1,183 +1,183 @@
package nghttp2 package nghttp2
/* /*
#include "_nghttp2.h" #include "_nghttp2.h"
*/ */
import "C" import "C"
import ( import (
"bytes" "bytes"
"errors" "errors"
"io" "io"
"log" "log"
"sync" "sync"
"time" "time"
"unsafe" "unsafe"
) )
// dataProvider provider data for libnghttp2 library // dataProvider provider data for libnghttp2 library
// libnghttp2 callback will Read to read the data, // libnghttp2 callback will Read to read the data,
// application call Write to provider data, // application call Write to provider data,
// application call Close will cause Read return io.EOF // application call Close will cause Read return io.EOF
type dataProvider struct { type dataProvider struct {
buf *bytes.Buffer buf *bytes.Buffer
closed bool closed bool
lock *sync.Mutex lock *sync.Mutex
sessLock *sync.Mutex sessLock *sync.Mutex
session *C.nghttp2_session session *C.nghttp2_session
streamID int streamID int
deferred bool deferred bool
} }
// Read read from data provider // Read read from data provider
func (dp *dataProvider) Read(buf []byte) (n int, err error) { func (dp *dataProvider) Read(buf []byte) (n int, err error) {
if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil {
log.Println("dp read invalid state") log.Println("dp read invalid state")
return 0, errors.New("invalid state") return 0, errors.New("invalid state")
} }
dp.lock.Lock() dp.lock.Lock()
defer dp.lock.Unlock() defer dp.lock.Unlock()
n, err = dp.buf.Read(buf) n, err = dp.buf.Read(buf)
if err != nil && !dp.closed { if err != nil && !dp.closed {
//log.Println("deferred") //log.Println("deferred")
dp.deferred = true dp.deferred = true
return 0, errAgain return 0, errAgain
} }
return return
} }
// Write provider data for data provider // Write provider data for data provider
func (dp *dataProvider) Write(buf []byte) (n int, err error) { func (dp *dataProvider) Write(buf []byte) (n int, err error) {
if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil {
log.Println("dp write invalid state") log.Println("dp write invalid state")
return 0, errors.New("invalid state") return 0, errors.New("invalid state")
} }
// make sure the buffer not too large // make sure the buffer not too large
delay := 10 * time.Millisecond delay := 10 * time.Millisecond
maxBufSize := 1 * 1024 * 1024 maxBufSize := 1 * 1024 * 1024
for { for {
dp.lock.Lock() dp.lock.Lock()
_len := dp.buf.Len() _len := dp.buf.Len()
closed := dp.closed closed := dp.closed
dp.lock.Unlock() dp.lock.Unlock()
if closed { if closed {
return 0, io.EOF return 0, io.EOF
} }
if _len < maxBufSize { if _len < maxBufSize {
break break
} }
time.Sleep(delay) time.Sleep(delay)
} }
dp.lock.Lock() dp.lock.Lock()
defer dp.lock.Unlock() defer dp.lock.Unlock()
//if dp.closed { //if dp.closed {
// return 0, io.EOF // return 0, io.EOF
//} //}
n, err = dp.buf.Write(buf) n, err = dp.buf.Write(buf)
if dp.deferred { if dp.deferred {
dp.sessLock.Lock() dp.sessLock.Lock()
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
dp.sessLock.Unlock() dp.sessLock.Unlock()
//log.Println("resume") //log.Println("resume")
dp.deferred = false dp.deferred = false
} }
return return
} }
// Close end to provide data // Close end to provide data
func (dp *dataProvider) Close() error { func (dp *dataProvider) Close() error {
if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil { if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil {
log.Println("dp close, invalid state") log.Println("dp close, invalid state")
return errors.New("invalid state") return errors.New("invalid state")
} }
dp.lock.Lock() dp.lock.Lock()
defer dp.lock.Unlock() defer dp.lock.Unlock()
if dp.closed { if dp.closed {
return nil return nil
} }
dp.closed = true dp.closed = true
//log.Printf("dp close stream %d", dp.streamID) //log.Printf("dp close stream %d", dp.streamID)
if dp.deferred { if dp.deferred {
dp.sessLock.Lock() dp.sessLock.Lock()
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
dp.sessLock.Unlock() dp.sessLock.Unlock()
dp.deferred = false dp.deferred = false
} }
return nil return nil
} }
func newDataProvider(cdp unsafe.Pointer, sessionLock *sync.Mutex, t int) *dataProvider { func newDataProvider(cdp unsafe.Pointer, sessionLock *sync.Mutex, t int) *dataProvider {
dp := &dataProvider{ dp := &dataProvider{
buf: new(bytes.Buffer), buf: new(bytes.Buffer),
lock: new(sync.Mutex), lock: new(sync.Mutex),
sessLock: sessionLock, sessLock: sessionLock,
} }
C.data_provider_set_callback(C.size_t(uintptr(cdp)), C.data_provider_set_callback(C.size_t(uintptr(cdp)),
C.size_t(uintptr(unsafe.Pointer(dp))), C.int(t)) C.size_t(uintptr(unsafe.Pointer(dp))), C.int(t))
return dp return dp
} }
// bodyProvider provide data for http body // bodyProvider provide data for http body
// Read will block when data not yet avaliable // Read will block when data not yet avaliable
type bodyProvider struct { type bodyProvider struct {
buf *bytes.Buffer buf *bytes.Buffer
closed bool closed bool
lock *sync.Mutex lock *sync.Mutex
} }
// Read read data from provider // Read read data from provider
// will block when data not yet avaliable // will block when data not yet avaliable
func (bp *bodyProvider) Read(buf []byte) (int, error) { func (bp *bodyProvider) Read(buf []byte) (int, error) {
var delay = 100 * time.Millisecond var delay = 100 * time.Millisecond
for { for {
bp.lock.Lock() bp.lock.Lock()
n, err := bp.buf.Read(buf) n, err := bp.buf.Read(buf)
bp.lock.Unlock() bp.lock.Unlock()
if err != nil && !bp.closed { if err != nil && !bp.closed {
time.Sleep(delay) time.Sleep(delay)
continue continue
} }
return n, err return n, err
} }
} }
// Write provide data for dataProvider // Write provide data for dataProvider
// libnghttp2 data chunk recv callback will call this // libnghttp2 data chunk recv callback will call this
func (bp *bodyProvider) Write(buf []byte) (int, error) { func (bp *bodyProvider) Write(buf []byte) (int, error) {
bp.lock.Lock() bp.lock.Lock()
defer bp.lock.Unlock() defer bp.lock.Unlock()
return bp.buf.Write(buf) return bp.buf.Write(buf)
} }
// Close end to provide data // Close end to provide data
func (bp *bodyProvider) Close() error { func (bp *bodyProvider) Close() error {
bp.lock.Lock() bp.lock.Lock()
defer bp.lock.Unlock() defer bp.lock.Unlock()
bp.closed = true bp.closed = true
return nil return nil
} }
func newNV(name, value string) C.nghttp2_nv { func newNV(name, value string) C.nghttp2_nv {
nv := C.nghttp2_nv{} nv := C.nghttp2_nv{}
nameArr := make([]byte, len(name)+1) nameArr := make([]byte, len(name)+1)
valueArr := make([]byte, len(value)+1) valueArr := make([]byte, len(value)+1)
copy(nameArr, []byte(name)) copy(nameArr, []byte(name))
copy(valueArr, []byte(value)) copy(valueArr, []byte(value))
nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0])) nv.name = (*C.uchar)(unsafe.Pointer(&nameArr[0]))
nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0])) nv.value = (*C.uchar)(unsafe.Pointer(&valueArr[0]))
nv.namelen = C.size_t(len(name)) nv.namelen = C.size_t(len(name))
nv.valuelen = C.size_t(len(value)) nv.valuelen = C.size_t(len(value))
nv.flags = 0 nv.flags = 0
return nv return nv
} }

226
doc.go

@ -1,113 +1,113 @@
/*Package nghttp2 is libnghttp2 binding for golang. /*Package nghttp2 is libnghttp2 binding for golang.
server example server example
cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key") cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
l, err := tls.Listen("tcp", "127.0.0.1:1100", &tls.Config{ l, err := tls.Listen("tcp", "127.0.0.1:1100", &tls.Config{
Certificates: []tls.Certificate{cert}, Certificates: []tls.Certificate{cert},
NextProtos: []string{"h2"}, NextProtos: []string{"h2"},
}) })
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer l.Close() defer l.Close()
addr := l.Addr().String() addr := l.Addr().String()
http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
log.Printf("%+v", r) log.Printf("%+v", r)
hdr := w.Header() hdr := w.Header()
hdr.Set("content-type", "text/plain") hdr.Set("content-type", "text/plain")
hdr.Set("aa", "bb") hdr.Set("aa", "bb")
d, err := ioutil.ReadAll(r.Body) d, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
w.Write(d) w.Write(d)
}) })
for { for {
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
break break
} }
h2conn, err := Server(c, nil) h2conn, err := Server(c, nil)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Printf("%+v", h2conn) log.Printf("%+v", h2conn)
go h2conn.Run() go h2conn.Run()
} }
client example client example
conn, err := tls.Dial("tcp", "nghttp2.org:443", &tls.Config{ conn, err := tls.Dial("tcp", "nghttp2.org:443", &tls.Config{
NextProtos: []string{"h2"}, NextProtos: []string{"h2"},
ServerName: "nghttp2.org", ServerName: "nghttp2.org",
}) })
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer conn.Close() defer conn.Close()
if err := conn.Handshake(); err != nil{ if err := conn.Handshake(); err != nil{
log.Fatal(err) log.Fatal(err)
} }
cstate := conn.ConnectionState() cstate := conn.ConnectionState()
if cstate.NegotiatedProtocol != "h2" { if cstate.NegotiatedProtocol != "h2" {
log.Fatal("no http2 on server") log.Fatal("no http2 on server")
} }
h2conn, err := Client(conn) h2conn, err := Client(conn)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
param := url.Values{} param := url.Values{}
param.Add("e", "b") param.Add("e", "b")
param.Add("f", "d") param.Add("f", "d")
data := bytes.NewReader([]byte(param.Encode())) data := bytes.NewReader([]byte(param.Encode()))
req, _ := http.NewRequest("POST", req, _ := http.NewRequest("POST",
"https://nghttp2.org/httpbin/post?a=b&c=d", "https://nghttp2.org/httpbin/post?a=b&c=d",
data) data)
log.Printf("%+v", req) log.Printf("%+v", req)
req.Header.Set("user-agent", "go-nghttp2/1.0") req.Header.Set("user-agent", "go-nghttp2/1.0")
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
res, err := h2conn.RoundTrip(req) res, err := h2conn.RoundTrip(req)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Printf("expect %d, got %d", http.StatusOK, res.StatusCode) log.Printf("expect %d, got %d", http.StatusOK, res.StatusCode)
} }
res.Write(os.Stderr) res.Write(os.Stderr)
co-work with net/http example co-work with net/http example
l, err := net.Listen("tcp", "127.0.0.1:1222") l, err := net.Listen("tcp", "127.0.0.1:1222")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
srv := &http.Server{ srv := &http.Server{
TLSConfig: &tls.Config{ TLSConfig: &tls.Config{
NextProtos: []string{"h2", "http/1.1"}, NextProtos: []string{"h2", "http/1.1"},
}, },
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){ TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){
"h2": nghttp2.HTTP2Handler, "h2": nghttp2.HTTP2Handler,
}, },
} }
defer srv.Close() defer srv.Close()
srv.ServeTLS(l, "testdata/server.crt", "testdata/server.key") srv.ServeTLS(l, "testdata/server.crt", "testdata/server.key")
see http2_test.go for more details see http2_test.go for more details
*/ */
package nghttp2 package nghttp2

Loading…
Cancel
Save