util/eventbus: add debug hooks to snoop on bus traffic
Updates #15160 Signed-off-by: David Anderson <dave@tailscale.com>
This commit is contained in:
parent
dd7166cb8e
commit
e80d2b4ad1
@ -8,17 +8,28 @@
|
||||
"reflect"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/util/set"
|
||||
)
|
||||
|
||||
type publishedEvent struct {
|
||||
Event any
|
||||
From *Client
|
||||
}
|
||||
|
||||
type routedEvent struct {
|
||||
Event any
|
||||
From *Client
|
||||
To []*Client
|
||||
}
|
||||
|
||||
// Bus is an event bus that distributes published events to interested
|
||||
// subscribers.
|
||||
type Bus struct {
|
||||
router *worker
|
||||
write chan publishedEvent
|
||||
snapshot chan chan []publishedEvent
|
||||
router *worker
|
||||
write chan publishedEvent
|
||||
snapshot chan chan []publishedEvent
|
||||
routeDebug hook[routedEvent]
|
||||
|
||||
topicsMu sync.Mutex // guards everything below.
|
||||
topics map[reflect.Type][]*subscribeState
|
||||
@ -94,13 +105,23 @@ func (b *Bus) pump(ctx context.Context) {
|
||||
for !vals.Empty() {
|
||||
val := vals.Peek()
|
||||
dests := b.dest(reflect.ValueOf(val.Event).Type())
|
||||
routed := time.Now()
|
||||
|
||||
if b.routeDebug.active() {
|
||||
clients := make([]*Client, len(dests))
|
||||
for i := range len(dests) {
|
||||
clients[i] = dests[i].client
|
||||
}
|
||||
b.routeDebug.run(routedEvent{
|
||||
Event: val.Event,
|
||||
From: val.From,
|
||||
To: clients,
|
||||
})
|
||||
}
|
||||
|
||||
for _, d := range dests {
|
||||
evt := queuedEvent{
|
||||
Event: val.Event,
|
||||
From: val.From,
|
||||
Published: val.Published,
|
||||
Routed: routed,
|
||||
Event: val.Event,
|
||||
From: val.From,
|
||||
}
|
||||
deliverOne:
|
||||
for {
|
||||
@ -113,6 +134,7 @@ func (b *Bus) pump(ctx context.Context) {
|
||||
break deliverOne
|
||||
case in := <-acceptCh():
|
||||
vals.Add(in)
|
||||
in.From.publishDebug.run(in)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ch := <-b.snapshot:
|
||||
@ -129,8 +151,9 @@ func (b *Bus) pump(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case val := <-b.write:
|
||||
vals.Add(val)
|
||||
case in := <-b.write:
|
||||
vals.Add(in)
|
||||
in.From.publishDebug.run(in)
|
||||
case ch := <-b.snapshot:
|
||||
ch <- nil
|
||||
}
|
||||
|
@ -17,8 +17,9 @@
|
||||
// Subscribers that share the same client receive events one at a
|
||||
// time, in the order they were published.
|
||||
type Client struct {
|
||||
name string
|
||||
bus *Bus
|
||||
name string
|
||||
bus *Bus
|
||||
publishDebug hook[publishedEvent]
|
||||
|
||||
mu sync.Mutex
|
||||
pub set.Set[publisher]
|
||||
|
@ -5,15 +5,8 @@
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
type publishedEvent struct {
|
||||
Event any
|
||||
From *Client
|
||||
Published time.Time
|
||||
}
|
||||
|
||||
// publisher is a uniformly typed wrapper around Publisher[T], so that
|
||||
// debugging facilities can look at active publishers.
|
||||
type publisher interface {
|
||||
@ -60,9 +53,8 @@ func (p *Publisher[T]) Publish(v T) {
|
||||
}
|
||||
|
||||
evt := publishedEvent{
|
||||
Event: v,
|
||||
From: p.client,
|
||||
Published: time.Now(),
|
||||
Event: v,
|
||||
From: p.client,
|
||||
}
|
||||
|
||||
select {
|
||||
|
@ -8,14 +8,17 @@
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type deliveredEvent struct {
|
||||
Event any
|
||||
From *Client
|
||||
To *Client
|
||||
}
|
||||
|
||||
type queuedEvent struct {
|
||||
Event any
|
||||
From *Client
|
||||
Published time.Time
|
||||
Routed time.Time
|
||||
Event any
|
||||
From *Client
|
||||
}
|
||||
|
||||
// subscriber is a uniformly typed wrapper around Subscriber[T], so
|
||||
@ -46,6 +49,7 @@ type subscribeState struct {
|
||||
dispatcher *worker
|
||||
write chan queuedEvent
|
||||
snapshot chan chan []queuedEvent
|
||||
debug hook[deliveredEvent]
|
||||
|
||||
outputsMu sync.Mutex
|
||||
outputs map[reflect.Type]subscriber
|
||||
@ -82,6 +86,14 @@ func (q *subscribeState) pump(ctx context.Context) {
|
||||
if !sub.dispatch(ctx, &vals, acceptCh) {
|
||||
return
|
||||
}
|
||||
|
||||
if q.debug.active() {
|
||||
q.debug.run(deliveredEvent{
|
||||
Event: val.Event,
|
||||
From: val.From,
|
||||
To: q.client,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Keep the cases in this select in sync with
|
||||
// Subscriber.dispatch below. The only different should be
|
||||
|
Loading…
Reference in New Issue
Block a user