cmd/k8s-operator,k8s-operator/sessionrecording,sessionrecording,ssh/tailssh: refactor session recording functionality (#12945)

cmd/k8s-operator,k8s-operator/sessionrecording,sessionrecording,ssh/tailssh: refactor session recording functionality

Refactor SSH session recording functionality (mostly the bits related to
Kubernetes API server proxy 'kubectl exec' session recording):

- move the session recording bits used by both Tailscale SSH
and the Kubernetes API server proxy into a shared sessionrecording package,
to avoid having the operator to import ssh/tailssh

- move the Kubernetes API server proxy session recording functionality
into a k8s-operator/sessionrecording package, add some abstractions
in preparation for adding support for a second streaming protocol (WebSockets)

Updates tailscale/corp#19821

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
This commit is contained in:
Irbe Krumina
2024-07-29 15:57:11 +03:00
committed by GitHub
parent 1bf7ed0348
commit a21bf100f3
17 changed files with 376 additions and 296 deletions

View File

@ -1,136 +0,0 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tailssh
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptrace"
"net/netip"
"time"
"tailscale.com/tailcfg"
"tailscale.com/util/multierr"
)
// ConnectToRecorder connects to the recorder at any of the provided addresses.
// It returns the first successful response, or a multierr if all attempts fail.
//
// On success, it returns a WriteCloser that can be used to upload the
// recording, and a channel that will be sent an error (or nil) when the upload
// fails or completes.
//
// In both cases, a slice of SSHRecordingAttempts is returned which detail the
// attempted recorder IP and the error message, if the attempt failed. The
// attempts are in order the recorder(s) was attempted. If successful a
// successful connection is made, the last attempt in the slice is the
// attempt for connected recorder.
func ConnectToRecorder(ctx context.Context, recs []netip.AddrPort, dial func(context.Context, string, string) (net.Conn, error)) (io.WriteCloser, []*tailcfg.SSHRecordingAttempt, <-chan error, error) {
if len(recs) == 0 {
return nil, nil, nil, errors.New("no recorders configured")
}
// We use a special context for dialing the recorder, so that we can
// limit the time we spend dialing to 30 seconds and still have an
// unbounded context for the upload.
dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second)
defer dialCancel()
hc, err := SessionRecordingClientForDialer(dialCtx, dial)
if err != nil {
return nil, nil, nil, err
}
var errs []error
var attempts []*tailcfg.SSHRecordingAttempt
for _, ap := range recs {
attempt := &tailcfg.SSHRecordingAttempt{
Recorder: ap,
}
attempts = append(attempts, attempt)
// We dial the recorder and wait for it to send a 100-continue
// response before returning from this function. This ensures that
// the recorder is ready to accept the recording.
// got100 is closed when we receive the 100-continue response.
got100 := make(chan struct{})
ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
Got100Continue: func() {
close(got100)
},
})
pr, pw := io.Pipe()
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s:%d/record", ap.Addr(), ap.Port()), pr)
if err != nil {
err = fmt.Errorf("recording: error starting recording: %w", err)
attempt.FailureMessage = err.Error()
errs = append(errs, err)
continue
}
// We set the Expect header to 100-continue, so that the recorder
// will send a 100-continue response before it starts reading the
// request body.
req.Header.Set("Expect", "100-continue")
// errChan is used to indicate the result of the request.
errChan := make(chan error, 1)
go func() {
resp, err := hc.Do(req)
if err != nil {
errChan <- fmt.Errorf("recording: error starting recording: %w", err)
return
}
if resp.StatusCode != 200 {
errChan <- fmt.Errorf("recording: unexpected status: %v", resp.Status)
return
}
errChan <- nil
}()
select {
case <-got100:
case err := <-errChan:
// If we get an error before we get the 100-continue response,
// we need to try another recorder.
if err == nil {
// If the error is nil, we got a 200 response, which
// is unexpected as we haven't sent any data yet.
err = errors.New("recording: unexpected EOF")
}
attempt.FailureMessage = err.Error()
errs = append(errs, err)
continue
}
return pw, attempts, errChan, nil
}
return nil, attempts, nil, multierr.New(errs...)
}
// SessionRecordingClientForDialer returns an http.Client that uses a clone of
// the provided Dialer's PeerTransport to dial connections. This is used to make
// requests to the session recording server to upload session recordings. It
// uses the provided dialCtx to dial connections, and limits a single dial to 5
// seconds.
func SessionRecordingClientForDialer(dialCtx context.Context, dial func(context.Context, string, string) (net.Conn, error)) (*http.Client, error) {
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
perAttemptCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
go func() {
select {
case <-perAttemptCtx.Done():
case <-dialCtx.Done():
cancel()
}
}()
return dial(perAttemptCtx, network, addr)
}
return &http.Client{
Transport: tr,
}, nil
}

