server/embed: fix data race when start insecure grpc
There are two goroutines accessing the `gs` grpc server var. Before
insecure `gs` server start, the `gs` can be changed to secure server and
then the client will fail to connect to etcd with insecure request. It
is data-race. We should use argument for reference in the new goroutine.
fix: #15495
Signed-off-by: Wei Fu <fuweid89@gmail.com>
(cherry picked from commit a9988e2625
)
Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
15b3756abd
commit
9e974792f9
@ -104,22 +104,26 @@ func (sctx *serveCtx) serve(
|
|||||||
servElection := v3election.NewElectionServer(v3c)
|
servElection := v3election.NewElectionServer(v3c)
|
||||||
servLock := v3lock.NewLockServer(v3c)
|
servLock := v3lock.NewLockServer(v3c)
|
||||||
|
|
||||||
var gs *grpc.Server
|
|
||||||
defer func() {
|
|
||||||
if err != nil && gs != nil {
|
|
||||||
gs.Stop()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if sctx.insecure {
|
if sctx.insecure {
|
||||||
gs = v3rpc.Server(s, nil, nil, gopts...)
|
gs := v3rpc.Server(s, nil, nil, gopts...)
|
||||||
v3electionpb.RegisterElectionServer(gs, servElection)
|
v3electionpb.RegisterElectionServer(gs, servElection)
|
||||||
v3lockpb.RegisterLockServer(gs, servLock)
|
v3lockpb.RegisterLockServer(gs, servLock)
|
||||||
if sctx.serviceRegister != nil {
|
if sctx.serviceRegister != nil {
|
||||||
sctx.serviceRegister(gs)
|
sctx.serviceRegister(gs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func(gs *grpc.Server) {
|
||||||
|
if err != nil {
|
||||||
|
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
|
||||||
|
gs.Stop()
|
||||||
|
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
|
||||||
|
}
|
||||||
|
}(gs)
|
||||||
|
|
||||||
grpcl := m.Match(cmux.HTTP2())
|
grpcl := m.Match(cmux.HTTP2())
|
||||||
go func() { errHandler(gs.Serve(grpcl)) }()
|
go func(gs *grpc.Server, grpcLis net.Listener) {
|
||||||
|
errHandler(gs.Serve(grpcLis))
|
||||||
|
}(gs, grpcl)
|
||||||
|
|
||||||
var gwmux *gw.ServeMux
|
var gwmux *gw.ServeMux
|
||||||
if s.Cfg.EnableGRPCGateway {
|
if s.Cfg.EnableGRPCGateway {
|
||||||
@ -140,7 +144,10 @@ func (sctx *serveCtx) serve(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
httpl := m.Match(cmux.HTTP1())
|
httpl := m.Match(cmux.HTTP1())
|
||||||
go func() { errHandler(srvhttp.Serve(httpl)) }()
|
|
||||||
|
go func(srvhttp *http.Server, httpLis net.Listener) {
|
||||||
|
errHandler(srvhttp.Serve(httpLis))
|
||||||
|
}(srvhttp, httpl)
|
||||||
|
|
||||||
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
|
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
|
||||||
sctx.lg.Info(
|
sctx.lg.Info(
|
||||||
@ -154,12 +161,22 @@ func (sctx *serveCtx) serve(
|
|||||||
if tlsErr != nil {
|
if tlsErr != nil {
|
||||||
return tlsErr
|
return tlsErr
|
||||||
}
|
}
|
||||||
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
|
|
||||||
|
gs := v3rpc.Server(s, tlscfg, nil, gopts...)
|
||||||
v3electionpb.RegisterElectionServer(gs, servElection)
|
v3electionpb.RegisterElectionServer(gs, servElection)
|
||||||
v3lockpb.RegisterLockServer(gs, servLock)
|
v3lockpb.RegisterLockServer(gs, servLock)
|
||||||
if sctx.serviceRegister != nil {
|
if sctx.serviceRegister != nil {
|
||||||
sctx.serviceRegister(gs)
|
sctx.serviceRegister(gs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func(gs *grpc.Server) {
|
||||||
|
if err != nil {
|
||||||
|
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
|
||||||
|
gs.Stop()
|
||||||
|
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
|
||||||
|
}
|
||||||
|
}(gs)
|
||||||
|
|
||||||
handler = grpcHandlerFunc(gs, handler)
|
handler = grpcHandlerFunc(gs, handler)
|
||||||
|
|
||||||
var gwmux *gw.ServeMux
|
var gwmux *gw.ServeMux
|
||||||
@ -192,7 +209,10 @@ func (sctx *serveCtx) serve(
|
|||||||
sctx.lg.Error("Configure https server failed", zap.Error(err))
|
sctx.lg.Error("Configure https server failed", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go func() { errHandler(srv.Serve(tlsl)) }()
|
|
||||||
|
go func(srvhttp *http.Server, tlsLis net.Listener) {
|
||||||
|
errHandler(srvhttp.Serve(tlsLis))
|
||||||
|
}(srv, tlsl)
|
||||||
|
|
||||||
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
|
sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
|
||||||
sctx.lg.Info(
|
sctx.lg.Info(
|
||||||
|
@ -142,6 +142,9 @@ func fetchGrpcGateway(endpoint string, httpVersion string, connType clientConnTy
|
|||||||
}
|
}
|
||||||
req := cURLReq{endpoint: "/v3/kv/range", value: string(rangeData), timeout: 5, httpVersion: httpVersion}
|
req := cURLReq{endpoint: "/v3/kv/range", value: string(rangeData), timeout: 5, httpVersion: httpVersion}
|
||||||
respData, err := curl(endpoint, "POST", req, connType)
|
respData, err := curl(endpoint, "POST", req, connType)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return validateGrpcgatewayRangeReponse([]byte(respData))
|
return validateGrpcgatewayRangeReponse([]byte(respData))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user