vendor: upgrade grpc to 1.2.1
This commit is contained in:
217
cmd/vendor/google.golang.org/grpc/server.go
generated
vendored
217
cmd/vendor/google.golang.org/grpc/server.go
generated
vendored
@ -54,6 +54,8 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/tap"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
@ -110,8 +112,11 @@ type options struct {
|
||||
maxMsgSize int
|
||||
unaryInt UnaryServerInterceptor
|
||||
streamInt StreamServerInterceptor
|
||||
inTapHandle tap.ServerInHandle
|
||||
statsHandler stats.Handler
|
||||
maxConcurrentStreams uint32
|
||||
useHandlerImpl bool // use http.Handler-based server
|
||||
unknownStreamDesc *StreamDesc
|
||||
}
|
||||
|
||||
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
|
||||
@ -186,6 +191,42 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
|
||||
}
|
||||
}
|
||||
|
||||
// InTapHandle returns a ServerOption that sets the tap handle for all the server
|
||||
// transport to be created. Only one can be installed.
|
||||
func InTapHandle(h tap.ServerInHandle) ServerOption {
|
||||
return func(o *options) {
|
||||
if o.inTapHandle != nil {
|
||||
panic("The tap handle has been set.")
|
||||
}
|
||||
o.inTapHandle = h
|
||||
}
|
||||
}
|
||||
|
||||
// StatsHandler returns a ServerOption that sets the stats handler for the server.
|
||||
func StatsHandler(h stats.Handler) ServerOption {
|
||||
return func(o *options) {
|
||||
o.statsHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
|
||||
// unknown service handler. The provided method is a bidi-streaming RPC service
|
||||
// handler that will be invoked instead of returning the the "unimplemented" gRPC
|
||||
// error whenever a request is received for an unregistered service or method.
|
||||
// The handling function has full access to the Context of the request and the
|
||||
// stream, and the invocation passes through interceptors.
|
||||
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
|
||||
return func(o *options) {
|
||||
o.unknownStreamDesc = &StreamDesc{
|
||||
StreamName: "unknown_service_handler",
|
||||
Handler: streamHandler,
|
||||
// We need to assume that the users of the streamHandler will want to use both.
|
||||
ClientStreams: true,
|
||||
ServerStreams: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewServer creates a gRPC server which has no service registered and has not
|
||||
// started to accept requests yet.
|
||||
func NewServer(opt ...ServerOption) *Server {
|
||||
@ -329,6 +370,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
|
||||
// read gRPC requests and then call the registered handlers to reply to them.
|
||||
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
|
||||
// this method returns.
|
||||
// Serve always returns non-nil error.
|
||||
func (s *Server) Serve(lis net.Listener) error {
|
||||
s.mu.Lock()
|
||||
s.printf("serving")
|
||||
@ -412,17 +454,23 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
|
||||
if s.opts.useHandlerImpl {
|
||||
s.serveUsingHandler(conn)
|
||||
} else {
|
||||
s.serveNewHTTP2Transport(conn, authInfo)
|
||||
s.serveHTTP2Transport(conn, authInfo)
|
||||
}
|
||||
}
|
||||
|
||||
// serveNewHTTP2Transport sets up a new http/2 transport (using the
|
||||
// serveHTTP2Transport sets up a http/2 transport (using the
|
||||
// gRPC http2 server transport in transport/http2_server.go) and
|
||||
// serves streams on it.
|
||||
// This is run in its own goroutine (it does network I/O in
|
||||
// transport.NewServerTransport).
|
||||
func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
|
||||
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
|
||||
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
|
||||
config := &transport.ServerConfig{
|
||||
MaxStreams: s.opts.maxConcurrentStreams,
|
||||
AuthInfo: authInfo,
|
||||
InTapHandle: s.opts.inTapHandle,
|
||||
StatsHandler: s.opts.statsHandler,
|
||||
}
|
||||
st, err := transport.NewServerTransport("http2", c, config)
|
||||
if err != nil {
|
||||
s.mu.Lock()
|
||||
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
|
||||
@ -448,6 +496,12 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
|
||||
defer wg.Done()
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
}()
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
if !EnableTracing {
|
||||
return ctx
|
||||
}
|
||||
tr := trace.New("grpc.Recv."+methodFamily(method), method)
|
||||
return trace.NewContext(ctx, tr)
|
||||
})
|
||||
wg.Wait()
|
||||
}
|
||||
@ -497,15 +551,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
|
||||
// If tracing is not enabled, it returns nil.
|
||||
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
|
||||
if !EnableTracing {
|
||||
tr, ok := trace.FromContext(stream.Context())
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
trInfo = &traceInfo{
|
||||
tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
|
||||
tr: tr,
|
||||
}
|
||||
trInfo.firstLine.client = false
|
||||
trInfo.firstLine.remoteAddr = st.RemoteAddr()
|
||||
stream.TraceContext(trInfo.tr)
|
||||
|
||||
if dl, ok := stream.Context().Deadline(); ok {
|
||||
trInfo.firstLine.deadline = dl.Sub(time.Now())
|
||||
}
|
||||
@ -532,11 +588,17 @@ func (s *Server) removeConn(c io.Closer) {
|
||||
}
|
||||
|
||||
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
|
||||
var cbuf *bytes.Buffer
|
||||
var (
|
||||
cbuf *bytes.Buffer
|
||||
outPayload *stats.OutPayload
|
||||
)
|
||||
if cp != nil {
|
||||
cbuf = new(bytes.Buffer)
|
||||
}
|
||||
p, err := encode(s.opts.codec, msg, cp, cbuf)
|
||||
if s.opts.statsHandler != nil {
|
||||
outPayload = &stats.OutPayload{}
|
||||
}
|
||||
p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
|
||||
if err != nil {
|
||||
// This typically indicates a fatal issue (e.g., memory
|
||||
// corruption or hardware faults) the application program
|
||||
@ -547,10 +609,33 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
|
||||
// the optimal option.
|
||||
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
|
||||
}
|
||||
return t.Write(stream, p, opts)
|
||||
err = t.Write(stream, p, opts)
|
||||
if err == nil && outPayload != nil {
|
||||
outPayload.SentTime = time.Now()
|
||||
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
|
||||
sh := s.opts.statsHandler
|
||||
if sh != nil {
|
||||
begin := &stats.Begin{
|
||||
BeginTime: time.Now(),
|
||||
}
|
||||
sh.HandleRPC(stream.Context(), begin)
|
||||
}
|
||||
defer func() {
|
||||
if sh != nil {
|
||||
end := &stats.End{
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
end.Error = toRPCErr(err)
|
||||
}
|
||||
sh.HandleRPC(stream.Context(), end)
|
||||
}
|
||||
}()
|
||||
if trInfo != nil {
|
||||
defer trInfo.tr.Finish()
|
||||
trInfo.firstLine.client = false
|
||||
@ -579,14 +664,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *rpcError:
|
||||
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
case transport.ConnectionError:
|
||||
// Nothing to do here.
|
||||
case transport.StreamError:
|
||||
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
|
||||
@ -597,20 +682,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
|
||||
switch err := err.(type) {
|
||||
case *rpcError:
|
||||
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
return err
|
||||
default:
|
||||
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
||||
}
|
||||
|
||||
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
|
||||
}
|
||||
}
|
||||
var inPayload *stats.InPayload
|
||||
if sh != nil {
|
||||
inPayload = &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
}
|
||||
return err
|
||||
}
|
||||
statusCode := codes.OK
|
||||
statusDesc := ""
|
||||
df := func(v interface{}) error {
|
||||
if inPayload != nil {
|
||||
inPayload.WireLength = len(req)
|
||||
}
|
||||
if pf == compressionMade {
|
||||
var err error
|
||||
req, err = s.opts.dc.Do(bytes.NewReader(req))
|
||||
@ -618,7 +712,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
|
||||
}
|
||||
return err
|
||||
return Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
if len(req) > s.opts.maxMsgSize {
|
||||
@ -630,6 +724,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if err := s.opts.codec.Unmarshal(req, v); err != nil {
|
||||
return err
|
||||
}
|
||||
if inPayload != nil {
|
||||
inPayload.Payload = v
|
||||
inPayload.Data = req
|
||||
inPayload.Length = len(req)
|
||||
sh.HandleRPC(stream.Context(), inPayload)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
|
||||
}
|
||||
@ -650,9 +750,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
}
|
||||
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
|
||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return Errorf(statusCode, statusDesc)
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(stringer("OK"), false)
|
||||
@ -677,23 +776,46 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
|
||||
}
|
||||
return t.WriteStatus(stream, statusCode, statusDesc)
|
||||
errWrite := t.WriteStatus(stream, statusCode, statusDesc)
|
||||
if statusCode != codes.OK {
|
||||
return Errorf(statusCode, statusDesc)
|
||||
}
|
||||
return errWrite
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
|
||||
sh := s.opts.statsHandler
|
||||
if sh != nil {
|
||||
begin := &stats.Begin{
|
||||
BeginTime: time.Now(),
|
||||
}
|
||||
sh.HandleRPC(stream.Context(), begin)
|
||||
}
|
||||
defer func() {
|
||||
if sh != nil {
|
||||
end := &stats.End{
|
||||
EndTime: time.Now(),
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
end.Error = toRPCErr(err)
|
||||
}
|
||||
sh.HandleRPC(stream.Context(), end)
|
||||
}
|
||||
}()
|
||||
if s.opts.cp != nil {
|
||||
stream.SetSendCompress(s.opts.cp.Type())
|
||||
}
|
||||
ss := &serverStream{
|
||||
t: t,
|
||||
s: stream,
|
||||
p: &parser{r: stream},
|
||||
codec: s.opts.codec,
|
||||
cp: s.opts.cp,
|
||||
dc: s.opts.dc,
|
||||
maxMsgSize: s.opts.maxMsgSize,
|
||||
trInfo: trInfo,
|
||||
t: t,
|
||||
s: stream,
|
||||
p: &parser{r: stream},
|
||||
codec: s.opts.codec,
|
||||
cp: s.opts.cp,
|
||||
dc: s.opts.dc,
|
||||
maxMsgSize: s.opts.maxMsgSize,
|
||||
trInfo: trInfo,
|
||||
statsHandler: sh,
|
||||
}
|
||||
if ss.cp != nil {
|
||||
ss.cbuf = new(bytes.Buffer)
|
||||
@ -712,15 +834,19 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
}()
|
||||
}
|
||||
var appErr error
|
||||
var server interface{}
|
||||
if srv != nil {
|
||||
server = srv.server
|
||||
}
|
||||
if s.opts.streamInt == nil {
|
||||
appErr = sd.Handler(srv.server, ss)
|
||||
appErr = sd.Handler(server, ss)
|
||||
} else {
|
||||
info := &StreamServerInfo{
|
||||
FullMethod: stream.Method(),
|
||||
IsClientStream: sd.ClientStreams,
|
||||
IsServerStream: sd.ServerStreams,
|
||||
}
|
||||
appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
|
||||
appErr = s.opts.streamInt(server, ss, info, sd.Handler)
|
||||
}
|
||||
if appErr != nil {
|
||||
if err, ok := appErr.(*rpcError); ok {
|
||||
@ -744,7 +870,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
}
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
||||
errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
||||
if ss.statusCode != codes.OK {
|
||||
return Errorf(ss.statusCode, ss.statusDesc)
|
||||
}
|
||||
return errWrite
|
||||
|
||||
}
|
||||
|
||||
@ -759,7 +889,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
|
||||
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
|
||||
if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
@ -775,11 +906,16 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
method := sm[pos+1:]
|
||||
srv, ok := s.m[service]
|
||||
if !ok {
|
||||
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
||||
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
||||
return
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
|
||||
errDesc := fmt.Sprintf("unknown service %v", service)
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
@ -804,7 +940,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
|
||||
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
||||
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
||||
return
|
||||
}
|
||||
errDesc := fmt.Sprintf("unknown method %v", method)
|
||||
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
|
Reference in New Issue
Block a user