diff --git a/cmd/k8s-operator/proxy.go b/cmd/k8s-operator/proxy.go index 3d092fe34..e15d066dc 100644 --- a/cmd/k8s-operator/proxy.go +++ b/cmd/k8s-operator/proxy.go @@ -31,11 +31,10 @@ "tailscale.com/util/set" ) -var whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil)) - var ( // counterNumRequestsproxies counts the number of API server requests proxied via this proxy. counterNumRequestsProxied = clientmetric.NewCounter("k8s_auth_proxy_requests_proxied") + whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil)) ) type apiServerProxyMode int @@ -222,6 +221,12 @@ func (ap *apiserverProxy) serveExecWS(w http.ResponseWriter, r *http.Request) { } func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, proto ksr.Protocol) { + const ( + podNameKey = "pod" + namespaceNameKey = "namespace" + upgradeHeaderKey = "Upgrade" + ) + who, err := ap.whoIs(r) if err != nil { ap.authError(w, err) @@ -246,7 +251,7 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p } wantsHeader := upgradeHeaderForProto[proto] - if h := r.Header.Get("Upgrade"); h != wantsHeader { + if h := r.Header.Get(upgradeHeaderKey); h != wantsHeader { msg := fmt.Sprintf("[unexpected] unable to verify that streaming protocol is %s, wants Upgrade header %q, got: %q", proto, wantsHeader, h) if failOpen { msg = msg + "; failure mode is 'fail open'; continuing session without recording." @@ -268,8 +273,8 @@ func (ap *apiserverProxy) execForProto(w http.ResponseWriter, r *http.Request, p Who: who, Addrs: addrs, FailOpen: failOpen, - Pod: r.PathValue("pod"), - Namespace: r.PathValue("namespace"), + Pod: r.PathValue(podNameKey), + Namespace: r.PathValue(namespaceNameKey), Log: ap.log, } h := ksr.New(opts) @@ -309,9 +314,11 @@ func (h *apiserverProxy) addImpersonationHeadersAsRequired(r *http.Request) { log.Printf("failed to add impersonation headers: " + err.Error()) } } + func (ap *apiserverProxy) whoIs(r *http.Request) (*apitype.WhoIsResponse, error) { return ap.lc.WhoIs(r.Context(), r.RemoteAddr) } + func (ap *apiserverProxy) authError(w http.ResponseWriter, err error) { ap.log.Errorf("failed to authenticate caller: %v", err) http.Error(w, "failed to authenticate caller", http.StatusInternalServerError) diff --git a/k8s-operator/sessionrecording/hijacker.go b/k8s-operator/sessionrecording/hijacker.go index 2e7ec7598..1287a0a0c 100644 --- a/k8s-operator/sessionrecording/hijacker.go +++ b/k8s-operator/sessionrecording/hijacker.go @@ -126,7 +126,10 @@ func (h *Hijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) { func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, error) { const ( // https://docs.asciinema.org/manual/asciicast/v2/ - asciicastv2 = 2 + asciicastv2 = 2 + ttyKey = "tty" + commandKey = "command" + containerKey = "container" ) var ( wc io.WriteCloser @@ -153,18 +156,20 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, // TODO (irbekrm): log which recorder h.log.Info("successfully connected to a session recorder") cl := tstime.DefaultClock{} - rec := tsrecorder.New(wc, cl, cl.Now(), h.failOpen) + rec := tsrecorder.New(wc, cl, cl.Now(), h.failOpen, h.log) qp := h.req.URL.Query() + tty := strings.Join(qp[ttyKey], "") + hasTerm := (tty == "true") // session has terminal attached ch := sessionrecording.CastHeader{ Version: asciicastv2, Timestamp: cl.Now().Unix(), - Command: strings.Join(qp["command"], " "), + Command: strings.Join(qp[commandKey], " "), SrcNode: strings.TrimSuffix(h.who.Node.Name, "."), SrcNodeID: h.who.Node.StableID, Kubernetes: &sessionrecording.Kubernetes{ PodName: h.pod, Namespace: h.ns, - Container: strings.Join(qp["container"], " "), + Container: strings.Join(qp[containerKey], " "), }, } if !h.who.Node.IsTagged() { @@ -177,9 +182,9 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, var lc net.Conn switch h.proto { case SPDYProtocol: - lc = spdy.New(conn, rec, ch, h.log) + lc = spdy.New(conn, rec, ch, hasTerm, h.log) case WSProtocol: - lc = ws.New(conn, rec, ch, h.log) + lc = ws.New(conn, rec, ch, hasTerm, h.log) default: return nil, fmt.Errorf("unknown protocol: %s", h.proto) } diff --git a/k8s-operator/sessionrecording/spdy/conn.go b/k8s-operator/sessionrecording/spdy/conn.go index 19a01641e..455c2225a 100644 --- a/k8s-operator/sessionrecording/spdy/conn.go +++ b/k8s-operator/sessionrecording/spdy/conn.go @@ -28,14 +28,16 @@ // The hijacked connection is used to transmit SPDY streams between Kubernetes client ('kubectl') and the destination container. // Data read from the underlying network connection is data sent via one of the SPDY streams from the client to the container. // Data written to the underlying connection is data sent from the container to the client. -// We parse the data and send everything for the STDOUT/STDERR streams to the configured tsrecorder as an asciinema recording with the provided header. +// We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#background-remotecommand-subprotocol -func New(nc net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, log *zap.SugaredLogger) net.Conn { +func New(nc net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn { return &conn{ - Conn: nc, - rec: rec, - ch: ch, - log: log, + Conn: nc, + rec: rec, + ch: ch, + log: log, + hasTerm: hasTerm, + initialTermSizeSet: make(chan struct{}), } } @@ -47,7 +49,6 @@ type conn struct { net.Conn // rec knows how to send data written to it to a tsrecorder instance. rec *tsrecorder.Client - ch sessionrecording.CastHeader stdoutStreamID atomic.Uint32 stderrStreamID atomic.Uint32 @@ -56,8 +57,37 @@ type conn struct { wmu sync.Mutex // sequences writes closed bool - rmu sync.Mutex // sequences reads + rmu sync.Mutex // sequences reads + + // The following fields are related to sending asciinema CastHeader. + // CastHeader must be sent before any payload. If the session has a + // terminal attached, the CastHeader must have '.Width' and '.Height' + // fields set for the tsrecorder UI to be able to play the recording. + // For 'kubectl exec' sessions, terminal width and height are sent as a + // resize message on resize stream from the client when the session + // starts as well as at any time the client detects a terminal change. + // We can intercept the resize message on Read calls. As there is no + // guarantee that the resize message from client will be intercepted + // before server writes stdout messages that we must record, we need to + // ensure that parsing stdout/stderr messages written to the connection + // waits till a resize message has been received and a CastHeader with + // correct terminal dimensions can be written. + + // ch is the asciinema CastHeader for the current session. + // https://docs.asciinema.org/manual/asciicast/v2/#header + ch sessionrecording.CastHeader + // writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once. writeCastHeaderOnce sync.Once + hasTerm bool // whether the session had TTY attached + // initialTermSizeSet channel gets sent a value once, when the Read has + // received a resize message and set the initial terminal size. It must + // be set to a buffered channel to prevent Reads being blocked on the + // first stdout/stderr write reading from the channel. + initialTermSizeSet chan struct{} + // sendInitialTermSizeSetOnce is used to ensure that a value is sent to + // initialTermSizeSet channel only once, when the initial resize message + // is received. + sendinitialTermSizeSetOnce sync.Once zlibReqReader zlibReader // writeBuf is used to store data written to the connection that has not @@ -97,13 +127,28 @@ func (c *conn) Read(b []byte) (int, error) { if !sf.Ctrl { // data frame switch sf.StreamID { case c.resizeStreamID.Load(): - var err error + var msg spdyResizeMsg if err = json.Unmarshal(sf.Payload, &msg); err != nil { return 0, fmt.Errorf("error umarshalling resize msg: %w", err) } c.ch.Width = msg.Width c.ch.Height = msg.Height + + // If this is initial resize message, the width and + // height will be sent in the CastHeader. If this is a + // subsequent resize message, we need to send asciinema + // resize message. + var isInitialResize bool + c.sendinitialTermSizeSetOnce.Do(func() { + isInitialResize = true + close(c.initialTermSizeSet) // unblock sending of CastHeader + }) + if !isInitialResize { + if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil { + return 0, fmt.Errorf("error writing resize message: %w", err) + } + } } return n, nil } @@ -147,21 +192,21 @@ func (c *conn) Write(b []byte) (int, error) { case c.stdoutStreamID.Load(), c.stderrStreamID.Load(): var err error c.writeCastHeaderOnce.Do(func() { - var j []byte - j, err = json.Marshal(c.ch) - if err != nil { - return - } - j = append(j, '\n') - err = c.rec.WriteCastLine(j) - if err != nil { - c.log.Errorf("received error from recorder: %v", err) + // If this is a session with a terminal attached, + // we must wait for the terminal width and + // height to be parsed from a resize message + // before sending CastHeader, else tsrecorder + // will not be able to play this recording. + if c.hasTerm { + c.log.Debugf("write: waiting for the initial terminal size to be set before proceeding with sending the first payload") + <-c.initialTermSizeSet } + err = c.rec.WriteCastHeader(c.ch) }) if err != nil { return 0, fmt.Errorf("error writing CastHeader: %w", err) } - if err := c.rec.Write(sf.Payload); err != nil { + if err := c.rec.WriteOutput(sf.Payload); err != nil { return 0, fmt.Errorf("error sending payload to session recorder: %w", err) } } diff --git a/k8s-operator/sessionrecording/spdy/conn_test.go b/k8s-operator/sessionrecording/spdy/conn_test.go index 629536b2e..3485d61c4 100644 --- a/k8s-operator/sessionrecording/spdy/conn_test.go +++ b/k8s-operator/sessionrecording/spdy/conn_test.go @@ -29,13 +29,15 @@ func Test_Writes(t *testing.T) { } cl := tstest.NewClock(tstest.ClockOpts{}) tests := []struct { - name string - inputs [][]byte - wantForwarded []byte - wantRecorded []byte - firstWrite bool - width int - height int + name string + inputs [][]byte + wantForwarded []byte + wantRecorded []byte + firstWrite bool + width int + height int + sendInitialResize bool + hasTerm bool }{ { name: "single_write_control_frame_with_payload", @@ -76,7 +78,18 @@ func Test_Writes(t *testing.T) { wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl), }, { - name: "single_first_write_stdout_data_frame_with_payload", + name: "single_first_write_stdout_data_frame_with_payload_sess_has_terminal", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...), + width: 10, + height: 20, + hasTerm: true, + firstWrite: true, + sendInitialResize: true, + }, + { + name: "single_first_write_stdout_data_frame_with_payload_sess_does_not_have_terminal", inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...), @@ -89,7 +102,7 @@ func Test_Writes(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tc := &fakes.TestConn{} sr := &fakes.TestSessionRecorder{} - rec := tsrecorder.New(sr, cl, cl.Now(), true) + rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar()) c := &conn{ Conn: tc, @@ -99,15 +112,21 @@ func Test_Writes(t *testing.T) { Width: tt.width, Height: tt.height, }, + initialTermSizeSet: make(chan struct{}), + hasTerm: tt.hasTerm, } if !tt.firstWrite { // this test case does not intend to test that cast header gets written once c.writeCastHeaderOnce.Do(func() {}) } + if tt.sendInitialResize { + close(c.initialTermSizeSet) + } c.stdoutStreamID.Store(stdoutStreamID) c.stderrStreamID.Store(stderrStreamID) for i, input := range tt.inputs { + c.hasTerm = tt.hasTerm if _, err := c.Write(input); err != nil { t.Errorf("[%d] spdyRemoteConnRecorder.Write() unexpected error %v", i, err) } @@ -195,11 +214,12 @@ func Test_Reads(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tc := &fakes.TestConn{} sr := &fakes.TestSessionRecorder{} - rec := tsrecorder.New(sr, cl, cl.Now(), true) + rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar()) c := &conn{ - Conn: tc, - log: zl.Sugar(), - rec: rec, + Conn: tc, + log: zl.Sugar(), + rec: rec, + initialTermSizeSet: make(chan struct{}), } c.resizeStreamID.Store(tt.resizeStreamIDBeforeRead) diff --git a/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go b/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go index 30142e4bd..af5fcb8da 100644 --- a/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go +++ b/k8s-operator/sessionrecording/tsrecorder/tsrecorder.go @@ -14,10 +14,12 @@ "time" "github.com/pkg/errors" + "go.uber.org/zap" + "tailscale.com/sessionrecording" "tailscale.com/tstime" ) -func New(conn io.WriteCloser, clock tstime.Clock, start time.Time, failOpen bool) *Client { +func New(conn io.WriteCloser, clock tstime.Clock, start time.Time, failOpen bool, logger *zap.SugaredLogger) *Client { return &Client{ start: start, clock: clock, @@ -35,38 +37,66 @@ type Client struct { // failOpen specifies whether the session should be allowed to // continue if writing to the recording fails. failOpen bool + // failedOpen is set to true if the recording of this session failed and + // we should not attempt to send any more data. + failedOpen bool - // backOff is set to true if we've failed open and should stop - // attempting to write to tsrecorder. - backOff bool + logger *zap.SugaredLogger mu sync.Mutex // guards writes to conn conn io.WriteCloser // connection to a tsrecorder instance } -// Write appends timestamp to the provided bytes and sends them to the -// configured tsrecorder. -func (rec *Client) Write(p []byte) (err error) { +// WriteOutput sends terminal stdout and stderr to the tsrecorder. +// https://docs.asciinema.org/manual/asciicast/v2/#o-output-data-written-to-a-terminal +func (rec *Client) WriteOutput(p []byte) (err error) { + const outputEventCode = "o" if len(p) == 0 { return nil } - if rec.backOff { + return rec.write([]any{ + rec.clock.Now().Sub(rec.start).Seconds(), + outputEventCode, + string(p)}) +} + +// WriteResize writes an asciinema resize message. This can be called if +// terminal size has changed. +// https://docs.asciinema.org/manual/asciicast/v2/#r-resize +func (rec *Client) WriteResize(height, width int) (err error) { + const resizeEventCode = "r" + p := fmt.Sprintf("%dx%d", height, width) + return rec.write([]any{ + rec.clock.Now().Sub(rec.start).Seconds(), + resizeEventCode, + string(p)}) +} + +// WriteCastHeaders writes asciinema CastHeader. This must be called once, +// before any payload is sent to the tsrecorder. +// https://docs.asciinema.org/manual/asciicast/v2/#header +func (rec *Client) WriteCastHeader(ch sessionrecording.CastHeader) error { + return rec.write(ch) +} + +// write writes the data to session recorder. If recording fails and policy is +// 'fail open', sets the state to failed and does not attempt to write any more +// data during this session. +func (rec *Client) write(data any) error { + if rec.failedOpen { return nil } - j, err := json.Marshal([]any{ - rec.clock.Now().Sub(rec.start).Seconds(), - "o", - string(p), - }) + j, err := json.Marshal(data) if err != nil { - return fmt.Errorf("error marhalling payload: %w", err) + return fmt.Errorf("error marshalling data as json: %v", err) } j = append(j, '\n') - if err := rec.WriteCastLine(j); err != nil { + if err := rec.writeCastLine(j); err != nil { if !rec.failOpen { return fmt.Errorf("error writing payload to recorder: %w", err) } - rec.backOff = true + rec.logger.Infof("error writing to tsrecorder: %v. Failure policy is to fail open, so rest of session contents will not be recorded.", err) + rec.failedOpen = true } return nil } @@ -82,9 +112,9 @@ func (rec *Client) Close() error { return err } -// writeCastLine sends bytes to the tsrecorder. The bytes should be in +// writeToRecorder sends bytes to the tsrecorder. The bytes should be in // asciinema format. -func (c *Client) WriteCastLine(j []byte) error { +func (c *Client) writeCastLine(j []byte) error { c.mu.Lock() defer c.mu.Unlock() if c.conn == nil { diff --git a/k8s-operator/sessionrecording/ws/conn.go b/k8s-operator/sessionrecording/ws/conn.go index 82fd094d1..86029f67b 100644 --- a/k8s-operator/sessionrecording/ws/conn.go +++ b/k8s-operator/sessionrecording/ws/conn.go @@ -28,14 +28,16 @@ // The hijacked connection is used to transmit *.channel.k8s.io streams between Kubernetes client ('kubectl') and the destination proxy controlled by Kubernetes. // Data read from the underlying network connection is data sent via one of the streams from the client to the container. // Data written to the underlying connection is data sent from the container to the client. -// We parse the data and send everything for the STDOUT/STDERR streams to the configured tsrecorder as an asciinema recording with the provided header. +// We parse the data and send everything for the stdout/stderr streams to the configured tsrecorder as an asciinema recording with the provided header. // https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/4006-transition-spdy-to-websockets#proposal-new-remotecommand-sub-protocol-version---v5channelk8sio -func New(c net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, log *zap.SugaredLogger) net.Conn { +func New(c net.Conn, rec *tsrecorder.Client, ch sessionrecording.CastHeader, hasTerm bool, log *zap.SugaredLogger) net.Conn { return &conn{ - Conn: c, - rec: rec, - ch: ch, - log: log, + Conn: c, + rec: rec, + ch: ch, + hasTerm: hasTerm, + log: log, + initialTermSizeSet: make(chan struct{}, 1), } } @@ -49,8 +51,37 @@ type conn struct { net.Conn // rec knows how to send data to a tsrecorder instance. rec *tsrecorder.Client - // ch is the asiinema CastHeader for a session. - ch sessionrecording.CastHeader + + // The following fields are related to sending asciinema CastHeader. + // CastHeader must be sent before any payload. If the session has a + // terminal attached, the CastHeader must have '.Width' and '.Height' + // fields set for the tsrecorder UI to be able to play the recording. + // For 'kubectl exec' sessions, terminal width and height are sent as a + // resize message on resize stream from the client when the session + // starts as well as at any time the client detects a terminal change. + // We can intercept the resize message on Read calls. As there is no + // guarantee that the resize message from client will be intercepted + // before server writes stdout messages that we must record, we need to + // ensure that parsing stdout/stderr messages written to the connection + // waits till a resize message has been received and a CastHeader with + // correct terminal dimensions can be written. + + // ch is asciinema CastHeader for the current session. + // https://docs.asciinema.org/manual/asciicast/v2/#header + ch sessionrecording.CastHeader + // writeCastHeaderOnce is used to ensure CastHeader gets sent to tsrecorder once. + writeCastHeaderOnce sync.Once + hasTerm bool // whether the session has TTY attached + // initialTermSizeSet channel gets sent a value once, when the Read has + // received a resize message and set the initial terminal size. It must + // be set to a buffered channel to prevent Reads being blocked on the + // first stdout/stderr write reading from the channel. + initialTermSizeSet chan struct{} + // sendInitialTermSizeSetOnce is used to ensure that a value is sent to + // initialTermSizeSet channel only once, when the initial resize message + // is received. + sendInitialTermSizeSetOnce sync.Once + log *zap.SugaredLogger rmu sync.Mutex // sequences reads @@ -63,9 +94,8 @@ type conn struct { // the original byte array. readBuf bytes.Buffer - wmu sync.Mutex // sequences writes - writeCastHeaderOnce sync.Once - closed bool // connection is closed + wmu sync.Mutex // sequences writes + closed bool // connection is closed // writeBuf contains bytes for a currently parsed binary data message // being written to the underlying conn. If the message is masked, it is // unmasked in place, so having this buffer allows us to avoid modifying @@ -140,17 +170,32 @@ func (c *conn) Read(b []byte) (int, error) { } c.readBuf.Next(len(readMsg.raw)) - if readMsg.isFinalized { + if readMsg.isFinalized && !c.readMsgIsIncomplete() { // Stream IDs for websocket streams are static. // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 if readMsg.streamID.Load() == remotecommand.StreamResize { - var err error var msg tsrecorder.ResizeMsg if err = json.Unmarshal(readMsg.payload, &msg); err != nil { return 0, fmt.Errorf("error umarshalling resize message: %w", err) } + c.ch.Width = msg.Width c.ch.Height = msg.Height + + // If this is initial resize message, the width and + // height will be sent in the CastHeader. If this is a + // subsequent resize message, we need to send asciinema + // resize message. + var isInitialResize bool + c.sendInitialTermSizeSetOnce.Do(func() { + isInitialResize = true + close(c.initialTermSizeSet) // unblock sending of CastHeader + }) + if !isInitialResize { + if err := c.rec.WriteResize(c.ch.Height, c.ch.Width); err != nil { + return 0, fmt.Errorf("error writing resize message: %w", err) + } + } } } c.currentReadMsg = readMsg @@ -209,22 +254,21 @@ func (c *conn) Write(b []byte) (int, error) { if writeMsg.streamID.Load() == remotecommand.StreamStdOut || writeMsg.streamID.Load() == remotecommand.StreamStdErr { var err error c.writeCastHeaderOnce.Do(func() { - var j []byte - j, err = json.Marshal(c.ch) - if err != nil { - c.log.Errorf("error marhsalling conn: %v", err) - return - } - j = append(j, '\n') - err = c.rec.WriteCastLine(j) - if err != nil { - c.log.Errorf("received error from recorder: %v", err) + // If this is a session with a terminal attached, + // we must wait for the terminal width and + // height to be parsed from a resize message + // before sending CastHeader, else tsrecorder + // will not be able to play this recording. + if c.hasTerm { + c.log.Debug("waiting for terminal size to be set before starting to send recorded data") + <-c.initialTermSizeSet } + err = c.rec.WriteCastHeader(c.ch) }) if err != nil { return 0, fmt.Errorf("error writing CastHeader: %w", err) } - if err := c.rec.Write(writeMsg.payload); err != nil { + if err := c.rec.WriteOutput(writeMsg.payload); err != nil { return 0, fmt.Errorf("error writing message to recorder: %v", err) } } diff --git a/k8s-operator/sessionrecording/ws/conn_test.go b/k8s-operator/sessionrecording/ws/conn_test.go index 2fcbeb7ca..11174480b 100644 --- a/k8s-operator/sessionrecording/ws/conn_test.go +++ b/k8s-operator/sessionrecording/ws/conn_test.go @@ -65,6 +65,7 @@ func Test_conn_Read(t *testing.T) { log: zl.Sugar(), } for i, input := range tt.inputs { + c.initialTermSizeSet = make(chan struct{}) if err := tc.WriteReadBufBytes(input); err != nil { t.Fatalf("writing bytes to test conn: %v", err) } @@ -93,13 +94,15 @@ func Test_conn_Write(t *testing.T) { } cl := tstest.NewClock(tstest.ClockOpts{}) tests := []struct { - name string - inputs [][]byte - wantForwarded []byte - wantRecorded []byte - firstWrite bool - width int - height int + name string + inputs [][]byte + wantForwarded []byte + wantRecorded []byte + firstWrite bool + width int + height int + hasTerm bool + sendInitialResize bool }{ { name: "single_write_control_frame", @@ -144,12 +147,23 @@ func Test_conn_Write(t *testing.T) { wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5}, wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl), }, + { + name: "three_writes_stdout_data_message_with_split_fragment_cast_header_with_terminal", + inputs: [][]byte{{0x2, 0x3, 0x1, 0x7, 0x8}, {0x80, 0x6, 0x1, 0x1, 0x2, 0x3}, {0x4, 0x5}}, + wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl)...), + height: 20, + width: 10, + hasTerm: true, + firstWrite: true, + sendInitialResize: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tc := &fakes.TestConn{} sr := &fakes.TestSessionRecorder{} - rec := tsrecorder.New(sr, cl, cl.Now(), true) + rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar()) c := &conn{ Conn: tc, log: zl.Sugar(), @@ -157,12 +171,17 @@ func Test_conn_Write(t *testing.T) { Width: tt.width, Height: tt.height, }, - rec: rec, + rec: rec, + initialTermSizeSet: make(chan struct{}), + hasTerm: tt.hasTerm, } if !tt.firstWrite { // This test case does not intend to test that cast header gets written once. c.writeCastHeaderOnce.Do(func() {}) } + if tt.sendInitialResize { + close(c.initialTermSizeSet) + } for i, input := range tt.inputs { _, err := c.Write(input) if err != nil { @@ -221,7 +240,7 @@ func Test_conn_WriteRand(t *testing.T) { } cl := tstest.NewClock(tstest.ClockOpts{}) sr := &fakes.TestSessionRecorder{} - rec := tsrecorder.New(sr, cl, cl.Now(), true) + rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar()) for i := range 100 { tc := &fakes.TestConn{} c := &conn{