etcdserver: Fix 64 KB websocket notification message limit

This fixes etcd being unable to send any message longer than 64 KB as
a notification over the websocket. This was because the older version
of grpc-websocket-proxy was used and WithMaxRespBodyBufferSize option
wasn't set.
This commit is contained in:
Vitaliy Filippov
2020-10-19 15:14:47 +03:00
parent d51c6c689b
commit a40f14d92c
4 changed files with 106 additions and 9 deletions

View File

@ -2,9 +2,11 @@ package wsproxy
import (
"bufio"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
@ -26,11 +28,16 @@ type RequestMutatorFunc func(incoming *http.Request, outgoing *http.Request) *ht
// Proxy provides websocket transport upgrade to compatible endpoints.
type Proxy struct {
h http.Handler
logger Logger
methodOverrideParam string
tokenCookieName string
requestMutator RequestMutatorFunc
h http.Handler
logger Logger
maxRespBodyBufferBytes int
methodOverrideParam string
tokenCookieName string
requestMutator RequestMutatorFunc
headerForwarder func(header string) bool
pingInterval time.Duration
pingWait time.Duration
pongWait time.Duration
}
// Logger collects log messages.
@ -50,6 +57,15 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Option allows customization of the proxy.
type Option func(*Proxy)
// WithMaxRespBodyBufferSize allows specification of a custom size for the
// buffer used while reading the response body. By default, the bufio.Scanner
// used to read the response body sets the maximum token size to MaxScanTokenSize.
func WithMaxRespBodyBufferSize(nBytes int) Option {
return func(p *Proxy) {
p.maxRespBodyBufferBytes = nBytes
}
}
// WithMethodParamOverride allows specification of the special http parameter that is used in the proxied streaming request.
func WithMethodParamOverride(param string) Option {
return func(p *Proxy) {
@ -71,6 +87,13 @@ func WithRequestMutator(fn RequestMutatorFunc) Option {
}
}
// WithForwardedHeaders allows controlling which headers are forwarded.
func WithForwardedHeaders(fn func(header string) bool) Option {
return func(p *Proxy) {
p.headerForwarder = fn
}
}
// WithLogger allows a custom FieldLogger to be supplied
func WithLogger(logger Logger) Option {
return func(p *Proxy) {
@ -78,6 +101,28 @@ func WithLogger(logger Logger) Option {
}
}
// WithPingControl allows specification of ping pong control. The interval
// parameter specifies the pingInterval between pings. The allowed wait time
// for a pong response is (pingInterval * 10) / 9.
func WithPingControl(interval time.Duration) Option {
return func(proxy *Proxy) {
proxy.pingInterval = interval
proxy.pongWait = (interval * 10) / 9
proxy.pingWait = proxy.pongWait / 6
}
}
var defaultHeadersToForward = map[string]bool{
"Origin": true,
"origin": true,
"Referer": true,
"referer": true,
}
func defaultHeaderForwarder(header string) bool {
return defaultHeadersToForward[header]
}
// WebsocketProxy attempts to expose the underlying handler as a bidi websocket stream with newline-delimited
// JSON as the content encoding.
//
@ -96,6 +141,7 @@ func WebsocketProxy(h http.Handler, opts ...Option) http.Handler {
logger: logrus.New(),
methodOverrideParam: MethodOverrideParam,
tokenCookieName: TokenCookieName,
headerForwarder: defaultHeaderForwarder,
}
for _, o := range opts {
o(p)
@ -144,7 +190,12 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
return
}
if swsp := r.Header.Get("Sec-WebSocket-Protocol"); swsp != "" {
request.Header.Set("Authorization", strings.Replace(swsp, "Bearer, ", "Bearer ", 1))
request.Header.Set("Authorization", transformSubProtocolHeader(swsp))
}
for header := range r.Header {
if p.headerForwarder(header) {
request.Header.Set(header, r.Header.Get(header))
}
}
// If token cookie is present, populate Authorization header from the cookie instead.
if cookie, err := r.Cookie(p.tokenCookieName); err == nil {
@ -175,6 +226,10 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
// read loop -- take messages from websocket and write to http request
go func() {
if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
conn.SetReadDeadline(time.Now().Add(p.pongWait))
conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(p.pongWait)); return nil })
}
defer func() {
cancelFn()
}()
@ -206,8 +261,38 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
}
}
}()
// ping write loop
if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
go func() {
ticker := time.NewTicker(p.pingInterval)
defer func() {
ticker.Stop()
conn.Close()
}()
for {
select {
case <-ctx.Done():
p.logger.Debugln("ping loop done")
return
case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(p.pingWait))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}()
}
// write loop -- take messages from response and write to websocket
scanner := bufio.NewScanner(responseBodyR)
// if maxRespBodyBufferSize has been specified, use custom buffer for scanner
var scannerBuf []byte
if p.maxRespBodyBufferBytes > 0 {
scannerBuf = make([]byte, 0, 64*1024)
scanner.Buffer(scannerBuf, p.maxRespBodyBufferBytes)
}
for scanner.Scan() {
if len(scanner.Bytes()) == 0 {
p.logger.Warnln("[write] empty scan", scanner.Err())
@ -239,6 +324,17 @@ func newInMemoryResponseWriter(w io.Writer) *inMemoryResponseWriter {
}
}
// IE and Edge do not delimit Sec-WebSocket-Protocol strings with spaces
func transformSubProtocolHeader(header string) string {
tokens := strings.SplitN(header, "Bearer,", 2)
if len(tokens) < 2 {
return ""
}
return fmt.Sprintf("Bearer %v", strings.Trim(tokens[1], " "))
}
func (w *inMemoryResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}