chore: add inbound test for vmess/vless

This commit is contained in:
wwqgtxx 2025-04-16 20:44:48 +08:00
parent a75e570cca
commit d0d0c392d7
3 changed files with 668 additions and 0 deletions

View File

@ -0,0 +1,241 @@
package inbound_test
import (
"context"
"crypto/rand"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"sync"
"testing"
"time"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/ca"
"github.com/metacubex/mihomo/component/generater"
C "github.com/metacubex/mihomo/constant"
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/stretchr/testify/assert"
)
var tlsCertificate, tlsPrivateKey, tlsFingerprint, _ = N.NewRandomTLSKeyPair()
var tlsConfigCert, _ = tls.X509KeyPair([]byte(tlsCertificate), []byte(tlsPrivateKey))
var tlsConfig = &tls.Config{Certificates: []tls.Certificate{tlsConfigCert}, NextProtos: []string{"h2", "http/1.1"}}
var tlsClientConfig, _ = ca.GetTLSConfig(nil, tlsFingerprint, "", "")
var realityPrivateKey, realityPublickey string
var realityDest = "itunes.apple.com"
var realityShortid = "10f897e26c4b9478"
func init() {
privateKey, err := generater.GeneratePrivateKey()
if err != nil {
panic(err)
}
publicKey := privateKey.PublicKey()
realityPrivateKey = base64.RawURLEncoding.EncodeToString(privateKey[:])
realityPublickey = base64.RawURLEncoding.EncodeToString(publicKey[:])
}
type TestTunnel struct {
HandleTCPConnFn func(conn net.Conn, metadata *C.Metadata)
HandleUDPPacketFn func(packet C.UDPPacket, metadata *C.Metadata)
NatTableFn func() C.NatTable
CloseFn func() error
DoTestFn func(t *testing.T, proxy C.ProxyAdapter)
}
func (tt *TestTunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) {
tt.HandleTCPConnFn(conn, metadata)
}
func (tt *TestTunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) {
tt.HandleUDPPacketFn(packet, metadata)
}
func (tt *TestTunnel) NatTable() C.NatTable {
return tt.NatTableFn()
}
func (tt *TestTunnel) Close() error {
return tt.CloseFn()
}
func (tt *TestTunnel) DoTest(t *testing.T, proxy C.ProxyAdapter) {
tt.DoTestFn(t, proxy)
}
type TestTunnelListener struct {
ch chan net.Conn
ctx context.Context
cancel context.CancelFunc
addr net.Addr
}
func (t *TestTunnelListener) Accept() (net.Conn, error) {
select {
case conn, ok := <-t.ch:
if !ok {
return nil, net.ErrClosed
}
return conn, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
func (t *TestTunnelListener) Close() error {
t.cancel()
return nil
}
func (t *TestTunnelListener) Addr() net.Addr {
return t.addr
}
type WaitCloseConn struct {
net.Conn
ch chan struct{}
once sync.Once
}
func (c *WaitCloseConn) Close() error {
err := c.Conn.Close()
c.once.Do(func() {
close(c.ch)
})
return err
}
var _ C.Tunnel = (*TestTunnel)(nil)
var _ net.Listener = (*TestTunnelListener)(nil)
type HttpTestConfig struct {
RemoteAddr netip.Addr
HttpPath string
HttpData []byte
}
func NewHttpTestTunnel() *TestTunnel {
httpData := make([]byte, 10240)
rand.Read(httpData)
config := &HttpTestConfig{
HttpPath: "/inbound_test",
HttpData: httpData,
RemoteAddr: netip.MustParseAddr("1.2.3.4"),
}
ctx, cancel := context.WithCancel(context.Background())
ln := &TestTunnelListener{ch: make(chan net.Conn), ctx: ctx, cancel: cancel, addr: net.TCPAddrFromAddrPort(netip.AddrPortFrom(config.RemoteAddr, 0))}
r := chi.NewRouter()
r.Get(config.HttpPath, func(w http.ResponseWriter, r *http.Request) {
render.Data(w, r, config.HttpData)
})
go http.Serve(ln, r)
testFn := func(t *testing.T, proxy C.ProxyAdapter, proto string) {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s://%s%s", proto, config.RemoteAddr, config.HttpPath), nil)
assert.NoError(t, err)
req = req.WithContext(ctx)
var dstPort uint16 = 80
if proto == "https" {
dstPort = 443
}
metadata := &C.Metadata{
NetWork: C.TCP,
DstIP: config.RemoteAddr,
DstPort: dstPort,
}
instance, err := proxy.DialContext(ctx, metadata)
assert.NoError(t, err)
defer instance.Close()
transport := &http.Transport{
DialContext: func(context.Context, string, string) (net.Conn, error) {
return instance, nil
},
// from http.DefaultTransport
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// for our self-signed cert
TLSClientConfig: tlsClientConfig,
// open http2
ForceAttemptHTTP2: true,
}
client := http.Client{
Timeout: 30 * time.Second,
Transport: transport,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
defer client.CloseIdleConnections()
resp, err := client.Do(req)
assert.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
data, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, config.HttpData, data)
}
tunnel := &TestTunnel{
HandleTCPConnFn: func(conn net.Conn, metadata *C.Metadata) {
defer conn.Close()
if metadata.DstIP != config.RemoteAddr && metadata.Host != realityDest {
return // not match, just return
}
c := &WaitCloseConn{
Conn: conn,
ch: make(chan struct{}),
}
if metadata.DstPort == 443 {
tlsConn := tls.Server(c, tlsConfig)
if metadata.Host == realityDest { // ignore the tls handshake error for realityDest
ctx, cancel := context.WithTimeout(ctx, C.DefaultTLSTimeout)
defer cancel()
if err := tlsConn.HandshakeContext(ctx); err != nil {
return
}
}
ln.ch <- tlsConn
} else {
ln.ch <- c
}
<-c.ch
},
CloseFn: ln.Close,
DoTestFn: func(t *testing.T, proxy C.ProxyAdapter) {
wg := sync.WaitGroup{}
num := 50
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
testFn(t, proxy, "https")
defer wg.Done()
}()
}
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
testFn(t, proxy, "http")
defer wg.Done()
}()
}
wg.Wait()
},
}
return tunnel
}

