diff --git a/adapters/outbound/http.go b/adapters/outbound/http.go index 840716b85..3c445cd46 100644 --- a/adapters/outbound/http.go +++ b/adapters/outbound/http.go @@ -51,13 +51,15 @@ func (h *Http) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, error) { return c, nil } -func (h *Http) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { +func (h *Http) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { c, err := dialer.DialContext(ctx, "tcp", h.addr) if err != nil { return nil, fmt.Errorf("%s connect error: %w", h.addr, err) } tcpKeepAlive(c) + defer safeConnClose(c, err) + c, err = h.StreamConn(c, metadata) if err != nil { return nil, err diff --git a/adapters/outbound/shadowsocks.go b/adapters/outbound/shadowsocks.go index 2028df749..bc2e1fe65 100644 --- a/adapters/outbound/shadowsocks.go +++ b/adapters/outbound/shadowsocks.go @@ -73,13 +73,15 @@ func (ss *ShadowSocks) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, e return c, err } -func (ss *ShadowSocks) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { +func (ss *ShadowSocks) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { c, err := dialer.DialContext(ctx, "tcp", ss.addr) if err != nil { return nil, fmt.Errorf("%s connect error: %w", ss.addr, err) } tcpKeepAlive(c) + defer safeConnClose(c, err) + c, err = ss.StreamConn(c, metadata) return NewConn(c, ss), err } @@ -92,6 +94,7 @@ func (ss *ShadowSocks) DialUDP(metadata *C.Metadata) (C.PacketConn, error) { addr, err := resolveUDPAddr("udp", ss.addr) if err != nil { + pc.Close() return nil, err } diff --git a/adapters/outbound/shadowsocksr.go b/adapters/outbound/shadowsocksr.go index 155ec8376..6ab355cd9 100644 --- a/adapters/outbound/shadowsocksr.go +++ b/adapters/outbound/shadowsocksr.go @@ -57,13 +57,15 @@ func (ssr *ShadowSocksR) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, return c, err } -func (ssr *ShadowSocksR) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { +func (ssr *ShadowSocksR) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { c, err := dialer.DialContext(ctx, "tcp", ssr.addr) if err != nil { return nil, fmt.Errorf("%s connect error: %w", ssr.addr, err) } tcpKeepAlive(c) + defer safeConnClose(c, err) + c, err = ssr.StreamConn(c, metadata) return NewConn(c, ssr), err } @@ -76,6 +78,7 @@ func (ssr *ShadowSocksR) DialUDP(metadata *C.Metadata) (C.PacketConn, error) { addr, err := resolveUDPAddr("udp", ssr.addr) if err != nil { + pc.Close() return nil, err } diff --git a/adapters/outbound/snell.go b/adapters/outbound/snell.go index 8cd2cdb14..820c789f7 100644 --- a/adapters/outbound/snell.go +++ b/adapters/outbound/snell.go @@ -55,7 +55,7 @@ func (s *Snell) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, error) { return c, err } -func (s *Snell) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { +func (s *Snell) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { if s.version == snell.Version2 { c, err := s.pool.Get() if err != nil { @@ -63,7 +63,10 @@ func (s *Snell) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, } port, _ := strconv.Atoi(metadata.DstPort) - err = snell.WriteHeader(c, metadata.String(), uint(port), s.version) + if err = snell.WriteHeader(c, metadata.String(), uint(port), s.version); err != nil { + c.Close() + return nil, err + } return NewConn(c, s), err } @@ -73,6 +76,8 @@ func (s *Snell) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, } tcpKeepAlive(c) + defer safeConnClose(c, err) + c, err = s.StreamConn(c, metadata) return NewConn(c, s), err } diff --git a/adapters/outbound/socks5.go b/adapters/outbound/socks5.go index eb65d5131..84acdd6d2 100644 --- a/adapters/outbound/socks5.go +++ b/adapters/outbound/socks5.go @@ -58,13 +58,15 @@ func (ss *Socks5) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, error) return c, nil } -func (ss *Socks5) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { +func (ss *Socks5) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { c, err := dialer.DialContext(ctx, "tcp", ss.addr) if err != nil { return nil, fmt.Errorf("%s connect error: %w", ss.addr, err) } tcpKeepAlive(c) + defer safeConnClose(c, err) + c, err = ss.StreamConn(c, metadata) if err != nil { return nil, err @@ -88,11 +90,7 @@ func (ss *Socks5) DialUDP(metadata *C.Metadata) (_ C.PacketConn, err error) { c = cc } - defer func() { - if err != nil { - c.Close() - } - }() + defer safeConnClose(c, err) tcpKeepAlive(c) var user *socks5.User diff --git a/adapters/outbound/trojan.go b/adapters/outbound/trojan.go index 0c996f9bf..99f67c306 100644 --- a/adapters/outbound/trojan.go +++ b/adapters/outbound/trojan.go @@ -55,8 +55,8 @@ func (t *Trojan) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, error) return c, err } -func (t *Trojan) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { - // gun transport, TODO: Optimize mux dial code +func (t *Trojan) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { + // gun transport if t.transport != nil { c, err := gun.StreamGunWithTransport(t.transport, t.gunConfig) if err != nil { @@ -76,6 +76,9 @@ func (t *Trojan) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, return nil, fmt.Errorf("%s connect error: %w", t.addr, err) } tcpKeepAlive(c) + + defer safeConnClose(c, err) + c, err = t.StreamConn(c, metadata) if err != nil { return nil, err @@ -84,30 +87,27 @@ func (t *Trojan) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, return NewConn(c, t), err } -func (t *Trojan) DialUDP(metadata *C.Metadata) (C.PacketConn, error) { - // gun transport, TODO: Optimize mux dial code +func (t *Trojan) DialUDP(metadata *C.Metadata) (_ C.PacketConn, err error) { + var c net.Conn + + // grpc transport if t.transport != nil { - c, err := gun.StreamGunWithTransport(t.transport, t.gunConfig) + c, err = gun.StreamGunWithTransport(t.transport, t.gunConfig) if err != nil { - return nil, err + return nil, fmt.Errorf("%s connect error: %w", t.addr, err) } - - if err = t.instance.WriteHeader(c, trojan.CommandUDP, serializesSocksAddr(metadata)); err != nil { - c.Close() - return nil, err + } else { + ctx, cancel := context.WithTimeout(context.Background(), tcpTimeout) + defer cancel() + c, err = dialer.DialContext(ctx, "tcp", t.addr) + if err != nil { + return nil, fmt.Errorf("%s connect error: %w", t.addr, err) } - - pc := t.instance.PacketConn(c) - return newPacketConn(pc, t), err + tcpKeepAlive(c) } - ctx, cancel := context.WithTimeout(context.Background(), tcpTimeout) - defer cancel() - c, err := dialer.DialContext(ctx, "tcp", t.addr) - if err != nil { - return nil, fmt.Errorf("%s connect error: %w", t.addr, err) - } - tcpKeepAlive(c) + defer safeConnClose(c, err) + c, err = t.instance.StreamConn(c) if err != nil { return nil, fmt.Errorf("%s connect error: %w", t.addr, err) diff --git a/adapters/outbound/util.go b/adapters/outbound/util.go index 4a92c24c2..932f741cc 100644 --- a/adapters/outbound/util.go +++ b/adapters/outbound/util.go @@ -98,3 +98,9 @@ func resolveUDPAddr(network, address string) (*net.UDPAddr, error) { } return net.ResolveUDPAddr(network, net.JoinHostPort(ip.String(), port)) } + +func safeConnClose(c net.Conn, err error) { + if err != nil { + c.Close() + } +} diff --git a/adapters/outbound/vmess.go b/adapters/outbound/vmess.go index 279a3d4bb..302764cce 100644 --- a/adapters/outbound/vmess.go +++ b/adapters/outbound/vmess.go @@ -170,13 +170,14 @@ func (v *Vmess) StreamConn(c net.Conn, metadata *C.Metadata) (net.Conn, error) { return v.client.StreamConn(c, parseVmessAddr(metadata)) } -func (v *Vmess) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { - // gun transport, TODO: Optimize mux dial code +func (v *Vmess) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { + // gun transport if v.transport != nil { c, err := gun.StreamGunWithTransport(v.transport, v.gunConfig) if err != nil { return nil, err } + defer safeConnClose(c, err) c, err = v.client.StreamConn(c, parseVmessAddr(metadata)) if err != nil { @@ -191,12 +192,13 @@ func (v *Vmess) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, return nil, fmt.Errorf("%s connect error: %s", v.addr, err.Error()) } tcpKeepAlive(c) + defer safeConnClose(c, err) c, err = v.StreamConn(c, metadata) return NewConn(c, v), err } -func (v *Vmess) DialUDP(metadata *C.Metadata) (C.PacketConn, error) { +func (v *Vmess) DialUDP(metadata *C.Metadata) (_ C.PacketConn, err error) { // vmess use stream-oriented udp with a special address, so we needs a net.UDPAddr if !metadata.Resolved() { ip, err := resolver.ResolveIP(metadata.Host) @@ -206,32 +208,33 @@ func (v *Vmess) DialUDP(metadata *C.Metadata) (C.PacketConn, error) { metadata.DstIP = ip } - // gun transport, TODO: Optimize mux dial code + var c net.Conn + // gun transport if v.transport != nil { - c, err := gun.StreamGunWithTransport(v.transport, v.gunConfig) + c, err = gun.StreamGunWithTransport(v.transport, v.gunConfig) if err != nil { return nil, err } + defer safeConnClose(c, err) c, err = v.client.StreamConn(c, parseVmessAddr(metadata)) + } else { + ctx, cancel := context.WithTimeout(context.Background(), tcpTimeout) + defer cancel() + c, err = dialer.DialContext(ctx, "tcp", v.addr) if err != nil { - return nil, err + return nil, fmt.Errorf("%s connect error: %s", v.addr, err.Error()) } + tcpKeepAlive(c) + defer safeConnClose(c, err) - return newPacketConn(&vmessPacketConn{Conn: c, rAddr: metadata.UDPAddr()}, v), nil + c, err = v.StreamConn(c, metadata) } - ctx, cancel := context.WithTimeout(context.Background(), tcpTimeout) - defer cancel() - c, err := dialer.DialContext(ctx, "tcp", v.addr) - if err != nil { - return nil, fmt.Errorf("%s connect error: %s", v.addr, err.Error()) - } - tcpKeepAlive(c) - c, err = v.StreamConn(c, metadata) if err != nil { return nil, fmt.Errorf("new vmess client error: %v", err) } + return newPacketConn(&vmessPacketConn{Conn: c, rAddr: metadata.UDPAddr()}, v), nil } diff --git a/constant/adapters.go b/constant/adapters.go index 7456c304a..333243d8a 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -69,8 +69,21 @@ type PacketConn interface { type ProxyAdapter interface { Name() string Type() AdapterType + + // StreamConn wraps a protocol around net.Conn with Metadata. + // + // Examples: + // conn, _ := net.Dial("tcp", "host:port") + // conn, _ = adapter.StreamConn(conn, metadata) + // + // It returns a C.Conn with protocol which start with + // a new session (if any) StreamConn(c net.Conn, metadata *Metadata) (net.Conn, error) + + // DialContext return a C.Conn with protocol which + // contains multiplexing-related reuse logic (if any) DialContext(ctx context.Context, metadata *Metadata) (Conn, error) + DialUDP(metadata *Metadata) (PacketConn, error) SupportUDP() bool MarshalJSON() ([]byte, error)