From e80d2b4ad1e427c7700264a05d4bc8a6d95e29d7 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 5 Mar 2025 19:37:03 -0800 Subject: [PATCH] util/eventbus: add debug hooks to snoop on bus traffic Updates #15160 Signed-off-by: David Anderson --- util/eventbus/bus.go | 45 ++++++++++++++++++++++++++++---------- util/eventbus/client.go | 5 +++-- util/eventbus/publish.go | 12 ++-------- util/eventbus/subscribe.go | 22 ++++++++++++++----- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index b479f3940..a9b6f0dec 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -8,17 +8,28 @@ "reflect" "slices" "sync" - "time" "tailscale.com/util/set" ) +type publishedEvent struct { + Event any + From *Client +} + +type routedEvent struct { + Event any + From *Client + To []*Client +} + // Bus is an event bus that distributes published events to interested // subscribers. type Bus struct { - router *worker - write chan publishedEvent - snapshot chan chan []publishedEvent + router *worker + write chan publishedEvent + snapshot chan chan []publishedEvent + routeDebug hook[routedEvent] topicsMu sync.Mutex // guards everything below. topics map[reflect.Type][]*subscribeState @@ -94,13 +105,23 @@ func (b *Bus) pump(ctx context.Context) { for !vals.Empty() { val := vals.Peek() dests := b.dest(reflect.ValueOf(val.Event).Type()) - routed := time.Now() + + if b.routeDebug.active() { + clients := make([]*Client, len(dests)) + for i := range len(dests) { + clients[i] = dests[i].client + } + b.routeDebug.run(routedEvent{ + Event: val.Event, + From: val.From, + To: clients, + }) + } + for _, d := range dests { evt := queuedEvent{ - Event: val.Event, - From: val.From, - Published: val.Published, - Routed: routed, + Event: val.Event, + From: val.From, } deliverOne: for { @@ -113,6 +134,7 @@ func (b *Bus) pump(ctx context.Context) { break deliverOne case in := <-acceptCh(): vals.Add(in) + in.From.publishDebug.run(in) case <-ctx.Done(): return case ch := <-b.snapshot: @@ -129,8 +151,9 @@ func (b *Bus) pump(ctx context.Context) { select { case <-ctx.Done(): return - case val := <-b.write: - vals.Add(val) + case in := <-b.write: + vals.Add(in) + in.From.publishDebug.run(in) case ch := <-b.snapshot: ch <- nil } diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 174cc5ea5..17f7e8608 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -17,8 +17,9 @@ // Subscribers that share the same client receive events one at a // time, in the order they were published. type Client struct { - name string - bus *Bus + name string + bus *Bus + publishDebug hook[publishedEvent] mu sync.Mutex pub set.Set[publisher] diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index fdabdcb23..b228708ac 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -5,15 +5,8 @@ import ( "reflect" - "time" ) -type publishedEvent struct { - Event any - From *Client - Published time.Time -} - // publisher is a uniformly typed wrapper around Publisher[T], so that // debugging facilities can look at active publishers. type publisher interface { @@ -60,9 +53,8 @@ func (p *Publisher[T]) Publish(v T) { } evt := publishedEvent{ - Event: v, - From: p.client, - Published: time.Now(), + Event: v, + From: p.client, } select { diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 71201aa40..c38949d9d 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -8,14 +8,17 @@ "fmt" "reflect" "sync" - "time" ) +type deliveredEvent struct { + Event any + From *Client + To *Client +} + type queuedEvent struct { - Event any - From *Client - Published time.Time - Routed time.Time + Event any + From *Client } // subscriber is a uniformly typed wrapper around Subscriber[T], so @@ -46,6 +49,7 @@ type subscribeState struct { dispatcher *worker write chan queuedEvent snapshot chan chan []queuedEvent + debug hook[deliveredEvent] outputsMu sync.Mutex outputs map[reflect.Type]subscriber @@ -82,6 +86,14 @@ func (q *subscribeState) pump(ctx context.Context) { if !sub.dispatch(ctx, &vals, acceptCh) { return } + + if q.debug.active() { + q.debug.run(deliveredEvent{ + Event: val.Event, + From: val.From, + To: q.client, + }) + } } else { // Keep the cases in this select in sync with // Subscriber.dispatch below. The only different should be