View File

@ -36,6 +36,7 @@ import (
"tailscale.com/logtail/backoff"
"tailscale.com/net/tsaddr"
"tailscale.com/net/tsdial"
"tailscale.com/sessionrecording"
"tailscale.com/tailcfg"
"tailscale.com/tempfork/gliderlabs/ssh"
"tailscale.com/types/key"
@ -1428,61 +1429,6 @@ func randBytes(n int) []byte {
return b
}
// CastHeader is the header of an asciinema file.
type CastHeader struct {
// Version is the asciinema file format version.
Version int `json:"version"`
// Width is the terminal width in characters.
// It is non-zero for Pty sessions.
Width int `json:"width"`
// Height is the terminal height in characters.
// It is non-zero for Pty sessions.
Height int `json:"height"`
// Timestamp is the unix timestamp of when the recording started.
Timestamp int64 `json:"timestamp"`
// Env is the environment variables of the session.
// Only "TERM" is set (2023-03-22).
Env map[string]string `json:"env"`
// Command is the command that was executed.
// Typically empty for shell sessions.
Command string `json:"command,omitempty"`
// Tailscale-specific fields:
// SrcNode is the FQDN of the node originating the connection.
// It is also the MagicDNS name for the node.
// It does not have a trailing dot.
// e.g. "host.tail-scale.ts.net"
SrcNode string `json:"srcNode"`
// SrcNodeID is the node ID of the node originating the connection.
SrcNodeID tailcfg.StableNodeID `json:"srcNodeID"`
// SrcNodeTags is the list of tags on the node originating the connection (if any).
SrcNodeTags []string `json:"srcNodeTags,omitempty"`
// SrcNodeUserID is the user ID of the node originating the connection (if not tagged).
SrcNodeUserID tailcfg.UserID `json:"srcNodeUserID,omitempty"` // if not tagged
// SrcNodeUser is the LoginName of the node originating the connection (if not tagged).
SrcNodeUser string `json:"srcNodeUser,omitempty"`
// SSHUser is the username as presented by the client.
SSHUser string `json:"sshUser"` // as presented by the client
// LocalUser is the effective username on the server.
LocalUser string `json:"localUser"`
// ConnectionID uniquely identifies a connection made to the SSH server.
// It may be shared across multiple sessions over the same connection in
// case of SSH multiplexing.
ConnectionID string `json:"connectionID"`
}
func (ss *sshSession) openFileForRecording(now time.Time) (_ io.WriteCloser, err error) {
varRoot := ss.conn.srv.lb.TailscaleVarRoot()
if varRoot == "" {
@ -1548,7 +1494,7 @@ func (ss *sshSession) startNewRecording() (_ *recording, err error) {
} else {
var errChan <-chan error
var attempts []*tailcfg.SSHRecordingAttempt
rec.out, attempts, errChan, err = ConnectToRecorder(ctx, recorders, ss.conn.srv.lb.Dialer().UserDial)
rec.out, attempts, errChan, err = sessionrecording.ConnectToRecorder(ctx, recorders, ss.conn.srv.lb.Dialer().UserDial)
if err != nil {
if onFailure != nil && onFailure.NotifyURL != "" && len(attempts) > 0 {
eventType := tailcfg.SSHSessionRecordingFailed
@ -1598,7 +1544,7 @@ func (ss *sshSession) startNewRecording() (_ *recording, err error) {
}()
}
ch := CastHeader{
ch := sessionrecording.CastHeader{
Version: 2,
Width: w.Width,
Height: w.Height,

View File

@ -36,6 +36,7 @@ import (
"tailscale.com/ipn/store/mem"
"tailscale.com/net/memnet"
"tailscale.com/net/tsdial"
"tailscale.com/sessionrecording"
"tailscale.com/tailcfg"
"tailscale.com/tempfork/gliderlabs/ssh"
"tailscale.com/tsd"
@ -630,7 +631,7 @@ func TestSSHRecordingNonInteractive(t *testing.T) {
wg.Wait()
<-ctx.Done() // wait for recording to finish
var ch CastHeader
var ch sessionrecording.CastHeader
if err := json.NewDecoder(bytes.NewReader(recording)).Decode(&ch); err != nil {
t.Fatal(err)
}