mirror of
https://gitclone.com/github.com/MetaCubeX/Clash.Meta
synced 2025-05-13 21:48:02 +08:00
In the original batch implementation, the Go() method will always start a new goroutine and then wait for the concurrency limit, which is unnecessary for the current code. x/sync/errgroup will block Go() until the concurrency limit is met, which can effectively reduce memory usage. In addition, the original batch always saves the return value of Go(), but it is not used in the current code, which will also waste a lot of memory space in high concurrency scenarios.
237 lines
5.7 KiB
Go
237 lines
5.7 KiB
Go
package provider
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/metacubex/mihomo/common/atomic"
|
|
"github.com/metacubex/mihomo/common/singledo"
|
|
"github.com/metacubex/mihomo/common/utils"
|
|
C "github.com/metacubex/mihomo/constant"
|
|
"github.com/metacubex/mihomo/log"
|
|
|
|
"github.com/dlclark/regexp2"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type HealthCheckOption struct {
|
|
URL string
|
|
Interval uint
|
|
}
|
|
|
|
type extraOption struct {
|
|
expectedStatus utils.IntRanges[uint16]
|
|
filters map[string]struct{}
|
|
}
|
|
|
|
type HealthCheck struct {
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
url string
|
|
extra map[string]*extraOption
|
|
mu sync.Mutex
|
|
started atomic.Bool
|
|
proxies []C.Proxy
|
|
interval time.Duration
|
|
lazy bool
|
|
expectedStatus utils.IntRanges[uint16]
|
|
lastTouch atomic.TypedValue[time.Time]
|
|
singleDo *singledo.Single[struct{}]
|
|
timeout time.Duration
|
|
}
|
|
|
|
func (hc *HealthCheck) process() {
|
|
if hc.started.Load() {
|
|
log.Warnln("Skip start health check timer due to it's started")
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(hc.interval)
|
|
hc.start()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
lastTouch := hc.lastTouch.Load()
|
|
since := time.Since(lastTouch)
|
|
if !hc.lazy || since < hc.interval {
|
|
hc.check()
|
|
} else {
|
|
log.Debugln("Skip once health check because we are lazy")
|
|
}
|
|
case <-hc.ctx.Done():
|
|
ticker.Stop()
|
|
hc.stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (hc *HealthCheck) setProxy(proxies []C.Proxy) {
|
|
hc.proxies = proxies
|
|
}
|
|
|
|
func (hc *HealthCheck) registerHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) {
|
|
url = strings.TrimSpace(url)
|
|
if len(url) == 0 || url == hc.url {
|
|
log.Debugln("ignore invalid health check url: %s", url)
|
|
return
|
|
}
|
|
|
|
hc.mu.Lock()
|
|
defer hc.mu.Unlock()
|
|
|
|
// if the provider has not set up health checks, then modify it to be the same as the group's interval
|
|
if hc.interval == 0 {
|
|
hc.interval = time.Duration(interval) * time.Second
|
|
}
|
|
|
|
if hc.extra == nil {
|
|
hc.extra = make(map[string]*extraOption)
|
|
}
|
|
|
|
// prioritize the use of previously registered configurations, especially those from provider
|
|
if _, ok := hc.extra[url]; ok {
|
|
// provider default health check does not set filter
|
|
if url != hc.url && len(filter) != 0 {
|
|
splitAndAddFiltersToExtra(filter, hc.extra[url])
|
|
}
|
|
|
|
log.Debugln("health check url: %s exists", url)
|
|
return
|
|
}
|
|
|
|
option := &extraOption{filters: map[string]struct{}{}, expectedStatus: expectedStatus}
|
|
splitAndAddFiltersToExtra(filter, option)
|
|
hc.extra[url] = option
|
|
|
|
if hc.auto() && !hc.started.Load() {
|
|
go hc.process()
|
|
}
|
|
}
|
|
|
|
func splitAndAddFiltersToExtra(filter string, option *extraOption) {
|
|
filter = strings.TrimSpace(filter)
|
|
if len(filter) != 0 {
|
|
for _, regex := range strings.Split(filter, "`") {
|
|
regex = strings.TrimSpace(regex)
|
|
if len(regex) != 0 {
|
|
option.filters[regex] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (hc *HealthCheck) auto() bool {
|
|
return hc.interval != 0
|
|
}
|
|
|
|
func (hc *HealthCheck) touch() {
|
|
hc.lastTouch.Store(time.Now())
|
|
}
|
|
|
|
func (hc *HealthCheck) start() {
|
|
hc.started.Store(true)
|
|
}
|
|
|
|
func (hc *HealthCheck) stop() {
|
|
hc.started.Store(false)
|
|
}
|
|
|
|
func (hc *HealthCheck) check() {
|
|
if len(hc.proxies) == 0 {
|
|
return
|
|
}
|
|
|
|
_, _, _ = hc.singleDo.Do(func() (struct{}, error) {
|
|
id := utils.NewUUIDV4().String()
|
|
log.Debugln("Start New Health Checking {%s}", id)
|
|
b := new(errgroup.Group)
|
|
b.SetLimit(10)
|
|
|
|
// execute default health check
|
|
option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus}
|
|
hc.execute(b, hc.url, id, option)
|
|
|
|
// execute extra health check
|
|
if len(hc.extra) != 0 {
|
|
for url, option := range hc.extra {
|
|
hc.execute(b, url, id, option)
|
|
}
|
|
}
|
|
_ = b.Wait()
|
|
log.Debugln("Finish A Health Checking {%s}", id)
|
|
return struct{}{}, nil
|
|
})
|
|
}
|
|
|
|
func (hc *HealthCheck) execute(b *errgroup.Group, url, uid string, option *extraOption) {
|
|
url = strings.TrimSpace(url)
|
|
if len(url) == 0 {
|
|
log.Debugln("Health Check has been skipped due to testUrl is empty, {%s}", uid)
|
|
return
|
|
}
|
|
|
|
var filterReg *regexp2.Regexp
|
|
var expectedStatus utils.IntRanges[uint16]
|
|
if option != nil {
|
|
expectedStatus = option.expectedStatus
|
|
if len(option.filters) != 0 {
|
|
filters := make([]string, 0, len(option.filters))
|
|
for filter := range option.filters {
|
|
filters = append(filters, filter)
|
|
}
|
|
|
|
filterReg = regexp2.MustCompile(strings.Join(filters, "|"), regexp2.None)
|
|
}
|
|
}
|
|
|
|
for _, proxy := range hc.proxies {
|
|
// skip proxies that do not require health check
|
|
if filterReg != nil {
|
|
if match, _ := filterReg.MatchString(proxy.Name()); !match {
|
|
continue
|
|
}
|
|
}
|
|
|
|
p := proxy
|
|
b.Go(func() error {
|
|
ctx, cancel := context.WithTimeout(hc.ctx, hc.timeout)
|
|
defer cancel()
|
|
log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid)
|
|
_, _ = p.URLTest(ctx, url, expectedStatus)
|
|
log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid)
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
func (hc *HealthCheck) close() {
|
|
hc.ctxCancel()
|
|
}
|
|
|
|
func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck {
|
|
if url == "" {
|
|
expectedStatus = nil
|
|
interval = 0
|
|
}
|
|
if timeout == 0 {
|
|
timeout = 5000
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &HealthCheck{
|
|
ctx: ctx,
|
|
ctxCancel: cancel,
|
|
proxies: proxies,
|
|
url: url,
|
|
timeout: time.Duration(timeout) * time.Millisecond,
|
|
extra: map[string]*extraOption{},
|
|
interval: time.Duration(interval) * time.Second,
|
|
lazy: lazy,
|
|
expectedStatus: expectedStatus,
|
|
singleDo: singledo.NewSingle[struct{}](time.Second),
|
|
}
|
|
}
|