Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
43b75072bf | |||
78141fae60 | |||
3be37f042e | |||
7c896098d2 | |||
30f4e36de4 | |||
557abbe437 | |||
4b448c209b |
@ -347,7 +347,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchResumeComapcted checks that the watcher gracefully closes in case
|
func TestWatchResumeInitRev(t *testing.T) {
|
||||||
|
defer testutil.AfterTest(t)
|
||||||
|
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
cli := clus.Client(0)
|
||||||
|
if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// if resume is broken, it'll pick up this key first instead of a=3
|
||||||
|
if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
|
||||||
|
if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
|
||||||
|
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
|
||||||
|
}
|
||||||
|
// pause wch
|
||||||
|
clus.Members[0].DropConnections()
|
||||||
|
clus.Members[0].PauseConnections()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp, ok := <-wch:
|
||||||
|
t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
// resume wch
|
||||||
|
clus.Members[0].UnpauseConnections()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp, ok := <-wch:
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("unexpected watch close")
|
||||||
|
}
|
||||||
|
if len(resp.Events) == 0 {
|
||||||
|
t.Fatal("expected event on watch")
|
||||||
|
}
|
||||||
|
if string(resp.Events[0].Kv.Value) != "3" {
|
||||||
|
t.Fatalf("expected value=3, got event %+v", resp.Events[0])
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("watch timed out")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestWatchResumeCompacted checks that the watcher gracefully closes in case
|
||||||
// that it tries to resume to a revision that's been compacted out of the store.
|
// that it tries to resume to a revision that's been compacted out of the store.
|
||||||
// Since the watcher's server restarts with stale data, the watcher will receive
|
// Since the watcher's server restarts with stale data, the watcher will receive
|
||||||
// either a compaction error or all keys by staying in sync before the compaction
|
// either a compaction error or all keys by staying in sync before the compaction
|
||||||
|
@ -616,10 +616,24 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||||||
if ws.initReq.createdNotify {
|
if ws.initReq.createdNotify {
|
||||||
ws.outc <- *wr
|
ws.outc <- *wr
|
||||||
}
|
}
|
||||||
|
// once the watch channel is returned, a current revision
|
||||||
|
// watch must resume at the store revision. This is necessary
|
||||||
|
// for the following case to work as expected:
|
||||||
|
// wch := m1.Watch("a")
|
||||||
|
// m2.Put("a", "b")
|
||||||
|
// <-wch
|
||||||
|
// If the revision is only bound on the first observed event,
|
||||||
|
// if wch is disconnected before the Put is issued, then reconnects
|
||||||
|
// after it is committed, it'll miss the Put.
|
||||||
|
if ws.initReq.rev == 0 {
|
||||||
|
nextRev = wr.Header.Revision
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// current progress of watch; <= store revision
|
||||||
|
nextRev = wr.Header.Revision
|
||||||
|
}
|
||||||
|
|
||||||
nextRev = wr.Header.Revision
|
|
||||||
if len(wr.Events) > 0 {
|
if len(wr.Events) > 0 {
|
||||||
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ func leaseGrantCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, fmt.Errorf("failed to grant lease (%v)\n", err))
|
ExitWithError(ExitError, fmt.Errorf("failed to grant lease (%v)\n", err))
|
||||||
}
|
}
|
||||||
fmt.Printf("lease %016x granted with TTL(%ds)\n", resp.ID, resp.TTL)
|
display.Grant(*resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLeaseRevokeCommand returns the cobra command for "lease revoke".
|
// NewLeaseRevokeCommand returns the cobra command for "lease revoke".
|
||||||
@ -90,12 +90,12 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
id := leaseFromArgs(args[0])
|
id := leaseFromArgs(args[0])
|
||||||
ctx, cancel := commandCtx(cmd)
|
ctx, cancel := commandCtx(cmd)
|
||||||
_, err := mustClientFromCmd(cmd).Revoke(ctx, id)
|
resp, err := mustClientFromCmd(cmd).Revoke(ctx, id)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ExitWithError(ExitError, fmt.Errorf("failed to revoke lease (%v)\n", err))
|
ExitWithError(ExitError, fmt.Errorf("failed to revoke lease (%v)\n", err))
|
||||||
}
|
}
|
||||||
fmt.Printf("lease %016x revoked\n", id)
|
display.Revoke(id, *resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
var timeToLiveKeys bool
|
var timeToLiveKeys bool
|
||||||
@ -154,10 +154,13 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for resp := range respc {
|
for resp := range respc {
|
||||||
fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL)
|
display.KeepAlive(*resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := (display).(*simplePrinter); ok {
|
||||||
fmt.Printf("lease %016x expired or revoked.\n", id)
|
fmt.Printf("lease %016x expired or revoked.\n", id)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func leaseFromArgs(arg string) v3.LeaseID {
|
func leaseFromArgs(arg string) v3.LeaseID {
|
||||||
id, err := strconv.ParseInt(arg, 16, 64)
|
id, err := strconv.ParseInt(arg, 16, 64)
|
||||||
|
@ -32,6 +32,9 @@ type printer interface {
|
|||||||
Txn(v3.TxnResponse)
|
Txn(v3.TxnResponse)
|
||||||
Watch(v3.WatchResponse)
|
Watch(v3.WatchResponse)
|
||||||
|
|
||||||
|
Grant(r v3.LeaseGrantResponse)
|
||||||
|
Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse)
|
||||||
|
KeepAlive(r v3.LeaseKeepAliveResponse)
|
||||||
TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool)
|
TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool)
|
||||||
|
|
||||||
MemberAdd(v3.MemberAddResponse)
|
MemberAdd(v3.MemberAddResponse)
|
||||||
@ -86,7 +89,12 @@ func (p *printerRPC) Get(r v3.GetResponse) { p.p((
|
|||||||
func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) }
|
func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) }
|
||||||
func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) }
|
func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) }
|
||||||
func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) }
|
func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) }
|
||||||
|
|
||||||
|
func (p *printerRPC) Grant(r v3.LeaseGrantResponse) { p.p(r) }
|
||||||
|
func (p *printerRPC) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) { p.p(r) }
|
||||||
|
func (p *printerRPC) KeepAlive(r v3.LeaseKeepAliveResponse) { p.p(r) }
|
||||||
func (p *printerRPC) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { p.p(&r) }
|
func (p *printerRPC) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { p.p(&r) }
|
||||||
|
|
||||||
func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) }
|
func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) }
|
||||||
func (p *printerRPC) MemberRemove(id uint64, r v3.MemberRemoveResponse) {
|
func (p *printerRPC) MemberRemove(id uint64, r v3.MemberRemoveResponse) {
|
||||||
p.p((*pb.MemberRemoveResponse)(&r))
|
p.p((*pb.MemberRemoveResponse)(&r))
|
||||||
|
@ -92,6 +92,22 @@ func (p *fieldsPrinter) Watch(resp v3.WatchResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *fieldsPrinter) Grant(r v3.LeaseGrantResponse) {
|
||||||
|
p.hdr(r.ResponseHeader)
|
||||||
|
fmt.Println(`"ID" :`, r.ID)
|
||||||
|
fmt.Println(`"TTL" :`, r.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *fieldsPrinter) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) {
|
||||||
|
p.hdr(r.Header)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *fieldsPrinter) KeepAlive(r v3.LeaseKeepAliveResponse) {
|
||||||
|
p.hdr(r.ResponseHeader)
|
||||||
|
fmt.Println(`"ID" :`, r.ID)
|
||||||
|
fmt.Println(`"TTL" :`, r.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *fieldsPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
|
func (p *fieldsPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
|
||||||
p.hdr(r.ResponseHeader)
|
p.hdr(r.ResponseHeader)
|
||||||
fmt.Println(`"ID" :`, r.ID)
|
fmt.Println(`"ID" :`, r.ID)
|
||||||
|
@ -79,6 +79,18 @@ func (s *simplePrinter) Watch(resp v3.WatchResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *simplePrinter) Grant(resp v3.LeaseGrantResponse) {
|
||||||
|
fmt.Printf("lease %016x granted with TTL(%ds)\n", resp.ID, resp.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *simplePrinter) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) {
|
||||||
|
fmt.Printf("lease %016x revoked\n", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *simplePrinter) KeepAlive(resp v3.LeaseKeepAliveResponse) {
|
||||||
|
fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) {
|
func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) {
|
||||||
txt := fmt.Sprintf("lease %016x granted with TTL(%ds), remaining(%ds)", resp.ID, resp.GrantedTTL, resp.TTL)
|
txt := fmt.Sprintf("lease %016x granted with TTL(%ds), remaining(%ds)", resp.ID, resp.GrantedTTL, resp.TTL)
|
||||||
if keys {
|
if keys {
|
||||||
|
@ -32,6 +32,7 @@ type bridge struct {
|
|||||||
conns map[*bridgeConn]struct{}
|
conns map[*bridgeConn]struct{}
|
||||||
|
|
||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
|
pausec chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) {
|
|||||||
inaddr: addr + "0",
|
inaddr: addr + "0",
|
||||||
outaddr: addr,
|
outaddr: addr,
|
||||||
conns: make(map[*bridgeConn]struct{}),
|
conns: make(map[*bridgeConn]struct{}),
|
||||||
stopc: make(chan struct{}, 1),
|
stopc: make(chan struct{}),
|
||||||
|
pausec: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
close(b.pausec)
|
||||||
|
|
||||||
l, err := transport.NewUnixListener(b.inaddr)
|
l, err := transport.NewUnixListener(b.inaddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
|
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
|
||||||
@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr }
|
|||||||
|
|
||||||
func (b *bridge) Close() {
|
func (b *bridge) Close() {
|
||||||
b.l.Close()
|
b.l.Close()
|
||||||
|
b.mu.Lock()
|
||||||
select {
|
select {
|
||||||
case b.stopc <- struct{}{}:
|
case <-b.stopc:
|
||||||
default:
|
default:
|
||||||
|
close(b.stopc)
|
||||||
}
|
}
|
||||||
|
b.mu.Unlock()
|
||||||
b.wg.Wait()
|
b.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,6 +82,22 @@ func (b *bridge) Reset() {
|
|||||||
b.conns = make(map[*bridgeConn]struct{})
|
b.conns = make(map[*bridgeConn]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *bridge) Pause() {
|
||||||
|
b.mu.Lock()
|
||||||
|
b.pausec = make(chan struct{})
|
||||||
|
b.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bridge) Unpause() {
|
||||||
|
b.mu.Lock()
|
||||||
|
select {
|
||||||
|
case <-b.pausec:
|
||||||
|
default:
|
||||||
|
close(b.pausec)
|
||||||
|
}
|
||||||
|
b.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (b *bridge) serveListen() {
|
func (b *bridge) serveListen() {
|
||||||
defer func() {
|
defer func() {
|
||||||
b.l.Close()
|
b.l.Close()
|
||||||
@ -91,13 +114,22 @@ func (b *bridge) serveListen() {
|
|||||||
if ierr != nil {
|
if ierr != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
b.mu.Lock()
|
||||||
|
pausec := b.pausec
|
||||||
|
b.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-b.stopc:
|
||||||
|
return
|
||||||
|
case <-pausec:
|
||||||
|
}
|
||||||
|
|
||||||
outc, oerr := net.Dial("unix", b.outaddr)
|
outc, oerr := net.Dial("unix", b.outaddr)
|
||||||
if oerr != nil {
|
if oerr != nil {
|
||||||
inc.Close()
|
inc.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bc := &bridgeConn{inc, outc}
|
bc := &bridgeConn{inc, outc, make(chan struct{})}
|
||||||
b.wg.Add(1)
|
b.wg.Add(1)
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
b.conns[bc] = struct{}{}
|
b.conns[bc] = struct{}{}
|
||||||
@ -108,6 +140,7 @@ func (b *bridge) serveListen() {
|
|||||||
|
|
||||||
func (b *bridge) serveConn(bc *bridgeConn) {
|
func (b *bridge) serveConn(bc *bridgeConn) {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
close(bc.donec)
|
||||||
bc.Close()
|
bc.Close()
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
delete(b.conns, bc)
|
delete(b.conns, bc)
|
||||||
@ -131,9 +164,11 @@ func (b *bridge) serveConn(bc *bridgeConn) {
|
|||||||
type bridgeConn struct {
|
type bridgeConn struct {
|
||||||
in net.Conn
|
in net.Conn
|
||||||
out net.Conn
|
out net.Conn
|
||||||
|
donec chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bc *bridgeConn) Close() {
|
func (bc *bridgeConn) Close() {
|
||||||
bc.in.Close()
|
bc.in.Close()
|
||||||
bc.out.Close()
|
bc.out.Close()
|
||||||
|
<-bc.donec
|
||||||
}
|
}
|
||||||
|
@ -533,6 +533,8 @@ func (m *member) electionTimeout() time.Duration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *member) DropConnections() { m.grpcBridge.Reset() }
|
func (m *member) DropConnections() { m.grpcBridge.Reset() }
|
||||||
|
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
|
||||||
|
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
|
||||||
|
|
||||||
// NewClientV3 creates a new grpc client connection to the member
|
// NewClientV3 creates a new grpc client connection to the member
|
||||||
func NewClientV3(m *member) (*clientv3.Client, error) {
|
func NewClientV3(m *member) (*clientv3.Client, error) {
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "3.0.0"
|
MinClusterVersion = "3.0.0"
|
||||||
Version = "3.1.6"
|
Version = "3.1.7"
|
||||||
APIVersion = "unknown"
|
APIVersion = "unknown"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
|
Reference in New Issue
Block a user