util/eventbus: initial implementation of an in-process event bus

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
Co-authored-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
David Anderson 2025-02-27 16:31:56 -08:00 committed by Dave Anderson
parent 8c2717f96a
commit ef906763ee
8 changed files with 856 additions and 0 deletions

1
go.mod
View File

@ -20,6 +20,7 @@ require (
github.com/coder/websocket v1.8.12 github.com/coder/websocket v1.8.12
github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6 github.com/coreos/go-iptables v0.7.1-0.20240112124308-65c67c9f46e6
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/creachadair/taskgroup v0.13.2
github.com/creack/pty v1.1.23 github.com/creack/pty v1.1.23
github.com/dblohm7/wingoes v0.0.0-20240119213807-a09d6be7affa github.com/dblohm7/wingoes v0.0.0-20240119213807-a09d6be7affa
github.com/digitalocean/go-smbios v0.0.0-20180907143718-390a4f403a8e github.com/digitalocean/go-smbios v0.0.0-20180907143718-390a4f403a8e

4
go.sum
View File

@ -231,6 +231,8 @@ github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creachadair/mds v0.17.1 h1:lXQbTGKmb3nE3aK6OEp29L1gCx6B5ynzlQ6c1KOBurc= github.com/creachadair/mds v0.17.1 h1:lXQbTGKmb3nE3aK6OEp29L1gCx6B5ynzlQ6c1KOBurc=
github.com/creachadair/mds v0.17.1/go.mod h1:4b//mUiL8YldH6TImXjmW45myzTLNS1LLjOmrk888eg= github.com/creachadair/mds v0.17.1/go.mod h1:4b//mUiL8YldH6TImXjmW45myzTLNS1LLjOmrk888eg=
github.com/creachadair/taskgroup v0.13.2 h1:3KyqakBuFsm3KkXi/9XIb0QcA8tEzLHLgaoidf0MdVc=
github.com/creachadair/taskgroup v0.13.2/go.mod h1:i3V1Zx7H8RjwljUEeUWYT30Lmb9poewSb2XI1yTwD0g=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.23 h1:4M6+isWdcStXEf15G/RbrMPOQj1dZ7HPZCGwE4kOeP0= github.com/creack/pty v1.1.23 h1:4M6+isWdcStXEf15G/RbrMPOQj1dZ7HPZCGwE4kOeP0=
github.com/creack/pty v1.1.23/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= github.com/creack/pty v1.1.23/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE=
@ -298,6 +300,8 @@ github.com/firefart/nonamedreturns v1.0.4 h1:abzI1p7mAEPYuR4A+VLKn4eNDOycjYo2phm
github.com/firefart/nonamedreturns v1.0.4/go.mod h1:TDhe/tjI1BXo48CmYbUduTV7BdIga8MAO/xbKdcVsGI= github.com/firefart/nonamedreturns v1.0.4/go.mod h1:TDhe/tjI1BXo48CmYbUduTV7BdIga8MAO/xbKdcVsGI=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=

223
util/eventbus/bus.go Normal file
View File

@ -0,0 +1,223 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"context"
"reflect"
"slices"
"sync"
"tailscale.com/util/set"
)
// Bus is an event bus that distributes published events to interested
// subscribers.
type Bus struct {
write chan any
stop goroutineShutdownControl
snapshot chan chan []any
topicsMu sync.Mutex // guards everything below.
topics map[reflect.Type][]*Queue
// Used for introspection/debugging only, not in the normal event
// publishing path.
publishers set.Set[publisher]
queues set.Set[*Queue]
}
// New returns a new bus. Use [PublisherOf] to make event publishers,
// and [Bus.Queue] and [Subscribe] to make event subscribers.
func New() *Bus {
stopCtl, stopWorker := newGoroutineShutdown()
ret := &Bus{
write: make(chan any),
stop: stopCtl,
snapshot: make(chan chan []any),
topics: map[reflect.Type][]*Queue{},
publishers: set.Set[publisher]{},
queues: set.Set[*Queue]{},
}
go ret.pump(stopWorker)
return ret
}
func (b *Bus) pump(stop goroutineShutdownWorker) {
defer stop.Done()
var vals queue
acceptCh := func() chan any {
if vals.Full() {
return nil
}
return b.write
}
for {
// Drain all pending events. Note that while we're draining
// events into subscriber queues, we continue to
// opportunistically accept more incoming events, if we have
// queue space for it.
for !vals.Empty() {
val := vals.Peek()
dests := b.dest(reflect.ValueOf(val).Type())
for _, d := range dests {
deliverOne:
for {
select {
case d.write <- val:
break deliverOne
case <-d.stop.WaitChan():
// Queue closed, don't block but continue
// delivering to others.
break deliverOne
case in := <-acceptCh():
vals.Add(in)
case <-stop.Stop():
return
case ch := <-b.snapshot:
ch <- vals.Snapshot()
}
}
}
vals.Drop()
}
// Inbound queue empty, wait for at least 1 work item before
// resuming.
for vals.Empty() {
select {
case <-stop.Stop():
return
case val := <-b.write:
vals.Add(val)
case ch := <-b.snapshot:
ch <- nil
}
}
}
}
func (b *Bus) dest(t reflect.Type) []*Queue {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
return b.topics[t]
}
func (b *Bus) subscribe(t reflect.Type, q *Queue) (cancel func()) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.topics[t] = append(b.topics[t], q)
return func() {
b.unsubscribe(t, q)
}
}
func (b *Bus) unsubscribe(t reflect.Type, q *Queue) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
// Topic slices are accessed by pump without holding a lock, so we
// have to replace the entire slice when unsubscribing.
// Unsubscribing should be infrequent enough that this won't
// matter.
i := slices.Index(b.topics[t], q)
if i < 0 {
return
}
b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1)
}
func (b *Bus) Close() {
b.stop.StopAndWait()
}
// Queue returns a new queue with no subscriptions. Use [Subscribe] to
// atach subscriptions to it.
//
// The queue's name should be a short, human-readable string that
// identifies this queue. The name is only visible through debugging
// APIs.
func (b *Bus) Queue(name string) *Queue {
return newQueue(b, name)
}
func (b *Bus) addQueue(q *Queue) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.queues.Add(q)
}
func (b *Bus) deleteQueue(q *Queue) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.queues.Delete(q)
}
func (b *Bus) addPublisher(p publisher) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.publishers.Add(p)
}
func (b *Bus) deletePublisher(p publisher) {
b.topicsMu.Lock()
defer b.topicsMu.Unlock()
b.publishers.Delete(p)
}
func newGoroutineShutdown() (goroutineShutdownControl, goroutineShutdownWorker) {
ctx, cancel := context.WithCancel(context.Background())
ctl := goroutineShutdownControl{
startShutdown: cancel,
shutdownFinished: make(chan struct{}),
}
work := goroutineShutdownWorker{
startShutdown: ctx.Done(),
shutdownFinished: ctl.shutdownFinished,
}
return ctl, work
}
// goroutineShutdownControl is a helper type to manage the shutdown of
// a worker goroutine. The worker goroutine should use the
// goroutineShutdownWorker related to this controller.
type goroutineShutdownControl struct {
startShutdown context.CancelFunc
shutdownFinished chan struct{}
}
func (ctl *goroutineShutdownControl) Stop() {
ctl.startShutdown()
}
func (ctl *goroutineShutdownControl) Wait() {
<-ctl.shutdownFinished
}
func (ctl *goroutineShutdownControl) WaitChan() <-chan struct{} {
return ctl.shutdownFinished
}
func (ctl *goroutineShutdownControl) StopAndWait() {
ctl.Stop()
ctl.Wait()
}
// goroutineShutdownWorker is a helper type for a worker goroutine to
// be notified that it should shut down, and to report that shutdown
// has completed. The notification is triggered by the related
// goroutineShutdownControl.
type goroutineShutdownWorker struct {
startShutdown <-chan struct{}
shutdownFinished chan struct{}
}
func (work *goroutineShutdownWorker) Stop() <-chan struct{} {
return work.startShutdown
}
func (work *goroutineShutdownWorker) Done() {
close(work.shutdownFinished)
}