View File

@ -0,0 +1,179 @@
package inbound_test
import (
"net"
"net/netip"
"testing"
"github.com/metacubex/mihomo/adapter/outbound"
"github.com/metacubex/mihomo/common/utils"
"github.com/metacubex/mihomo/listener/inbound"
"github.com/stretchr/testify/assert"
)
func testInboundVless(t *testing.T, inboundOptions inbound.VlessOption, outboundOptions outbound.VlessOption) {
userUUID := utils.NewUUIDV4().String()
inboundOptions.BaseOption = inbound.BaseOption{
NameStr: "vless_inbound",
Listen: "127.0.0.1",
Port: "0",
}
inboundOptions.Users = []inbound.VlessUser{
{Username: "test", UUID: userUUID, Flow: "xtls-rprx-vision"},
}
in, err := inbound.NewVless(&inboundOptions)
assert.NoError(t, err)
tunnel := NewHttpTestTunnel()
defer tunnel.Close()
err = in.Listen(tunnel)
assert.NoError(t, err)
defer in.Close()
addrPort, err := netip.ParseAddrPort(in.Address())
assert.NoError(t, err)
outboundOptions.Name = "vless_outbound"
outboundOptions.Server = addrPort.Addr().String()
outboundOptions.Port = int(addrPort.Port())
outboundOptions.UUID = userUUID
out, err := outbound.NewVless(outboundOptions)
assert.NoError(t, err)
defer out.Close()
tunnel.DoTest(t, out)
}
func TestInboundVless_Tls(t *testing.T) {
inboundOptions := inbound.VlessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
}
outboundOptions := outbound.VlessOption{
TLS: true,
Fingerprint: tlsFingerprint,
}
testInboundVless(t, inboundOptions, outboundOptions)
outboundOptions.Flow = "xtls-rprx-vision"
testInboundVless(t, inboundOptions, outboundOptions)
}
func TestInboundVless_Wss1(t *testing.T) {
inboundOptions := inbound.VlessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
WsPath: "/ws",
}
outboundOptions := outbound.VlessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
},
ClientFingerprint: "chrome",
}
testInboundVless(t, inboundOptions, outboundOptions)
outboundOptions.Flow = "xtls-rprx-vision"
testInboundVless(t, inboundOptions, outboundOptions)
}
func TestInboundVless_Wss2(t *testing.T) {
inboundOptions := inbound.VlessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
WsPath: "/ws",
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VlessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
},
}
testInboundVless(t, inboundOptions, outboundOptions)
outboundOptions.Flow = "xtls-rprx-vision"
testInboundVless(t, inboundOptions, outboundOptions)
}
func TestInboundVless_Grpc1(t *testing.T) {
inboundOptions := inbound.VlessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VlessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "grpc",
GrpcOpts: outbound.GrpcOptions{GrpcServiceName: "GunService"},
}
testInboundVless(t, inboundOptions, outboundOptions)
}
func TestInboundVless_Grpc2(t *testing.T) {
inboundOptions := inbound.VlessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
WsPath: "/ws",
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VlessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "grpc",
GrpcOpts: outbound.GrpcOptions{GrpcServiceName: "GunService"},
}
testInboundVless(t, inboundOptions, outboundOptions)
}
func TestInboundVless_Reality(t *testing.T) {
inboundOptions := inbound.VlessOption{
RealityConfig: inbound.RealityConfig{
Dest: net.JoinHostPort(realityDest, "443"),
PrivateKey: realityPrivateKey,
ShortID: []string{realityShortid},
ServerNames: []string{realityDest},
},
}
outboundOptions := outbound.VlessOption{
TLS: true,
ServerName: realityDest,
RealityOpts: outbound.RealityOptions{
PublicKey: realityPublickey,
ShortID: realityShortid,
},
ClientFingerprint: "chrome",
}
testInboundVless(t, inboundOptions, outboundOptions)
outboundOptions.Flow = "xtls-rprx-vision"
testInboundVless(t, inboundOptions, outboundOptions)
}
func TestInboundVless_Reality_Grpc(t *testing.T) {
inboundOptions := inbound.VlessOption{
RealityConfig: inbound.RealityConfig{
Dest: net.JoinHostPort(realityDest, "443"),
PrivateKey: realityPrivateKey,
ShortID: []string{realityShortid},
ServerNames: []string{realityDest},
},
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VlessOption{
TLS: true,
ServerName: realityDest,
RealityOpts: outbound.RealityOptions{
PublicKey: realityPublickey,
ShortID: realityShortid,
},
ClientFingerprint: "chrome",
Network: "grpc",
GrpcOpts: outbound.GrpcOptions{GrpcServiceName: "GunService"},
}
testInboundVless(t, inboundOptions, outboundOptions)
}

View File

@ -0,0 +1,248 @@
package inbound_test
import (
"net"
"net/netip"
"testing"
"github.com/metacubex/mihomo/adapter/outbound"
"github.com/metacubex/mihomo/common/utils"
"github.com/metacubex/mihomo/listener/inbound"
"github.com/stretchr/testify/assert"
)
func testInboundVMess(t *testing.T, inboundOptions inbound.VmessOption, outboundOptions outbound.VmessOption) {
userUUID := utils.NewUUIDV4().String()
inboundOptions.BaseOption = inbound.BaseOption{
NameStr: "vmess_inbound",
Listen: "127.0.0.1",
Port: "0",
}
inboundOptions.Users = []inbound.VmessUser{
{Username: "test", UUID: userUUID, AlterID: 0},
}
in, err := inbound.NewVmess(&inboundOptions)
assert.NoError(t, err)
tunnel := NewHttpTestTunnel()
defer tunnel.Close()
err = in.Listen(tunnel)
assert.NoError(t, err)
defer in.Close()
addrPort, err := netip.ParseAddrPort(in.Address())
assert.NoError(t, err)
outboundOptions.Name = "vmess_outbound"
outboundOptions.Server = addrPort.Addr().String()
outboundOptions.Port = int(addrPort.Port())
outboundOptions.UUID = userUUID
outboundOptions.AlterID = 0
outboundOptions.Cipher = "auto"
out, err := outbound.NewVmess(outboundOptions)
assert.NoError(t, err)
defer out.Close()
tunnel.DoTest(t, out)
}
func TestInboundVMess_Basic(t *testing.T) {
inboundOptions := inbound.VmessOption{}
outboundOptions := outbound.VmessOption{}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Tls(t *testing.T) {
inboundOptions := inbound.VmessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
}
outboundOptions := outbound.VmessOption{
TLS: true,
Fingerprint: tlsFingerprint,
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Ws(t *testing.T) {
inboundOptions := inbound.VmessOption{
WsPath: "/ws",
}
outboundOptions := outbound.VmessOption{
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Ws_ed1(t *testing.T) {
inboundOptions := inbound.VmessOption{
WsPath: "/ws",
}
outboundOptions := outbound.VmessOption{
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws?ed=2048",
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Ws_ed2(t *testing.T) {
inboundOptions := inbound.VmessOption{
WsPath: "/ws",
}
outboundOptions := outbound.VmessOption{
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
MaxEarlyData: 2048,
EarlyDataHeaderName: "Sec-WebSocket-Protocol",
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Ws_Upgrade1(t *testing.T) {
inboundOptions := inbound.VmessOption{
WsPath: "/ws",
}
outboundOptions := outbound.VmessOption{
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
V2rayHttpUpgrade: true,
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Ws_Upgrade2(t *testing.T) {
inboundOptions := inbound.VmessOption{
WsPath: "/ws",
}
outboundOptions := outbound.VmessOption{
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
V2rayHttpUpgrade: true,
V2rayHttpUpgradeFastOpen: true,
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Wss1(t *testing.T) {
inboundOptions := inbound.VmessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
WsPath: "/ws",
}
outboundOptions := outbound.VmessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Wss2(t *testing.T) {
inboundOptions := inbound.VmessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
WsPath: "/ws",
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VmessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "ws",
WSOpts: outbound.WSOptions{
Path: "/ws",
},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Grpc1(t *testing.T) {
inboundOptions := inbound.VmessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VmessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "grpc",
GrpcOpts: outbound.GrpcOptions{GrpcServiceName: "GunService"},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Grpc2(t *testing.T) {
inboundOptions := inbound.VmessOption{
Certificate: tlsCertificate,
PrivateKey: tlsPrivateKey,
WsPath: "/ws",
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VmessOption{
TLS: true,
Fingerprint: tlsFingerprint,
Network: "grpc",
GrpcOpts: outbound.GrpcOptions{GrpcServiceName: "GunService"},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Reality(t *testing.T) {
inboundOptions := inbound.VmessOption{
RealityConfig: inbound.RealityConfig{
Dest: net.JoinHostPort(realityDest, "443"),
PrivateKey: realityPrivateKey,
ShortID: []string{realityShortid},
ServerNames: []string{realityDest},
},
}
outboundOptions := outbound.VmessOption{
TLS: true,
ServerName: realityDest,
RealityOpts: outbound.RealityOptions{
PublicKey: realityPublickey,
ShortID: realityShortid,
},
ClientFingerprint: "chrome",
}
testInboundVMess(t, inboundOptions, outboundOptions)
}
func TestInboundVMess_Reality_Grpc(t *testing.T) {
inboundOptions := inbound.VmessOption{
RealityConfig: inbound.RealityConfig{
Dest: net.JoinHostPort(realityDest, "443"),
PrivateKey: realityPrivateKey,
ShortID: []string{realityShortid},
ServerNames: []string{realityDest},
},
GrpcServiceName: "GunService",
}
outboundOptions := outbound.VmessOption{
TLS: true,
ServerName: realityDest,
RealityOpts: outbound.RealityOptions{
PublicKey: realityPublickey,
ShortID: realityShortid,
},
ClientFingerprint: "chrome",
Network: "grpc",
GrpcOpts: outbound.GrpcOptions{GrpcServiceName: "GunService"},
}
testInboundVMess(t, inboundOptions, outboundOptions)
}