From c2af1cd9e347abbe7fa7ef52ca21df3230abbfe1 Mon Sep 17 00:00:00 2001 From: Anton Tolchanov Date: Wed, 29 Jan 2025 15:35:37 +0000 Subject: [PATCH] prober: support multiple probes running concurrently Some probes might need to run for longer than their scheduling interval, so this change relaxes the 1-at-a-time restriction, allowing us to configure probe concurrency and timeout separately. The default values remain the same (concurrency of 1; timeout of 80% of interval). Updates tailscale/corp#25479 Signed-off-by: Anton Tolchanov --- prober/prober.go | 48 ++++++++++++++++++++--------- prober/prober_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 15 deletions(-) diff --git a/prober/prober.go b/prober/prober.go index e3860e7b9..d80db773a 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -7,6 +7,7 @@ package prober import ( + "cmp" "container/ring" "context" "encoding/json" @@ -20,6 +21,7 @@ "time" "github.com/prometheus/client_golang/prometheus" + "tailscale.com/syncs" "tailscale.com/tsweb" ) @@ -44,6 +46,14 @@ type ProbeClass struct { // exposed by this probe class. Labels Labels + // Timeout is the maximum time the probe function is allowed to run before + // its context is cancelled. Defaults to 80% of the scheduling interval. + Timeout time.Duration + + // Concurrency is the maximum number of concurrent probe executions + // allowed for this probe class. Defaults to 1. + Concurrency int + // Metrics allows a probe class to export custom Metrics. Can be nil. Metrics func(prometheus.Labels) []prometheus.Metric } @@ -131,9 +141,12 @@ func newProbe(p *Prober, name string, interval time.Duration, l prometheus.Label cancel: cancel, stopped: make(chan struct{}), + runSema: syncs.NewSemaphore(cmp.Or(pc.Concurrency, 1)), + name: name, probeClass: pc, interval: interval, + timeout: cmp.Or(pc.Timeout, time.Duration(float64(interval)*0.8)), initialDelay: initialDelay(name, interval), successHist: ring.New(recentHistSize), latencyHist: ring.New(recentHistSize), @@ -226,11 +239,12 @@ type Probe struct { ctx context.Context cancel context.CancelFunc // run to initiate shutdown stopped chan struct{} // closed when shutdown is complete - runMu sync.Mutex // ensures only one probe runs at a time + runSema syncs.Semaphore // restricts concurrency per probe name string probeClass ProbeClass interval time.Duration + timeout time.Duration initialDelay time.Duration tick ticker @@ -282,17 +296,15 @@ func (p *Probe) loop() { t := p.prober.newTicker(p.initialDelay) select { case <-t.Chan(): - p.run() case <-p.ctx.Done(): t.Stop() return } t.Stop() - } else { - p.run() } if p.prober.once { + p.run() return } @@ -315,9 +327,12 @@ func (p *Probe) loop() { p.tick = p.prober.newTicker(p.interval) defer p.tick.Stop() for { + // Run the probe in a new goroutine every tick. Default concurrency & timeout + // settings will ensure that only one probe is running at a time. + go p.run() + select { case <-p.tick.Chan(): - p.run() case <-p.ctx.Done(): return } @@ -331,8 +346,13 @@ func (p *Probe) loop() { // that the probe either succeeds or fails before the next cycle is scheduled to // start. func (p *Probe) run() (pi ProbeInfo, err error) { - p.runMu.Lock() - defer p.runMu.Unlock() + // Probes are scheduled each p.interval, so we don't wait longer than that. + semaCtx, cancel := context.WithTimeout(p.ctx, p.interval) + defer cancel() + if !p.runSema.AcquireContext(semaCtx) { + return pi, fmt.Errorf("probe %s: context cancelled", p.name) + } + defer p.runSema.Release() p.recordStart() defer func() { @@ -344,19 +364,21 @@ func (p *Probe) run() (pi ProbeInfo, err error) { if r := recover(); r != nil { log.Printf("probe %s panicked: %v", p.name, r) err = fmt.Errorf("panic: %v", r) - p.recordEnd(err) + p.recordEndLocked(err) } }() ctx := p.ctx if !p.IsContinuous() { - timeout := time.Duration(float64(p.interval) * 0.8) var cancel func() - ctx, cancel = context.WithTimeout(ctx, timeout) + ctx, cancel = context.WithTimeout(ctx, p.timeout) defer cancel() } err = p.probeClass.Probe(ctx) - p.recordEnd(err) + + p.mu.Lock() + defer p.mu.Unlock() + p.recordEndLocked(err) if err != nil { log.Printf("probe %s: %v", p.name, err) } @@ -370,10 +392,8 @@ func (p *Probe) recordStart() { p.mu.Unlock() } -func (p *Probe) recordEnd(err error) { +func (p *Probe) recordEndLocked(err error) { end := p.prober.now() - p.mu.Lock() - defer p.mu.Unlock() p.end = end p.succeeded = err == nil p.lastErr = err diff --git a/prober/prober_test.go b/prober/prober_test.go index 3905bfbc9..109953b65 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -149,6 +149,74 @@ func TestProberTimingSpread(t *testing.T) { notCalled() } +func TestProberTimeout(t *testing.T) { + clk := newFakeTime() + p := newForTest(clk.Now, clk.NewTicker) + + var done sync.WaitGroup + done.Add(1) + pfunc := FuncProbe(func(ctx context.Context) error { + defer done.Done() + select { + case <-ctx.Done(): + return ctx.Err() + } + }) + pfunc.Timeout = time.Microsecond + probe := p.Run("foo", 30*time.Second, nil, pfunc) + waitActiveProbes(t, p, clk, 1) + done.Wait() + probe.mu.Lock() + info := probe.probeInfoLocked() + probe.mu.Unlock() + wantInfo := ProbeInfo{ + Name: "foo", + Interval: 30 * time.Second, + Labels: map[string]string{"class": "", "name": "foo"}, + Status: ProbeStatusFailed, + Error: "context deadline exceeded", + RecentResults: []bool{false}, + RecentLatencies: nil, + } + if diff := cmp.Diff(wantInfo, info, cmpopts.IgnoreFields(ProbeInfo{}, "Start", "End", "Latency")); diff != "" { + t.Fatalf("unexpected ProbeInfo (-want +got):\n%s", diff) + } + if got := info.Latency; got > time.Second { + t.Errorf("info.Latency = %v, want at most 1s", got) + } +} + +func TestProberConcurrency(t *testing.T) { + clk := newFakeTime() + p := newForTest(clk.Now, clk.NewTicker) + + var ran atomic.Int64 + stopProbe := make(chan struct{}) + pfunc := FuncProbe(func(ctx context.Context) error { + ran.Add(1) + <-stopProbe + return nil + }) + pfunc.Timeout = time.Hour + pfunc.Concurrency = 3 + p.Run("foo", time.Second, nil, pfunc) + waitActiveProbes(t, p, clk, 1) + + for range 50 { + clk.Advance(time.Second) + } + + if err := tstest.WaitFor(convergenceTimeout, func() error { + if got, want := ran.Load(), int64(3); got != want { + return fmt.Errorf("expected %d probes to run concurrently, got %d", want, got) + } + return nil + }); err != nil { + t.Fatal(err) + } + close(stopProbe) +} + func TestProberRun(t *testing.T) { clk := newFakeTime() p := newForTest(clk.Now, clk.NewTicker) @@ -450,9 +518,11 @@ type probeResult struct { for _, r := range tt.results { probe.recordStart() clk.Advance(r.latency) - probe.recordEnd(r.err) + probe.recordEndLocked(r.err) } + probe.mu.Lock() info := probe.probeInfoLocked() + probe.mu.Unlock() if diff := cmp.Diff(tt.wantProbeInfo, info, cmpopts.IgnoreFields(ProbeInfo{}, "Start", "End", "Interval")); diff != "" { t.Fatalf("unexpected ProbeInfo (-want +got):\n%s", diff) }