196
util/eventbus/bus_test.go Normal file
View File

@ -0,0 +1,196 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus_test
import (
"errors"
"fmt"
"testing"
"time"
"github.com/creachadair/taskgroup"
"github.com/google/go-cmp/cmp"
"tailscale.com/util/eventbus"
)
type EventA struct {
Counter int
}
type EventB struct {
Counter int
}
func TestBus(t *testing.T) {
b := eventbus.New()
defer b.Close()
q := b.Queue("TestBus")
defer q.Close()
s := eventbus.Subscribe[EventA](q)
go func() {
pa := eventbus.PublisherOf[EventA](b, "TestBusA")
defer pa.Close()
pb := eventbus.PublisherOf[EventB](b, "TestBusB")
defer pb.Close()
pa.Publish(EventA{1})
pb.Publish(EventB{2})
pa.Publish(EventA{3})
}()
want := expectEvents(t, EventA{1}, EventA{3})
for !want.Empty() {
select {
case got := <-s.Events():
want.Got(got)
case <-q.Done():
t.Fatalf("queue closed unexpectedly")
case <-time.After(time.Second):
t.Fatalf("timed out waiting for event")
}
}
}
func TestBusMultipleConsumers(t *testing.T) {
b := eventbus.New()
defer b.Close()
q1 := b.Queue("TestBusA")
defer q1.Close()
s1 := eventbus.Subscribe[EventA](q1)
q2 := b.Queue("TestBusAB")
defer q2.Close()
s2A := eventbus.Subscribe[EventA](q2)
s2B := eventbus.Subscribe[EventB](q2)
go func() {
pa := eventbus.PublisherOf[EventA](b, "TestBusA")
defer pa.Close()
pb := eventbus.PublisherOf[EventB](b, "TestBusB")
defer pb.Close()
pa.Publish(EventA{1})
pb.Publish(EventB{2})
pa.Publish(EventA{3})
}()
wantA := expectEvents(t, EventA{1}, EventA{3})
wantB := expectEvents(t, EventA{1}, EventB{2}, EventA{3})
for !wantA.Empty() || !wantB.Empty() {
select {
case got := <-s1.Events():
wantA.Got(got)
case got := <-s2A.Events():
wantB.Got(got)
case got := <-s2B.Events():
wantB.Got(got)
case <-q1.Done():
t.Fatalf("queue closed unexpectedly")
case <-q2.Done():
t.Fatalf("queue closed unexpectedly")
case <-time.After(time.Second):
t.Fatalf("timed out waiting for event")
}
}
}
func TestSpam(t *testing.T) {
b := eventbus.New()
defer b.Close()
const (
publishers = 100
eventsPerPublisher = 20
wantEvents = publishers * eventsPerPublisher
subscribers = 100
)
var g taskgroup.Group
received := make([][]EventA, subscribers)
for i := range subscribers {
q := b.Queue(fmt.Sprintf("Subscriber%d", i))
defer q.Close()
s := eventbus.Subscribe[EventA](q)
g.Go(func() error {
for range wantEvents {
select {
case evt := <-s.Events():
received[i] = append(received[i], evt)
case <-q.Done():
t.Errorf("queue done before expected number of events received")
return errors.New("queue prematurely closed")
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for expected bus event after %d events", len(received[i]))
return errors.New("timeout")
}
}
return nil
})
}
published := make([][]EventA, publishers)
for i := range publishers {
g.Run(func() {
p := eventbus.PublisherOf[EventA](b, fmt.Sprintf("Publisher%d", i))
for j := range eventsPerPublisher {
evt := EventA{i*eventsPerPublisher + j}
p.Publish(evt)
published[i] = append(published[i], evt)
}
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
var last []EventA
for i, got := range received {
if len(got) != wantEvents {
// Receiving goroutine already reported an error, we just need
// to fail early within the main test goroutine.
t.FailNow()
}
if last == nil {
continue
}
if diff := cmp.Diff(got, last); diff != "" {
t.Errorf("Subscriber %d did not see the same events as %d (-got+want):\n%s", i, i-1, diff)
}
last = got
}
for i, sent := range published {
if got := len(sent); got != eventsPerPublisher {
t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher)
}
}
// TODO: check that the published sequences are proper
// subsequences of the received slices.
}
type queueChecker struct {
t *testing.T
want []any
}
func expectEvents(t *testing.T, want ...any) *queueChecker {
return &queueChecker{t, want}
}
func (q *queueChecker) Got(v any) {
q.t.Helper()
if q.Empty() {
q.t.Fatalf("queue got unexpected %v", v)
}
if v != q.want[0] {
q.t.Fatalf("queue got %#v, want %#v", v, q.want[0])
}
q.want = q.want[1:]
}
func (q *queueChecker) Empty() bool {
return len(q.want) == 0
}

100
util/eventbus/doc.go Normal file
View File

@ -0,0 +1,100 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package eventbus provides an in-process event bus.
//
// The event bus connects publishers of typed events with subscribers
// interested in those events.
//
// # Usage
//
// To publish events, use [PublisherOf] to get a typed publisher for
// your event type, then call [Publisher.Publish] as needed. If your
// event is expensive to construct, you can optionally use
// [Publisher.ShouldPublish] to skip the work if nobody is listening
// for the event.
//
// To receive events, first use [Bus.Queue] to create an event
// delivery queue, then use [Subscribe] to get a [Subscriber] for each
// event type you're interested in. Receive the events themselves by
// selecting over all your [Subscriber.Chan] channels, as well as
// [Queue.Done] for shutdown notifications.
//
// # Concurrency properties
//
// The bus serializes all published events, and preserves that
// ordering when delivering to subscribers that are attached to the
// same Queue. In more detail:
//
// - An event is published to the bus at some instant between the
// start and end of the call to [Publisher.Publish].
// - Events cannot be published at the same instant, and so are
// totally ordered by their publication time. Given two events E1
// and E2, either E1 happens before E2, or E2 happens before E1.
// - Queues dispatch events to their Subscribers in publication
// order: if E1 happens before E2, the queue always delivers E1
// before E2.
// - Queues do not synchronize with each other: given queues Q1 and
// Q2, both subscribed to events E1 and E2, Q1 may deliver both E1
// and E2 before Q2 delivers E1.
//
// Less formally: there is one true timeline of all published events.
// If you make a Queue and subscribe to events on it, you will receive
// those events one at a time, in the same order as the one true
// timeline. You will "skip over" events you didn't subscribe to, but
// your view of the world always moves forward in time, never
// backwards, and you will observe events in the same order as
// everyone else.
//
// However, you cannot assume that what your subscribers on your queue
// see as "now" is the same as what other subscribers on other
// queues. Their queue may be further behind you in the timeline, or
// running ahead of you. This means you should be careful about
// reaching out to another component directly after receiving an
// event, as its view of the world may not yet (or ever) be exactly
// consistent with yours.
//
// To make your code more testable and understandable, you should try
// to structure it following the actor model: you have some local
// state over which you have authority, but your only way to interact
// with state elsewhere in the program is to receive and process
// events coming from elsewhere, or to emit events of your own.
//
// # Expected subscriber behavior
//
// Subscribers are expected to promptly receive their events on
// [Subscriber.Chan]. The bus has a small, fixed amount of internal
// buffering, meaning that a slow subscriber will eventually cause
// backpressure and block publication of all further events.
//
// In general, you should receive from your subscriber(s) in a loop,
// and only do fast state updates within that loop. Any heavier work
// should be offloaded to another goroutine.
//
// Causing publishers to block from backpressure is considered a bug
// in the slow subscriber causing the backpressure, and should be
// addressed there. Publishers should assume that Publish will not
// block for extended periods of time, and should not make exceptional
// effort to behave gracefully if they do get blocked.
//
// These blocking semantics are provisional and subject to
// change. Please speak up if this causes development pain, so that we
// can adapt the semantics to better suit our needs.
//
// # Debugging facilities
//
// (TODO, not implemented yet, sorry, I promise we're working on it next!)
//
// The bus comes with introspection facilities to help reason about
// the state of the client, and diagnose issues such as slow
// subscribers.
//
// The bus provide a tsweb debugging page that shows the current state
// of the bus, including all publishers, subscribers, and queued
// events.
//
// The bus also has a snooping and tracing facility, which lets you
// observe all events flowing through the bus, along with their
// source, destination(s) and timing information such as the time of
// delivery to each subscriber and end-to-end bus delays.
package eventbus

79
util/eventbus/publish.go Normal file
View File

@ -0,0 +1,79 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"context"
"reflect"
)
// publisher is a uniformly typed wrapper around Publisher[T], so that
// debugging facilities can look at active publishers.
type publisher interface {
publisherName() string
}
// A Publisher publishes events on the bus.
type Publisher[T any] struct {
bus *Bus
name string
stopCtx context.Context
stop context.CancelFunc
}
// PublisherOf returns a publisher for event type T on the given bus.
//
// The publisher's name should be a short, human-readable string that
// identifies this event publisher. The name is only visible through
// debugging APIs.
func PublisherOf[T any](b *Bus, name string) *Publisher[T] {
ctx, cancel := context.WithCancel(context.Background())
ret := &Publisher[T]{
bus: b,
name: name,
stopCtx: ctx,
stop: cancel,
}
b.addPublisher(ret)
return ret
}
func (p *Publisher[T]) publisherName() string { return p.name }
// Publish publishes event v on the bus.
func (p *Publisher[T]) Publish(v T) {
// Check for just a stopped publisher or bus before trying to
// write, so that once closed Publish consistently does nothing.
select {
case <-p.stopCtx.Done():
return
case <-p.bus.stop.WaitChan():
return
default:
}
select {
case p.bus.write <- v:
case <-p.stopCtx.Done():
case <-p.bus.stop.WaitChan():
}
}
// ShouldPublish reports whether anyone is subscribed to events of
// type T.
//
// ShouldPublish can be used to skip expensive event construction if
// nobody seems to care. Publishers must not assume that someone will
// definitely receive an event if ShouldPublish returns true.
func (p *Publisher[T]) ShouldPublish() bool {
dests := p.bus.dest(reflect.TypeFor[T]())
return len(dests) > 0
}
// Close closes the publisher, indicating that no further events will
// be published with it.
func (p *Publisher[T]) Close() {
p.stop()
p.bus.deletePublisher(p)
}

83
util/eventbus/queue.go Normal file
View File

@ -0,0 +1,83 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"slices"
)
const maxQueuedItems = 16
// queue is an ordered queue of length up to maxQueuedItems.
type queue struct {
vals []any
start int
}
// canAppend reports whether a value can be appended to q.vals without
// shifting values around.
func (q *queue) canAppend() bool {
return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals)
}
func (q *queue) Full() bool {
return q.start == 0 && !q.canAppend()
}
func (q *queue) Empty() bool {
return q.start == len(q.vals)
}
func (q *queue) Len() int {
return len(q.vals) - q.start
}
// Add adds v to the end of the queue. Blocks until append can be
// done.
func (q *queue) Add(v any) {
if !q.canAppend() {
if q.start == 0 {
panic("Add on a full queue")
}
// Slide remaining values back to the start of the array.
n := copy(q.vals, q.vals[q.start:])
toClear := len(q.vals) - n
clear(q.vals[len(q.vals)-toClear:])
q.vals = q.vals[:n]
q.start = 0
}
q.vals = append(q.vals, v)
}
// Peek returns the first value in the queue, without removing it from
// the queue, or nil if the queue is empty.
func (q *queue) Peek() any {
if q.Empty() {
return nil
}
return q.vals[q.start]
}
// Drop discards the first value in the queue, if any.
func (q *queue) Drop() {
if q.Empty() {
return
}
q.vals[q.start] = nil
q.start++
if q.Empty() {
// Reset cursor to start of array, it's free to do.
q.start = 0
q.vals = q.vals[:0]
}
}
// Snapshot returns a copy of the queue's contents.
func (q *queue) Snapshot() []any {
return slices.Clone(q.vals[q.start:])
}

