prober: export probe class and metrics from bandwidth prober
- Wrap each prober function into a probe class that allows associating metric labels and custom metrics with a given probe; - Make sure all existing probe classes set a `class` metric label; - Move bandwidth probe size from being a metric label to a separate gauge metric; this will make it possible to use it to calculate average used bandwidth using a PromQL query; - Also export transfer time for the bandwidth prober (more accurate than the total probe time, since it excludes connection establishment time). Updates tailscale/corp#17912 Signed-off-by: Anton Tolchanov <anton@tailscale.com>
This commit is contained in:
parent
21671ca374
commit
5336362e64
106
prober/derp.go
106
prober/derp.go
@ -10,9 +10,9 @@
|
|||||||
crand "crypto/rand"
|
crand "crypto/rand"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"maps"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -20,6 +20,7 @@
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"tailscale.com/derp"
|
"tailscale.com/derp"
|
||||||
"tailscale.com/derp/derphttp"
|
"tailscale.com/derp/derphttp"
|
||||||
"tailscale.com/net/stun"
|
"tailscale.com/net/stun"
|
||||||
@ -42,11 +43,14 @@ type derpProber struct {
|
|||||||
bwInterval time.Duration
|
bwInterval time.Duration
|
||||||
bwProbeSize int64
|
bwProbeSize int64
|
||||||
|
|
||||||
// Probe functions that can be overridden for testing.
|
// Probe class for fetching & updating the DERP map.
|
||||||
tlsProbeFn func(string) ProbeFunc
|
ProbeMap ProbeClass
|
||||||
udpProbeFn func(string, int) ProbeFunc
|
|
||||||
meshProbeFn func(string, string) ProbeFunc
|
// Probe classes for probing individual derpers.
|
||||||
bwProbeFn func(string, string, int64) ProbeFunc
|
tlsProbeFn func(string) ProbeClass
|
||||||
|
udpProbeFn func(string, int) ProbeClass
|
||||||
|
meshProbeFn func(string, string) ProbeClass
|
||||||
|
bwProbeFn func(string, string, int64) ProbeClass
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
lastDERPMap *tailcfg.DERPMap
|
lastDERPMap *tailcfg.DERPMap
|
||||||
@ -100,6 +104,10 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) {
|
|||||||
nodes: make(map[string]*tailcfg.DERPNode),
|
nodes: make(map[string]*tailcfg.DERPNode),
|
||||||
probes: make(map[string]*Probe),
|
probes: make(map[string]*Probe),
|
||||||
}
|
}
|
||||||
|
d.ProbeMap = ProbeClass{
|
||||||
|
Probe: d.probeMapFn,
|
||||||
|
Class: "derp_map",
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(d)
|
o(d)
|
||||||
}
|
}
|
||||||
@ -109,10 +117,10 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) {
|
|||||||
return d, nil
|
return d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProbeMap fetches the DERPMap and creates/destroys probes for each
|
// probeMapFn fetches the DERPMap and creates/destroys probes for each
|
||||||
// DERP server as necessary. It should get regularly executed as a
|
// DERP server as necessary. It should get regularly executed as a
|
||||||
// probe function itself.
|
// probe function itself.
|
||||||
func (d *derpProber) ProbeMap(ctx context.Context) error {
|
func (d *derpProber) probeMapFn(ctx context.Context) error {
|
||||||
if err := d.updateMap(ctx); err != nil {
|
if err := d.updateMap(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -123,7 +131,7 @@ func (d *derpProber) ProbeMap(ctx context.Context) error {
|
|||||||
|
|
||||||
for _, region := range d.lastDERPMap.Regions {
|
for _, region := range d.lastDERPMap.Regions {
|
||||||
for _, server := range region.Nodes {
|
for _, server := range region.Nodes {
|
||||||
labels := map[string]string{
|
labels := Labels{
|
||||||
"region": region.RegionCode,
|
"region": region.RegionCode,
|
||||||
"region_id": strconv.Itoa(region.RegionID),
|
"region_id": strconv.Itoa(region.RegionID),
|
||||||
"hostname": server.HostName,
|
"hostname": server.HostName,
|
||||||
@ -169,18 +177,11 @@ func (d *derpProber) ProbeMap(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if d.bwInterval > 0 && d.bwProbeSize > 0 {
|
if d.bwInterval > 0 && d.bwProbeSize > 0 {
|
||||||
bwLabels := maps.Clone(labels)
|
|
||||||
bwLabels["probe_size_bytes"] = fmt.Sprintf("%d", d.bwProbeSize)
|
|
||||||
if server.Name == to.Name {
|
|
||||||
bwLabels["derp_path"] = "single"
|
|
||||||
} else {
|
|
||||||
bwLabels["derp_path"] = "mesh"
|
|
||||||
}
|
|
||||||
n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name)
|
n := fmt.Sprintf("derp/%s/%s/%s/bw", region.RegionCode, server.Name, to.Name)
|
||||||
wantProbes[n] = true
|
wantProbes[n] = true
|
||||||
if d.probes[n] == nil {
|
if d.probes[n] == nil {
|
||||||
log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval)
|
log.Printf("adding DERP bandwidth probe for %s->%s (%s) %v bytes every %v", server.Name, to.Name, region.RegionName, d.bwProbeSize, d.bwInterval)
|
||||||
d.probes[n] = d.p.Run(n, d.bwInterval, bwLabels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize))
|
d.probes[n] = d.p.Run(n, d.bwInterval, labels, d.bwProbeFn(server.Name, to.Name, d.bwProbeSize))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -198,32 +199,55 @@ func (d *derpProber) ProbeMap(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// probeMesh returs a probe func that sends a test packet through a pair of DERP
|
// probeMesh returs a probe class that sends a test packet through a pair of DERP
|
||||||
// servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to'
|
// servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to'
|
||||||
// are expected to be names (DERPNode.Name) of two DERP servers in the same region.
|
// are expected to be names (DERPNode.Name) of two DERP servers in the same region.
|
||||||
func (d *derpProber) probeMesh(from, to string) ProbeFunc {
|
func (d *derpProber) probeMesh(from, to string) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
derpPath := "mesh"
|
||||||
fromN, toN, err := d.getNodePair(from, to)
|
if from == to {
|
||||||
if err != nil {
|
derpPath = "single"
|
||||||
return err
|
}
|
||||||
}
|
return ProbeClass{
|
||||||
|
Probe: func(ctx context.Context) error {
|
||||||
|
fromN, toN, err := d.getNodePair(from, to)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
dm := d.lastDERPMap
|
dm := d.lastDERPMap
|
||||||
return derpProbeNodePair(ctx, dm, fromN, toN)
|
return derpProbeNodePair(ctx, dm, fromN, toN)
|
||||||
|
},
|
||||||
|
Class: "derp_mesh",
|
||||||
|
Labels: Labels{"derp_path": derpPath},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// probeBandwidth returs a probe func that sends a payload of a given size
|
// probeBandwidth returs a probe class that sends a payload of a given size
|
||||||
// through a pair of DERP servers (or just one server, if 'from' and 'to' are
|
// through a pair of DERP servers (or just one server, if 'from' and 'to' are
|
||||||
// the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two
|
// the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two
|
||||||
// DERP servers in the same region.
|
// DERP servers in the same region.
|
||||||
func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeFunc {
|
func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
derpPath := "mesh"
|
||||||
fromN, toN, err := d.getNodePair(from, to)
|
if from == to {
|
||||||
if err != nil {
|
derpPath = "single"
|
||||||
return err
|
}
|
||||||
}
|
var transferTime expvar.Float
|
||||||
return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size)
|
return ProbeClass{
|
||||||
|
Probe: func(ctx context.Context) error {
|
||||||
|
fromN, toN, err := d.getNodePair(from, to)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return derpProbeBandwidth(ctx, d.lastDERPMap, fromN, toN, size, &transferTime)
|
||||||
|
},
|
||||||
|
Class: "derp_bw",
|
||||||
|
Labels: Labels{"derp_path": derpPath},
|
||||||
|
Metrics: func(l prometheus.Labels) []prometheus.Metric {
|
||||||
|
return []prometheus.Metric{
|
||||||
|
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_probe_size_bytes", "Payload size of the bandwidth prober", nil, l), prometheus.GaugeValue, float64(size)),
|
||||||
|
prometheus.MustNewConstMetric(prometheus.NewDesc("derp_bw_transfer_time_seconds_total", "Time it took to transfer data", nil, l), prometheus.CounterValue, transferTime.Value()),
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -289,9 +313,12 @@ func (d *derpProber) updateMap(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeFunc {
|
func (d *derpProber) ProbeUDP(ipaddr string, port int) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
return ProbeClass{
|
||||||
return derpProbeUDP(ctx, ipaddr, port)
|
Probe: func(ctx context.Context) error {
|
||||||
|
return derpProbeUDP(ctx, ipaddr, port)
|
||||||
|
},
|
||||||
|
Class: "derp_udp",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -347,7 +374,7 @@ func derpProbeUDP(ctx context.Context, ipStr string, port int) error {
|
|||||||
|
|
||||||
// derpProbeBandwidth sends a payload of a given size between two local
|
// derpProbeBandwidth sends a payload of a given size between two local
|
||||||
// DERP clients connected to two DERP servers.
|
// DERP clients connected to two DERP servers.
|
||||||
func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64) (err error) {
|
func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tailcfg.DERPNode, size int64, transferTime *expvar.Float) (err error) {
|
||||||
// This probe uses clients with isProber=false to avoid spamming the derper logs with every packet
|
// This probe uses clients with isProber=false to avoid spamming the derper logs with every packet
|
||||||
// sent by the bandwidth probe.
|
// sent by the bandwidth probe.
|
||||||
fromc, err := newConn(ctx, dm, from, false)
|
fromc, err := newConn(ctx, dm, from, false)
|
||||||
@ -368,6 +395,9 @@ func derpProbeBandwidth(ctx context.Context, dm *tailcfg.DERPMap, from, to *tail
|
|||||||
time.Sleep(100 * time.Millisecond) // pretty arbitrary
|
time.Sleep(100 * time.Millisecond) // pretty arbitrary
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer func() { transferTime.Add(time.Since(start).Seconds()) }()
|
||||||
|
|
||||||
if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, size); err != nil {
|
if err := runDerpProbeNodePair(ctx, from, to, fromc, toc, size); err != nil {
|
||||||
// Record pubkeys on failed probes to aid investigation.
|
// Record pubkeys on failed probes to aid investigation.
|
||||||
return fmt.Errorf("%s -> %s: %w",
|
return fmt.Errorf("%s -> %s: %w",
|
||||||
|
@ -60,16 +60,16 @@ func TestDerpProber(t *testing.T) {
|
|||||||
p: p,
|
p: p,
|
||||||
derpMapURL: srv.URL,
|
derpMapURL: srv.URL,
|
||||||
tlsInterval: time.Second,
|
tlsInterval: time.Second,
|
||||||
tlsProbeFn: func(_ string) ProbeFunc { return func(context.Context) error { return nil } },
|
tlsProbeFn: func(_ string) ProbeClass { return FuncProbe(func(context.Context) error { return nil }) },
|
||||||
udpInterval: time.Second,
|
udpInterval: time.Second,
|
||||||
udpProbeFn: func(_ string, _ int) ProbeFunc { return func(context.Context) error { return nil } },
|
udpProbeFn: func(_ string, _ int) ProbeClass { return FuncProbe(func(context.Context) error { return nil }) },
|
||||||
meshInterval: time.Second,
|
meshInterval: time.Second,
|
||||||
meshProbeFn: func(_, _ string) ProbeFunc { return func(context.Context) error { return nil } },
|
meshProbeFn: func(_, _ string) ProbeClass { return FuncProbe(func(context.Context) error { return nil }) },
|
||||||
nodes: make(map[string]*tailcfg.DERPNode),
|
nodes: make(map[string]*tailcfg.DERPNode),
|
||||||
probes: make(map[string]*Probe),
|
probes: make(map[string]*Probe),
|
||||||
}
|
}
|
||||||
if err := dp.ProbeMap(context.Background()); err != nil {
|
if err := dp.probeMapFn(context.Background()); err != nil {
|
||||||
t.Errorf("unexpected ProbeMap() error: %s", err)
|
t.Errorf("unexpected probeMapFn() error: %s", err)
|
||||||
}
|
}
|
||||||
if len(dp.nodes) != 2 || dp.nodes["n1"] == nil || dp.nodes["n2"] == nil {
|
if len(dp.nodes) != 2 || dp.nodes["n1"] == nil || dp.nodes["n2"] == nil {
|
||||||
t.Errorf("unexpected nodes: %+v", dp.nodes)
|
t.Errorf("unexpected nodes: %+v", dp.nodes)
|
||||||
@ -89,8 +89,8 @@ func TestDerpProber(t *testing.T) {
|
|||||||
IPv4: "1.1.1.1",
|
IPv4: "1.1.1.1",
|
||||||
IPv6: "::1",
|
IPv6: "::1",
|
||||||
})
|
})
|
||||||
if err := dp.ProbeMap(context.Background()); err != nil {
|
if err := dp.probeMapFn(context.Background()); err != nil {
|
||||||
t.Errorf("unexpected ProbeMap() error: %s", err)
|
t.Errorf("unexpected probeMapFn() error: %s", err)
|
||||||
}
|
}
|
||||||
if len(dp.nodes) != 3 {
|
if len(dp.nodes) != 3 {
|
||||||
t.Errorf("unexpected nodes: %+v", dp.nodes)
|
t.Errorf("unexpected nodes: %+v", dp.nodes)
|
||||||
@ -102,8 +102,8 @@ func TestDerpProber(t *testing.T) {
|
|||||||
|
|
||||||
// Remove 2 nodes and check that probes have been destroyed.
|
// Remove 2 nodes and check that probes have been destroyed.
|
||||||
dm.Regions[0].Nodes = dm.Regions[0].Nodes[:1]
|
dm.Regions[0].Nodes = dm.Regions[0].Nodes[:1]
|
||||||
if err := dp.ProbeMap(context.Background()); err != nil {
|
if err := dp.probeMapFn(context.Background()); err != nil {
|
||||||
t.Errorf("unexpected ProbeMap() error: %s", err)
|
t.Errorf("unexpected probeMapFn() error: %s", err)
|
||||||
}
|
}
|
||||||
if len(dp.nodes) != 1 {
|
if len(dp.nodes) != 1 {
|
||||||
t.Errorf("unexpected nodes: %+v", dp.nodes)
|
t.Errorf("unexpected nodes: %+v", dp.nodes)
|
||||||
|
@ -35,8 +35,12 @@ type ForEachAddrOpts struct {
|
|||||||
// every time a new IP is discovered. The Probes returned will be closed if an
|
// every time a new IP is discovered. The Probes returned will be closed if an
|
||||||
// IP address is no longer in the DNS record for the given hostname. This can
|
// IP address is no longer in the DNS record for the given hostname. This can
|
||||||
// be used to healthcheck every IP address that a hostname resolves to.
|
// be used to healthcheck every IP address that a hostname resolves to.
|
||||||
func ForEachAddr(host string, makeProbes func(netip.Addr) []*Probe, opts ForEachAddrOpts) ProbeFunc {
|
func ForEachAddr(host string, makeProbes func(netip.Addr) []*Probe, opts ForEachAddrOpts) ProbeClass {
|
||||||
return makeForEachAddr(host, makeProbes, opts).run
|
feap := makeForEachAddr(host, makeProbes, opts)
|
||||||
|
return ProbeClass{
|
||||||
|
Probe: feap.run,
|
||||||
|
Class: "dns_each_addr",
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeForEachAddr(host string, makeProbes func(netip.Addr) []*Probe, opts ForEachAddrOpts) *forEachAddrProbe {
|
func makeForEachAddr(host string, makeProbes func(netip.Addr) []*Probe, opts ForEachAddrOpts) *forEachAddrProbe {
|
||||||
|
@ -89,11 +89,13 @@ func ExampleForEachAddr() {
|
|||||||
<-sigCh
|
<-sigCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func probeLogWrapper(logf logger.Logf, pf prober.ProbeFunc) prober.ProbeFunc {
|
func probeLogWrapper(logf logger.Logf, pc prober.ProbeClass) prober.ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
return prober.ProbeClass{
|
||||||
logf("starting probe")
|
Probe: func(ctx context.Context) error {
|
||||||
err := pf(ctx)
|
logf("starting probe")
|
||||||
logf("probe finished with %v", err)
|
err := pc.Probe(ctx)
|
||||||
return err
|
logf("probe finished with %v", err)
|
||||||
|
return err
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,9 +57,9 @@ func TestForEachAddr(t *testing.T) {
|
|||||||
registered = append(registered, addr)
|
registered = append(registered, addr)
|
||||||
|
|
||||||
// Return a probe that does nothing; we don't care about what this does.
|
// Return a probe that does nothing; we don't care about what this does.
|
||||||
probe := p.Run(fmt.Sprintf("website/%s", addr), probeInterval, nil, func(_ context.Context) error {
|
probe := p.Run(fmt.Sprintf("website/%s", addr), probeInterval, nil, FuncProbe(func(_ context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
return []*Probe{probe}
|
return []*Probe{probe}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,14 +13,17 @@
|
|||||||
|
|
||||||
const maxHTTPBody = 4 << 20 // MiB
|
const maxHTTPBody = 4 << 20 // MiB
|
||||||
|
|
||||||
// HTTP returns a Probe that healthchecks an HTTP URL.
|
// HTTP returns a ProbeClass that healthchecks an HTTP URL.
|
||||||
//
|
//
|
||||||
// The ProbeFunc sends a GET request for url, expects an HTTP 200
|
// The probe function sends a GET request for url, expects an HTTP 200
|
||||||
// response, and verifies that want is present in the response
|
// response, and verifies that want is present in the response
|
||||||
// body.
|
// body.
|
||||||
func HTTP(url, wantText string) ProbeFunc {
|
func HTTP(url, wantText string) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
return ProbeClass{
|
||||||
return probeHTTP(ctx, url, []byte(wantText))
|
Probe: func(ctx context.Context) error {
|
||||||
|
return probeHTTP(ctx, url, []byte(wantText))
|
||||||
|
},
|
||||||
|
Class: "http",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"log"
|
"log"
|
||||||
|
"maps"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -19,10 +20,33 @@
|
|||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProbeFunc is a function that probes something and reports whether
|
// ProbeClass defines a probe of a specific type: a probing function that will
|
||||||
// the probe succeeded. The provided context's deadline must be obeyed
|
// be regularly ran, and metric labels that will be added automatically to all
|
||||||
// for correct probe scheduling.
|
// probes using this class.
|
||||||
type ProbeFunc func(context.Context) error
|
type ProbeClass struct {
|
||||||
|
// Probe is a function that probes something and reports whether the Probe
|
||||||
|
// succeeded. The provided context's deadline must be obeyed for correct
|
||||||
|
// Probe scheduling.
|
||||||
|
Probe func(context.Context) error
|
||||||
|
|
||||||
|
// Class defines a user-facing name of the probe class that will be used
|
||||||
|
// in the `class` metric label.
|
||||||
|
Class string
|
||||||
|
|
||||||
|
// Labels defines a set of metric labels that will be added to all metrics
|
||||||
|
// exposed by this probe class.
|
||||||
|
Labels Labels
|
||||||
|
|
||||||
|
// Metrics allows a probe class to export custom Metrics. Can be nil.
|
||||||
|
Metrics func(prometheus.Labels) []prometheus.Metric
|
||||||
|
}
|
||||||
|
|
||||||
|
// FuncProbe wraps a simple probe function in a ProbeClass.
|
||||||
|
func FuncProbe(fn func(context.Context) error) ProbeClass {
|
||||||
|
return ProbeClass{
|
||||||
|
Probe: fn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// a Prober manages a set of probes and keeps track of their results.
|
// a Prober manages a set of probes and keeps track of their results.
|
||||||
type Prober struct {
|
type Prober struct {
|
||||||
@ -61,17 +85,23 @@ func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Pro
|
|||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run executes fun every interval, and exports probe results under probeName.
|
// Run executes probe class function every interval, and exports probe results under probeName.
|
||||||
//
|
//
|
||||||
// Registering a probe under an already-registered name panics.
|
// Registering a probe under an already-registered name panics.
|
||||||
func (p *Prober) Run(name string, interval time.Duration, labels map[string]string, fun ProbeFunc) *Probe {
|
func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc ProbeClass) *Probe {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
if _, ok := p.probes[name]; ok {
|
if _, ok := p.probes[name]; ok {
|
||||||
panic(fmt.Sprintf("probe named %q already registered", name))
|
panic(fmt.Sprintf("probe named %q already registered", name))
|
||||||
}
|
}
|
||||||
|
|
||||||
l := prometheus.Labels{"name": name}
|
l := prometheus.Labels{
|
||||||
|
"name": name,
|
||||||
|
"class": pc.Class,
|
||||||
|
}
|
||||||
|
for k, v := range pc.Labels {
|
||||||
|
l[k] = v
|
||||||
|
}
|
||||||
for k, v := range labels {
|
for k, v := range labels {
|
||||||
l[k] = v
|
l[k] = v
|
||||||
}
|
}
|
||||||
@ -84,10 +114,11 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri
|
|||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
|
|
||||||
name: name,
|
name: name,
|
||||||
doProbe: fun,
|
probeClass: pc,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
initialDelay: initialDelay(name, interval),
|
initialDelay: initialDelay(name, interval),
|
||||||
metrics: prometheus.NewRegistry(),
|
metrics: prometheus.NewRegistry(),
|
||||||
|
metricLabels: l,
|
||||||
mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l),
|
mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l),
|
||||||
mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, l),
|
mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, l),
|
||||||
mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, l),
|
mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, l),
|
||||||
@ -177,7 +208,7 @@ type Probe struct {
|
|||||||
stopped chan struct{} // closed when shutdown is complete
|
stopped chan struct{} // closed when shutdown is complete
|
||||||
|
|
||||||
name string
|
name string
|
||||||
doProbe ProbeFunc
|
probeClass ProbeClass
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
initialDelay time.Duration
|
initialDelay time.Duration
|
||||||
tick ticker
|
tick ticker
|
||||||
@ -185,14 +216,15 @@ type Probe struct {
|
|||||||
// metrics is a Prometheus metrics registry for metrics exported by this probe.
|
// metrics is a Prometheus metrics registry for metrics exported by this probe.
|
||||||
// Using a separate registry allows cleanly removing metrics exported by this
|
// Using a separate registry allows cleanly removing metrics exported by this
|
||||||
// probe when it gets unregistered.
|
// probe when it gets unregistered.
|
||||||
metrics *prometheus.Registry
|
metrics *prometheus.Registry
|
||||||
mInterval *prometheus.Desc
|
metricLabels prometheus.Labels
|
||||||
mStartTime *prometheus.Desc
|
mInterval *prometheus.Desc
|
||||||
mEndTime *prometheus.Desc
|
mStartTime *prometheus.Desc
|
||||||
mLatency *prometheus.Desc
|
mEndTime *prometheus.Desc
|
||||||
mResult *prometheus.Desc
|
mLatency *prometheus.Desc
|
||||||
mAttempts *prometheus.CounterVec
|
mResult *prometheus.Desc
|
||||||
mSeconds *prometheus.CounterVec
|
mAttempts *prometheus.CounterVec
|
||||||
|
mSeconds *prometheus.CounterVec
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
start time.Time // last time doProbe started
|
start time.Time // last time doProbe started
|
||||||
@ -268,7 +300,7 @@ func (p *Probe) run() {
|
|||||||
ctx, cancel := context.WithTimeout(p.ctx, timeout)
|
ctx, cancel := context.WithTimeout(p.ctx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
err := p.doProbe(ctx)
|
err := p.probeClass.Probe(ctx)
|
||||||
p.recordEnd(start, err)
|
p.recordEnd(start, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("probe %s: %v", p.name, err)
|
log.Printf("probe %s: %v", p.name, err)
|
||||||
@ -349,6 +381,11 @@ func (p *Probe) Describe(ch chan<- *prometheus.Desc) {
|
|||||||
ch <- p.mLatency
|
ch <- p.mLatency
|
||||||
p.mAttempts.Describe(ch)
|
p.mAttempts.Describe(ch)
|
||||||
p.mSeconds.Describe(ch)
|
p.mSeconds.Describe(ch)
|
||||||
|
if p.probeClass.Metrics != nil {
|
||||||
|
for _, m := range p.probeClass.Metrics(p.metricLabels) {
|
||||||
|
ch <- m.Desc()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect implements prometheus.Collector.
|
// Collect implements prometheus.Collector.
|
||||||
@ -373,6 +410,11 @@ func (p *Probe) Collect(ch chan<- prometheus.Metric) {
|
|||||||
}
|
}
|
||||||
p.mAttempts.Collect(ch)
|
p.mAttempts.Collect(ch)
|
||||||
p.mSeconds.Collect(ch)
|
p.mSeconds.Collect(ch)
|
||||||
|
if p.probeClass.Metrics != nil {
|
||||||
|
for _, m := range p.probeClass.Metrics(p.metricLabels) {
|
||||||
|
ch <- m
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ticker wraps a time.Ticker in a way that can be faked for tests.
|
// ticker wraps a time.Ticker in a way that can be faked for tests.
|
||||||
@ -401,3 +443,12 @@ func initialDelay(seed string, interval time.Duration) time.Duration {
|
|||||||
r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64()
|
r := rand.New(rand.NewSource(int64(h.Sum64()))).Float64()
|
||||||
return time.Duration(float64(interval) * r)
|
return time.Duration(float64(interval) * r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Labels is a set of metric labels used by a prober.
|
||||||
|
type Labels map[string]string
|
||||||
|
|
||||||
|
func (l Labels) With(k, v string) Labels {
|
||||||
|
new := maps.Clone(l)
|
||||||
|
new[k] = v
|
||||||
|
return new
|
||||||
|
}
|
||||||
|
@ -51,10 +51,10 @@ func TestProberTiming(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Run("test-probe", probeInterval, nil, func(context.Context) error {
|
p.Run("test-probe", probeInterval, nil, FuncProbe(func(context.Context) error {
|
||||||
invoked <- struct{}{}
|
invoked <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
|
|
||||||
waitActiveProbes(t, p, clk, 1)
|
waitActiveProbes(t, p, clk, 1)
|
||||||
|
|
||||||
@ -93,10 +93,10 @@ func TestProberTimingSpread(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
probe := p.Run("test-spread-probe", probeInterval, nil, func(context.Context) error {
|
probe := p.Run("test-spread-probe", probeInterval, nil, FuncProbe(func(context.Context) error {
|
||||||
invoked <- struct{}{}
|
invoked <- struct{}{}
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
|
|
||||||
waitActiveProbes(t, p, clk, 1)
|
waitActiveProbes(t, p, clk, 1)
|
||||||
|
|
||||||
@ -156,12 +156,12 @@ func TestProberRun(t *testing.T) {
|
|||||||
var probes []*Probe
|
var probes []*Probe
|
||||||
|
|
||||||
for i := 0; i < startingProbes; i++ {
|
for i := 0; i < startingProbes; i++ {
|
||||||
probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, nil, func(context.Context) error {
|
probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, nil, FuncProbe(func(context.Context) error {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
cnt++
|
cnt++
|
||||||
return nil
|
return nil
|
||||||
}))
|
})))
|
||||||
}
|
}
|
||||||
|
|
||||||
checkCnt := func(want int) {
|
checkCnt := func(want int) {
|
||||||
@ -207,13 +207,13 @@ func TestPrometheus(t *testing.T) {
|
|||||||
p := newForTest(clk.Now, clk.NewTicker).WithMetricNamespace("probe")
|
p := newForTest(clk.Now, clk.NewTicker).WithMetricNamespace("probe")
|
||||||
|
|
||||||
var succeed atomic.Bool
|
var succeed atomic.Bool
|
||||||
p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error {
|
p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, FuncProbe(func(context.Context) error {
|
||||||
clk.Advance(aFewMillis)
|
clk.Advance(aFewMillis)
|
||||||
if succeed.Load() {
|
if succeed.Load() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return errors.New("failing, as instructed by test")
|
return errors.New("failing, as instructed by test")
|
||||||
})
|
}))
|
||||||
|
|
||||||
waitActiveProbes(t, p, clk, 1)
|
waitActiveProbes(t, p, clk, 1)
|
||||||
|
|
||||||
@ -221,16 +221,16 @@ func TestPrometheus(t *testing.T) {
|
|||||||
want := fmt.Sprintf(`
|
want := fmt.Sprintf(`
|
||||||
# HELP probe_interval_secs Probe interval in seconds
|
# HELP probe_interval_secs Probe interval in seconds
|
||||||
# TYPE probe_interval_secs gauge
|
# TYPE probe_interval_secs gauge
|
||||||
probe_interval_secs{label="value",name="testprobe"} %f
|
probe_interval_secs{class="",label="value",name="testprobe"} %f
|
||||||
# HELP probe_start_secs Latest probe start time (seconds since epoch)
|
# HELP probe_start_secs Latest probe start time (seconds since epoch)
|
||||||
# TYPE probe_start_secs gauge
|
# TYPE probe_start_secs gauge
|
||||||
probe_start_secs{label="value",name="testprobe"} %d
|
probe_start_secs{class="",label="value",name="testprobe"} %d
|
||||||
# HELP probe_end_secs Latest probe end time (seconds since epoch)
|
# HELP probe_end_secs Latest probe end time (seconds since epoch)
|
||||||
# TYPE probe_end_secs gauge
|
# TYPE probe_end_secs gauge
|
||||||
probe_end_secs{label="value",name="testprobe"} %d
|
probe_end_secs{class="",label="value",name="testprobe"} %d
|
||||||
# HELP probe_result Latest probe result (1 = success, 0 = failure)
|
# HELP probe_result Latest probe result (1 = success, 0 = failure)
|
||||||
# TYPE probe_result gauge
|
# TYPE probe_result gauge
|
||||||
probe_result{label="value",name="testprobe"} 0
|
probe_result{class="",label="value",name="testprobe"} 0
|
||||||
`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix())
|
`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix())
|
||||||
return testutil.GatherAndCompare(p.metrics, strings.NewReader(want),
|
return testutil.GatherAndCompare(p.metrics, strings.NewReader(want),
|
||||||
"probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_result")
|
"probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_result")
|
||||||
@ -248,19 +248,19 @@ func TestPrometheus(t *testing.T) {
|
|||||||
want := fmt.Sprintf(`
|
want := fmt.Sprintf(`
|
||||||
# HELP probe_interval_secs Probe interval in seconds
|
# HELP probe_interval_secs Probe interval in seconds
|
||||||
# TYPE probe_interval_secs gauge
|
# TYPE probe_interval_secs gauge
|
||||||
probe_interval_secs{label="value",name="testprobe"} %f
|
probe_interval_secs{class="",label="value",name="testprobe"} %f
|
||||||
# HELP probe_start_secs Latest probe start time (seconds since epoch)
|
# HELP probe_start_secs Latest probe start time (seconds since epoch)
|
||||||
# TYPE probe_start_secs gauge
|
# TYPE probe_start_secs gauge
|
||||||
probe_start_secs{label="value",name="testprobe"} %d
|
probe_start_secs{class="",label="value",name="testprobe"} %d
|
||||||
# HELP probe_end_secs Latest probe end time (seconds since epoch)
|
# HELP probe_end_secs Latest probe end time (seconds since epoch)
|
||||||
# TYPE probe_end_secs gauge
|
# TYPE probe_end_secs gauge
|
||||||
probe_end_secs{label="value",name="testprobe"} %d
|
probe_end_secs{class="",label="value",name="testprobe"} %d
|
||||||
# HELP probe_latency_millis Latest probe latency (ms)
|
# HELP probe_latency_millis Latest probe latency (ms)
|
||||||
# TYPE probe_latency_millis gauge
|
# TYPE probe_latency_millis gauge
|
||||||
probe_latency_millis{label="value",name="testprobe"} %d
|
probe_latency_millis{class="",label="value",name="testprobe"} %d
|
||||||
# HELP probe_result Latest probe result (1 = success, 0 = failure)
|
# HELP probe_result Latest probe result (1 = success, 0 = failure)
|
||||||
# TYPE probe_result gauge
|
# TYPE probe_result gauge
|
||||||
probe_result{label="value",name="testprobe"} 1
|
probe_result{class="",label="value",name="testprobe"} 1
|
||||||
`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds())
|
`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds())
|
||||||
return testutil.GatherAndCompare(p.metrics, strings.NewReader(want),
|
return testutil.GatherAndCompare(p.metrics, strings.NewReader(want),
|
||||||
"probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_latency_millis", "probe_result")
|
"probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_latency_millis", "probe_result")
|
||||||
@ -274,14 +274,14 @@ func TestOnceMode(t *testing.T) {
|
|||||||
clk := newFakeTime()
|
clk := newFakeTime()
|
||||||
p := newForTest(clk.Now, clk.NewTicker).WithOnce(true)
|
p := newForTest(clk.Now, clk.NewTicker).WithOnce(true)
|
||||||
|
|
||||||
p.Run("probe1", probeInterval, nil, func(context.Context) error { return nil })
|
p.Run("probe1", probeInterval, nil, FuncProbe(func(context.Context) error { return nil }))
|
||||||
p.Run("probe2", probeInterval, nil, func(context.Context) error { return fmt.Errorf("error2") })
|
p.Run("probe2", probeInterval, nil, FuncProbe(func(context.Context) error { return fmt.Errorf("error2") }))
|
||||||
p.Run("probe3", probeInterval, nil, func(context.Context) error {
|
p.Run("probe3", probeInterval, nil, FuncProbe(func(context.Context) error {
|
||||||
p.Run("probe4", probeInterval, nil, func(context.Context) error {
|
p.Run("probe4", probeInterval, nil, FuncProbe(func(context.Context) error {
|
||||||
return fmt.Errorf("error4")
|
return fmt.Errorf("error4")
|
||||||
})
|
}))
|
||||||
return nil
|
return nil
|
||||||
})
|
}))
|
||||||
|
|
||||||
p.Wait()
|
p.Wait()
|
||||||
wantCount := 4
|
wantCount := 4
|
||||||
|
@ -12,9 +12,12 @@
|
|||||||
// TCP returns a Probe that healthchecks a TCP endpoint.
|
// TCP returns a Probe that healthchecks a TCP endpoint.
|
||||||
//
|
//
|
||||||
// The ProbeFunc reports whether it can successfully connect to addr.
|
// The ProbeFunc reports whether it can successfully connect to addr.
|
||||||
func TCP(addr string) ProbeFunc {
|
func TCP(addr string) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
return ProbeClass{
|
||||||
return probeTCP(ctx, addr)
|
Probe: func(ctx context.Context) error {
|
||||||
|
return probeTCP(ctx, addr)
|
||||||
|
},
|
||||||
|
Class: "tcp",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,22 +27,28 @@
|
|||||||
// The ProbeFunc connects to a hostPort (host:port string), does a TLS
|
// The ProbeFunc connects to a hostPort (host:port string), does a TLS
|
||||||
// handshake, verifies that the hostname matches the presented certificate,
|
// handshake, verifies that the hostname matches the presented certificate,
|
||||||
// checks certificate validity time and OCSP revocation status.
|
// checks certificate validity time and OCSP revocation status.
|
||||||
func TLS(hostPort string) ProbeFunc {
|
func TLS(hostPort string) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
return ProbeClass{
|
||||||
certDomain, _, err := net.SplitHostPort(hostPort)
|
Probe: func(ctx context.Context) error {
|
||||||
if err != nil {
|
certDomain, _, err := net.SplitHostPort(hostPort)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
return probeTLS(ctx, certDomain, hostPort)
|
}
|
||||||
|
return probeTLS(ctx, certDomain, hostPort)
|
||||||
|
},
|
||||||
|
Class: "tls",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TLSWithIP is like TLS, but dials the provided dialAddr instead
|
// TLSWithIP is like TLS, but dials the provided dialAddr instead
|
||||||
// of using DNS resolution. The certDomain is the expected name in
|
// of using DNS resolution. The certDomain is the expected name in
|
||||||
// the cert (and the SNI name to send).
|
// the cert (and the SNI name to send).
|
||||||
func TLSWithIP(certDomain string, dialAddr netip.AddrPort) ProbeFunc {
|
func TLSWithIP(certDomain string, dialAddr netip.AddrPort) ProbeClass {
|
||||||
return func(ctx context.Context) error {
|
return ProbeClass{
|
||||||
return probeTLS(ctx, certDomain, dialAddr.String())
|
Probe: func(ctx context.Context) error {
|
||||||
|
return probeTLS(ctx, certDomain, dialAddr.String())
|
||||||
|
},
|
||||||
|
Class: "tls",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user