grpcproxy, etcdmain, integration: add close channel to kv proxy
ccache launches goroutines that need to be explicitly stopped. Fixes #7158
This commit is contained in:

committed by
Gyu-Ho Lee

parent
6c8f1986c8
commit
c2e8d06eec
@ -103,7 +103,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
kvp := grpcproxy.NewKvProxy(client)
|
kvp, _ := grpcproxy.NewKvProxy(client)
|
||||||
watchp, _ := grpcproxy.NewWatchProxy(client)
|
watchp, _ := grpcproxy.NewWatchProxy(client)
|
||||||
clusterp := grpcproxy.NewClusterProxy(client)
|
clusterp := grpcproxy.NewClusterProxy(client)
|
||||||
leasep := grpcproxy.NewLeaseProxy(client)
|
leasep := grpcproxy.NewLeaseProxy(client)
|
||||||
|
@ -30,8 +30,9 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type grpcClientProxy struct {
|
type grpcClientProxy struct {
|
||||||
grpc grpcAPI
|
grpc grpcAPI
|
||||||
wdonec <-chan struct{}
|
wdonec <-chan struct{}
|
||||||
|
kvdonec <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toGRPC(c *clientv3.Client) grpcAPI {
|
func toGRPC(c *clientv3.Client) grpcAPI {
|
||||||
@ -43,26 +44,30 @@ func toGRPC(c *clientv3.Client) grpcAPI {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wp, wpch := grpcproxy.NewWatchProxy(c)
|
wp, wpch := grpcproxy.NewWatchProxy(c)
|
||||||
|
kvp, kvpch := grpcproxy.NewKvProxy(c)
|
||||||
grpc := grpcAPI{
|
grpc := grpcAPI{
|
||||||
pb.NewClusterClient(c.ActiveConnection()),
|
pb.NewClusterClient(c.ActiveConnection()),
|
||||||
grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
|
grpcproxy.KvServerToKvClient(kvp),
|
||||||
pb.NewLeaseClient(c.ActiveConnection()),
|
pb.NewLeaseClient(c.ActiveConnection()),
|
||||||
grpcproxy.WatchServerToWatchClient(wp),
|
grpcproxy.WatchServerToWatchClient(wp),
|
||||||
pb.NewMaintenanceClient(c.ActiveConnection()),
|
pb.NewMaintenanceClient(c.ActiveConnection()),
|
||||||
pb.NewAuthClient(c.ActiveConnection()),
|
pb.NewAuthClient(c.ActiveConnection()),
|
||||||
}
|
}
|
||||||
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch}
|
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch}
|
||||||
return grpc
|
return grpc
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchCloser struct {
|
type proxyCloser struct {
|
||||||
clientv3.Watcher
|
clientv3.Watcher
|
||||||
wdonec <-chan struct{}
|
wdonec <-chan struct{}
|
||||||
|
kvdonec <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wc *watchCloser) Close() error {
|
func (pc *proxyCloser) Close() error {
|
||||||
err := wc.Watcher.Close()
|
// client ctx is canceled before calling close, so kv will close out
|
||||||
<-wc.wdonec
|
<-pc.kvdonec
|
||||||
|
err := pc.Watcher.Close()
|
||||||
|
<-pc.wdonec
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,9 +79,10 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
|
|||||||
rpc := toGRPC(c)
|
rpc := toGRPC(c)
|
||||||
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
|
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
|
||||||
pmu.Lock()
|
pmu.Lock()
|
||||||
c.Watcher = &watchCloser{
|
c.Watcher = &proxyCloser{
|
||||||
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
|
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
|
||||||
wdonec: proxies[c].wdonec,
|
wdonec: proxies[c].wdonec,
|
||||||
|
kvdonec: proxies[c].kvdonec,
|
||||||
}
|
}
|
||||||
pmu.Unlock()
|
pmu.Unlock()
|
||||||
return c, nil
|
return c, nil
|
||||||
|
3
proxy/grpcproxy/cache/store.go
vendored
3
proxy/grpcproxy/cache/store.go
vendored
@ -39,6 +39,7 @@ type Cache interface {
|
|||||||
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
|
Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||||
Compact(revision int64)
|
Compact(revision int64)
|
||||||
Invalidate(key []byte, endkey []byte)
|
Invalidate(key []byte, endkey []byte)
|
||||||
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
|
// keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
|
||||||
@ -58,6 +59,8 @@ func NewCache(maxCacheEntries int) Cache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cache) Close() { c.lru.Stop() }
|
||||||
|
|
||||||
// cache implements Cache
|
// cache implements Cache
|
||||||
type cache struct {
|
type cache struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
@ -27,11 +27,18 @@ type kvProxy struct {
|
|||||||
cache cache.Cache
|
cache cache.Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKvProxy(c *clientv3.Client) pb.KVServer {
|
func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
|
||||||
return &kvProxy{
|
kv := &kvProxy{
|
||||||
kv: c.KV,
|
kv: c.KV,
|
||||||
cache: cache.NewCache(cache.DefaultMaxEntries),
|
cache: cache.NewCache(cache.DefaultMaxEntries),
|
||||||
}
|
}
|
||||||
|
donec := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(donec)
|
||||||
|
<-c.Ctx().Done()
|
||||||
|
kv.cache.Close()
|
||||||
|
}()
|
||||||
|
return kv, donec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||||
|
@ -76,7 +76,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kvp := NewKvProxy(client)
|
kvp, _ := NewKvProxy(client)
|
||||||
|
|
||||||
kvts := &kvproxyTestServer{
|
kvts := &kvproxyTestServer{
|
||||||
kp: kvp,
|
kp: kvp,
|
||||||
|
Reference in New Issue
Block a user