Compare commits

...

7 Commits

Author SHA1 Message Date
43b75072bf version: bump up to 3.1.7
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-27 08:25:50 -07:00
78141fae60 clientv3: set current revision to create rev regardless of CreateNotify
Turns out the optimization to ignore setting the init rev for
current revision watches breaks some ordering assumptions. Since
Watch only returns a channel once it gets a response, it should
bind the revision at the time of the first create response.

Was causing TestWatchReconnInit to fail.
2017-04-25 10:54:39 -07:00
3be37f042e integration: add pause/unpause to client bridge
Resetting connections sometimes isn't enough; need to stop/resume
accepting connections for some tests while keeping the member up.
2017-04-25 10:54:15 -07:00
7c896098d2 clientv3/integration: test watch resume with disconnect before first event 2017-04-25 10:53:58 -07:00
30f4e36de4 clientv3: only update initReq.rev == 0 with creation watch revision
Always updating the initReq.rev on watch create will resume from the wrong
revision if initReq is ever nonzero.
2017-04-25 10:53:37 -07:00
557abbe437 ctlv3: use printer for lease command results
Fixes #7783
2017-04-20 10:39:36 -07:00
4b448c209b version: bump up to 3.1.6+git
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
2017-04-20 10:39:18 -07:00
9 changed files with 162 additions and 22 deletions

View File

@ -347,7 +347,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
} }
} }
// TestWatchResumeComapcted checks that the watcher gracefully closes in case func TestWatchResumeInitRev(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
t.Fatal(err)
}
if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
t.Fatal(err)
}
// if resume is broken, it'll pick up this key first instead of a=3
if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
t.Fatal(err)
}
wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
}
// pause wch
clus.Members[0].DropConnections()
clus.Members[0].PauseConnections()
select {
case resp, ok := <-wch:
t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
case <-time.After(100 * time.Millisecond):
}
// resume wch
clus.Members[0].UnpauseConnections()
select {
case resp, ok := <-wch:
if !ok {
t.Fatal("unexpected watch close")
}
if len(resp.Events) == 0 {
t.Fatal("expected event on watch")
}
if string(resp.Events[0].Kv.Value) != "3" {
t.Fatalf("expected value=3, got event %+v", resp.Events[0])
}
case <-time.After(5 * time.Second):
t.Fatal("watch timed out")
}
}
// TestWatchResumeCompacted checks that the watcher gracefully closes in case
// that it tries to resume to a revision that's been compacted out of the store. // that it tries to resume to a revision that's been compacted out of the store.
// Since the watcher's server restarts with stale data, the watcher will receive // Since the watcher's server restarts with stale data, the watcher will receive
// either a compaction error or all keys by staying in sync before the compaction // either a compaction error or all keys by staying in sync before the compaction

View File

@ -616,10 +616,24 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
if ws.initReq.createdNotify { if ws.initReq.createdNotify {
ws.outc <- *wr ws.outc <- *wr
} }
// once the watch channel is returned, a current revision
// watch must resume at the store revision. This is necessary
// for the following case to work as expected:
// wch := m1.Watch("a")
// m2.Put("a", "b")
// <-wch
// If the revision is only bound on the first observed event,
// if wch is disconnected before the Put is issued, then reconnects
// after it is committed, it'll miss the Put.
if ws.initReq.rev == 0 {
nextRev = wr.Header.Revision
} }
} }
} else {
// current progress of watch; <= store revision
nextRev = wr.Header.Revision
}
nextRev = wr.Header.Revision
if len(wr.Events) > 0 { if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
} }

View File

