diff --git a/adapter/adapter.go b/adapter/adapter.go index 60b6ad947..a3af6471b 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -26,21 +26,20 @@ const ( defaultHistoriesNum = 10 ) -type extraProxyState struct { - history *queue.Queue[C.DelayHistory] +type internalProxyState struct { alive atomic.Bool + history *queue.Queue[C.DelayHistory] } type Proxy struct { C.ProxyAdapter history *queue.Queue[C.DelayHistory] alive atomic.Bool - url string - extra *xsync.MapOf[string, *extraProxyState] + extra *xsync.MapOf[string, *internalProxyState] } -// AliveForTestUrl implements C.Proxy -func (p *Proxy) AliveForTestUrl(url string) bool { +// Alive implements C.Proxy +func (p *Proxy) Alive(url string) bool { if state, ok := p.extra.Load(url); ok { return state.alive.Load() } @@ -48,10 +47,6 @@ func (p *Proxy) AliveForTestUrl(url string) bool { return p.alive.Load() } -func (p *Proxy) OriginalHealthCheckUrl(url string) { - p.url = url -} - // Dial implements C.Proxy func (p *Proxy) Dial(metadata *C.Metadata) (C.Conn, error) { ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout) @@ -81,7 +76,7 @@ func (p *Proxy) ListenPacketContext(ctx context.Context, metadata *C.Metadata, o // DelayHistory implements C.Proxy func (p *Proxy) DelayHistory() []C.DelayHistory { queueM := p.history.Copy() - histories := []C.DelayHistory{} + var histories []C.DelayHistory for _, item := range queueM { histories = append(histories, item) } @@ -97,71 +92,52 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory { queueM = state.history.Copy() } - if queueM == nil { - queueM = p.history.Copy() - } - - histories := []C.DelayHistory{} + var histories []C.DelayHistory for _, item := range queueM { histories = append(histories, item) } return histories } -func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory { - extraHistory := map[string][]C.DelayHistory{} - - p.extra.Range(func(k string, v *extraProxyState) bool { +// ExtraDelayHistories return all delay histories for each test URL +// implements C.Proxy +func (p *Proxy) ExtraDelayHistories() map[string]C.ProxyState { + histories := map[string]C.ProxyState{} + p.extra.Range(func(k string, v *internalProxyState) bool { testUrl := k state := v - histories := []C.DelayHistory{} queueM := state.history.Copy() + var history []C.DelayHistory for _, item := range queueM { - histories = append(histories, item) + history = append(history, item) } - extraHistory[testUrl] = histories - + histories[testUrl] = C.ProxyState{ + Alive: state.alive.Load(), + History: history, + } return true }) - return extraHistory + return histories } -// LastDelay return last history record. if proxy is not alive, return the max value of uint16. +// LastDelayForTestUrl return last history record of the specified URL. if proxy is not alive, return the max value of uint16. // implements C.Proxy -func (p *Proxy) LastDelay() (delay uint16) { - var max uint16 = 0xffff - if !p.alive.Load() { - return max - } - - history := p.history.Last() - if history.Delay == 0 { - return max - } - return history.Delay -} - -// LastDelayForTestUrl implements C.Proxy func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) { var max uint16 = 0xffff - alive := p.alive.Load() - history := p.history.Last() + alive := false + var history C.DelayHistory if state, ok := p.extra.Load(url); ok { alive = state.alive.Load() history = state.history.Last() } - if !alive { - return max - } - - if history.Delay == 0 { + if !alive || history.Delay == 0 { return max } return history.Delay @@ -177,8 +153,8 @@ func (p *Proxy) MarshalJSON() ([]byte, error) { mapping := map[string]any{} _ = json.Unmarshal(inner, &mapping) mapping["history"] = p.DelayHistory() - mapping["extra"] = p.ExtraDelayHistory() - mapping["alive"] = p.AliveForTestUrl(p.url) + mapping["extra"] = p.ExtraDelayHistories() + mapping["alive"] = p.alive.Load() mapping["name"] = p.Name() mapping["udp"] = p.SupportUDP() mapping["xudp"] = p.SupportXUDP() @@ -191,43 +167,32 @@ func (p *Proxy) MarshalJSON() ([]byte, error) { func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (t uint16, err error) { defer func() { alive := err == nil - - if len(p.url) == 0 || url == p.url { - p.alive.Store(alive) - record := C.DelayHistory{Time: time.Now()} - if alive { - record.Delay = t - } - p.history.Put(record) - if p.history.Len() > defaultHistoriesNum { - p.history.Pop() - } - - // test URL configured by the proxy provider - if len(p.url) == 0 { - p.url = url - } - } else { - record := C.DelayHistory{Time: time.Now()} - if alive { - record.Delay = t - } - - state, ok := p.extra.Load(url) - if !ok { - state = &extraProxyState{ - history: queue.New[C.DelayHistory](defaultHistoriesNum), - alive: atomic.NewBool(true), - } - p.extra.Store(url, state) - } - - state.alive.Store(alive) - state.history.Put(record) - if state.history.Len() > defaultHistoriesNum { - state.history.Pop() - } + record := C.DelayHistory{Time: time.Now()} + if alive { + record.Delay = t } + + p.alive.Store(alive) + p.history.Put(record) + if p.history.Len() > defaultHistoriesNum { + p.history.Pop() + } + + state, ok := p.extra.Load(url) + if !ok { + state = &internalProxyState{ + history: queue.New[C.DelayHistory](defaultHistoriesNum), + alive: atomic.NewBool(true), + } + p.extra.Store(url, state) + } + + state.alive.Store(alive) + state.history.Put(record) + if state.history.Len() > defaultHistoriesNum { + state.history.Pop() + } + }() unifiedDelay := UnifiedDelay.Load() @@ -304,8 +269,7 @@ func NewProxy(adapter C.ProxyAdapter) *Proxy { ProxyAdapter: adapter, history: queue.New[C.DelayHistory](defaultHistoriesNum), alive: atomic.NewBool(true), - url: "", - extra: xsync.NewMapOf[string, *extraProxyState]()} + extra: xsync.NewMapOf[string, *internalProxyState]()} } func urlToMetadata(rawURL string) (addr C.Metadata, err error) { diff --git a/adapter/outboundgroup/fallback.go b/adapter/outboundgroup/fallback.go index 50427e53a..72d7e62a4 100644 --- a/adapter/outboundgroup/fallback.go +++ b/adapter/outboundgroup/fallback.go @@ -102,12 +102,12 @@ func (f *Fallback) findAliveProxy(touch bool) C.Proxy { proxies := f.GetProxies(touch) for _, proxy := range proxies { if len(f.selected) == 0 { - if proxy.AliveForTestUrl(f.testUrl) { + if proxy.Alive(f.testUrl) { return proxy } } else { if proxy.Name() == f.selected { - if proxy.AliveForTestUrl(f.testUrl) { + if proxy.Alive(f.testUrl) { return proxy } else { f.selected = "" @@ -133,7 +133,7 @@ func (f *Fallback) Set(name string) error { } f.selected = name - if !p.AliveForTestUrl(f.testUrl) { + if !p.Alive(f.testUrl) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(5000)) defer cancel() expectedStatus, _ := utils.NewIntRanges[uint16](f.expectedStatus) diff --git a/adapter/outboundgroup/loadbalance.go b/adapter/outboundgroup/loadbalance.go index 7fbc91a76..8bcd661f0 100644 --- a/adapter/outboundgroup/loadbalance.go +++ b/adapter/outboundgroup/loadbalance.go @@ -150,8 +150,7 @@ func strategyRoundRobin(url string) strategyFn { for ; i < length; i++ { id := (idx + i) % length proxy := proxies[id] - // if proxy.Alive() { - if proxy.AliveForTestUrl(url) { + if proxy.Alive(url) { i++ return proxy } @@ -169,16 +168,14 @@ func strategyConsistentHashing(url string) strategyFn { for i := 0; i < maxRetry; i, key = i+1, key+1 { idx := jumpHash(key, buckets) proxy := proxies[idx] - // if proxy.Alive() { - if proxy.AliveForTestUrl(url) { + if proxy.Alive(url) { return proxy } } // when availability is poor, traverse the entire list to get the available nodes for _, proxy := range proxies { - // if proxy.Alive() { - if proxy.AliveForTestUrl(url) { + if proxy.Alive(url) { return proxy } } @@ -204,8 +201,7 @@ func strategyStickySessions(url string) strategyFn { nowIdx := idx for i := 1; i < maxRetry; i++ { proxy := proxies[nowIdx] - // if proxy.Alive() { - if proxy.AliveForTestUrl(url) { + if proxy.Alive(url) { if nowIdx != idx { lruCache.Delete(key) lruCache.Set(key, nowIdx) diff --git a/adapter/outboundgroup/urltest.go b/adapter/outboundgroup/urltest.go index bdac909f4..2e533415b 100644 --- a/adapter/outboundgroup/urltest.go +++ b/adapter/outboundgroup/urltest.go @@ -101,7 +101,7 @@ func (u *URLTest) fast(touch bool) C.Proxy { proxies := u.GetProxies(touch) if u.selected != "" { for _, proxy := range proxies { - if !proxy.AliveForTestUrl(u.testUrl) { + if !proxy.Alive(u.testUrl) { continue } if proxy.Name() == u.selected { @@ -121,7 +121,7 @@ func (u *URLTest) fast(touch bool) C.Proxy { fastNotExist = false } - if !proxy.AliveForTestUrl(u.testUrl) { + if !proxy.Alive(u.testUrl) { continue } @@ -133,7 +133,7 @@ func (u *URLTest) fast(touch bool) C.Proxy { } // tolerance - if u.fastNode == nil || fastNotExist || !u.fastNode.AliveForTestUrl(u.testUrl) || u.fastNode.LastDelayForTestUrl(u.testUrl) > fast.LastDelayForTestUrl(u.testUrl)+u.tolerance { + if u.fastNode == nil || fastNotExist || !u.fastNode.Alive(u.testUrl) || u.fastNode.LastDelayForTestUrl(u.testUrl) > fast.LastDelayForTestUrl(u.testUrl)+u.tolerance { u.fastNode = fast } return u.fastNode, nil diff --git a/adapter/provider/healthcheck.go b/adapter/provider/healthcheck.go index d8e56192e..111185016 100644 --- a/adapter/provider/healthcheck.go +++ b/adapter/provider/healthcheck.go @@ -202,7 +202,7 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex defer cancel() log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid) _, _ = p.URLTest(ctx, url, expectedStatus) - log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid) + log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.Alive(url), p.LastDelayForTestUrl(url), uid) return false, nil }) } diff --git a/adapter/provider/provider.go b/adapter/provider/provider.go index 37106e63a..01ae44eed 100644 --- a/adapter/provider/provider.go +++ b/adapter/provider/provider.go @@ -114,10 +114,6 @@ func (pp *proxySetProvider) RegisterHealthCheckTask(url string, expectedStatus u func (pp *proxySetProvider) setProxies(proxies []C.Proxy) { pp.proxies = proxies - for _, proxy := range pp.proxies { - proxy.OriginalHealthCheckUrl(pp.healthCheck.url) - } - pp.healthCheck.setProxy(proxies) if pp.healthCheck.auto() { go pp.healthCheck.check() diff --git a/constant/adapters.go b/constant/adapters.go index 4a8de89b5..58b2a92b0 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -147,15 +147,19 @@ type DelayHistory struct { Delay uint16 `json:"delay"` } +type ProxyState struct { + Alive bool `json:"alive"` + History []DelayHistory `json:"history"` +} + type DelayHistoryStoreType int type Proxy interface { ProxyAdapter - AliveForTestUrl(url string) bool + Alive(url string) bool DelayHistory() []DelayHistory - ExtraDelayHistory() map[string][]DelayHistory + ExtraDelayHistories() map[string]ProxyState LastDelayForTestUrl(url string) uint16 - OriginalHealthCheckUrl(url string) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (uint16, error) // Deprecated: use DialContext instead.