mirror of
https://gitclone.com/github.com/MetaCubeX/Clash.Meta
synced 2025-05-14 22:18:02 +08:00
chore: fetcher will change duration to achieve fast retry when the update failed with a 2x factor step from 1s to interval
This commit is contained in:
parent
f328203bc1
commit
cad26ac6a8
@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/metacubex/mihomo/common/utils"
|
"github.com/metacubex/mihomo/common/utils"
|
||||||
|
"github.com/metacubex/mihomo/component/slowdown"
|
||||||
types "github.com/metacubex/mihomo/constant/provider"
|
types "github.com/metacubex/mihomo/constant/provider"
|
||||||
"github.com/metacubex/mihomo/log"
|
"github.com/metacubex/mihomo/log"
|
||||||
|
|
||||||
@ -29,6 +30,7 @@ type Fetcher[V any] struct {
|
|||||||
onUpdate func(V)
|
onUpdate func(V)
|
||||||
watcher *fswatch.Watcher
|
watcher *fswatch.Watcher
|
||||||
loadBufMutex sync.Mutex
|
loadBufMutex sync.Mutex
|
||||||
|
backoff slowdown.Backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Fetcher[V]) Name() string {
|
func (f *Fetcher[V]) Name() string {
|
||||||
@ -83,6 +85,7 @@ func (f *Fetcher[V]) Initial() (V, error) {
|
|||||||
func (f *Fetcher[V]) Update() (V, bool, error) {
|
func (f *Fetcher[V]) Update() (V, bool, error) {
|
||||||
buf, hash, err := f.vehicle.Read(f.ctx, f.hash)
|
buf, hash, err := f.vehicle.Read(f.ctx, f.hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
f.backoff.AddAttempt() // add a failed attempt to backoff
|
||||||
return lo.Empty[V](), false, err
|
return lo.Empty[V](), false, err
|
||||||
}
|
}
|
||||||
return f.loadBuf(buf, hash, f.vehicle.Type() != types.File)
|
return f.loadBuf(buf, hash, f.vehicle.Type() != types.File)
|
||||||
@ -111,8 +114,10 @@ func (f *Fetcher[V]) loadBuf(buf []byte, hash utils.HashType, updateFile bool) (
|
|||||||
|
|
||||||
contents, err := f.parser(buf)
|
contents, err := f.parser(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
f.backoff.AddAttempt() // add a failed attempt to backoff
|
||||||
return lo.Empty[V](), false, err
|
return lo.Empty[V](), false, err
|
||||||
}
|
}
|
||||||
|
f.backoff.Reset() // no error, reset backoff
|
||||||
|
|
||||||
if updateFile {
|
if updateFile {
|
||||||
if err = f.vehicle.Write(buf); err != nil {
|
if err = f.vehicle.Write(buf); err != nil {
|
||||||
@ -147,14 +152,25 @@ func (f *Fetcher[V]) pullLoop(forceUpdate bool) {
|
|||||||
log.Warnln("[Provider] %s not updated for a long time, force refresh", f.Name())
|
log.Warnln("[Provider] %s not updated for a long time, force refresh", f.Name())
|
||||||
f.updateWithLog()
|
f.updateWithLog()
|
||||||
}
|
}
|
||||||
|
if attempt := f.backoff.Attempt(); attempt > 0 { // f.Update() was failed, decrease the interval from backoff to achieve fast retry
|
||||||
|
if duration := f.backoff.ForAttempt(attempt); duration < initialInterval {
|
||||||
|
initialInterval = duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
timer := time.NewTimer(initialInterval)
|
timer := time.NewTimer(initialInterval)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
timer.Reset(f.interval)
|
|
||||||
f.updateWithLog()
|
f.updateWithLog()
|
||||||
|
interval := f.interval
|
||||||
|
if attempt := f.backoff.Attempt(); attempt > 0 { // f.Update() was failed, decrease the interval from backoff to achieve fast retry
|
||||||
|
if duration := f.backoff.ForAttempt(attempt); duration < interval {
|
||||||
|
interval = duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
timer.Reset(interval)
|
||||||
case <-f.ctx.Done():
|
case <-f.ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -212,5 +228,11 @@ func NewFetcher[V any](name string, interval time.Duration, vehicle types.Vehicl
|
|||||||
parser: parser,
|
parser: parser,
|
||||||
onUpdate: onUpdate,
|
onUpdate: onUpdate,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
|
backoff: slowdown.Backoff{
|
||||||
|
Factor: 2,
|
||||||
|
Jitter: false,
|
||||||
|
Min: time.Second,
|
||||||
|
Max: interval,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,9 +4,10 @@ package slowdown
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/metacubex/randv2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Backoff is a time.Duration counter, starting at Min. After every call to
|
// Backoff is a time.Duration counter, starting at Min. After every call to
|
||||||
@ -63,7 +64,7 @@ func (b *Backoff) ForAttempt(attempt float64) time.Duration {
|
|||||||
minf := float64(min)
|
minf := float64(min)
|
||||||
durf := minf * math.Pow(factor, attempt)
|
durf := minf * math.Pow(factor, attempt)
|
||||||
if b.Jitter {
|
if b.Jitter {
|
||||||
durf = rand.Float64()*(durf-minf) + minf
|
durf = randv2.Float64()*(durf-minf) + minf
|
||||||
}
|
}
|
||||||
//ensure float64 wont overflow int64
|
//ensure float64 wont overflow int64
|
||||||
if durf > maxInt64 {
|
if durf > maxInt64 {
|
||||||
@ -90,6 +91,11 @@ func (b *Backoff) Attempt() float64 {
|
|||||||
return float64(b.attempt.Load())
|
return float64(b.attempt.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddAttempt adds one to the attempt counter.
|
||||||
|
func (b *Backoff) AddAttempt() {
|
||||||
|
b.attempt.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
// Copy returns a backoff with equals constraints as the original
|
// Copy returns a backoff with equals constraints as the original
|
||||||
func (b *Backoff) Copy() *Backoff {
|
func (b *Backoff) Copy() *Backoff {
|
||||||
return &Backoff{
|
return &Backoff{
|
||||||
|
Loading…
Reference in New Issue
Block a user