@ -67,7 +67,7 @@ func leaseGrantCommandFunc(cmd *cobra.Command, args []string) {
if err != nil { if err != nil {
ExitWithError(ExitError, fmt.Errorf("failed to grant lease (%v)\n", err)) ExitWithError(ExitError, fmt.Errorf("failed to grant lease (%v)\n", err))
} }
fmt.Printf("lease %016x granted with TTL(%ds)\n", resp.ID, resp.TTL) display.Grant(*resp)
} }
// NewLeaseRevokeCommand returns the cobra command for "lease revoke". // NewLeaseRevokeCommand returns the cobra command for "lease revoke".
@ -90,12 +90,12 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
id := leaseFromArgs(args[0]) id := leaseFromArgs(args[0])
ctx, cancel := commandCtx(cmd) ctx, cancel := commandCtx(cmd)
_, err := mustClientFromCmd(cmd).Revoke(ctx, id) resp, err := mustClientFromCmd(cmd).Revoke(ctx, id)
cancel() cancel()
if err != nil { if err != nil {
ExitWithError(ExitError, fmt.Errorf("failed to revoke lease (%v)\n", err)) ExitWithError(ExitError, fmt.Errorf("failed to revoke lease (%v)\n", err))
} }
fmt.Printf("lease %016x revoked\n", id) display.Revoke(id, *resp)
} }
var timeToLiveKeys bool var timeToLiveKeys bool
@ -154,10 +154,13 @@ func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
} }
for resp := range respc { for resp := range respc {
fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL) display.KeepAlive(*resp)
} }
if _, ok := (display).(*simplePrinter); ok {
fmt.Printf("lease %016x expired or revoked.\n", id) fmt.Printf("lease %016x expired or revoked.\n", id)
} }
}
func leaseFromArgs(arg string) v3.LeaseID { func leaseFromArgs(arg string) v3.LeaseID {
id, err := strconv.ParseInt(arg, 16, 64) id, err := strconv.ParseInt(arg, 16, 64)

View File

@ -32,6 +32,9 @@ type printer interface {
Txn(v3.TxnResponse) Txn(v3.TxnResponse)
Watch(v3.WatchResponse) Watch(v3.WatchResponse)
Grant(r v3.LeaseGrantResponse)
Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse)
KeepAlive(r v3.LeaseKeepAliveResponse)
TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool)
MemberAdd(v3.MemberAddResponse) MemberAdd(v3.MemberAddResponse)
@ -86,7 +89,12 @@ func (p *printerRPC) Get(r v3.GetResponse) { p.p((
func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) } func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) }
func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) } func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) }
func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) } func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) }
func (p *printerRPC) Grant(r v3.LeaseGrantResponse) { p.p(r) }
func (p *printerRPC) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) { p.p(r) }
func (p *printerRPC) KeepAlive(r v3.LeaseKeepAliveResponse) { p.p(r) }
func (p *printerRPC) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { p.p(&r) } func (p *printerRPC) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { p.p(&r) }
func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) } func (p *printerRPC) MemberAdd(r v3.MemberAddResponse) { p.p((*pb.MemberAddResponse)(&r)) }
func (p *printerRPC) MemberRemove(id uint64, r v3.MemberRemoveResponse) { func (p *printerRPC) MemberRemove(id uint64, r v3.MemberRemoveResponse) {
p.p((*pb.MemberRemoveResponse)(&r)) p.p((*pb.MemberRemoveResponse)(&r))

View File

@ -92,6 +92,22 @@ func (p *fieldsPrinter) Watch(resp v3.WatchResponse) {
} }
} }
func (p *fieldsPrinter) Grant(r v3.LeaseGrantResponse) {
p.hdr(r.ResponseHeader)
fmt.Println(`"ID" :`, r.ID)
fmt.Println(`"TTL" :`, r.TTL)
}
func (p *fieldsPrinter) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) {
p.hdr(r.Header)
}
func (p *fieldsPrinter) KeepAlive(r v3.LeaseKeepAliveResponse) {
p.hdr(r.ResponseHeader)
fmt.Println(`"ID" :`, r.ID)
fmt.Println(`"TTL" :`, r.TTL)
}
func (p *fieldsPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) { func (p *fieldsPrinter) TimeToLive(r v3.LeaseTimeToLiveResponse, keys bool) {
p.hdr(r.ResponseHeader) p.hdr(r.ResponseHeader)
fmt.Println(`"ID" :`, r.ID) fmt.Println(`"ID" :`, r.ID)

View File

@ -79,6 +79,18 @@ func (s *simplePrinter) Watch(resp v3.WatchResponse) {
} }
} }
func (s *simplePrinter) Grant(resp v3.LeaseGrantResponse) {
fmt.Printf("lease %016x granted with TTL(%ds)\n", resp.ID, resp.TTL)
}
func (p *simplePrinter) Revoke(id v3.LeaseID, r v3.LeaseRevokeResponse) {
fmt.Printf("lease %016x revoked\n", id)
}
func (p *simplePrinter) KeepAlive(resp v3.LeaseKeepAliveResponse) {
fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL)
}
func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) { func (s *simplePrinter) TimeToLive(resp v3.LeaseTimeToLiveResponse, keys bool) {
txt := fmt.Sprintf("lease %016x granted with TTL(%ds), remaining(%ds)", resp.ID, resp.GrantedTTL, resp.TTL) txt := fmt.Sprintf("lease %016x granted with TTL(%ds), remaining(%ds)", resp.ID, resp.GrantedTTL, resp.TTL)
if keys { if keys {

View File

@ -32,6 +32,7 @@ type bridge struct {
conns map[*bridgeConn]struct{} conns map[*bridgeConn]struct{}
stopc chan struct{} stopc chan struct{}
pausec chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
mu sync.Mutex mu sync.Mutex
@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) {
inaddr: addr + "0", inaddr: addr + "0",
outaddr: addr, outaddr: addr,
conns: make(map[*bridgeConn]struct{}), conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}, 1), stopc: make(chan struct{}),
pausec: make(chan struct{}),
} }
close(b.pausec)
l, err := transport.NewUnixListener(b.inaddr) l, err := transport.NewUnixListener(b.inaddr)
if err != nil { if err != nil {
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err) return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr }
func (b *bridge) Close() { func (b *bridge) Close() {
b.l.Close() b.l.Close()
b.mu.Lock()
select { select {
case b.stopc <- struct{}{}: case <-b.stopc:
default: default:
close(b.stopc)
} }
b.mu.Unlock()
b.wg.Wait() b.wg.Wait()
} }
@ -75,6 +82,22 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{}) b.conns = make(map[*bridgeConn]struct{})
} }
func (b *bridge) Pause() {
b.mu.Lock()
b.pausec = make(chan struct{})
b.mu.Unlock()
}
func (b *bridge) Unpause() {
b.mu.Lock()
select {
case <-b.pausec:
default:
close(b.pausec)
}
b.mu.Unlock()
}
func (b *bridge) serveListen() { func (b *bridge) serveListen() {
defer func() { defer func() {
b.l.Close() b.l.Close()
@ -91,13 +114,22 @@ func (b *bridge) serveListen() {
if ierr != nil { if ierr != nil {
return return
} }
b.mu.Lock()
pausec := b.pausec
b.mu.Unlock()
select {
case <-b.stopc:
return
case <-pausec:
}
outc, oerr := net.Dial("unix", b.outaddr) outc, oerr := net.Dial("unix", b.outaddr)
if oerr != nil { if oerr != nil {
inc.Close() inc.Close()
return return
} }
bc := &bridgeConn{inc, outc} bc := &bridgeConn{inc, outc, make(chan struct{})}
b.wg.Add(1) b.wg.Add(1)
b.mu.Lock() b.mu.Lock()
b.conns[bc] = struct{}{} b.conns[bc] = struct{}{}
@ -108,6 +140,7 @@ func (b *bridge) serveListen() {
func (b *bridge) serveConn(bc *bridgeConn) { func (b *bridge) serveConn(bc *bridgeConn) {
defer func() { defer func() {
close(bc.donec)
bc.Close() bc.Close()
b.mu.Lock() b.mu.Lock()
delete(b.conns, bc) delete(b.conns, bc)
@ -131,9 +164,11 @@ func (b *bridge) serveConn(bc *bridgeConn) {
type bridgeConn struct { type bridgeConn struct {
in net.Conn in net.Conn
out net.Conn out net.Conn
donec chan struct{}
} }
func (bc *bridgeConn) Close() { func (bc *bridgeConn) Close() {
bc.in.Close() bc.in.Close()
bc.out.Close() bc.out.Close()
<-bc.donec
} }

View File

@ -533,6 +533,8 @@ func (m *member) electionTimeout() time.Duration {
} }
func (m *member) DropConnections() { m.grpcBridge.Reset() } func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
// NewClientV3 creates a new grpc client connection to the member // NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) { func NewClientV3(m *member) (*clientv3.Client, error) {

View File

@ -26,7 +26,7 @@ import (
var ( var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with. // MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0" MinClusterVersion = "3.0.0"
Version = "3.1.6" Version = "3.1.7"
APIVersion = "unknown" APIVersion = "unknown"
// Git SHA Value will be set during build // Git SHA Value will be set during build