diff --git a/cmd/containerboot/healthz.go b/cmd/containerboot/healthz.go index 895290733..6d03bd6d3 100644 --- a/cmd/containerboot/healthz.go +++ b/cmd/containerboot/healthz.go @@ -6,9 +6,12 @@ package main import ( + "fmt" "log" "net/http" "sync" + + "tailscale.com/kube/kubetypes" ) // healthz is a simple health check server, if enabled it returns 200 OK if @@ -17,6 +20,7 @@ type healthz struct { sync.Mutex hasAddrs bool + podIPv4 string } func (h *healthz) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -24,7 +28,10 @@ func (h *healthz) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer h.Unlock() if h.hasAddrs { - w.Write([]byte("ok")) + w.Header().Add(kubetypes.PodIPv4Header, h.podIPv4) + if _, err := w.Write([]byte("ok")); err != nil { + http.Error(w, fmt.Sprintf("error writing status: %v", err), http.StatusInternalServerError) + } } else { http.Error(w, "node currently has no tailscale IPs", http.StatusServiceUnavailable) } @@ -43,8 +50,8 @@ func (h *healthz) update(healthy bool) { // healthHandlers registers a simple health handler at /healthz. // A containerized tailscale instance is considered healthy if // it has at least one tailnet IP address. -func healthHandlers(mux *http.ServeMux) *healthz { - h := &healthz{} +func healthHandlers(mux *http.ServeMux, podIPv4 string) *healthz { + h := &healthz{podIPv4: podIPv4} mux.Handle("GET /healthz", h) return h } diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index 895be108b..0aca27f5f 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -191,17 +191,18 @@ func main() { defer killTailscaled() var healthCheck *healthz + ep := &egressProxy{} if cfg.HealthCheckAddrPort != "" { mux := http.NewServeMux() log.Printf("Running healthcheck endpoint at %s/healthz", cfg.HealthCheckAddrPort) - healthCheck = healthHandlers(mux) + healthCheck = healthHandlers(mux, cfg.PodIPv4) close := runHTTPServer(mux, cfg.HealthCheckAddrPort) defer close() } - if cfg.localMetricsEnabled() || cfg.localHealthEnabled() { + if cfg.localMetricsEnabled() || cfg.localHealthEnabled() || cfg.egressSvcsTerminateEPEnabled() { mux := http.NewServeMux() if cfg.localMetricsEnabled() { @@ -211,7 +212,11 @@ func main() { if cfg.localHealthEnabled() { log.Printf("Running healthcheck endpoint at %s/healthz", cfg.LocalAddrPort) - healthCheck = healthHandlers(mux) + healthCheck = healthHandlers(mux, cfg.PodIPv4) + } + if cfg.EgressProxiesCfgPath != "" { + log.Printf("Running preshutdown hook at %s%s", cfg.LocalAddrPort, kubetypes.EgessServicesPreshutdownEP) + ep.registerHandlers(mux) } close := runHTTPServer(mux, cfg.LocalAddrPort) @@ -639,20 +644,21 @@ func main() { // will then continuously monitor the config file and netmap updates and // reconfigure the firewall rules as needed. If any of its operations fail, it // will crash this node. - if cfg.EgressSvcsCfgPath != "" { - log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressSvcsCfgPath) + if cfg.EgressProxiesCfgPath != "" { + log.Printf("configuring egress proxy using configuration file at %s", cfg.EgressProxiesCfgPath) egressSvcsNotify = make(chan ipn.Notify) - ep := egressProxy{ - cfgPath: cfg.EgressSvcsCfgPath, + opts := egressProxyRunOpts{ + cfgPath: cfg.EgressProxiesCfgPath, nfr: nfr, kc: kc, + tsClient: client, stateSecret: cfg.KubeSecret, netmapChan: egressSvcsNotify, podIPv4: cfg.PodIPv4, tailnetAddrs: addrs, } go func() { - if err := ep.run(ctx, n); err != nil { + if err := ep.run(ctx, n, opts); err != nil { egressSvcsErrorChan <- err } }() diff --git a/cmd/containerboot/main_test.go b/cmd/containerboot/main_test.go index dacfb5bc6..c8066f2c1 100644 --- a/cmd/containerboot/main_test.go +++ b/cmd/containerboot/main_test.go @@ -32,6 +32,8 @@ "golang.org/x/sys/unix" "tailscale.com/ipn" "tailscale.com/kube/egressservices" + "tailscale.com/kube/kubeclient" + "tailscale.com/kube/kubetypes" "tailscale.com/tailcfg" "tailscale.com/tstest" "tailscale.com/types/netmap" @@ -54,20 +56,9 @@ func TestContainerBoot(t *testing.T) { defer kube.Close() tailscaledConf := &ipn.ConfigVAlpha{AuthKey: ptr.To("foo"), Version: "alpha0"} - tailscaledConfBytes, err := json.Marshal(tailscaledConf) - if err != nil { - t.Fatalf("error unmarshaling tailscaled config: %v", err) - } serveConf := ipn.ServeConfig{TCP: map[uint16]*ipn.TCPPortHandler{80: {HTTP: true}}} - serveConfBytes, err := json.Marshal(serveConf) - if err != nil { - t.Fatalf("error unmarshaling serve config: %v", err) - } - egressSvcsCfg := egressservices.Configs{"foo": {TailnetTarget: egressservices.TailnetTarget{FQDN: "foo.tailnetxyx.ts.net"}}} - egressSvcsCfgBytes, err := json.Marshal(egressSvcsCfg) - if err != nil { - t.Fatalf("error unmarshaling egress services config: %v", err) - } + egressCfg := egressSvcConfig("foo", "foo.tailnetxyz.ts.net") + egressStatus := egressSvcStatus("foo", "foo.tailnetxyz.ts.net") dirs := []string{ "var/lib", @@ -84,16 +75,17 @@ func TestContainerBoot(t *testing.T) { } } files := map[string][]byte{ - "usr/bin/tailscaled": fakeTailscaled, - "usr/bin/tailscale": fakeTailscale, - "usr/bin/iptables": fakeTailscale, - "usr/bin/ip6tables": fakeTailscale, - "dev/net/tun": []byte(""), - "proc/sys/net/ipv4/ip_forward": []byte("0"), - "proc/sys/net/ipv6/conf/all/forwarding": []byte("0"), - "etc/tailscaled/cap-95.hujson": tailscaledConfBytes, - "etc/tailscaled/serve-config.json": serveConfBytes, - "etc/tailscaled/egress-services-config.json": egressSvcsCfgBytes, + "usr/bin/tailscaled": fakeTailscaled, + "usr/bin/tailscale": fakeTailscale, + "usr/bin/iptables": fakeTailscale, + "usr/bin/ip6tables": fakeTailscale, + "dev/net/tun": []byte(""), + "proc/sys/net/ipv4/ip_forward": []byte("0"), + "proc/sys/net/ipv6/conf/all/forwarding": []byte("0"), + "etc/tailscaled/cap-95.hujson": mustJSON(t, tailscaledConf), + "etc/tailscaled/serve-config.json": mustJSON(t, serveConf), + filepath.Join("etc/tailscaled/", egressservices.KeyEgressServices): mustJSON(t, egressCfg), + filepath.Join("etc/tailscaled/", egressservices.KeyHEPPings): []byte("4"), } resetFiles := func() { for path, content := range files { @@ -132,6 +124,9 @@ func TestContainerBoot(t *testing.T) { healthURL := func(port int) string { return fmt.Sprintf("http://127.0.0.1:%d/healthz", port) } + egressSvcTerminateURL := func(port int) string { + return fmt.Sprintf("http://127.0.0.1:%d%s", port, kubetypes.EgessServicesPreshutdownEP) + } capver := fmt.Sprintf("%d", tailcfg.CurrentCapabilityVersion) @@ -896,9 +891,10 @@ type phase struct { { Name: "egress_svcs_config_kube", Env: map[string]string{ - "KUBERNETES_SERVICE_HOST": kube.Host, - "KUBERNETES_SERVICE_PORT_HTTPS": kube.Port, - "TS_EGRESS_SERVICES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled/egress-services-config.json"), + "KUBERNETES_SERVICE_HOST": kube.Host, + "KUBERNETES_SERVICE_PORT_HTTPS": kube.Port, + "TS_EGRESS_PROXIES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled"), + "TS_LOCAL_ADDR_PORT": fmt.Sprintf("[::]:%d", localAddrPort), }, KubeSecret: map[string]string{ "authkey": "tskey-key", @@ -912,28 +908,35 @@ type phase struct { WantKubeSecret: map[string]string{ "authkey": "tskey-key", }, + EndpointStatuses: map[string]int{ + egressSvcTerminateURL(localAddrPort): 200, + }, }, { Notify: runningNotify, WantKubeSecret: map[string]string{ + "egress-services": mustBase64(t, egressStatus), "authkey": "tskey-key", "device_fqdn": "test-node.test.ts.net", "device_id": "myID", "device_ips": `["100.64.0.1"]`, "tailscale_capver": capver, }, + EndpointStatuses: map[string]int{ + egressSvcTerminateURL(localAddrPort): 200, + }, }, }, }, { Name: "egress_svcs_config_no_kube", Env: map[string]string{ - "TS_EGRESS_SERVICES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled/egress-services-config.json"), - "TS_AUTHKEY": "tskey-key", + "TS_EGRESS_PROXIES_CONFIG_PATH": filepath.Join(d, "etc/tailscaled"), + "TS_AUTHKEY": "tskey-key", }, Phases: []phase{ { - WantFatalLog: "TS_EGRESS_SERVICES_CONFIG_PATH is only supported for Tailscale running on Kubernetes", + WantFatalLog: "TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes", }, }, }, @@ -1394,13 +1397,31 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) { panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs))) } for _, op := range req { - if op.Op != "remove" { + if op.Op == "remove" { + if !strings.HasPrefix(op.Path, "/data/") { + panic(fmt.Sprintf("unsupported json-patch path %q", op.Path)) + } + delete(k.secret, strings.TrimPrefix(op.Path, "/data/")) + } else if op.Op == "replace" { + path, ok := strings.CutPrefix(op.Path, "/data/") + if !ok { + panic(fmt.Sprintf("unsupported json-patch path %q", op.Path)) + } + req := make([]kubeclient.JSONPatch, 0) + if err := json.Unmarshal(bs, &req); err != nil { + panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs))) + } + + for _, patch := range req { + val, ok := patch.Value.(string) + if !ok { + panic(fmt.Sprintf("unsupported json patch value %v: cannot be converted to string", patch.Value)) + } + k.secret[path] = val + } + } else { panic(fmt.Sprintf("unsupported json-patch op %q", op.Op)) } - if !strings.HasPrefix(op.Path, "/data/") { - panic(fmt.Sprintf("unsupported json-patch path %q", op.Path)) - } - delete(k.secret, strings.TrimPrefix(op.Path, "/data/")) } case "application/strategic-merge-patch+json": req := struct { @@ -1419,3 +1440,41 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) { panic(fmt.Sprintf("unhandled HTTP method %q", r.Method)) } } + +func mustBase64(t *testing.T, v any) string { + b := mustJSON(t, v) + s := base64.StdEncoding.WithPadding('=').EncodeToString(b) + return s +} + +func mustJSON(t *testing.T, v any) []byte { + b, err := json.Marshal(v) + if err != nil { + t.Fatalf("error converting %v to json: %v", v, err) + } + return b +} + +// egress services status given one named tailnet target specified by FQDN. As written by the proxy to its state Secret. +func egressSvcStatus(name, fqdn string) egressservices.Status { + return egressservices.Status{ + Services: map[string]*egressservices.ServiceStatus{ + name: { + TailnetTarget: egressservices.TailnetTarget{ + FQDN: fqdn, + }, + }, + }, + } +} + +// egress config given one named tailnet target specified by FQDN. +func egressSvcConfig(name, fqdn string) egressservices.Configs { + return egressservices.Configs{ + name: egressservices.Config{ + TailnetTarget: egressservices.TailnetTarget{ + FQDN: fqdn, + }, + }, + } +} diff --git a/cmd/containerboot/services.go b/cmd/containerboot/services.go index aed00250d..177cb2d50 100644 --- a/cmd/containerboot/services.go +++ b/cmd/containerboot/services.go @@ -11,18 +11,24 @@ "errors" "fmt" "log" + "net/http" "net/netip" "os" "path/filepath" "reflect" + "strconv" "strings" "time" "github.com/fsnotify/fsnotify" + "tailscale.com/client/tailscale" "tailscale.com/ipn" "tailscale.com/kube/egressservices" "tailscale.com/kube/kubeclient" + "tailscale.com/kube/kubetypes" + "tailscale.com/syncs" "tailscale.com/tailcfg" + "tailscale.com/util/httpm" "tailscale.com/util/linuxfw" "tailscale.com/util/mak" ) @@ -37,13 +43,15 @@ // egressProxy knows how to configure firewall rules to route cluster traffic to // one or more tailnet services. type egressProxy struct { - cfgPath string // path to egress service config file + cfgPath string // path to a directory with egress services config files nfr linuxfw.NetfilterRunner // never nil kc kubeclient.Client // never nil stateSecret string // name of the kube state Secret + tsClient *tailscale.LocalClient // never nil + netmapChan chan ipn.Notify // chan to receive netmap updates on podIPv4 string // never empty string, currently only IPv4 is supported @@ -55,15 +63,29 @@ type egressProxy struct { // memory at all. targetFQDNs map[string][]netip.Prefix - // used to configure firewall rules. - tailnetAddrs []netip.Prefix + tailnetAddrs []netip.Prefix // tailnet IPs of this tailnet device + + // shortSleep is the backoff sleep between healthcheck endpoint calls - can be overridden in tests. + shortSleep time.Duration + // longSleep is the time to sleep after the routing rules are updated to increase the chance that kube + // proxies on all nodes have updated their routing configuration. It can be configured to 0 in + // tests. + longSleep time.Duration + // client is a client that can send HTTP requests. + client httpClient +} + +// httpClient is a client that can send HTTP requests and can be mocked in tests. +type httpClient interface { + Do(*http.Request) (*http.Response, error) } // run configures egress proxy firewall rules and ensures that the firewall rules are reconfigured when: // - the mounted egress config has changed // - the proxy's tailnet IP addresses have changed // - tailnet IPs have changed for any backend targets specified by tailnet FQDN -func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error { +func (ep *egressProxy) run(ctx context.Context, n ipn.Notify, opts egressProxyRunOpts) error { + ep.configure(opts) var tickChan <-chan time.Time var eventChan <-chan fsnotify.Event // TODO (irbekrm): take a look if this can be pulled into a single func @@ -75,7 +97,7 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error { tickChan = ticker.C } else { defer w.Close() - if err := w.Add(filepath.Dir(ep.cfgPath)); err != nil { + if err := w.Add(ep.cfgPath); err != nil { return fmt.Errorf("failed to add fsnotify watch: %w", err) } eventChan = w.Events @@ -85,28 +107,52 @@ func (ep *egressProxy) run(ctx context.Context, n ipn.Notify) error { return err } for { - var err error select { case <-ctx.Done(): return nil case <-tickChan: - err = ep.sync(ctx, n) + log.Printf("periodic sync, ensuring firewall config is up to date...") case <-eventChan: log.Printf("config file change detected, ensuring firewall config is up to date...") - err = ep.sync(ctx, n) case n = <-ep.netmapChan: shouldResync := ep.shouldResync(n) - if shouldResync { - log.Printf("netmap change detected, ensuring firewall config is up to date...") - err = ep.sync(ctx, n) + if !shouldResync { + continue } + log.Printf("netmap change detected, ensuring firewall config is up to date...") } - if err != nil { + if err := ep.sync(ctx, n); err != nil { return fmt.Errorf("error syncing egress service config: %w", err) } } } +type egressProxyRunOpts struct { + cfgPath string + nfr linuxfw.NetfilterRunner + kc kubeclient.Client + tsClient *tailscale.LocalClient + stateSecret string + netmapChan chan ipn.Notify + podIPv4 string + tailnetAddrs []netip.Prefix +} + +// applyOpts configures egress proxy using the provided options. +func (ep *egressProxy) configure(opts egressProxyRunOpts) { + ep.cfgPath = opts.cfgPath + ep.nfr = opts.nfr + ep.kc = opts.kc + ep.tsClient = opts.tsClient + ep.stateSecret = opts.stateSecret + ep.netmapChan = opts.netmapChan + ep.podIPv4 = opts.podIPv4 + ep.tailnetAddrs = opts.tailnetAddrs + ep.client = &http.Client{} // default HTTP client + ep.shortSleep = time.Second + ep.longSleep = time.Second * 10 +} + // sync triggers an egress proxy config resync. The resync calculates the diff between config and status to determine if // any firewall rules need to be updated. Currently using status in state Secret as a reference for what is the current // firewall configuration is good enough because - the status is keyed by the Pod IP - we crash the Pod on errors such @@ -327,7 +373,8 @@ func (ep *egressProxy) deleteUnnecessaryServices(cfgs *egressservices.Configs, s // getConfigs gets the mounted egress service configuration. func (ep *egressProxy) getConfigs() (*egressservices.Configs, error) { - j, err := os.ReadFile(ep.cfgPath) + svcsCfg := filepath.Join(ep.cfgPath, egressservices.KeyEgressServices) + j, err := os.ReadFile(svcsCfg) if os.IsNotExist(err) { return nil, nil } @@ -569,3 +616,142 @@ func servicesStatusIsEqual(st, st1 *egressservices.Status) bool { st1.PodIPv4 = "" return reflect.DeepEqual(*st, *st1) } + +// registerHandlers adds a new handler to the provided ServeMux that can be called as a Kubernetes prestop hook to +// delay shutdown till it's safe to do so. +func (ep *egressProxy) registerHandlers(mux *http.ServeMux) { + mux.Handle(fmt.Sprintf("GET %s", kubetypes.EgessServicesPreshutdownEP), ep) +} + +// ServeHTTP serves /internal-egress-services-preshutdown endpoint, when it receives a request, it periodically polls +// the configured health check endpoint for each egress service till it the health check endpoint no longer hits this +// proxy Pod. It uses the Pod-IPv4 header to verify if health check response is received from this Pod. +func (ep *egressProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + cfgs, err := ep.getConfigs() + if err != nil { + http.Error(w, fmt.Sprintf("error retrieving egress services configs: %v", err), http.StatusInternalServerError) + return + } + if cfgs == nil { + if _, err := w.Write([]byte("safe to terminate")); err != nil { + http.Error(w, fmt.Sprintf("error writing termination status: %v", err), http.StatusInternalServerError) + return + } + } + hp, err := ep.getHEPPings() + if err != nil { + http.Error(w, fmt.Sprintf("error determining the number of times health check endpoint should be pinged: %v", err), http.StatusInternalServerError) + return + } + ep.waitTillSafeToShutdown(r.Context(), cfgs, hp) +} + +// waitTillSafeToShutdown looks up all egress targets configured to be proxied via this instance and, for each target +// whose configuration includes a healthcheck endpoint, pings the endpoint till none of the responses +// are returned by this instance or till the HTTP request times out. In practice, the endpoint will be a Kubernetes Service for whom one of the backends +// would normally be this Pod. When this Pod is being deleted, the operator should have removed it from the Service +// backends and eventually kube proxy routing rules should be updated to no longer route traffic for the Service to this +// Pod. +func (ep *egressProxy) waitTillSafeToShutdown(ctx context.Context, cfgs *egressservices.Configs, hp int) { + if cfgs == nil || len(*cfgs) == 0 { // avoid sleeping if no services are configured + return + } + log.Printf("Ensuring that cluster traffic for egress targets is no longer routed via this Pod...") + wg := syncs.WaitGroup{} + + for s, cfg := range *cfgs { + hep := cfg.HealthCheckEndpoint + if hep == "" { + log.Printf("Tailnet target %q does not have a cluster healthcheck specified, unable to verify if cluster traffic for the target is still routed via this Pod", s) + continue + } + svc := s + wg.Go(func() { + log.Printf("Ensuring that cluster traffic is no longer routed to %q via this Pod...", svc) + for { + if ctx.Err() != nil { // kubelet's HTTP request timeout + log.Printf("Cluster traffic for %s did not stop being routed to this Pod.", svc) + return + } + found, err := lookupPodRoute(ctx, hep, ep.podIPv4, hp, ep.client) + if err != nil { + log.Printf("unable to reach endpoint %q, assuming the routing rules for this Pod have been deleted: %v", hep, err) + break + } + if !found { + log.Printf("service %q is no longer routed through this Pod", svc) + break + } + log.Printf("service %q is still routed through this Pod, waiting...", svc) + time.Sleep(ep.shortSleep) + } + }) + } + wg.Wait() + // The check above really only checked that the routing rules are updated on this node. Sleep for a bit to + // ensure that the routing rules are updated on other nodes. TODO(irbekrm): this may or may not be good enough. + // If it's not good enough, we'd probably want to do something more complex, where the proxies check each other. + log.Printf("Sleeping for %s before shutdown to ensure that kube proxies on all nodes have updated routing configuration", ep.longSleep) + time.Sleep(ep.longSleep) +} + +// lookupPodRoute calls the healthcheck endpoint repeat times and returns true if the endpoint returns with the podIP +// header at least once. +func lookupPodRoute(ctx context.Context, hep, podIP string, repeat int, client httpClient) (bool, error) { + for range repeat { + f, err := lookup(ctx, hep, podIP, client) + if err != nil { + return false, err + } + if f { + return true, nil + } + } + return false, nil +} + +// lookup calls the healthcheck endpoint and returns true if the response contains the podIP header. +func lookup(ctx context.Context, hep, podIP string, client httpClient) (bool, error) { + req, err := http.NewRequestWithContext(ctx, httpm.GET, hep, nil) + if err != nil { + return false, fmt.Errorf("error creating new HTTP request: %v", err) + } + + // Close the TCP connection to ensure that the next request is routed to a different backend. + req.Close = true + + resp, err := client.Do(req) + if err != nil { + log.Printf("Endpoint %q can not be reached: %v, likely because there are no (more) healthy backends", hep, err) + return true, nil + } + defer resp.Body.Close() + gotIP := resp.Header.Get(kubetypes.PodIPv4Header) + return strings.EqualFold(podIP, gotIP), nil +} + +// getHEPPings gets the number of pings that should be sent to a health check endpoint to ensure that each configured +// backend is hit. This assumes that a health check endpoint is a Kubernetes Service and traffic to backend Pods is +// round robin load balanced. +func (ep *egressProxy) getHEPPings() (int, error) { + hepPingsPath := filepath.Join(ep.cfgPath, egressservices.KeyHEPPings) + j, err := os.ReadFile(hepPingsPath) + if os.IsNotExist(err) { + return 0, nil + } + if err != nil { + return -1, err + } + if len(j) == 0 || string(j) == "" { + return 0, nil + } + hp, err := strconv.Atoi(string(j)) + if err != nil { + return -1, fmt.Errorf("error parsing hep pings as int: %v", err) + } + if hp < 0 { + log.Printf("[unexpected] hep pings is negative: %d", hp) + return 0, nil + } + return hp, nil +} diff --git a/cmd/containerboot/services_test.go b/cmd/containerboot/services_test.go index 46f6db1cf..724626b07 100644 --- a/cmd/containerboot/services_test.go +++ b/cmd/containerboot/services_test.go @@ -6,11 +6,18 @@ package main import ( + "context" + "fmt" + "io" + "net/http" "net/netip" "reflect" + "strings" + "sync" "testing" "tailscale.com/kube/egressservices" + "tailscale.com/kube/kubetypes" ) func Test_updatesForSvc(t *testing.T) { @@ -173,3 +180,145 @@ func Test_updatesForSvc(t *testing.T) { }) } } + +// A failure of this test will most likely look like a timeout. +func TestWaitTillSafeToShutdown(t *testing.T) { + podIP := "10.0.0.1" + anotherIP := "10.0.0.2" + + tests := []struct { + name string + // services is a map of service name to the number of calls to make to the healthcheck endpoint before + // returning a response that does NOT contain this Pod's IP in headers. + services map[string]int + replicas int + healthCheckSet bool + }{ + { + name: "no_configs", + }, + { + name: "one_service_immediately_safe_to_shutdown", + services: map[string]int{ + "svc1": 0, + }, + replicas: 2, + healthCheckSet: true, + }, + { + name: "multiple_services_immediately_safe_to_shutdown", + services: map[string]int{ + "svc1": 0, + "svc2": 0, + "svc3": 0, + }, + replicas: 2, + healthCheckSet: true, + }, + { + name: "multiple_services_no_healthcheck_endpoints", + services: map[string]int{ + "svc1": 0, + "svc2": 0, + "svc3": 0, + }, + replicas: 2, + }, + { + name: "one_service_eventually_safe_to_shutdown", + services: map[string]int{ + "svc1": 3, // After 3 calls to health check endpoint, no longer returns this Pod's IP + }, + replicas: 2, + healthCheckSet: true, + }, + { + name: "multiple_services_eventually_safe_to_shutdown", + services: map[string]int{ + "svc1": 1, // After 1 call to health check endpoint, no longer returns this Pod's IP + "svc2": 3, // After 3 calls to health check endpoint, no longer returns this Pod's IP + "svc3": 5, // After 5 calls to the health check endpoint, no longer returns this Pod's IP + }, + replicas: 2, + healthCheckSet: true, + }, + { + name: "multiple_services_eventually_safe_to_shutdown_with_higher_replica_count", + services: map[string]int{ + "svc1": 7, + "svc2": 10, + }, + replicas: 5, + healthCheckSet: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfgs := &egressservices.Configs{} + switches := make(map[string]int) + + for svc, callsToSwitch := range tt.services { + endpoint := fmt.Sprintf("http://%s.local", svc) + if tt.healthCheckSet { + (*cfgs)[svc] = egressservices.Config{ + HealthCheckEndpoint: endpoint, + } + } + switches[endpoint] = callsToSwitch + } + + ep := &egressProxy{ + podIPv4: podIP, + client: &mockHTTPClient{ + podIP: podIP, + anotherIP: anotherIP, + switches: switches, + }, + } + + ep.waitTillSafeToShutdown(context.Background(), cfgs, tt.replicas) + }) + } +} + +// mockHTTPClient is a client that receives an HTTP call for an egress service endpoint and returns a response with an +// IP address in a 'Pod-IPv4' header. It can be configured to return one IP address for N calls, then switch to another +// IP address to simulate a scenario where an IP is eventually no longer a backend for an endpoint. +// TODO(irbekrm): to test this more thoroughly, we should have the client take into account the number of replicas and +// return as if traffic was round robin load balanced across different Pods. +type mockHTTPClient struct { + // podIP - initial IP address to return, that matches the current proxy's IP address. + podIP string + anotherIP string + // after how many calls to an endpoint, the client should start returning 'anotherIP' instead of 'podIP. + switches map[string]int + mu sync.Mutex // protects the following + // calls tracks the number of calls received. + calls map[string]int +} + +func (m *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { + m.mu.Lock() + if m.calls == nil { + m.calls = make(map[string]int) + } + + endpoint := req.URL.String() + m.calls[endpoint]++ + calls := m.calls[endpoint] + m.mu.Unlock() + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader("")), + } + + if calls <= m.switches[endpoint] { + resp.Header.Set(kubetypes.PodIPv4Header, m.podIP) // Pod is still routable + } else { + resp.Header.Set(kubetypes.PodIPv4Header, m.anotherIP) // Pod is no longer routable + } + return resp, nil +} diff --git a/cmd/containerboot/settings.go b/cmd/containerboot/settings.go index 5fc6cc3f0..0da18e52c 100644 --- a/cmd/containerboot/settings.go +++ b/cmd/containerboot/settings.go @@ -64,16 +64,16 @@ type settings struct { // when setting up rules to proxy cluster traffic to cluster ingress // target. // Deprecated: use PodIPv4, PodIPv6 instead to support dual stack clusters - PodIP string - PodIPv4 string - PodIPv6 string - PodUID string - HealthCheckAddrPort string - LocalAddrPort string - MetricsEnabled bool - HealthCheckEnabled bool - DebugAddrPort string - EgressSvcsCfgPath string + PodIP string + PodIPv4 string + PodIPv6 string + PodUID string + HealthCheckAddrPort string + LocalAddrPort string + MetricsEnabled bool + HealthCheckEnabled bool + DebugAddrPort string + EgressProxiesCfgPath string } func configFromEnv() (*settings, error) { @@ -107,7 +107,7 @@ func configFromEnv() (*settings, error) { MetricsEnabled: defaultBool("TS_ENABLE_METRICS", false), HealthCheckEnabled: defaultBool("TS_ENABLE_HEALTH_CHECK", false), DebugAddrPort: defaultEnv("TS_DEBUG_ADDR_PORT", ""), - EgressSvcsCfgPath: defaultEnv("TS_EGRESS_SERVICES_CONFIG_PATH", ""), + EgressProxiesCfgPath: defaultEnv("TS_EGRESS_PROXIES_CONFIG_PATH", ""), PodUID: defaultEnv("POD_UID", ""), } podIPs, ok := os.LookupEnv("POD_IPS") @@ -186,7 +186,7 @@ func (s *settings) validate() error { return fmt.Errorf("error parsing TS_HEALTHCHECK_ADDR_PORT value %q: %w", s.HealthCheckAddrPort, err) } } - if s.localMetricsEnabled() || s.localHealthEnabled() { + if s.localMetricsEnabled() || s.localHealthEnabled() || s.EgressProxiesCfgPath != "" { if _, err := netip.ParseAddrPort(s.LocalAddrPort); err != nil { return fmt.Errorf("error parsing TS_LOCAL_ADDR_PORT value %q: %w", s.LocalAddrPort, err) } @@ -199,8 +199,8 @@ func (s *settings) validate() error { if s.HealthCheckEnabled && s.HealthCheckAddrPort != "" { return errors.New("TS_HEALTHCHECK_ADDR_PORT is deprecated and will be removed in 1.82.0, use TS_ENABLE_HEALTH_CHECK and optionally TS_LOCAL_ADDR_PORT") } - if s.EgressSvcsCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") { - return errors.New("TS_EGRESS_SERVICES_CONFIG_PATH is only supported for Tailscale running on Kubernetes") + if s.EgressProxiesCfgPath != "" && !(s.InKubernetes && s.KubeSecret != "") { + return errors.New("TS_EGRESS_PROXIES_CONFIG_PATH is only supported for Tailscale running on Kubernetes") } return nil } @@ -291,7 +291,7 @@ func isOneStepConfig(cfg *settings) bool { // as an L3 proxy, proxying to an endpoint provided via one of the config env // vars. func isL3Proxy(cfg *settings) bool { - return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressSvcsCfgPath != "" + return cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" || cfg.AllowProxyingClusterTrafficViaIngress || cfg.EgressProxiesCfgPath != "" } // hasKubeStateStore returns true if the state must be stored in a Kubernetes @@ -308,6 +308,10 @@ func (cfg *settings) localHealthEnabled() bool { return cfg.LocalAddrPort != "" && cfg.HealthCheckEnabled } +func (cfg *settings) egressSvcsTerminateEPEnabled() bool { + return cfg.LocalAddrPort != "" && cfg.EgressProxiesCfgPath != "" +} + // defaultEnv returns the value of the given envvar name, or defVal if // unset. func defaultEnv(name, defVal string) string { diff --git a/cmd/k8s-operator/egress-eps.go b/cmd/k8s-operator/egress-eps.go index 85992abed..3441e12ba 100644 --- a/cmd/k8s-operator/egress-eps.go +++ b/cmd/k8s-operator/egress-eps.go @@ -20,7 +20,6 @@ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" - tsoperator "tailscale.com/k8s-operator" "tailscale.com/kube/egressservices" "tailscale.com/types/ptr" ) @@ -71,25 +70,27 @@ func (er *egressEpsReconciler) Reconcile(ctx context.Context, req reconcile.Requ if err != nil { return res, fmt.Errorf("error retrieving ExternalName Service: %w", err) } - if !tsoperator.EgressServiceIsValidAndConfigured(svc) { - l.Infof("Cluster resources for ExternalName Service %s/%s are not yet configured", svc.Namespace, svc.Name) - return res, nil - } // TODO(irbekrm): currently this reconcile loop runs all the checks every time it's triggered, which is // wasteful. Once we have a Ready condition for ExternalName Services for ProxyGroup, use the condition to // determine if a reconcile is needed. oldEps := eps.DeepCopy() - proxyGroupName := eps.Labels[labelProxyGroup] tailnetSvc := tailnetSvcName(svc) l = l.With("tailnet-service-name", tailnetSvc) // Retrieve the desired tailnet service configuration from the ConfigMap. + proxyGroupName := eps.Labels[labelProxyGroup] _, cfgs, err := egressSvcsConfigs(ctx, er.Client, proxyGroupName, er.tsNamespace) if err != nil { return res, fmt.Errorf("error retrieving tailnet services configuration: %w", err) } + if cfgs == nil { + // TODO(irbekrm): this path would be hit if egress service was once exposed on a ProxyGroup that later + // got deleted. Probably the EndpointSlices then need to be deleted too- need to rethink this flow. + l.Debugf("No egress config found, likely because ProxyGroup has not been created") + return res, nil + } cfg, ok := (*cfgs)[tailnetSvc] if !ok { l.Infof("[unexpected] configuration for tailnet service %s not found", tailnetSvc) diff --git a/cmd/k8s-operator/egress-services.go b/cmd/k8s-operator/egress-services.go index 55003ee91..cf218ba4f 100644 --- a/cmd/k8s-operator/egress-services.go +++ b/cmd/k8s-operator/egress-services.go @@ -59,6 +59,8 @@ maxPorts = 1000 indexEgressProxyGroup = ".metadata.annotations.egress-proxy-group" + + tsHealthCheckPortName = "tailscale-health-check" ) var gaugeEgressServices = clientmetric.NewGauge(kubetypes.MetricEgressServiceCount) @@ -229,15 +231,16 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s found := false for _, wantsPM := range svc.Spec.Ports { if wantsPM.Port == pm.Port && strings.EqualFold(string(wantsPM.Protocol), string(pm.Protocol)) { - // We don't use the port name to distinguish this port internally, but Kubernetes - // require that, for Service ports with more than one name each port is uniquely named. - // So we can always pick the port name from the ExternalName Service as at this point we - // know that those are valid names because Kuberentes already validated it once. Note - // that users could have changed an unnamed port to a named port and might have changed - // port names- this should still work. + // We want to both preserve the user set port names for ease of debugging, but also + // ensure that we name all unnamed ports as the ClusterIP Service that we create will + // always have at least two ports. // https://kubernetes.io/docs/concepts/services-networking/service/#multi-port-services // See also https://github.com/tailscale/tailscale/issues/13406#issuecomment-2507230388 - clusterIPSvc.Spec.Ports[i].Name = wantsPM.Name + if wantsPM.Name != "" { + clusterIPSvc.Spec.Ports[i].Name = wantsPM.Name + } else { + clusterIPSvc.Spec.Ports[i].Name = "tailscale-unnamed" + } found = true break } @@ -252,6 +255,12 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s // ClusterIP Service produce new target port and add a portmapping to // the ClusterIP Service. for _, wantsPM := range svc.Spec.Ports { + // Because we add a healthcheck port of our own, we will always have at least two ports. That + // means that we cannot have ports with name not set. + // https://kubernetes.io/docs/concepts/services-networking/service/#multi-port-services + if wantsPM.Name == "" { + wantsPM.Name = "tailscale-unnamed" + } found := false for _, gotPM := range clusterIPSvc.Spec.Ports { if wantsPM.Port == gotPM.Port && strings.EqualFold(string(wantsPM.Protocol), string(gotPM.Protocol)) { @@ -278,6 +287,25 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s }) } } + var healthCheckPort int32 = defaultLocalAddrPort + + for { + if !slices.ContainsFunc(svc.Spec.Ports, func(p corev1.ServicePort) bool { + return p.Port == healthCheckPort + }) { + break + } + healthCheckPort++ + if healthCheckPort > 10002 { + return nil, false, fmt.Errorf("unable to find a free port for internal health check in range [9002, 10002]") + } + } + clusterIPSvc.Spec.Ports = append(clusterIPSvc.Spec.Ports, corev1.ServicePort{ + Name: tsHealthCheckPortName, + Port: healthCheckPort, + TargetPort: intstr.FromInt(defaultLocalAddrPort), + Protocol: "TCP", + }) if !reflect.DeepEqual(clusterIPSvc, oldClusterIPSvc) { if clusterIPSvc, err = createOrUpdate(ctx, esr.Client, esr.tsNamespace, clusterIPSvc, func(svc *corev1.Service) { svc.Labels = clusterIPSvc.Labels @@ -320,7 +348,7 @@ func (esr *egressSvcsReconciler) provision(ctx context.Context, proxyGroupName s } tailnetSvc := tailnetSvcName(svc) gotCfg := (*cfgs)[tailnetSvc] - wantsCfg := egressSvcCfg(svc, clusterIPSvc) + wantsCfg := egressSvcCfg(svc, clusterIPSvc, esr.tsNamespace, l) if !reflect.DeepEqual(gotCfg, wantsCfg) { l.Debugf("updating egress services ConfigMap %s", cm.Name) mak.Set(cfgs, tailnetSvc, wantsCfg) @@ -504,10 +532,8 @@ func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, s return false, nil } if !tsoperator.ProxyGroupIsReady(pg) { - l.Infof("ProxyGroup %s is not ready, waiting...", proxyGroupName) tsoperator.SetServiceCondition(svc, tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, esr.clock, l) tsoperator.RemoveServiceCondition(svc, tsapi.EgressSvcConfigured) - return false, nil } l.Debugf("egress service is valid") @@ -515,6 +541,24 @@ func (esr *egressSvcsReconciler) validateClusterResources(ctx context.Context, s return true, nil } +func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service, ns string, l *zap.SugaredLogger) egressservices.Config { + d := retrieveClusterDomain(ns, l) + tt := tailnetTargetFromSvc(externalNameSvc) + hep := healthCheckForSvc(clusterIPSvc, d) + cfg := egressservices.Config{ + TailnetTarget: tt, + HealthCheckEndpoint: hep, + } + for _, svcPort := range clusterIPSvc.Spec.Ports { + if svcPort.Name == tsHealthCheckPortName { + continue // exclude healthcheck from egress svcs configs + } + pm := portMap(svcPort) + mak.Set(&cfg.Ports, pm, struct{}{}) + } + return cfg +} + func validateEgressService(svc *corev1.Service, pg *tsapi.ProxyGroup) []string { violations := validateService(svc) @@ -584,16 +628,6 @@ func tailnetTargetFromSvc(svc *corev1.Service) egressservices.TailnetTarget { } } -func egressSvcCfg(externalNameSvc, clusterIPSvc *corev1.Service) egressservices.Config { - tt := tailnetTargetFromSvc(externalNameSvc) - cfg := egressservices.Config{TailnetTarget: tt} - for _, svcPort := range clusterIPSvc.Spec.Ports { - pm := portMap(svcPort) - mak.Set(&cfg.Ports, pm, struct{}{}) - } - return cfg -} - func portMap(p corev1.ServicePort) egressservices.PortMap { // TODO (irbekrm): out of bounds check? return egressservices.PortMap{Protocol: string(p.Protocol), MatchPort: uint16(p.TargetPort.IntVal), TargetPort: uint16(p.Port)} @@ -618,7 +652,11 @@ func egressSvcsConfigs(ctx context.Context, cl client.Client, proxyGroupName, ts Namespace: tsNamespace, }, } - if err := cl.Get(ctx, client.ObjectKeyFromObject(cm), cm); err != nil { + err = cl.Get(ctx, client.ObjectKeyFromObject(cm), cm) + if apierrors.IsNotFound(err) { // ProxyGroup resources have not been created (yet) + return nil, nil, nil + } + if err != nil { return nil, nil, fmt.Errorf("error retrieving egress services ConfigMap %s: %v", name, err) } cfgs = &egressservices.Configs{} @@ -740,3 +778,17 @@ func (esr *egressSvcsReconciler) updateSvcSpec(ctx context.Context, svc *corev1. svc.Status = *st return err } + +// healthCheckForSvc return the URL of the containerboot's health check endpoint served by this Service or empty string. +func healthCheckForSvc(svc *corev1.Service, clusterDomain string) string { + // This version of the operator always sets health check port on the egress Services. However, it is possible + // that this reconcile loops runs during a proxy upgrade from a version that did not set the health check port + // and parses a Service that does not have the port set yet. + i := slices.IndexFunc(svc.Spec.Ports, func(port corev1.ServicePort) bool { + return port.Name == tsHealthCheckPortName + }) + if i == -1 { + return "" + } + return fmt.Sprintf("http://%s.%s.svc.%s:%d/healthz", svc.Name, svc.Namespace, clusterDomain, svc.Spec.Ports[i].Port) +} diff --git a/cmd/k8s-operator/egress-services_test.go b/cmd/k8s-operator/egress-services_test.go index ab0008ca0..d8a5dfd32 100644 --- a/cmd/k8s-operator/egress-services_test.go +++ b/cmd/k8s-operator/egress-services_test.go @@ -18,6 +18,7 @@ discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" @@ -78,42 +79,16 @@ func TestTailscaleEgressServices(t *testing.T) { Selector: nil, Ports: []corev1.ServicePort{ { - Name: "http", Protocol: "TCP", Port: 80, }, - { - Name: "https", - Protocol: "TCP", - Port: 443, - }, }, }, } - t.Run("proxy_group_not_ready", func(t *testing.T) { + t.Run("service_one_unnamed_port", func(t *testing.T) { mustCreate(t, fc, svc) expectReconciled(t, esr, "default", "test") - // Service should have EgressSvcValid condition set to Unknown. - svc.Status.Conditions = []metav1.Condition{condition(tsapi.EgressSvcValid, metav1.ConditionUnknown, reasonProxyGroupNotReady, reasonProxyGroupNotReady, clock)} - expectEqual(t, fc, svc) - }) - - t.Run("proxy_group_ready", func(t *testing.T) { - mustUpdateStatus(t, fc, "", "foo", func(pg *tsapi.ProxyGroup) { - pg.Status.Conditions = []metav1.Condition{ - condition(tsapi.ProxyGroupReady, metav1.ConditionTrue, "", "", clock), - } - }) - expectReconciled(t, esr, "default", "test") - validateReadyService(t, fc, esr, svc, clock, zl, cm) - }) - t.Run("service_retain_one_unnamed_port", func(t *testing.T) { - svc.Spec.Ports = []corev1.ServicePort{{Protocol: "TCP", Port: 80}} - mustUpdate(t, fc, "default", "test", func(s *corev1.Service) { - s.Spec.Ports = svc.Spec.Ports - }) - expectReconciled(t, esr, "default", "test") validateReadyService(t, fc, esr, svc, clock, zl, cm) }) t.Run("service_add_two_named_ports", func(t *testing.T) { @@ -164,7 +139,7 @@ func validateReadyService(t *testing.T, fc client.WithWatch, esr *egressSvcsReco // Verify that an EndpointSlice has been created. expectEqual(t, fc, endpointSlice(name, svc, clusterSvc)) // Verify that ConfigMap contains configuration for the new egress service. - mustHaveConfigForSvc(t, fc, svc, clusterSvc, cm) + mustHaveConfigForSvc(t, fc, svc, clusterSvc, cm, zl) r := svcConfiguredReason(svc, true, zl.Sugar()) // Verify that the user-created ExternalName Service has Configured set to true and ExternalName pointing to the // CluterIP Service. @@ -203,6 +178,23 @@ func findGenNameForEgressSvcResources(t *testing.T, client client.Client, svc *c func clusterIPSvc(name string, extNSvc *corev1.Service) *corev1.Service { labels := egressSvcChildResourceLabels(extNSvc) + ports := make([]corev1.ServicePort, len(extNSvc.Spec.Ports)) + for i, port := range extNSvc.Spec.Ports { + ports[i] = corev1.ServicePort{ // Copy the port to avoid modifying the original. + Name: port.Name, + Port: port.Port, + Protocol: port.Protocol, + } + if port.Name == "" { + ports[i].Name = "tailscale-unnamed" + } + } + ports = append(ports, corev1.ServicePort{ + Name: "tailscale-health-check", + Port: 9002, + TargetPort: intstr.FromInt(9002), + Protocol: "TCP", + }) return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -212,7 +204,7 @@ func clusterIPSvc(name string, extNSvc *corev1.Service) *corev1.Service { }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Ports: extNSvc.Spec.Ports, + Ports: ports, }, } } @@ -257,9 +249,9 @@ func portsForEndpointSlice(svc *corev1.Service) []discoveryv1.EndpointPort { return ports } -func mustHaveConfigForSvc(t *testing.T, cl client.Client, extNSvc, clusterIPSvc *corev1.Service, cm *corev1.ConfigMap) { +func mustHaveConfigForSvc(t *testing.T, cl client.Client, extNSvc, clusterIPSvc *corev1.Service, cm *corev1.ConfigMap, l *zap.Logger) { t.Helper() - wantsCfg := egressSvcCfg(extNSvc, clusterIPSvc) + wantsCfg := egressSvcCfg(extNSvc, clusterIPSvc, clusterIPSvc.Namespace, l.Sugar()) if err := cl.Get(context.Background(), client.ObjectKeyFromObject(cm), cm); err != nil { t.Fatalf("Error retrieving ConfigMap: %v", err) } diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index f349e7848..6631c4f98 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -777,7 +777,7 @@ func proxyClassHandlerForConnector(cl client.Client, logger *zap.SugaredLogger) } } -// proxyClassHandlerForConnector returns a handler that, for a given ProxyClass, +// proxyClassHandlerForProxyGroup returns a handler that, for a given ProxyClass, // returns a list of reconcile requests for all Connectors that have // .spec.proxyClass set. func proxyClassHandlerForProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc { @@ -998,7 +998,7 @@ func reconcileRequestsForPG(pg string, cl client.Client, ns string) []reconcile. // egressSvcsFromEgressProxyGroup is an event handler for egress ProxyGroups. It returns reconcile requests for all // user-created ExternalName Services that should be exposed on this ProxyGroup. func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc { - return func(_ context.Context, o client.Object) []reconcile.Request { + return func(ctx context.Context, o client.Object) []reconcile.Request { pg, ok := o.(*tsapi.ProxyGroup) if !ok { logger.Infof("[unexpected] ProxyGroup handler triggered for an object that is not a ProxyGroup") @@ -1008,7 +1008,7 @@ func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) return nil } svcList := &corev1.ServiceList{} - if err := cl.List(context.Background(), svcList, client.MatchingFields{indexEgressProxyGroup: pg.Name}); err != nil { + if err := cl.List(ctx, svcList, client.MatchingFields{indexEgressProxyGroup: pg.Name}); err != nil { logger.Infof("error listing Services: %v, skipping a reconcile for event on ProxyGroup %s", err, pg.Name) return nil } @@ -1028,7 +1028,7 @@ func egressSvcsFromEgressProxyGroup(cl client.Client, logger *zap.SugaredLogger) // epsFromExternalNameService is an event handler for ExternalName Services that define a Tailscale egress service that // should be exposed on a ProxyGroup. It returns reconcile requests for EndpointSlices created for this Service. func epsFromExternalNameService(cl client.Client, logger *zap.SugaredLogger, ns string) handler.MapFunc { - return func(_ context.Context, o client.Object) []reconcile.Request { + return func(ctx context.Context, o client.Object) []reconcile.Request { svc, ok := o.(*corev1.Service) if !ok { logger.Infof("[unexpected] Service handler triggered for an object that is not a Service") @@ -1038,7 +1038,7 @@ func epsFromExternalNameService(cl client.Client, logger *zap.SugaredLogger, ns return nil } epsList := &discoveryv1.EndpointSliceList{} - if err := cl.List(context.Background(), epsList, client.InNamespace(ns), + if err := cl.List(ctx, epsList, client.InNamespace(ns), client.MatchingLabels(egressSvcChildResourceLabels(svc))); err != nil { logger.Infof("error listing EndpointSlices: %v, skipping a reconcile for event on Service %s", err, svc.Name) return nil diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go index f6de31727..4b17d3470 100644 --- a/cmd/k8s-operator/proxygroup.go +++ b/cmd/k8s-operator/proxygroup.go @@ -32,6 +32,7 @@ "tailscale.com/ipn" tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" + "tailscale.com/kube/egressservices" "tailscale.com/kube/kubetypes" "tailscale.com/tailcfg" "tailscale.com/tstime" @@ -166,6 +167,7 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ r.recorder.Eventf(pg, corev1.EventTypeWarning, reasonProxyGroupCreationFailed, err.Error()) return setStatusReady(pg, metav1.ConditionFalse, reasonProxyGroupCreationFailed, err.Error()) } + validateProxyClassForPG(logger, pg, proxyClass) if !tsoperator.ProxyClassIsReady(proxyClass) { message := fmt.Sprintf("the ProxyGroup's ProxyClass %s is not yet in a ready state, waiting...", proxyClassName) logger.Info(message) @@ -204,6 +206,31 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ return setStatusReady(pg, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady) } +// validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup. +func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) { + if pg.Spec.Type == tsapi.ProxyGroupTypeIngress { + return + } + // Our custom logic for ensuring minimum downtime ProxyGroup update rollouts relies on the local health check + // beig accessible on the replica Pod IP:9002. This address can also be modified by users, via + // TS_LOCAL_ADDR_PORT env var. + // + // Currently TS_LOCAL_ADDR_PORT controls Pod's health check and metrics address. _Probably_ there is no need for + // users to set this to a custom value. Users who want to consume metrics, should integrate with the metrics + // Service and/or ServiceMonitor, rather than Pods directly. The health check is likely not useful to integrate + // directly with for operator proxies (and we should aim for unified lifecycle logic in the operator, users + // shouldn't need to set their own). + // + // TODO(irbekrm): maybe disallow configuring this env var in future (in Tailscale 1.84 or later). + if hasLocalAddrPortSet(pc) { + msg := fmt.Sprintf("ProxyClass %s applied to an egress ProxyGroup has TS_LOCAL_ADDR_PORT env var set to a custom value."+ + "This will disable the ProxyGroup graceful failover mechanism, so you might experience downtime when ProxyGroup pods are restarted."+ + "In future we will remove the ability to set custom TS_LOCAL_ADDR_PORT for egress ProxyGroups."+ + "Please raise an issue if you expect that this will cause issues for your workflow.", pc.Name) + logger.Warn(msg) + } +} + func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) error { logger := r.logger(pg.Name) r.mu.Lock() @@ -253,10 +280,11 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro return fmt.Errorf("error provisioning RoleBinding: %w", err) } if pg.Spec.Type == tsapi.ProxyGroupTypeEgress { - cm := pgEgressCM(pg, r.tsNamespace) + cm, hp := pgEgressCM(pg, r.tsNamespace) if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, cm, func(existing *corev1.ConfigMap) { existing.ObjectMeta.Labels = cm.ObjectMeta.Labels existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences + mak.Set(&existing.BinaryData, egressservices.KeyHEPPings, hp) }); err != nil { return fmt.Errorf("error provisioning egress ConfigMap %q: %w", cm.Name, err) } @@ -270,7 +298,7 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro return fmt.Errorf("error provisioning ingress ConfigMap %q: %w", cm.Name, err) } } - ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode) + ss, err := pgStatefulSet(pg, r.tsNamespace, r.proxyImage, r.tsFirewallMode, proxyClass) if err != nil { return fmt.Errorf("error generating StatefulSet spec: %w", err) } diff --git a/cmd/k8s-operator/proxygroup_specs.go b/cmd/k8s-operator/proxygroup_specs.go index 556a2ed76..1ea91004b 100644 --- a/cmd/k8s-operator/proxygroup_specs.go +++ b/cmd/k8s-operator/proxygroup_specs.go @@ -7,11 +7,14 @@ import ( "fmt" + "slices" + "strconv" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/yaml" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/kube/egressservices" @@ -19,9 +22,12 @@ "tailscale.com/types/ptr" ) +// deletionGracePeriodSeconds is set to 6 minutes to ensure that the pre-stop hook of these proxies have enough chance to terminate gracefully. +const deletionGracePeriodSeconds int64 = 360 + // Returns the base StatefulSet definition for a ProxyGroup. A ProxyClass may be // applied over the top after. -func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string) (*appsv1.StatefulSet, error) { +func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string, proxyClass *tsapi.ProxyClass) (*appsv1.StatefulSet, error) { ss := new(appsv1.StatefulSet) if err := yaml.Unmarshal(proxyYaml, &ss); err != nil { return nil, fmt.Errorf("failed to unmarshal proxy spec: %w", err) @@ -145,15 +151,25 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string } if pg.Spec.Type == tsapi.ProxyGroupTypeEgress { - envs = append(envs, corev1.EnvVar{ - Name: "TS_EGRESS_SERVICES_CONFIG_PATH", - Value: fmt.Sprintf("/etc/proxies/%s", egressservices.KeyEgressServices), - }, + envs = append(envs, + // TODO(irbekrm): in 1.80 we deprecated TS_EGRESS_SERVICES_CONFIG_PATH in favour of + // TS_EGRESS_PROXIES_CONFIG_PATH. Remove it in 1.84. + corev1.EnvVar{ + Name: "TS_EGRESS_SERVICES_CONFIG_PATH", + Value: fmt.Sprintf("/etc/proxies/%s", egressservices.KeyEgressServices), + }, + corev1.EnvVar{ + Name: "TS_EGRESS_PROXIES_CONFIG_PATH", + Value: "/etc/proxies", + }, corev1.EnvVar{ Name: "TS_INTERNAL_APP", Value: kubetypes.AppProxyGroupEgress, }, - ) + corev1.EnvVar{ + Name: "TS_ENABLE_HEALTH_CHECK", + Value: "true", + }) } else { // ingress envs = append(envs, corev1.EnvVar{ Name: "TS_INTERNAL_APP", @@ -167,6 +183,25 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string return append(c.Env, envs...) }() + // The pre-stop hook is used to ensure that a replica does not get terminated while cluster traffic for egress + // services is still being routed to it. + // + // This mechanism currently (2025-01-26) rely on the local health check being accessible on the Pod's + // IP, so they are not supported for ProxyGroups where users have configured TS_LOCAL_ADDR_PORT to a custom + // value. + if pg.Spec.Type == tsapi.ProxyGroupTypeEgress && !hasLocalAddrPortSet(proxyClass) { + c.Lifecycle = &corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: kubetypes.EgessServicesPreshutdownEP, + Port: intstr.FromInt(defaultLocalAddrPort), + }, + }, + } + // Set the deletion grace period to 6 minutes to ensure that the pre-stop hook has enough time to terminate + // gracefully. + ss.Spec.Template.DeletionGracePeriodSeconds = ptr.To(deletionGracePeriodSeconds) + } return ss, nil } @@ -258,7 +293,9 @@ func pgStateSecrets(pg *tsapi.ProxyGroup, namespace string) (secrets []*corev1.S return secrets } -func pgEgressCM(pg *tsapi.ProxyGroup, namespace string) *corev1.ConfigMap { +func pgEgressCM(pg *tsapi.ProxyGroup, namespace string) (*corev1.ConfigMap, []byte) { + hp := hepPings(pg) + hpBs := []byte(strconv.Itoa(hp)) return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: pgEgressCMName(pg.Name), @@ -266,8 +303,10 @@ func pgEgressCM(pg *tsapi.ProxyGroup, namespace string) *corev1.ConfigMap { Labels: pgLabels(pg.Name, nil), OwnerReferences: pgOwnerReference(pg), }, - } + BinaryData: map[string][]byte{egressservices.KeyHEPPings: hpBs}, + }, hpBs } + func pgIngressCM(pg *tsapi.ProxyGroup, namespace string) *corev1.ConfigMap { return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -313,3 +352,23 @@ func pgReplicas(pg *tsapi.ProxyGroup) int32 { func pgEgressCMName(pg string) string { return fmt.Sprintf("%s-egress-config", pg) } + +// hasLocalAddrPortSet returns true if the proxyclass has the TS_LOCAL_ADDR_PORT env var set. For egress ProxyGroups, +// currently (2025-01-26) this means that the ProxyGroup does not support graceful failover. +func hasLocalAddrPortSet(proxyClass *tsapi.ProxyClass) bool { + if proxyClass == nil || proxyClass.Spec.StatefulSet == nil || proxyClass.Spec.StatefulSet.Pod == nil || proxyClass.Spec.StatefulSet.Pod.TailscaleContainer == nil { + return false + } + return slices.ContainsFunc(proxyClass.Spec.StatefulSet.Pod.TailscaleContainer.Env, func(env tsapi.Env) bool { + return env.Name == envVarTSLocalAddrPort + }) +} + +// hepPings returns the number of times a health check endpoint exposed by a Service fronting ProxyGroup replicas should +// be pinged to ensure that all currently configured backend replicas are hit. +func hepPings(pg *tsapi.ProxyGroup) int { + rc := pgReplicas(pg) + // Assuming a Service implemented using round robin load balancing, number-of-replica-times should be enough, but in + // practice, we cannot assume that the requests will be load balanced perfectly. + return int(rc) * 3 +} diff --git a/cmd/k8s-operator/proxygroup_test.go b/cmd/k8s-operator/proxygroup_test.go index e7c85d387..29100de1d 100644 --- a/cmd/k8s-operator/proxygroup_test.go +++ b/cmd/k8s-operator/proxygroup_test.go @@ -19,13 +19,13 @@ rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "tailscale.com/client/tailscale" tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" - "tailscale.com/kube/egressservices" "tailscale.com/kube/kubetypes" "tailscale.com/tstest" "tailscale.com/types/ptr" @@ -97,7 +97,7 @@ func TestProxyGroup(t *testing.T) { tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "the ProxyGroup's ProxyClass default-pc is not yet in a ready state, waiting...", 0, cl, zl.Sugar()) expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, false, "") + expectProxyGroupResources(t, fc, pg, false, "", pc) }) t.Run("observe_ProxyGroupCreating_status_reason", func(t *testing.T) { @@ -118,11 +118,11 @@ func TestProxyGroup(t *testing.T) { tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "0/2 ProxyGroup pods running", 0, cl, zl.Sugar()) expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, true, "") + expectProxyGroupResources(t, fc, pg, true, "", pc) if expected := 1; reconciler.egressProxyGroups.Len() != expected { t.Fatalf("expected %d egress ProxyGroups, got %d", expected, reconciler.egressProxyGroups.Len()) } - expectProxyGroupResources(t, fc, pg, true, "") + expectProxyGroupResources(t, fc, pg, true, "", pc) keyReq := tailscale.KeyCapabilities{ Devices: tailscale.KeyDeviceCapabilities{ Create: tailscale.KeyDeviceCreateCapabilities{ @@ -154,7 +154,7 @@ func TestProxyGroup(t *testing.T) { } tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady, 0, cl, zl.Sugar()) expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, true, initialCfgHash) + expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc) }) t.Run("scale_up_to_3", func(t *testing.T) { @@ -165,7 +165,7 @@ func TestProxyGroup(t *testing.T) { expectReconciled(t, reconciler, "", pg.Name) tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "2/3 ProxyGroup pods running", 0, cl, zl.Sugar()) expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, true, initialCfgHash) + expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc) addNodeIDToStateSecrets(t, fc, pg) expectReconciled(t, reconciler, "", pg.Name) @@ -175,7 +175,7 @@ func TestProxyGroup(t *testing.T) { TailnetIPs: []string{"1.2.3.4", "::1"}, }) expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, true, initialCfgHash) + expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc) }) t.Run("scale_down_to_1", func(t *testing.T) { @@ -188,7 +188,7 @@ func TestProxyGroup(t *testing.T) { pg.Status.Devices = pg.Status.Devices[:1] // truncate to only the first device. expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, true, initialCfgHash) + expectProxyGroupResources(t, fc, pg, true, initialCfgHash, pc) }) t.Run("trigger_config_change_and_observe_new_config_hash", func(t *testing.T) { @@ -202,7 +202,7 @@ func TestProxyGroup(t *testing.T) { expectReconciled(t, reconciler, "", pg.Name) expectEqual(t, fc, pg) - expectProxyGroupResources(t, fc, pg, true, "518a86e9fae64f270f8e0ec2a2ea6ca06c10f725035d3d6caca132cd61e42a74") + expectProxyGroupResources(t, fc, pg, true, "518a86e9fae64f270f8e0ec2a2ea6ca06c10f725035d3d6caca132cd61e42a74", pc) }) t.Run("enable_metrics", func(t *testing.T) { @@ -246,12 +246,29 @@ func TestProxyGroup(t *testing.T) { // The fake client does not clean up objects whose owner has been // deleted, so we can't test for the owned resources getting deleted. }) + } func TestProxyGroupTypes(t *testing.T) { + pc := &tsapi.ProxyClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Generation: 1, + }, + Spec: tsapi.ProxyClassSpec{}, + } fc := fake.NewClientBuilder(). WithScheme(tsapi.GlobalScheme). + WithObjects(pc). + WithStatusSubresource(pc). Build() + mustUpdateStatus(t, fc, "", pc.Name, func(p *tsapi.ProxyClass) { + p.Status.Conditions = []metav1.Condition{{ + Type: string(tsapi.ProxyClassReady), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }} + }) zl, _ := zap.NewDevelopment() reconciler := &ProxyGroupReconciler{ @@ -274,9 +291,7 @@ func TestProxyGroupTypes(t *testing.T) { Replicas: ptr.To[int32](0), }, } - if err := fc.Create(context.Background(), pg); err != nil { - t.Fatal(err) - } + mustCreate(t, fc, pg) expectReconciled(t, reconciler, "", pg.Name) verifyProxyGroupCounts(t, reconciler, 0, 1) @@ -286,7 +301,8 @@ func TestProxyGroupTypes(t *testing.T) { t.Fatalf("failed to get StatefulSet: %v", err) } verifyEnvVar(t, sts, "TS_INTERNAL_APP", kubetypes.AppProxyGroupEgress) - verifyEnvVar(t, sts, "TS_EGRESS_SERVICES_CONFIG_PATH", fmt.Sprintf("/etc/proxies/%s", egressservices.KeyEgressServices)) + verifyEnvVar(t, sts, "TS_EGRESS_PROXIES_CONFIG_PATH", "/etc/proxies") + verifyEnvVar(t, sts, "TS_ENABLE_HEALTH_CHECK", "true") // Verify that egress configuration has been set up. cm := &corev1.ConfigMap{} @@ -323,6 +339,57 @@ func TestProxyGroupTypes(t *testing.T) { if diff := cmp.Diff(expectedVolumeMounts, sts.Spec.Template.Spec.Containers[0].VolumeMounts); diff != "" { t.Errorf("unexpected volume mounts (-want +got):\n%s", diff) } + + expectedLifecycle := corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: kubetypes.EgessServicesPreshutdownEP, + Port: intstr.FromInt(defaultLocalAddrPort), + }, + }, + } + if diff := cmp.Diff(expectedLifecycle, *sts.Spec.Template.Spec.Containers[0].Lifecycle); diff != "" { + t.Errorf("unexpected lifecycle (-want +got):\n%s", diff) + } + if *sts.Spec.Template.DeletionGracePeriodSeconds != deletionGracePeriodSeconds { + t.Errorf("unexpected deletion grace period seconds %d, want %d", *sts.Spec.Template.DeletionGracePeriodSeconds, deletionGracePeriodSeconds) + } + }) + t.Run("egress_type_no_lifecycle_hook_when_local_addr_port_set", func(t *testing.T) { + pg := &tsapi.ProxyGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-egress-no-lifecycle", + UID: "test-egress-no-lifecycle-uid", + }, + Spec: tsapi.ProxyGroupSpec{ + Type: tsapi.ProxyGroupTypeEgress, + Replicas: ptr.To[int32](0), + ProxyClass: "test", + }, + } + mustCreate(t, fc, pg) + mustUpdate(t, fc, "", pc.Name, func(p *tsapi.ProxyClass) { + p.Spec.StatefulSet = &tsapi.StatefulSet{ + Pod: &tsapi.Pod{ + TailscaleContainer: &tsapi.Container{ + Env: []tsapi.Env{{ + Name: "TS_LOCAL_ADDR_PORT", + Value: "127.0.0.1:8080", + }}, + }, + }, + } + }) + expectReconciled(t, reconciler, "", pg.Name) + + sts := &appsv1.StatefulSet{} + if err := fc.Get(context.Background(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil { + t.Fatalf("failed to get StatefulSet: %v", err) + } + + if sts.Spec.Template.Spec.Containers[0].Lifecycle != nil { + t.Error("lifecycle hook was set when TS_LOCAL_ADDR_PORT was configured via ProxyClass") + } }) t.Run("ingress_type", func(t *testing.T) { @@ -341,7 +408,7 @@ func TestProxyGroupTypes(t *testing.T) { } expectReconciled(t, reconciler, "", pg.Name) - verifyProxyGroupCounts(t, reconciler, 1, 1) + verifyProxyGroupCounts(t, reconciler, 1, 2) sts := &appsv1.StatefulSet{} if err := fc.Get(context.Background(), client.ObjectKey{Namespace: tsNamespace, Name: pg.Name}, sts); err != nil { @@ -402,13 +469,13 @@ func verifyEnvVar(t *testing.T, sts *appsv1.StatefulSet, name, expectedValue str t.Errorf("%s environment variable not found", name) } -func expectProxyGroupResources(t *testing.T, fc client.WithWatch, pg *tsapi.ProxyGroup, shouldExist bool, cfgHash string) { +func expectProxyGroupResources(t *testing.T, fc client.WithWatch, pg *tsapi.ProxyGroup, shouldExist bool, cfgHash string, proxyClass *tsapi.ProxyClass) { t.Helper() role := pgRole(pg, tsNamespace) roleBinding := pgRoleBinding(pg, tsNamespace) serviceAccount := pgServiceAccount(pg, tsNamespace) - statefulSet, err := pgStatefulSet(pg, tsNamespace, testProxyImage, "auto") + statefulSet, err := pgStatefulSet(pg, tsNamespace, testProxyImage, "auto", proxyClass) if err != nil { t.Fatal(err) } diff --git a/cmd/k8s-operator/sts.go b/cmd/k8s-operator/sts.go index fce6bfdd7..c1d13f33d 100644 --- a/cmd/k8s-operator/sts.go +++ b/cmd/k8s-operator/sts.go @@ -101,6 +101,9 @@ proxyTypeIngressResource = "ingress_resource" proxyTypeConnector = "connector" proxyTypeProxyGroup = "proxygroup" + + envVarTSLocalAddrPort = "TS_LOCAL_ADDR_PORT" + defaultLocalAddrPort = 9002 // metrics and health check port ) var ( diff --git a/k8s-operator/conditions.go b/k8s-operator/conditions.go index 1ecedfc07..abe8f7f9c 100644 --- a/k8s-operator/conditions.go +++ b/k8s-operator/conditions.go @@ -75,16 +75,6 @@ func RemoveServiceCondition(svc *corev1.Service, conditionType tsapi.ConditionTy }) } -func EgressServiceIsValidAndConfigured(svc *corev1.Service) bool { - for _, typ := range []tsapi.ConditionType{tsapi.EgressSvcValid, tsapi.EgressSvcConfigured} { - cond := GetServiceCondition(svc, typ) - if cond == nil || cond.Status != metav1.ConditionTrue { - return false - } - } - return true -} - // SetRecorderCondition ensures that Recorder status has a condition with the // given attributes. LastTransitionTime gets set every time condition's status // changes. diff --git a/kube/egressservices/egressservices.go b/kube/egressservices/egressservices.go index 04a1c362b..2515f1bf3 100644 --- a/kube/egressservices/egressservices.go +++ b/kube/egressservices/egressservices.go @@ -13,9 +13,15 @@ "net/netip" ) -// KeyEgressServices is name of the proxy state Secret field that contains the -// currently applied egress proxy config. -const KeyEgressServices = "egress-services" +const ( + // KeyEgressServices is name of the proxy state Secret field that contains the + // currently applied egress proxy config. + KeyEgressServices = "egress-services" + + // KeyHEPPings is the number of times an egress service health check endpoint needs to be pinged to ensure that + // each currently configured backend is hit. In practice, it depends on the number of ProxyGroup replicas. + KeyHEPPings = "hep-pings" +) // Configs contains the desired configuration for egress services keyed by // service name. @@ -24,6 +30,7 @@ // Config is an egress service configuration. // TODO(irbekrm): version this? type Config struct { + HealthCheckEndpoint string `json:"healthCheckEndpoint"` // TailnetTarget is the target to which cluster traffic for this service // should be proxied. TailnetTarget TailnetTarget `json:"tailnetTarget"` diff --git a/kube/egressservices/egressservices_test.go b/kube/egressservices/egressservices_test.go index d6f952ea0..806ad91be 100644 --- a/kube/egressservices/egressservices_test.go +++ b/kube/egressservices/egressservices_test.go @@ -55,7 +55,7 @@ func Test_jsonMarshalConfig(t *testing.T) { protocol: "tcp", matchPort: 4003, targetPort: 80, - wantsBs: []byte(`{"tailnetTarget":{"ip":"","fqdn":""},"ports":[{"protocol":"tcp","matchPort":4003,"targetPort":80}]}`), + wantsBs: []byte(`{"healthCheckEndpoint":"","tailnetTarget":{"ip":"","fqdn":""},"ports":[{"protocol":"tcp","matchPort":4003,"targetPort":80}]}`), }, } for _, tt := range tests { diff --git a/kube/kubetypes/types.go b/kube/kubetypes/types.go index afc489018..894cbb41d 100644 --- a/kube/kubetypes/types.go +++ b/kube/kubetypes/types.go @@ -43,4 +43,9 @@ // that cluster workloads behind the Ingress can now be accessed via the given DNS name over HTTPS. KeyHTTPSEndpoint string = "https_endpoint" ValueNoHTTPS string = "no-https" + + // Pod's IPv4 address header key as returned by containerboot health check endpoint. + PodIPv4Header string = "Pod-IPv4" + + EgessServicesPreshutdownEP = "/internal-egress-services-preshutdown" )