Clash.Meta/tunnel/tunnel.go

728 lines
18 KiB
Go

package tunnel
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/common/utils"
"github.com/metacubex/mihomo/component/loopback"
"github.com/metacubex/mihomo/component/nat"
P "github.com/metacubex/mihomo/component/process"
"github.com/metacubex/mihomo/component/resolver"
"github.com/metacubex/mihomo/component/slowdown"
"github.com/metacubex/mihomo/component/sniffer"
C "github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/constant/features"
"github.com/metacubex/mihomo/constant/provider"
icontext "github.com/metacubex/mihomo/context"
"github.com/metacubex/mihomo/log"
"github.com/metacubex/mihomo/tunnel/statistic"
)
const (
queueCapacity = 64 // chan capacity tcpQueue and udpQueue
senderCapacity = 128 // chan capacity of PacketSender
)
var (
status = newAtomicStatus(Suspend)
udpInit sync.Once
udpQueues []chan C.PacketAdapter
natTable = nat.New()
rules []C.Rule
listeners = make(map[string]C.InboundListener)
subRules map[string][]C.Rule
proxies = make(map[string]C.Proxy)
providers map[string]provider.ProxyProvider
ruleProviders map[string]provider.RuleProvider
configMux sync.RWMutex
// for compatibility, lazy init
tcpQueue chan C.ConnContext
tcpInOnce sync.Once
udpQueue chan C.PacketAdapter
udpInOnce sync.Once
// Outbound Rule
mode = Rule
// default timeout for UDP session
udpTimeout = 60 * time.Second
findProcessMode P.FindProcessMode
fakeIPRange netip.Prefix
snifferDispatcher *sniffer.Dispatcher
sniffingEnable = false
ruleUpdateCallback = utils.NewCallback[provider.RuleProvider]()
)
type tunnel struct{}
var Tunnel = tunnel{}
var _ C.Tunnel = Tunnel
var _ provider.Tunnel = Tunnel
func (t tunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) {
connCtx := icontext.NewConnContext(conn, metadata)
handleTCPConn(connCtx)
}
func initUDP() {
numUDPWorkers := 4
if num := runtime.GOMAXPROCS(0); num > numUDPWorkers {
numUDPWorkers = num
}
udpQueues = make([]chan C.PacketAdapter, numUDPWorkers)
for i := 0; i < numUDPWorkers; i++ {
queue := make(chan C.PacketAdapter, queueCapacity)
udpQueues[i] = queue
go processUDP(queue)
}
}
func (t tunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) {
udpInit.Do(initUDP)
packetAdapter := C.NewPacketAdapter(packet, metadata)
key := packetAdapter.Key()
hash := utils.MapHash(key)
queueNo := uint(hash) % uint(len(udpQueues))
select {
case udpQueues[queueNo] <- packetAdapter:
default:
packet.Drop()
}
}
func (t tunnel) NatTable() C.NatTable {
return natTable
}
func (t tunnel) Providers() map[string]provider.ProxyProvider {
return providers
}
func (t tunnel) RuleProviders() map[string]provider.RuleProvider {
return ruleProviders
}
func (t tunnel) RuleUpdateCallback() *utils.Callback[provider.RuleProvider] {
return ruleUpdateCallback
}
func OnSuspend() {
status.Store(Suspend)
}
func OnInnerLoading() {
status.Store(Inner)
}
func OnRunning() {
status.Store(Running)
}
func Status() TunnelStatus {
return status.Load()
}
func SetFakeIPRange(p netip.Prefix) {
fakeIPRange = p
}
func FakeIPRange() netip.Prefix {
return fakeIPRange
}
func SetSniffing(b bool) {
if snifferDispatcher.Enable() {
configMux.Lock()
sniffingEnable = b
configMux.Unlock()
}
}
func IsSniffing() bool {
return sniffingEnable
}
// TCPIn return fan-in queue
// Deprecated: using Tunnel instead
func TCPIn() chan<- C.ConnContext {
tcpInOnce.Do(func() {
tcpQueue = make(chan C.ConnContext, queueCapacity)
go func() {
for connCtx := range tcpQueue {
go handleTCPConn(connCtx)
}
}()
})
return tcpQueue
}
// UDPIn return fan-in udp queue
// Deprecated: using Tunnel instead
func UDPIn() chan<- C.PacketAdapter {
udpInOnce.Do(func() {
udpQueue = make(chan C.PacketAdapter, queueCapacity)
go func() {
for packet := range udpQueue {
Tunnel.HandleUDPPacket(packet, packet.Metadata())
}
}()
})
return udpQueue
}
// NatTable return nat table
func NatTable() C.NatTable {
return natTable
}
// Rules return all rules
func Rules() []C.Rule {
return rules
}
func Listeners() map[string]C.InboundListener {
return listeners
}
// UpdateRules handle update rules
func UpdateRules(newRules []C.Rule, newSubRule map[string][]C.Rule, rp map[string]provider.RuleProvider) {
configMux.Lock()
rules = newRules
ruleProviders = rp
subRules = newSubRule
configMux.Unlock()
}
// Proxies return all proxies
func Proxies() map[string]C.Proxy {
return proxies
}
func ProxiesWithProviders() map[string]C.Proxy {
allProxies := make(map[string]C.Proxy)
for name, proxy := range proxies {
allProxies[name] = proxy
}
for _, p := range providers {
for _, proxy := range p.Proxies() {
name := proxy.Name()
allProxies[name] = proxy
}
}
return allProxies
}
// Providers return all compatible providers
func Providers() map[string]provider.ProxyProvider {
return providers
}
// RuleProviders return all loaded rule providers
func RuleProviders() map[string]provider.RuleProvider {
return ruleProviders
}
// UpdateProxies handle update proxies
func UpdateProxies(newProxies map[string]C.Proxy, newProviders map[string]provider.ProxyProvider) {
configMux.Lock()
proxies = newProxies
providers = newProviders
configMux.Unlock()
}
func UpdateListeners(newListeners map[string]C.InboundListener) {
configMux.Lock()
defer configMux.Unlock()
listeners = newListeners
}
func UpdateSniffer(dispatcher *sniffer.Dispatcher) {
configMux.Lock()
snifferDispatcher = dispatcher
sniffingEnable = dispatcher.Enable()
configMux.Unlock()
}
// Mode return current mode
func Mode() TunnelMode {
return mode
}
// SetMode change the mode of tunnel
func SetMode(m TunnelMode) {
mode = m
}
func FindProcessMode() P.FindProcessMode {
return findProcessMode
}
// SetFindProcessMode replace SetAlwaysFindProcess
// always find process info if legacyAlways = true or mode.Always() = true, may be increase many memory
func SetFindProcessMode(mode P.FindProcessMode) {
findProcessMode = mode
}
func isHandle(t C.Type) bool {
status := status.Load()
return status == Running || (status == Inner && t == C.INNER)
}
func needLookupIP(metadata *C.Metadata) bool {
return resolver.MappingEnabled() && metadata.Host == "" && metadata.DstIP.IsValid()
}
func preHandleMetadata(metadata *C.Metadata) error {
// handle IP string on host
if ip, err := netip.ParseAddr(metadata.Host); err == nil {
metadata.DstIP = ip
metadata.Host = ""
}
// preprocess enhanced-mode metadata
if needLookupIP(metadata) {
host, exist := resolver.FindHostByIP(metadata.DstIP)
if exist {
metadata.Host = host
metadata.DNSMode = C.DNSMapping
if resolver.FakeIPEnabled() {
metadata.DstIP = netip.Addr{}
metadata.DNSMode = C.DNSFakeIP
} else if node, ok := resolver.DefaultHosts.Search(host, false); ok {
// redir-host should lookup the hosts
metadata.DstIP, _ = node.RandIP()
} else if node != nil && node.IsDomain {
metadata.Host = node.Domain
}
} else if resolver.IsFakeIP(metadata.DstIP) {
return fmt.Errorf("fake DNS record %s missing", metadata.DstIP)
}
} else if node, ok := resolver.DefaultHosts.Search(metadata.Host, true); ok {
// try use domain mapping
metadata.Host = node.Domain
}
return nil
}
func resolveMetadata(metadata *C.Metadata) (proxy C.Proxy, rule C.Rule, err error) {
if metadata.SpecialProxy != "" {
var exist bool
proxy, exist = proxies[metadata.SpecialProxy]
if !exist {
err = fmt.Errorf("proxy %s not found", metadata.SpecialProxy)
}
return
}
switch mode {
case Direct:
proxy = proxies["DIRECT"]
case Global:
proxy = proxies["GLOBAL"]
// Rule
default:
proxy, rule, err = match(metadata)
}
return
}
// processUDP starts a loop to handle udp packet
func processUDP(queue chan C.PacketAdapter) {
for conn := range queue {
handleUDPConn(conn)
}
}
func handleUDPConn(packet C.PacketAdapter) {
if !isHandle(packet.Metadata().Type) {
packet.Drop()
return
}
metadata := packet.Metadata()
if !metadata.Valid() {
packet.Drop()
log.Warnln("[Metadata] not valid: %#v", metadata)
return
}
// make a fAddr if request ip is fakeip
var fAddr netip.Addr
if resolver.IsExistFakeIP(metadata.DstIP) {
fAddr = metadata.DstIP
}
if err := preHandleMetadata(metadata); err != nil {
packet.Drop()
log.Debugln("[Metadata PreHandle] error: %s", err)
return
}
if sniffingEnable && snifferDispatcher.Enable() {
snifferDispatcher.UDPSniff(packet)
}
key := packet.Key()
sender, loaded := natTable.GetOrCreate(key, newPacketSender)
if !loaded {
dial := func() (C.PacketConn, C.WriteBackProxy, error) {
if err := sender.ResolveUDP(metadata); err != nil {
log.Warnln("[UDP] Resolve Ip error: %s", err)
return nil, nil, err
}
proxy, rule, err := resolveMetadata(metadata)
if err != nil {
log.Warnln("[UDP] Parse metadata failed: %s", err.Error())
return nil, nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout)
defer cancel()
rawPc, err := retry(ctx, func(ctx context.Context) (C.PacketConn, error) {
return proxy.ListenPacketContext(ctx, metadata.Pure())
}, func(err error) {
logMetadataErr(metadata, rule, proxy, err)
})
if err != nil {
return nil, nil, err
}
logMetadata(metadata, rule, rawPc)
pc := statistic.NewUDPTracker(rawPc, statistic.DefaultManager, metadata, rule, 0, 0, true)
if rawPc.Chains().Last() == "REJECT-DROP" {
_ = pc.Close()
return nil, nil, errors.New("rejected drop packet")
}
oAddrPort := metadata.AddrPort()
writeBackProxy := nat.NewWriteBackProxy(packet)
go handleUDPToLocal(writeBackProxy, pc, sender, key, oAddrPort, fAddr)
return pc, writeBackProxy, nil
}
go func() {
pc, proxy, err := dial()
if err != nil {
sender.Close()
natTable.Delete(key)
return
}
sender.Process(pc, proxy)
}()
}
sender.Send(packet) // nonblocking
}
func handleTCPConn(connCtx C.ConnContext) {
if !isHandle(connCtx.Metadata().Type) {
_ = connCtx.Conn().Close()
return
}
defer func(conn net.Conn) {
_ = conn.Close()
}(connCtx.Conn())
metadata := connCtx.Metadata()
if !metadata.Valid() {
log.Warnln("[Metadata] not valid: %#v", metadata)
return
}
preHandleFailed := false
if err := preHandleMetadata(metadata); err != nil {
log.Debugln("[Metadata PreHandle] error: %s", err)
preHandleFailed = true
}
conn := connCtx.Conn()
conn.ResetPeeked() // reset before sniffer
if sniffingEnable && snifferDispatcher.Enable() {
// Try to sniff a domain when `preHandleMetadata` failed, this is usually
// caused by a "Fake DNS record missing" error when enhanced-mode is fake-ip.
if snifferDispatcher.TCPSniff(conn, metadata) {
// we now have a domain name
preHandleFailed = false
}
}
// If both trials have failed, we can do nothing but give up
if preHandleFailed {
log.Debugln("[Metadata PreHandle] failed to sniff a domain for connection %s --> %s, give up",
metadata.SourceDetail(), metadata.RemoteAddress())
return
}
peekMutex := sync.Mutex{}
if !conn.Peeked() {
peekMutex.Lock()
go func() {
defer peekMutex.Unlock()
_ = conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
_, _ = conn.Peek(1)
_ = conn.SetReadDeadline(time.Time{})
}()
}
proxy, rule, err := resolveMetadata(metadata)
if err != nil {
log.Warnln("[Metadata] parse failed: %s", err.Error())
return
}
dialMetadata := metadata
if len(metadata.Host) > 0 {
if node, ok := resolver.DefaultHosts.Search(metadata.Host, false); ok {
if dstIp, _ := node.RandIP(); !FakeIPRange().Contains(dstIp) {
dialMetadata.DstIP = dstIp
dialMetadata.DNSMode = C.DNSHosts
dialMetadata = dialMetadata.Pure()
}
}
}
var peekBytes []byte
var peekLen int
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
defer cancel()
remoteConn, err := retry(ctx, func(ctx context.Context) (remoteConn C.Conn, err error) {
remoteConn, err = proxy.DialContext(ctx, dialMetadata)
if err != nil {
return
}
if N.NeedHandshake(remoteConn) {
defer func() {
for _, chain := range remoteConn.Chains() {
if chain == "REJECT" {
err = nil
return
}
}
if err != nil {
remoteConn = nil
}
}()
peekMutex.Lock()
defer peekMutex.Unlock()
peekBytes, _ = conn.Peek(conn.Buffered())
_, err = remoteConn.Write(peekBytes)
if err != nil {
return
}
if peekLen = len(peekBytes); peekLen > 0 {
_, _ = conn.Discard(peekLen)
}
}
return
}, func(err error) {
logMetadataErr(metadata, rule, proxy, err)
})
if err != nil {
return
}
logMetadata(metadata, rule, remoteConn)
remoteConn = statistic.NewTCPTracker(remoteConn, statistic.DefaultManager, metadata, rule, 0, int64(peekLen), true)
defer func(remoteConn C.Conn) {
_ = remoteConn.Close()
}(remoteConn)
_ = conn.SetReadDeadline(time.Now()) // stop unfinished peek
peekMutex.Lock()
defer peekMutex.Unlock()
_ = conn.SetReadDeadline(time.Time{}) // reset
handleSocket(conn, remoteConn)
}
func logMetadataErr(metadata *C.Metadata, rule C.Rule, proxy C.ProxyAdapter, err error) {
if rule == nil {
log.Warnln("[%s] dial %s %s --> %s error: %s", strings.ToUpper(metadata.NetWork.String()), proxy.Name(), metadata.SourceDetail(), metadata.RemoteAddress(), err.Error())
} else {
log.Warnln("[%s] dial %s (match %s/%s) %s --> %s error: %s", strings.ToUpper(metadata.NetWork.String()), proxy.Name(), rule.RuleType().String(), rule.Payload(), metadata.SourceDetail(), metadata.RemoteAddress(), err.Error())
}
}
func logMetadata(metadata *C.Metadata, rule C.Rule, remoteConn C.Connection) {
switch {
case metadata.SpecialProxy != "":
log.Infoln("[%s] %s --> %s using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), metadata.SpecialProxy)
case rule != nil:
if rule.Payload() != "" {
log.Infoln("[%s] %s --> %s match %s using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), fmt.Sprintf("%s(%s)", rule.RuleType().String(), rule.Payload()), remoteConn.Chains().String())
} else {
log.Infoln("[%s] %s --> %s match %s using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), rule.RuleType().String(), remoteConn.Chains().String())
}
case mode == Global:
log.Infoln("[%s] %s --> %s using GLOBAL", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress())
case mode == Direct:
log.Infoln("[%s] %s --> %s using DIRECT", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress())
default:
log.Infoln("[%s] %s --> %s doesn't match any rule using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), remoteConn.Chains().Last())
}
}
func shouldResolveIP(rule C.Rule, metadata *C.Metadata) bool {
return rule.ShouldResolveIP() && metadata.Host != "" && !metadata.DstIP.IsValid()
}
func match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
configMux.RLock()
defer configMux.RUnlock()
var (
resolved bool
attemptProcessLookup = metadata.Type != C.INNER
)
if node, ok := resolver.DefaultHosts.Search(metadata.Host, false); ok {
metadata.DstIP, _ = node.RandIP()
resolved = true
}
for _, rule := range getRules(metadata) {
if !resolved && shouldResolveIP(rule, metadata) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), resolver.DefaultDNSTimeout)
defer cancel()
ip, err := resolver.ResolveIP(ctx, metadata.Host)
if err != nil {
log.Debugln("[DNS] resolve %s error: %s", metadata.Host, err.Error())
} else {
log.Debugln("[DNS] %s --> %s", metadata.Host, ip.String())
metadata.DstIP = ip
}
resolved = true
}()
}
if attemptProcessLookup && !findProcessMode.Off() && (findProcessMode.Always() || rule.ShouldFindProcess()) {
attemptProcessLookup = false
if !features.CMFA {
// normal check for process
uid, path, err := P.FindProcessName(metadata.NetWork.String(), metadata.SrcIP, int(metadata.SrcPort))
if err != nil {
log.Debugln("[Process] find process error for %s: %v", metadata.String(), err)
} else {
metadata.Process = filepath.Base(path)
metadata.ProcessPath = path
metadata.Uid = uid
if pkg, err := P.FindPackageName(metadata); err == nil { // for android (not CMFA) package names
metadata.Process = pkg
}
}
} else {
// check package names
pkg, err := P.FindPackageName(metadata)
if err != nil {
log.Debugln("[Process] find process error for %s: %v", metadata.String(), err)
} else {
metadata.Process = pkg
}
}
}
if matched, ada := rule.Match(metadata); matched {
adapter, ok := proxies[ada]
if !ok {
continue
}
// parse multi-layer nesting
passed := false
for adapter := adapter; adapter != nil; adapter = adapter.Unwrap(metadata, false) {
if adapter.Type() == C.Pass {
passed = true
break
}
}
if passed {
log.Debugln("%s match Pass rule", adapter.Name())
continue
}
if metadata.NetWork == C.UDP && !adapter.SupportUDP() {
log.Debugln("%s UDP is not supported", adapter.Name())
continue
}
return adapter, rule, nil
}
}
return proxies["DIRECT"], nil, nil
}
func getRules(metadata *C.Metadata) []C.Rule {
if sr, ok := subRules[metadata.SpecialRules]; ok {
log.Debugln("[Rule] use %s rules", metadata.SpecialRules)
return sr
} else {
log.Debugln("[Rule] use default rules")
return rules
}
}
func shouldStopRetry(err error) bool {
if errors.Is(err, resolver.ErrIPNotFound) {
return true
}
if errors.Is(err, resolver.ErrIPVersion) {
return true
}
if errors.Is(err, resolver.ErrIPv6Disabled) {
return true
}
if errors.Is(err, loopback.ErrReject) {
return true
}
return false
}
func retry[T any](ctx context.Context, ft func(context.Context) (T, error), fe func(err error)) (t T, err error) {
s := slowdown.New()
for i := 0; i < 10; i++ {
t, err = ft(ctx)
if err != nil {
if fe != nil {
fe(err)
}
if shouldStopRetry(err) {
return
}
if s.Wait(ctx) == nil {
continue
} else {
return
}
} else {
break
}
}
return
}