Compare commits

..

No commits in common. 'master' and 'merge_conn' have entirely different histories.

@ -10,7 +10,7 @@ server usage example:
cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
l, err := tls.Listen("tcp", "127.0.0.1:1100", &tls.Config{
@ -18,7 +18,7 @@ server usage example:
NextProtos: []string{"h2"},
})
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer l.Close()
addr := l.Addr().String()
@ -30,7 +30,7 @@ server usage example:
hdr.Set("aa", "bb")
d, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
ln(err)
return
}
w.Write(d)
@ -43,7 +43,7 @@ server usage example:
}
h2conn, err := NewServerConn(c, nil)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
f("%+v", h2conn)
go h2conn.Run()
@ -57,18 +57,18 @@ client usage example:
ServerName: "nghttp2.org",
})
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer conn.Close()
cstate := conn.ConnectionState()
if cstate.NegotiatedProtocol != "h2" {
log.Fatal("no http2 on server")
t.Fatal("no http2 on server")
}
h2conn, err := NewClientConn(conn)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
param := url.Values{}
@ -79,18 +79,18 @@ client usage example:
"https://nghttp2.org/httpbin/post?a=b&c=d",
data)
log.Printf("%+v", req)
f("%+v", req)
req.Header.Set("user-agent", "go-nghttp2/1.0")
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
res, err := h2conn.CreateRequest(req)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
if res.StatusCode != http.StatusOK {
log.Printf("expect %d, got %d", http.StatusOK, res.StatusCode)
t.Errorf("expect %d, got %d", http.StatusOK, res.StatusCode)
}
res.Write(os.Stderr)
@ -99,7 +99,7 @@ co-work with net/http server example:
l, err := net.Listen("tcp", "127.0.0.1:1222")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
srv := &http.Server{
TLSConfig: &tls.Config{

@ -6,13 +6,11 @@ package nghttp2
import "C"
import (
"bytes"
"context"
"crypto/tls"
"errors"
"io"
"net/http"
"net/url"
"runtime"
"strconv"
"strings"
"sync"
@ -30,14 +28,6 @@ const (
NGHTTP2_ERR_DEFERRED = -508
)
/*
var bufPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 16*1024)
},
}
*/
// onDataSourceReadCallback callback function for libnghttp2 library
// want read data from data provider source,
// return NGHTTP2_ERR_DEFERRED will cause data frame defered,
@ -54,16 +44,7 @@ func onDataSourceReadCallback(ptr unsafe.Pointer, streamID C.int,
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
}
gobuf := make([]byte, int(length))
/*
_length := int(length)
gobuf := bufPool.Get().([]byte)
if len(gobuf) < _length {
gobuf = make([]byte, _length)
}
defer bufPool.Put(gobuf)
*/
n, err := s.dp.Read(gobuf[0:])
n, err := s.dp.Read(gobuf)
if err != nil {
if err == io.EOF {
//log.Println("onDataSourceReadCallback end")
@ -116,8 +97,7 @@ func onDataChunkRecv(ptr unsafe.Pointer, streamID C.int,
// onDataSendCallback callback function for libnghttp2 library want send data to network.
//
//export onDataSendCallback
func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer,
size C.size_t) C.ssize_t {
func onDataSendCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.size_t) C.ssize_t {
//log.Println("onDataSendCallback begin")
//log.Println("data write req ", int(size))
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
@ -179,12 +159,9 @@ func onBeginHeaderCallback(ptr unsafe.Pointer, streamID C.int) C.int {
TLS: &TLS,
},
}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.request.Body = s.bp
//log.Printf("new stream %d", int(streamID))
conn.streams[int(streamID)] = s
runtime.SetFinalizer(s, (*stream).free)
conn.streams[int(streamID)] = s
//log.Println("onBeginHeaderCallback end")
return NGHTTP2_NO_ERROR
@ -248,11 +225,9 @@ func onHeaderCallback(ptr unsafe.Pointer, streamID C.int,
case "transfer-encoding":
header.Add(goname, govalue)
if conn.isServer {
s.request.TransferEncoding = append(
s.response.TransferEncoding, govalue)
s.request.TransferEncoding = append(s.response.TransferEncoding, govalue)
} else {
s.response.TransferEncoding = append(
s.response.TransferEncoding, govalue)
s.response.TransferEncoding = append(s.response.TransferEncoding, govalue)
}
default:
header.Add(goname, govalue)
@ -299,7 +274,6 @@ func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
stream, ok := conn.streams[int(streamID)]
if ok {
go stream.Close()
//log.Printf("remove stream %d", int(streamID))
//conn.lock.Lock()
delete(conn.streams, int(streamID))
//go stream.Close()
@ -315,7 +289,14 @@ func onStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
func onConnectionCloseCallback(ptr unsafe.Pointer) {
conn := (*Conn)(unsafe.Pointer(uintptr(ptr)))
conn.err = io.EOF
conn.Close()
// signal all goroutings exit
for i := 0; i < 4; i++ {
select {
case conn.exitch <- struct{}{}:
default:
}
}
}
//export onStreamEndCallback

@ -1,85 +0,0 @@
package nghttp2
import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"strings"
"sync"
)
// Transport the nghttp2 RoundTripper implement
type Transport struct {
TLSConfig *tls.Config
DialTLS func(network, addr string, cfg *tls.Config) (*tls.Conn, error)
cacheConn map[string]*Conn
mu sync.Mutex
}
// RoundTrip send req and get res
func (tr *Transport) RoundTrip(req *http.Request) (res *http.Response, err error) {
h2conn, err := tr.getConn(req)
if err != nil {
return nil, err
}
return h2conn.RoundTrip(req)
}
func (tr *Transport) getConn(req *http.Request) (*Conn, error) {
tr.mu.Lock()
defer tr.mu.Unlock()
if tr.cacheConn == nil {
tr.cacheConn = map[string]*Conn{}
}
k := req.URL.Host
if c, ok := tr.cacheConn[k]; ok {
if c.CanTakeNewRequest() {
return c, nil
}
delete(tr.cacheConn, k)
c.Close()
}
c, err := tr.createConn(k)
if err == nil {
tr.cacheConn[k] = c
}
return c, err
}
func (tr *Transport) createConn(host string) (*Conn, error) {
dial := tls.Dial
if tr.DialTLS != nil {
dial = tr.DialTLS
}
cfg := tr.TLSConfig
if cfg == nil {
h, _, err := net.SplitHostPort(host)
if err != nil {
h = host
}
cfg = &tls.Config{
ServerName: h,
NextProtos: []string{"h2"},
}
}
if !strings.Contains(host, ":") {
host = fmt.Sprintf("%s:443", host)
}
conn, err := dial("tcp", host, cfg)
if err != nil {
return nil, err
}
if err = conn.Handshake(); err != nil {
return nil, err
}
state := conn.ConnectionState()
if state.NegotiatedProtocol != "h2" {
conn.Close()
return nil, errors.New("http2 is not supported")
}
return Client(conn)
}

@ -7,7 +7,6 @@ package nghttp2
import "C"
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
@ -15,7 +14,6 @@ import (
"net"
"net/http"
"net/url"
"runtime"
"strings"
"sync"
"time"
@ -28,104 +26,56 @@ type Conn struct {
session *C.nghttp2_session
streams map[int]*stream
streamCount int
closed bool
isServer bool
running bool
handler http.Handler
lock *sync.Mutex
err error
errch chan error
ctx context.Context
cancel context.CancelFunc
exitch chan struct{}
}
// Dial connect to addr and create a http2 client Conn
//
// the Conn.Run have already called, should not call it again
func Dial(network, addr string, cfg *tls.Config) (*Conn, error) {
nextProto := []string{"h2"}
if cfg == nil {
_addr := addr
h, _, err := net.SplitHostPort(addr)
if err == nil {
_addr = h
}
cfg = &tls.Config{ServerName: _addr}
}
cfg.NextProtos = nextProto
conn, err := tls.Dial(network, addr, cfg)
if err != nil {
return nil, err
}
if err := conn.Handshake(); err != nil {
return nil, err
}
state := conn.ConnectionState()
if state.NegotiatedProtocol != "h2" {
return nil, errors.New("server not support http2")
}
return Client(conn)
}
// Server create server side http2 connection on c
//
// c must be TLS connection and negotiated for h2
//
// the Conn.Run not called, you must run it
// Server create server side http2 connection
func Server(c net.Conn, handler http.Handler) (*Conn, error) {
conn := &Conn{
conn: c,
handler: handler,
errch: make(chan error),
exitch: make(chan struct{}),
lock: new(sync.Mutex),
isServer: true,
streams: make(map[int]*stream),
}
conn.ctx, conn.cancel = context.WithCancel(context.Background())
//log.Printf("new conn %x", uintptr(unsafe.Pointer(conn)))
runtime.SetFinalizer(conn, (*Conn).free)
conn.session = C.init_nghttp2_server_session(
C.size_t(uintptr(unsafe.Pointer(conn))))
conn.session = C.init_nghttp2_server_session(C.size_t(uintptr(unsafe.Pointer(conn))))
if conn.session == nil {
return nil, errors.New("init server session failed")
}
ret := C.send_connection_header(conn.session)
if int(ret) < 0 {
conn.Close()
return nil, fmt.Errorf("send settings error: %s",
C.GoString(C.nghttp2_strerror(ret)))
return nil, fmt.Errorf("send settings error: %s", C.GoString(C.nghttp2_strerror(ret)))
}
return conn, nil
}
// Client create client side http2 connection on c
//
// c must be TLS connection and negotiated for h2
//
// the Conn.Run have alread called, you should not call it again
// Client create client side http2 connection
func Client(c net.Conn) (*Conn, error) {
conn := &Conn{
conn: c,
errch: make(chan error),
exitch: make(chan struct{}),
lock: new(sync.Mutex),
streams: make(map[int]*stream),
}
conn.ctx, conn.cancel = context.WithCancel(context.Background())
//log.Printf("new conn %x", uintptr(unsafe.Pointer(conn)))
runtime.SetFinalizer(conn, (*Conn).free)
conn.session = C.init_nghttp2_client_session(
C.size_t(uintptr(unsafe.Pointer(conn))))
conn.session = C.init_nghttp2_client_session(C.size_t(uintptr(unsafe.Pointer(conn))))
if conn.session == nil {
return nil, errors.New("init server session failed")
}
ret := C.send_connection_header(conn.session)
if int(ret) < 0 {
conn.Close()
return nil, fmt.Errorf("send settings error: %s",
C.GoString(C.nghttp2_strerror(ret)))
return nil, fmt.Errorf("send settings error: %s", C.GoString(C.nghttp2_strerror(ret)))
}
go conn.Run()
return conn, nil
@ -152,13 +102,6 @@ func HTTP2Handler(srv *http.Server, conn *tls.Conn, handler http.Handler) {
h2conn.Run()
}
func (c *Conn) free() {
//log.Printf("free conn %x", uintptr(unsafe.Pointer(c)))
if !c.isClosed() {
c.Close()
}
}
// Error return conn error
func (c *Conn) Error() error {
c.lock.Lock()
@ -193,8 +136,7 @@ func (c *Conn) RoundTrip(req *http.Request) (*http.Response, error) {
nv = append(nv, newNV(":path", p))
for k, v := range req.Header {
_k := strings.ToLower(k)
if _k == "connection" || _k == "proxy-connection" ||
_k == "transfer-encoding" {
if _k == "connection" || _k == "proxy-connection" || _k == "transfer-encoding" {
continue
}
nv = append(nv, newNV(k, v[0]))
@ -223,9 +165,6 @@ func (c *Conn) RoundTrip(req *http.Request) (*http.Response, error) {
dp.Close()
}()
}
s.request = req
select {
case res := <-s.resch:
/*
@ -233,25 +172,23 @@ func (c *Conn) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("http error code %d", res.StatusCode)
}
*/
s.request = req
res.Request = s.request
return res, nil
case <-c.ctx.Done():
case <-c.exitch:
return nil, errors.New("connection closed")
}
}
func (c *Conn) submitRequest(nv []C.nghttp2_nv,
cdp *C.nghttp2_data_provider) (*stream, error) {
func (c *Conn) submitRequest(nv []C.nghttp2_nv, cdp *C.nghttp2_data_provider) (*stream, error) {
c.lock.Lock()
ret := C._nghttp2_submit_request(c.session, nil,
C.size_t(uintptr(unsafe.Pointer(&nv[0]))),
C.size_t(len(nv)), cdp, nil)
C.size_t(uintptr(unsafe.Pointer(&nv[0]))), C.size_t(len(nv)), cdp, nil)
c.lock.Unlock()
if int(ret) < 0 {
return nil, fmt.Errorf("submit request error: %s",
C.GoString(C.nghttp2_strerror(ret)))
return nil, fmt.Errorf("submit request error: %s", C.GoString(C.nghttp2_strerror(ret)))
}
streamID := int(ret)
s := &stream{
@ -263,22 +200,14 @@ func (c *Conn) submitRequest(nv []C.nghttp2_nv,
},
resch: make(chan *http.Response),
}
s.ctx, s.cancel = context.WithCancel(context.Background())
if cdp != nil {
s.cdp = *cdp
}
runtime.SetFinalizer(s, (*stream).free)
return s, nil
}
// Connect submit connect request
//
// like "CONNECT host:port" on http/1.1
//
// statusCode is the http status code the server returned
//
// c bounds to the remote host of addr
func (c *Conn) Connect(addr string) (conn net.Conn, statusCode int, err error) {
func (c *Conn) Connect(addr string) (net.Conn, int, error) {
nv := []C.nghttp2_nv{}
nv = append(nv, newNV(":method", "CONNECT"))
@ -303,8 +232,7 @@ func (c *Conn) Connect(addr string) (conn net.Conn, statusCode int, err error) {
select {
case res := <-s.resch:
if res.StatusCode != http.StatusOK {
return nil, res.StatusCode, fmt.Errorf(
"http error code %d", res.StatusCode)
return nil, res.StatusCode, fmt.Errorf("http error code %d", res.StatusCode)
}
s.request = &http.Request{
Method: "CONNECT",
@ -314,9 +242,8 @@ func (c *Conn) Connect(addr string) (conn net.Conn, statusCode int, err error) {
}
res.Request = s.request
return s, res.StatusCode, nil
case <-c.ctx.Done():
return nil, http.StatusServiceUnavailable,
errors.New("connection closed")
case <-c.exitch:
return nil, http.StatusServiceUnavailable, errors.New("connection closed")
}
}
@ -337,7 +264,7 @@ func (c *Conn) Run() {
case err := <-c.errch:
c.err = err
return
case <-c.ctx.Done():
case <-c.exitch:
return
}
}
@ -348,40 +275,26 @@ func (c *Conn) serve(s *stream) {
if handler == nil {
handler = http.DefaultServeMux
}
s.request.RemoteAddr = c.conn.RemoteAddr().String()
if s.request.URL == nil {
s.request.URL = &url.URL{}
}
handler.ServeHTTP(s, s.request)
s.Close()
}
// Close close the connection
func (c *Conn) Close() error {
c.lock.Lock()
if c.isClosed() {
c.lock.Unlock()
if c.closed {
return nil
}
c.cancel()
c.lock.Unlock()
// stream.Close may require the conn.Lock
// so must not hold the lock here
c.closed = true
for _, s := range c.streams {
s.Close()
}
c.lock.Lock()
for n := range c.streams {
delete(c.streams, n)
}
C.nghttp2_session_terminate_session(c.session, 0)
C.nghttp2_session_del(c.session)
c.lock.Unlock()
close(c.exitch)
c.conn.Close()
return nil
}
@ -394,51 +307,67 @@ func (c *Conn) errorNotify(err error) {
}
func (c *Conn) readloop() {
type data struct {
buf []byte
err error
}
var ret C.ssize_t
var err error
var d data
datach := make(chan data)
go func() {
d1 := data{}
var n int
var err1 error
for {
buf := make([]byte, 16*1024)
n, err1 = c.conn.Read(buf)
d1.buf = buf[:n]
d1.err = err1
datach <- d1
}
}()
for {
if c.isClosed() {
select {
case <-c.exitch:
return
}
n, err := c.conn.Read(buf)
if err != nil {
c.errorNotify(err)
case d = <-datach:
if d.err != nil {
c.errorNotify(d.err)
return
}
c.lock.Lock()
// check again
if c.isClosed() {
c.lock.Unlock()
return
}
ret := C.nghttp2_session_mem_recv(c.session,
(*C.uchar)(unsafe.Pointer(&buf[0])), C.size_t(n))
ret = C.nghttp2_session_mem_recv(c.session,
(*C.uchar)(unsafe.Pointer(&d.buf[0])), C.size_t(len(d.buf)))
c.lock.Unlock()
if int(ret) < 0 {
err = fmt.Errorf("http2 recv error: %s",
C.GoString(C.nghttp2_strerror(C.int(ret))))
err = fmt.Errorf("http2 recv error: %s", C.GoString(C.nghttp2_strerror(C.int(ret))))
c.errorNotify(err)
return
}
}
}
}
func (c *Conn) writeloop() {
var ret C.int
var err error
var delay = 50 * time.Millisecond
for {
c.lock.Lock()
if c.isClosed() {
c.lock.Unlock()
select {
case <-c.exitch:
return
default:
}
c.lock.Lock()
ret = C.nghttp2_session_send(c.session)
c.lock.Unlock()
if int(ret) < 0 {
err = fmt.Errorf("http2 send error: %s",
C.GoString(C.nghttp2_strerror(C.int(ret))))
err = fmt.Errorf("http2 send error: %s", C.GoString(C.nghttp2_strerror(C.int(ret))))
c.errorNotify(err)
return
}
@ -451,12 +380,3 @@ func (c *Conn) writeloop() {
}
}
}
func (c *Conn) isClosed() bool {
select {
case <-c.ctx.Done():
return true
default:
}
return false
}

@ -7,7 +7,6 @@ import "C"
import (
"bytes"
"errors"
"io"
"log"
"sync"
"time"
@ -30,8 +29,7 @@ type dataProvider struct {
// Read read from data provider
func (dp *dataProvider) Read(buf []byte) (n int, err error) {
if dp.buf == nil || dp.lock == nil ||
dp.sessLock == nil || dp.session == nil {
if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil {
log.Println("dp read invalid state")
return 0, errors.New("invalid state")
}
@ -49,29 +47,10 @@ func (dp *dataProvider) Read(buf []byte) (n int, err error) {
// Write provider data for data provider
func (dp *dataProvider) Write(buf []byte) (n int, err error) {
if dp.buf == nil || dp.lock == nil ||
dp.sessLock == nil || dp.session == nil {
if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil {
log.Println("dp write invalid state")
return 0, errors.New("invalid state")
}
// make sure the buffer not too large
delay := 10 * time.Millisecond
maxBufSize := 4 * 1024
for {
dp.lock.Lock()
_len := dp.buf.Len()
closed := dp.closed
dp.lock.Unlock()
if closed {
return 0, io.EOF
}
if _len < maxBufSize {
break
}
time.Sleep(delay)
}
dp.lock.Lock()
defer dp.lock.Unlock()
@ -82,8 +61,7 @@ func (dp *dataProvider) Write(buf []byte) (n int, err error) {
n, err = dp.buf.Write(buf)
if dp.deferred {
dp.sessLock.Lock()
C.nghttp2_session_resume_data(
dp.session, C.int(dp.streamID))
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
dp.sessLock.Unlock()
//log.Println("resume")
@ -94,8 +72,7 @@ func (dp *dataProvider) Write(buf []byte) (n int, err error) {
// Close end to provide data
func (dp *dataProvider) Close() error {
if dp.buf == nil || dp.lock == nil ||
dp.sessLock == nil || dp.session == nil {
if dp.buf == nil || dp.lock == nil || dp.sessLock == nil || dp.session == nil {
log.Println("dp close, invalid state")
return errors.New("invalid state")
}
@ -109,8 +86,7 @@ func (dp *dataProvider) Close() error {
//log.Printf("dp close stream %d", dp.streamID)
if dp.deferred {
dp.sessLock.Lock()
C.nghttp2_session_resume_data(
dp.session, C.int(dp.streamID))
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
dp.sessLock.Unlock()
dp.deferred = false
@ -118,8 +94,7 @@ func (dp *dataProvider) Close() error {
return nil
}
func newDataProvider(cdp unsafe.Pointer,
sessionLock *sync.Mutex, t int) *dataProvider {
func newDataProvider(cdp unsafe.Pointer, sessionLock *sync.Mutex, t int) *dataProvider {
dp := &dataProvider{
buf: new(bytes.Buffer),
lock: new(sync.Mutex),

@ -4,7 +4,7 @@ server example
cert, err := tls.LoadX509KeyPair("testdata/server.crt", "testdata/server.key")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
l, err := tls.Listen("tcp", "127.0.0.1:1100", &tls.Config{
@ -12,7 +12,7 @@ server example
NextProtos: []string{"h2"},
})
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer l.Close()
addr := l.Addr().String()
@ -37,7 +37,7 @@ server example
}
h2conn, err := Server(c, nil)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
log.Printf("%+v", h2conn)
go h2conn.Run()
@ -50,20 +50,20 @@ client example
ServerName: "nghttp2.org",
})
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer conn.Close()
if err := conn.Handshake(); err != nil{
log.Fatal(err)
t.Fatal(err)
}
cstate := conn.ConnectionState()
if cstate.NegotiatedProtocol != "h2" {
log.Fatal("no http2 on server")
t.Fatal("no http2 on server")
}
h2conn, err := Client(conn)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
param := url.Values{}
@ -81,11 +81,11 @@ client example
res, err := h2conn.RoundTrip(req)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
if res.StatusCode != http.StatusOK {
log.Printf("expect %d, got %d", http.StatusOK, res.StatusCode)
t.Errorf("expect %d, got %d", http.StatusOK, res.StatusCode)
}
res.Write(os.Stderr)
@ -94,7 +94,7 @@ co-work with net/http example
l, err := net.Listen("tcp", "127.0.0.1:1222")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
srv := &http.Server{
TLSConfig: &tls.Config{

@ -5,7 +5,6 @@ package nghttp2
*/
import "C"
import (
"context"
"errors"
"fmt"
"io"
@ -26,31 +25,14 @@ type stream struct {
resch chan *http.Response
headersEnd bool
streamEnd bool
closed bool
cdp C.nghttp2_data_provider
ctx context.Context
cancel context.CancelFunc
}
var _ net.Conn = &stream{}
func (s *stream) isClosed() bool {
select {
case <-s.ctx.Done():
return true
default:
}
return false
}
func (s *stream) free() {
//log.Printf("stream free %d", s.streamID)
if !s.isClosed() {
s.Close()
}
}
func (s *stream) Read(buf []byte) (int, error) {
if s.isClosed() {
if s.closed {
return 0, io.EOF
}
if s.bp != nil {
@ -60,7 +42,7 @@ func (s *stream) Read(buf []byte) (int, error) {
}
func (s *stream) WriteHeader(code int) {
if s.isClosed() {
if s.closed {
return
}
if s.response == nil {
@ -82,33 +64,24 @@ func (s *stream) WriteHeader(code int) {
nv = append(nv, newNV(":status", fmt.Sprintf("%d", code)))
for k, v := range s.response.Header {
_k := strings.ToLower(k)
if _k == "host" || _k == "connection" ||
_k == "transfer-encoding" {
if _k == "host" || _k == "connection" || _k == "transfer-encoding" {
continue
}
nv = append(nv, newNV(k, v[0]))
}
s.cdp = C.nghttp2_data_provider{}
s.dp = newDataProvider(unsafe.Pointer(&s.cdp),
s.conn.lock, 0)
s.dp = newDataProvider(unsafe.Pointer(&s.cdp), s.conn.lock, 0)
s.dp.session = s.conn.session
s.dp.streamID = s.streamID
s.conn.lock.Lock()
if s.conn.isClosed() {
s.conn.lock.Unlock()
return
}
ret := C._nghttp2_submit_response(s.conn.session,
C.int(s.streamID),
C.size_t(uintptr(unsafe.Pointer(&nv[0]))),
C.size_t(len(nv)), &s.cdp)
ret := C._nghttp2_submit_response(s.conn.session, C.int(s.streamID),
C.size_t(uintptr(unsafe.Pointer(&nv[0]))), C.size_t(len(nv)), &s.cdp)
s.conn.lock.Unlock()
if int(ret) < 0 {
panic(fmt.Sprintf("submit response error: %s",
C.GoString(C.nghttp2_strerror(ret))))
panic(fmt.Sprintf("submit response error: %s", C.GoString(C.nghttp2_strerror(ret))))
}
}
@ -125,11 +98,10 @@ func (s *stream) Header() http.Header {
}
func (s *stream) Write(buf []byte) (int, error) {
if s.isClosed() {
if s.closed {
return 0, io.EOF
}
if s.conn.isServer && (s.response == nil ||
s.response.StatusCode == 0) {
if s.conn.isServer && (s.response == nil || s.response.StatusCode == 0) {
s.WriteHeader(http.StatusOK)
}
@ -140,12 +112,10 @@ func (s *stream) Write(buf []byte) (int, error) {
}
func (s *stream) Close() error {
if s.isClosed() {
if s.closed {
return nil
}
s.cancel()
s.closed = true
if s.dp != nil {
s.dp.Close()
}
@ -160,8 +130,7 @@ func (s *stream) Close() error {
if s.request != nil && s.request.Method == "CONNECT" {
//log.Println("rst stream")
s.conn.lock.Lock()
C.nghttp2_submit_rst_stream(s.conn.session, 0,
C.int(s.streamID), 8)
C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 8)
s.conn.lock.Unlock()
}
return nil

Loading…
Cancel
Save