chore: better bufio.Reader warp

This commit is contained in:
wwqgtxx 2023-11-02 10:31:58 +08:00
parent 96220aa8ea
commit b0638cfc49
2 changed files with 22 additions and 11 deletions

View File

@ -22,6 +22,16 @@ func NewBufferedConn(c net.Conn) *BufferedConn {
return &BufferedConn{bufio.NewReader(c), NewExtendedConn(c), false} return &BufferedConn{bufio.NewReader(c), NewExtendedConn(c), false}
} }
func WarpConnWithBioReader(c net.Conn, br *bufio.Reader) net.Conn {
if br != nil && br.Buffered() > 0 {
if bc, ok := c.(*BufferedConn); ok && bc.r == br {
return bc
}
return &BufferedConn{br, NewExtendedConn(c), true}
}
return c
}
// Reader returns the internal bufio.Reader. // Reader returns the internal bufio.Reader.
func (c *BufferedConn) Reader() *bufio.Reader { func (c *BufferedConn) Reader() *bufio.Reader {
return c.r return c.r

View File

@ -1,7 +1,6 @@
package vmess package vmess
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"crypto/tls" "crypto/tls"
@ -393,7 +392,11 @@ func streamWebsocketConn(ctx context.Context, conn net.Conn, c *WebsocketConfig,
return nil, fmt.Errorf("dial %s error: %w", uri.Host, err) return nil, fmt.Errorf("dial %s error: %w", uri.Host, err)
} }
conn = newWebsocketConn(conn, reader, ws.StateClientSide) // some bytes which could be written by the peer right after response and be caught by us during buffered read,
// so we need warp Conn with bio.Reader
conn = N.WarpConnWithBioReader(conn, reader)
conn = newWebsocketConn(conn, ws.StateClientSide)
// websocketConn can't correct handle ReadDeadline // websocketConn can't correct handle ReadDeadline
// so call N.NewDeadlineConn to add a safe wrapper // so call N.NewDeadlineConn to add a safe wrapper
return N.NewDeadlineConn(conn), nil return N.NewDeadlineConn(conn), nil
@ -419,19 +422,13 @@ func StreamWebsocketConn(ctx context.Context, conn net.Conn, c *WebsocketConfig)
return streamWebsocketConn(ctx, conn, c, nil) return streamWebsocketConn(ctx, conn, c, nil)
} }
func newWebsocketConn(conn net.Conn, br *bufio.Reader, state ws.State) *websocketConn { func newWebsocketConn(conn net.Conn, state ws.State) *websocketConn {
controlHandler := wsutil.ControlFrameHandler(conn, state) controlHandler := wsutil.ControlFrameHandler(conn, state)
var reader io.Reader
if br != nil && br.Buffered() > 0 {
reader = br
} else {
reader = conn
}
return &websocketConn{ return &websocketConn{
Conn: conn, Conn: conn,
state: state, state: state,
reader: &wsutil.Reader{ reader: &wsutil.Reader{
Source: reader, Source: conn,
State: state, State: state,
SkipHeaderCheck: true, SkipHeaderCheck: true,
CheckUTF8: false, CheckUTF8: false,
@ -463,7 +460,11 @@ func StreamUpgradedWebsocketConn(w http.ResponseWriter, r *http.Request) (net.Co
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn := newWebsocketConn(wsConn, rw.Reader, ws.StateServerSide)
// gobwas/ws will flush rw.Writer, so we only need warp rw.Reader
wsConn = N.WarpConnWithBioReader(wsConn, rw.Reader)
conn := newWebsocketConn(wsConn, ws.StateServerSide)
if edBuf := decodeXray0rtt(r.Header); len(edBuf) > 0 { if edBuf := decodeXray0rtt(r.Header); len(edBuf) > 0 {
return N.NewDeadlineConn(&websocketWithReaderConn{conn, io.MultiReader(bytes.NewReader(edBuf), conn)}), nil return N.NewDeadlineConn(&websocketWithReaderConn{conn, io.MultiReader(bytes.NewReader(edBuf), conn)}), nil
} }