control/controlclient: add Auto.updateRoutine
Instead of having updates replace the map polls, create a third goroutine which is solely responsible for making sure that control is aware of the latest client state. This also makes it so that the streaming map polls are only broken when there are auth changes, or the client is paused. Updates tailscale/corp#5761 Signed-off-by: Maisem Ali <maisem@tailscale.com>
This commit is contained in:
@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"tailscale.com/health"
|
||||
@ -46,6 +47,91 @@ func (g *LoginGoal) sendLogoutError(err error) {
|
||||
|
||||
var _ Client = (*Auto)(nil)
|
||||
|
||||
// waitUnpause waits until the client is unpaused then returns. It only
|
||||
// returns an error if the client is closed.
|
||||
func (c *Auto) waitUnpause(routineLogName string) error {
|
||||
c.mu.Lock()
|
||||
if !c.paused {
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
unpaused := c.unpausedChanLocked()
|
||||
c.mu.Unlock()
|
||||
c.logf("%s: awaiting unpause", routineLogName)
|
||||
select {
|
||||
case <-unpaused:
|
||||
c.logf("%s: unpaused", routineLogName)
|
||||
return nil
|
||||
case <-c.quit:
|
||||
return errors.New("quit")
|
||||
}
|
||||
}
|
||||
|
||||
// updateRoutine is responsible for informing the server of worthy changes to
|
||||
// our local state. It runs in its own goroutine.
|
||||
func (c *Auto) updateRoutine() {
|
||||
defer close(c.updateDone)
|
||||
bo := backoff.NewBackoff("updateRoutine", c.logf, 30*time.Second)
|
||||
for {
|
||||
if err := c.waitUnpause("updateRoutine"); err != nil {
|
||||
c.logf("updateRoutine: exiting")
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
gen := c.lastUpdateGen
|
||||
ctx := c.mapCtx
|
||||
needUpdate := gen > 0 && gen != c.lastUpdateGenInformed && c.loggedIn
|
||||
c.mu.Unlock()
|
||||
|
||||
if needUpdate {
|
||||
select {
|
||||
case <-c.quit:
|
||||
c.logf("updateRoutine: exiting")
|
||||
return
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
// Nothing to do, wait for a signal.
|
||||
select {
|
||||
case <-c.quit:
|
||||
c.logf("updateRoutine: exiting")
|
||||
return
|
||||
case <-c.updateCh:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
t0 := c.clock.Now()
|
||||
err := c.direct.SendUpdate(ctx)
|
||||
d := time.Since(t0).Round(time.Millisecond)
|
||||
if err != nil {
|
||||
if ctx.Err() == nil {
|
||||
c.direct.logf("lite map update error after %v: %v", d, err)
|
||||
}
|
||||
bo.BackOff(ctx, err)
|
||||
continue
|
||||
}
|
||||
bo.BackOff(ctx, nil)
|
||||
c.direct.logf("[v1] successful lite map update in %v", d)
|
||||
|
||||
c.mu.Lock()
|
||||
c.lastUpdateGenInformed = gen
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// atomicGen is an atomic int64 generator. It is used to generate monotonically
|
||||
// increasing numbers for updateGen.
|
||||
var atomicGen atomic.Int64
|
||||
|
||||
func nextUpdateGen() updateGen {
|
||||
return updateGen(atomicGen.Add(1))
|
||||
}
|
||||
|
||||
// updateGen is a monotonically increasing number that represents a particular
|
||||
// update to the local state.
|
||||
type updateGen int64
|
||||
|
||||
// Auto connects to a tailcontrol server for a node.
|
||||
// It's a concrete implementation of the Client interface.
|
||||
type Auto struct {
|
||||
@ -54,6 +140,7 @@ type Auto struct {
|
||||
logf logger.Logf
|
||||
expiry *time.Time
|
||||
closed bool
|
||||
updateCh chan struct{} // readable when we should inform the server of a change
|
||||
newMapCh chan struct{} // readable when we must restart a map request
|
||||
statusFunc func(Status) // called to update Client status; always non-nil
|
||||
|
||||
@ -61,25 +148,29 @@ type Auto struct {
|
||||
|
||||
mu sync.Mutex // mutex guards the following fields
|
||||
|
||||
paused bool // whether we should stop making HTTP requests
|
||||
unpauseWaiters []chan struct{}
|
||||
loggedIn bool // true if currently logged in
|
||||
loginGoal *LoginGoal // non-nil if some login activity is desired
|
||||
synced bool // true if our netmap is up-to-date
|
||||
inPollNetMap bool // true if currently running a PollNetMap
|
||||
inLiteMapUpdate bool // true if a lite (non-streaming) map request is outstanding
|
||||
liteMapUpdateCancel context.CancelFunc // cancels a lite map update, may be nil
|
||||
liteMapUpdateCancels int // how many times we've canceled a lite map update
|
||||
inSendStatus int // number of sendStatus calls currently in progress
|
||||
state State
|
||||
// lastUpdateGen is the gen of last update we had an update worth sending to
|
||||
// the server.
|
||||
lastUpdateGen updateGen
|
||||
// lastUpdateGenInformed is the value of lastUpdateAt that we've successfully
|
||||
// informed the server of.
|
||||
lastUpdateGenInformed updateGen
|
||||
|
||||
paused bool // whether we should stop making HTTP requests
|
||||
unpauseWaiters []chan struct{}
|
||||
loggedIn bool // true if currently logged in
|
||||
loginGoal *LoginGoal // non-nil if some login activity is desired
|
||||
synced bool // true if our netmap is up-to-date
|
||||
inSendStatus int // number of sendStatus calls currently in progress
|
||||
state State
|
||||
|
||||
authCtx context.Context // context used for auth requests
|
||||
mapCtx context.Context // context used for netmap requests
|
||||
authCancel func() // cancel the auth context
|
||||
mapCancel func() // cancel the netmap context
|
||||
mapCtx context.Context // context used for netmap and update requests
|
||||
authCancel func() // cancel authCtx
|
||||
mapCancel func() // cancel mapCtx
|
||||
quit chan struct{} // when closed, goroutines should all exit
|
||||
authDone chan struct{} // when closed, auth goroutine is done
|
||||
mapDone chan struct{} // when closed, map goroutine is done
|
||||
authDone chan struct{} // when closed, authRoutine is done
|
||||
mapDone chan struct{} // when closed, mapRoutine is done
|
||||
updateDone chan struct{} // when closed, updateRoutine is done
|
||||
}
|
||||
|
||||
// New creates and starts a new Auto.
|
||||
@ -116,10 +207,12 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
|
||||
direct: direct,
|
||||
clock: opts.Clock,
|
||||
logf: opts.Logf,
|
||||
updateCh: make(chan struct{}, 1),
|
||||
newMapCh: make(chan struct{}, 1),
|
||||
quit: make(chan struct{}),
|
||||
authDone: make(chan struct{}),
|
||||
mapDone: make(chan struct{}),
|
||||
updateDone: make(chan struct{}),
|
||||
statusFunc: opts.Status,
|
||||
}
|
||||
c.authCtx, c.authCancel = context.WithCancel(context.Background())
|
||||
@ -162,85 +255,34 @@ func (c *Auto) SetPaused(paused bool) {
|
||||
func (c *Auto) Start() {
|
||||
go c.authRoutine()
|
||||
go c.mapRoutine()
|
||||
go c.updateRoutine()
|
||||
}
|
||||
|
||||
// sendNewMapRequest either sends a new OmitPeers, non-streaming map request
|
||||
// (to just send Hostinfo/Netinfo/Endpoints info, while keeping an existing
|
||||
// streaming response open), or start a new streaming one if necessary.
|
||||
// updateControl sends a new OmitPeers, non-streaming map request (to just send
|
||||
// Hostinfo/Netinfo/Endpoints info, while keeping an existing streaming response
|
||||
// open).
|
||||
//
|
||||
// It should be called whenever there's something new to tell the server.
|
||||
func (c *Auto) sendNewMapRequest() {
|
||||
func (c *Auto) updateControl() {
|
||||
gen := nextUpdateGen()
|
||||
c.mu.Lock()
|
||||
|
||||
// If we're not already streaming a netmap, then tear down everything
|
||||
// and start a new stream (which starts by sending a new map request)
|
||||
if !c.inPollNetMap || !c.loggedIn {
|
||||
if gen < c.lastUpdateGen {
|
||||
// This update is out of date.
|
||||
c.mu.Unlock()
|
||||
c.cancelMapSafely()
|
||||
return
|
||||
}
|
||||
c.lastUpdateGen = gen
|
||||
c.mu.Unlock()
|
||||
|
||||
// If we are already in process of doing a LiteMapUpdate, cancel it and
|
||||
// try a new one. If this is the 10th time we have done this
|
||||
// cancelation, tear down everything and start again.
|
||||
const maxLiteMapUpdateAttempts = 10
|
||||
if c.inLiteMapUpdate {
|
||||
// Always cancel the in-flight lite map update, regardless of
|
||||
// whether we cancel the streaming map request or not.
|
||||
c.liteMapUpdateCancel()
|
||||
c.inLiteMapUpdate = false
|
||||
|
||||
if c.liteMapUpdateCancels >= maxLiteMapUpdateAttempts {
|
||||
// Not making progress
|
||||
c.mu.Unlock()
|
||||
c.cancelMapSafely()
|
||||
return
|
||||
}
|
||||
|
||||
// Increment our cancel counter and continue below to start a
|
||||
// new lite update.
|
||||
c.liteMapUpdateCancels++
|
||||
select {
|
||||
case c.updateCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
// Otherwise, send a lite update that doesn't keep a
|
||||
// long-running stream response.
|
||||
defer c.mu.Unlock()
|
||||
c.inLiteMapUpdate = true
|
||||
ctx, cancel := context.WithTimeout(c.mapCtx, 10*time.Second)
|
||||
c.liteMapUpdateCancel = cancel
|
||||
go func() {
|
||||
defer cancel()
|
||||
t0 := c.clock.Now()
|
||||
err := c.direct.SendLiteMapUpdate(ctx)
|
||||
d := time.Since(t0).Round(time.Millisecond)
|
||||
|
||||
c.mu.Lock()
|
||||
c.inLiteMapUpdate = false
|
||||
c.liteMapUpdateCancel = nil
|
||||
if err == nil {
|
||||
c.liteMapUpdateCancels = 0
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if err == nil {
|
||||
c.logf("[v1] successful lite map update in %v", d)
|
||||
return
|
||||
}
|
||||
if ctx.Err() == nil {
|
||||
c.logf("lite map update after %v: %v", d, err)
|
||||
}
|
||||
if !errors.Is(ctx.Err(), context.Canceled) {
|
||||
// Fall back to restarting the long-polling map
|
||||
// request (the old heavy way) if the lite update
|
||||
// failed for reasons other than the context being
|
||||
// canceled.
|
||||
c.cancelMapSafely()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Auto) cancelAuth() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.authCancel != nil {
|
||||
c.authCancel()
|
||||
}
|
||||
@ -248,9 +290,9 @@ func (c *Auto) cancelAuth() {
|
||||
c.authCtx, c.authCancel = context.WithCancel(context.Background())
|
||||
c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, c.logf)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// cancelMapLocked is like cancelMap, but assumes the caller holds c.mu.
|
||||
func (c *Auto) cancelMapLocked() {
|
||||
if c.mapCancel != nil {
|
||||
c.mapCancel()
|
||||
@ -258,56 +300,33 @@ func (c *Auto) cancelMapLocked() {
|
||||
if !c.closed {
|
||||
c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
|
||||
c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, c.logf)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Auto) cancelMapUnsafely() {
|
||||
c.mu.Lock()
|
||||
c.cancelMapLocked()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Auto) cancelMapSafely() {
|
||||
// cancelMap cancels the existing mapPoll and liteUpdates.
|
||||
func (c *Auto) cancelMap() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.cancelMapLocked()
|
||||
}
|
||||
|
||||
// Always reset our lite map cancels counter if we're canceling
|
||||
// everything, since we're about to restart with a new map update; this
|
||||
// allows future calls to sendNewMapRequest to retry sending lite
|
||||
// updates.
|
||||
c.liteMapUpdateCancels = 0
|
||||
// restartMap cancels the existing mapPoll and liteUpdates, and then starts a
|
||||
// new one.
|
||||
func (c *Auto) restartMap() {
|
||||
c.cancelMap()
|
||||
|
||||
c.logf("[v1] cancelMapSafely: synced=%v", c.synced)
|
||||
c.logf("[v1] restartMap: synced=%v", c.synced)
|
||||
|
||||
if c.inPollNetMap {
|
||||
// received at least one netmap since the last
|
||||
// interruption. That means the server has already
|
||||
// fully processed our last request, which might
|
||||
// include UpdateEndpoints(). Interrupt it and try
|
||||
// again.
|
||||
c.cancelMapLocked()
|
||||
} else {
|
||||
// !synced means we either haven't done a netmap
|
||||
// request yet, or it hasn't answered yet. So the
|
||||
// server is in an undefined state. If we send
|
||||
// another netmap request too soon, it might race
|
||||
// with the last one, and if we're very unlucky,
|
||||
// the new request will be applied before the old one,
|
||||
// and the wrong endpoints will get registered. We
|
||||
// have to tell the client to abort politely, only
|
||||
// after it receives a response to its existing netmap
|
||||
// request.
|
||||
select {
|
||||
case c.newMapCh <- struct{}{}:
|
||||
c.logf("[v1] cancelMapSafely: wrote to channel")
|
||||
default:
|
||||
// if channel write failed, then there was already
|
||||
// an outstanding newMapCh request. One is enough,
|
||||
// since it'll always use the latest endpoints.
|
||||
c.logf("[v1] cancelMapSafely: channel was full")
|
||||
}
|
||||
select {
|
||||
case c.newMapCh <- struct{}{}:
|
||||
c.logf("[v1] restartMap: wrote to channel")
|
||||
default:
|
||||
// if channel write failed, then there was already
|
||||
// an outstanding newMapCh request. One is enough,
|
||||
// since it'll always use the latest endpoints.
|
||||
c.logf("[v1] restartMap: channel was full")
|
||||
}
|
||||
c.updateControl()
|
||||
}
|
||||
|
||||
func (c *Auto) authRoutine() {
|
||||
@ -428,7 +447,7 @@ func (c *Auto) authRoutine() {
|
||||
c.mu.Unlock()
|
||||
|
||||
c.sendStatus("authRoutine-success", nil, "", nil)
|
||||
c.cancelMapSafely()
|
||||
c.restartMap()
|
||||
bo.BackOff(ctx, nil)
|
||||
}
|
||||
}
|
||||
@ -458,25 +477,19 @@ func (c *Auto) unpausedChanLocked() <-chan struct{} {
|
||||
return unpaused
|
||||
}
|
||||
|
||||
// mapRoutine is responsible for keeping a read-only streaming connection to the
|
||||
// control server, and keeping the netmap up to date.
|
||||
func (c *Auto) mapRoutine() {
|
||||
defer close(c.mapDone)
|
||||
bo := backoff.NewBackoff("mapRoutine", c.logf, 30*time.Second)
|
||||
|
||||
for {
|
||||
c.mu.Lock()
|
||||
if c.paused {
|
||||
unpaused := c.unpausedChanLocked()
|
||||
c.mu.Unlock()
|
||||
c.logf("mapRoutine: awaiting unpause")
|
||||
select {
|
||||
case <-unpaused:
|
||||
c.logf("mapRoutine: unpaused")
|
||||
case <-c.quit:
|
||||
c.logf("mapRoutine: quit")
|
||||
return
|
||||
}
|
||||
continue
|
||||
if err := c.waitUnpause("mapRoutine"); err != nil {
|
||||
c.logf("mapRoutine: exiting")
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.logf("[v1] mapRoutine: %s", c.state)
|
||||
loggedIn := c.loggedIn
|
||||
ctx := c.mapCtx
|
||||
@ -513,43 +526,21 @@ func (c *Auto) mapRoutine() {
|
||||
c.logf("[v1] mapRoutine: new map needed while idle.")
|
||||
}
|
||||
} else {
|
||||
// Be sure this is false when we're not inside
|
||||
// PollNetMap, so that cancelMapSafely() can notify
|
||||
// us correctly.
|
||||
c.mu.Lock()
|
||||
c.inPollNetMap = false
|
||||
c.mu.Unlock()
|
||||
health.SetInPollNetMap(false)
|
||||
|
||||
err := c.direct.PollNetMap(ctx, func(nm *netmap.NetworkMap) {
|
||||
health.SetInPollNetMap(true)
|
||||
|
||||
c.mu.Lock()
|
||||
|
||||
select {
|
||||
case <-c.newMapCh:
|
||||
c.logf("[v1] mapRoutine: new map request during PollNetMap. canceling.")
|
||||
c.cancelMapLocked()
|
||||
|
||||
// Don't emit this netmap; we're
|
||||
// about to request a fresh one.
|
||||
c.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
c.synced = true
|
||||
c.inPollNetMap = true
|
||||
if c.loggedIn {
|
||||
c.state = StateSynchronized
|
||||
}
|
||||
exp := nm.Expiry
|
||||
c.expiry = &exp
|
||||
c.expiry = ptr.To(nm.Expiry)
|
||||
stillAuthed := c.loggedIn
|
||||
state := c.state
|
||||
|
||||
c.logf("[v1] mapRoutine: netmap received: %s", c.state)
|
||||
c.mu.Unlock()
|
||||
|
||||
c.logf("[v1] mapRoutine: netmap received: %s", state)
|
||||
if stillAuthed {
|
||||
c.sendStatus("mapRoutine-got-netmap", nil, "", nm)
|
||||
}
|
||||
@ -560,7 +551,6 @@ func (c *Auto) mapRoutine() {
|
||||
health.SetInPollNetMap(false)
|
||||
c.mu.Lock()
|
||||
c.synced = false
|
||||
c.inPollNetMap = false
|
||||
if c.state == StateSynchronized {
|
||||
c.state = StateAuthenticated
|
||||
}
|
||||
@ -602,7 +592,7 @@ func (c *Auto) SetHostinfo(hi *tailcfg.Hostinfo) {
|
||||
}
|
||||
|
||||
// Send new Hostinfo to server
|
||||
c.sendNewMapRequest()
|
||||
c.updateControl()
|
||||
}
|
||||
|
||||
func (c *Auto) SetNetInfo(ni *tailcfg.NetInfo) {
|
||||
@ -614,12 +604,17 @@ func (c *Auto) SetNetInfo(ni *tailcfg.NetInfo) {
|
||||
}
|
||||
|
||||
// Send new NetInfo to server
|
||||
c.sendNewMapRequest()
|
||||
c.updateControl()
|
||||
}
|
||||
|
||||
// SetTKAHead updates the TKA head hash that map-request infrastructure sends.
|
||||
func (c *Auto) SetTKAHead(headHash string) {
|
||||
c.direct.SetTKAHead(headHash)
|
||||
if !c.direct.SetTKAHead(headHash) {
|
||||
return
|
||||
}
|
||||
|
||||
// Send new TKAHead to server
|
||||
c.updateControl()
|
||||
}
|
||||
|
||||
func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkMap) {
|
||||
@ -728,7 +723,7 @@ func (c *Auto) SetExpirySooner(ctx context.Context, expiry time.Time) error {
|
||||
func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
|
||||
changed := c.direct.SetEndpoints(endpoints)
|
||||
if changed {
|
||||
c.sendNewMapRequest()
|
||||
c.updateControl()
|
||||
}
|
||||
}
|
||||
|
||||
@ -750,8 +745,9 @@ func (c *Auto) Shutdown() {
|
||||
close(c.quit)
|
||||
c.cancelAuth()
|
||||
<-c.authDone
|
||||
c.cancelMapUnsafely()
|
||||
c.cancelMap()
|
||||
<-c.mapDone
|
||||
<-c.updateDone
|
||||
if direct != nil {
|
||||
direct.Close()
|
||||
}
|
||||
|
Reference in New Issue
Block a user