code format

merge_conn
fangdingjun 6 years ago
parent ade4dcb8a7
commit 8c53841e25

@ -286,6 +286,7 @@ func onClientDataRecvCallback(ptr unsafe.Pointer, data unsafe.Pointer, size C.si
return C.ssize_t(n) return C.ssize_t(n)
} }
*/ */
// onClientDataSendCallback callback function for libnghttp2 library want send data to network. // onClientDataSendCallback callback function for libnghttp2 library want send data to network.
// //
//export onClientDataSendCallback //export onClientDataSendCallback
@ -422,8 +423,12 @@ func onClientStreamClose(ptr unsafe.Pointer, streamID C.int) C.int {
func onClientConnectionCloseCallback(ptr unsafe.Pointer) { func onClientConnectionCloseCallback(ptr unsafe.Pointer) {
conn := (*ClientConn)(ptr) conn := (*ClientConn)(ptr)
conn.err = io.EOF conn.err = io.EOF
// signal all goroutings exit
for i := 0; i < 4; i++ {
select { select {
case conn.exitch <- struct{}{}: case conn.exitch <- struct{}{}:
default: default:
} }
}
} }

@ -284,15 +284,21 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
setNvArray(nva, nvIndex, ":authority", req.Host, 0) setNvArray(nva, nvIndex, ":authority", req.Host, 0)
nvIndex++ nvIndex++
/*
:path must starts with "/"
req.RequestURI maybe starts with http://
*/
p := req.URL.Path p := req.URL.Path
q := req.URL.Query().Encode() q := req.URL.Query().Encode()
if q != "" { if q != "" {
p = p + "?" + q p = p + "?" + q
} }
if req.Method != "CONNECT" { if req.Method != "CONNECT" {
setNvArray(nva, nvIndex, ":path", p, 0) setNvArray(nva, nvIndex, ":path", p, 0)
nvIndex++ nvIndex++
} }
//log.Printf("%s http://%s%s", req.Method, req.Host, p) //log.Printf("%s http://%s%s", req.Method, req.Host, p)
for k, v := range req.Header { for k, v := range req.Header {
//log.Printf("header %s: %s\n", k, v[0]) //log.Printf("header %s: %s\n", k, v[0])
@ -304,8 +310,10 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
nvIndex++ nvIndex++
} }
var dp *dataProvider var dp *dataProvider
var cdp *C.nghttp2_data_provider var cdp *C.nghttp2_data_provider
if req.Method == "PUT" || req.Method == "POST" || req.Method == "CONNECT" { if req.Method == "PUT" || req.Method == "POST" || req.Method == "CONNECT" {
dp, cdp = newDataProvider(c.lock) dp, cdp = newDataProvider(c.lock)
go func() { go func() {
@ -320,6 +328,7 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
c.lock.Unlock() c.lock.Unlock()
C.delete_nv_array(nva) C.delete_nv_array(nva)
if int(streamID) < 0 { if int(streamID) < 0 {
return nil, fmt.Errorf("submit request error: %s", return nil, fmt.Errorf("submit request error: %s",
C.GoString(C.nghttp2_strerror(streamID))) C.GoString(C.nghttp2_strerror(streamID)))
@ -331,6 +340,7 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
dp.streamID = int(streamID) dp.streamID = int(streamID)
dp.session = c.session dp.session = c.session
} }
s := &ClientStream{ s := &ClientStream{
streamID: int(streamID), streamID: int(streamID),
conn: c, conn: c,
@ -340,11 +350,13 @@ func (c *ClientConn) CreateRequest(req *http.Request) (*http.Response, error) {
errch: make(chan error), errch: make(chan error),
lock: new(sync.Mutex), lock: new(sync.Mutex),
} }
c.lock.Lock() c.lock.Lock()
c.streams[int(streamID)] = s c.streams[int(streamID)] = s
c.streamCount++ c.streamCount++
c.lock.Unlock() c.lock.Unlock()
// waiting for response from server
select { select {
case err := <-s.errch: case err := <-s.errch:
return nil, err return nil, err
@ -422,10 +434,13 @@ func Server(c net.Conn, handler http.Handler) (*ServerConn, error) {
errch: make(chan struct{}), errch: make(chan struct{}),
exitch: make(chan struct{}), exitch: make(chan struct{}),
} }
conn.session = C.init_nghttp2_server_session(C.size_t(uintptr(unsafe.Pointer(conn))))
conn.session = C.init_nghttp2_server_session(
C.size_t(uintptr(unsafe.Pointer(conn))))
if conn.session == nil { if conn.session == nil {
return nil, fmt.Errorf("init session failed") return nil, fmt.Errorf("init session failed")
} }
//log.Println("send server connection header") //log.Println("send server connection header")
ret := C.send_server_connection_header(conn.session) ret := C.send_server_connection_header(conn.session)
if int(ret) < 0 { if int(ret) < 0 {
@ -441,9 +456,12 @@ func (c *ServerConn) serve(s *ServerStream) {
if c.Handler == nil { if c.Handler == nil {
handler = http.DefaultServeMux handler = http.DefaultServeMux
} }
if s.req.URL == nil { if s.req.URL == nil {
s.req.URL = &url.URL{} s.req.URL = &url.URL{}
} }
// call http.Handler to serve request
handler.ServeHTTP(s, s.req) handler.ServeHTTP(s, s.req)
s.Close() s.Close()
} }
@ -454,6 +472,7 @@ func (c *ServerConn) Close() error {
return nil return nil
} }
c.closed = true c.closed = true
for _, s := range c.streams { for _, s := range c.streams {
s.Close() s.Close()
} }
@ -465,6 +484,7 @@ func (c *ServerConn) Close() error {
C.nghttp2_session_del(c.session) C.nghttp2_session_del(c.session)
close(c.exitch) close(c.exitch)
c.conn.Close() c.conn.Close()
return nil return nil
} }

@ -42,13 +42,16 @@ func (dp *dataProvider) Read(buf []byte) (n int, err error) {
func (dp *dataProvider) Write(buf []byte) (n int, err error) { func (dp *dataProvider) Write(buf []byte) (n int, err error) {
dp.lock.Lock() dp.lock.Lock()
defer dp.lock.Unlock() defer dp.lock.Unlock()
if dp.closed { if dp.closed {
return 0, io.EOF return 0, io.EOF
} }
if dp.deferred { if dp.deferred {
dp.sessLock.Lock() dp.sessLock.Lock()
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
dp.sessLock.Unlock() dp.sessLock.Unlock()
dp.deferred = false dp.deferred = false
} }
return dp.buf.Write(buf) return dp.buf.Write(buf)
@ -58,6 +61,7 @@ func (dp *dataProvider) Write(buf []byte) (n int, err error) {
func (dp *dataProvider) Close() error { func (dp *dataProvider) Close() error {
dp.lock.Lock() dp.lock.Lock()
defer dp.lock.Unlock() defer dp.lock.Unlock()
if dp.closed { if dp.closed {
return nil return nil
} }
@ -67,6 +71,7 @@ func (dp *dataProvider) Close() error {
dp.sessLock.Lock() dp.sessLock.Lock()
C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID)) C.nghttp2_session_resume_data(dp.session, C.int(dp.streamID))
dp.sessLock.Unlock() dp.sessLock.Unlock()
dp.deferred = false dp.deferred = false
} }
return nil return nil
@ -95,6 +100,7 @@ type bodyProvider struct {
// will block when data not yet avaliable // will block when data not yet avaliable
func (bp *bodyProvider) Read(buf []byte) (int, error) { func (bp *bodyProvider) Read(buf []byte) (int, error) {
var delay = 100 * time.Millisecond var delay = 100 * time.Millisecond
for { for {
bp.lock.Lock() bp.lock.Lock()
n, err := bp.buf.Read(buf) n, err := bp.buf.Read(buf)
@ -112,6 +118,7 @@ func (bp *bodyProvider) Read(buf []byte) (int, error) {
func (bp *bodyProvider) Write(buf []byte) (int, error) { func (bp *bodyProvider) Write(buf []byte) (int, error) {
bp.lock.Lock() bp.lock.Lock()
defer bp.lock.Unlock() defer bp.lock.Unlock()
return bp.buf.Write(buf) return bp.buf.Write(buf)
} }
@ -119,6 +126,7 @@ func (bp *bodyProvider) Write(buf []byte) (int, error) {
func (bp *bodyProvider) Close() error { func (bp *bodyProvider) Close() error {
bp.lock.Lock() bp.lock.Lock()
defer bp.lock.Unlock() defer bp.lock.Unlock()
bp.closed = true bp.closed = true
return nil return nil
} }

@ -138,14 +138,19 @@ func (s *ServerStream) WriteHeader(code int) {
if s.closed { if s.closed {
return return
} }
if s.responseSend { if s.responseSend {
return return
} }
s.responseSend = true s.responseSend = true
s.statusCode = code s.statusCode = code
nvIndex := 0 nvIndex := 0
nvMax := 25 nvMax := 25
nva := C.new_nv_array(C.size_t(nvMax)) nva := C.new_nv_array(C.size_t(nvMax))
setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0) setNvArray(nva, nvIndex, ":status", fmt.Sprintf("%d", code), 0)
nvIndex++ nvIndex++
@ -157,11 +162,14 @@ func (s *ServerStream) WriteHeader(code int) {
setNvArray(nva, nvIndex, strings.Title(k), v[0], 0) setNvArray(nva, nvIndex, strings.Title(k), v[0], 0)
nvIndex++ nvIndex++
} }
var dp *dataProvider var dp *dataProvider
var cdp *C.nghttp2_data_provider var cdp *C.nghttp2_data_provider
dp, cdp = newDataProvider(s.conn.lock) dp, cdp = newDataProvider(s.conn.lock)
dp.streamID = s.streamID dp.streamID = s.streamID
dp.session = s.conn.session dp.session = s.conn.session
s.dp = dp s.dp = dp
s.cdp = cdp s.cdp = cdp
@ -172,7 +180,8 @@ func (s *ServerStream) WriteHeader(code int) {
C.delete_nv_array(nva) C.delete_nv_array(nva)
if int(ret) < 0 { if int(ret) < 0 {
panic(fmt.Sprintf("sumit response error %s", C.GoString(C.nghttp2_strerror(ret)))) panic(fmt.Sprintf("sumit response error %s",
C.GoString(C.nghttp2_strerror(ret))))
} }
//log.Printf("stream %d send response", s.streamID) //log.Printf("stream %d send response", s.streamID)
} }
@ -192,14 +201,17 @@ func (s *ServerStream) Close() error {
return nil return nil
} }
s.closed = true s.closed = true
//C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0) //C.nghttp2_submit_rst_stream(s.conn.session, 0, C.int(s.streamID), 0)
if s.req.Body != nil { if s.req.Body != nil {
s.req.Body.Close() s.req.Body.Close()
} }
if s.dp != nil { if s.dp != nil {
s.dp.Close() s.dp.Close()
s.dp = nil s.dp = nil
} }
if s.cdp != nil { if s.cdp != nil {
C.free(unsafe.Pointer(s.cdp)) C.free(unsafe.Pointer(s.cdp))
s.cdp = nil s.cdp = nil
@ -207,6 +219,7 @@ func (s *ServerStream) Close() error {
s.conn.lock.Lock() s.conn.lock.Lock()
s.conn.lock.Unlock() s.conn.lock.Unlock()
if _, ok := s.conn.streams[s.streamID]; ok { if _, ok := s.conn.streams[s.streamID]; ok {
delete(s.conn.streams, s.streamID) delete(s.conn.streams, s.streamID)
} }

Loading…
Cancel
Save