170
util/eventbus/subscribe.go Normal file
View File

@ -0,0 +1,170 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package eventbus
import (
"fmt"
"reflect"
"sync"
)
type dispatchFn func(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool
// A Queue receives events from a Bus.
//
// To receive events through the queue, see [Subscribe]. Subscribers
// that share the same Queue receive events one at time, in the order
// they were published.
type Queue struct {
bus *Bus
name string
write chan any
stop goroutineShutdownControl
snapshot chan chan []any
outputsMu sync.Mutex
outputs map[reflect.Type]dispatchFn
}
func newQueue(b *Bus, name string) *Queue {
stopCtl, stopWorker := newGoroutineShutdown()
ret := &Queue{
bus: b,
name: name,
write: make(chan any),
stop: stopCtl,
snapshot: make(chan chan []any),
outputs: map[reflect.Type]dispatchFn{},
}
b.addQueue(ret)
go ret.pump(stopWorker)
return ret
}
func (q *Queue) pump(stop goroutineShutdownWorker) {
defer stop.Done()
var vals queue
acceptCh := func() chan any {
if vals.Full() {
return nil
}
return q.write
}
for {
if !vals.Empty() {
val := vals.Peek()
fn := q.dispatchFn(val)
if fn == nil {
// Raced with unsubscribe.
vals.Drop()
continue
}
if !fn(&vals, stop, acceptCh) {
return
}
} else {
// Keep the cases in this select in sync with
// Subscriber.dispatch below. The only different should be
// that this select doesn't deliver queued values to
// anyone, and unconditionally accepts new values.
select {
case val := <-q.write:
vals.Add(val)
case <-stop.Stop():
return
case ch := <-q.snapshot:
ch <- vals.Snapshot()
}
}
}
}
// A Subscriber delivers one type of event from a [Queue].
type Subscriber[T any] struct {
recv *Queue
read chan T
}
func (s *Subscriber[T]) dispatch(vals *queue, stop goroutineShutdownWorker, acceptCh func() chan any) bool {
t := vals.Peek().(T)
for {
// Keep the cases in this select in sync with Queue.pump
// above. The only different should be that this select
// delivers a value on s.read.
select {
case s.read <- t:
vals.Drop()
return true
case val := <-acceptCh():
vals.Add(val)
case <-stop.Stop():
return false
case ch := <-s.recv.snapshot:
ch <- vals.Snapshot()
}
}
}
// Events returns a channel on which the subscriber's events are
// delivered.
func (s *Subscriber[T]) Events() <-chan T {
return s.read
}
// Close shuts down the Subscriber, indicating the caller no longer
// wishes to receive these events. After Close, receives on
// [Subscriber.Chan] block for ever.
func (s *Subscriber[T]) Close() {
t := reflect.TypeFor[T]()
s.recv.bus.unsubscribe(t, s.recv)
s.recv.deleteDispatchFn(t)
}
func (q *Queue) dispatchFn(val any) dispatchFn {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
return q.outputs[reflect.ValueOf(val).Type()]
}
func (q *Queue) addDispatchFn(t reflect.Type, fn dispatchFn) {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
if q.outputs[t] != nil {
panic(fmt.Errorf("double subscription for event %s", t))
}
q.outputs[t] = fn
}
func (q *Queue) deleteDispatchFn(t reflect.Type) {
q.outputsMu.Lock()
defer q.outputsMu.Unlock()
delete(q.outputs, t)
}
// Done returns a channel that is closed when the Queue is closed.
func (q *Queue) Done() <-chan struct{} {
return q.stop.WaitChan()
}
// Close closes the queue. All Subscribers attached to the queue are
// implicitly closed, and any pending events are discarded.
func (q *Queue) Close() {
q.stop.StopAndWait()
q.bus.deleteQueue(q)
}
// Subscribe requests delivery of events of type T through the given
// Queue. Panics if the queue already has a subscriber for T.
func Subscribe[T any](r *Queue) Subscriber[T] {
t := reflect.TypeFor[T]()
ret := Subscriber[T]{
recv: r,
read: make(chan T),
}
r.addDispatchFn(t, ret.dispatch)
r.bus.subscribe(t, r)
return ret
}