load all leases from backend
This commit is contained in:
@ -25,8 +25,10 @@ import (
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
@ -253,3 +255,99 @@ func TestV3CorruptAlarm(t *testing.T) {
|
||||
}
|
||||
t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
|
||||
}
|
||||
|
||||
func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
clus := integration.NewCluster(t, &integration.ClusterConfig{
|
||||
CorruptCheckTime: time.Second,
|
||||
Size: 3,
|
||||
SnapshotCount: 10,
|
||||
SnapshotCatchUpEntries: 5,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
lresp, err := integration.ToGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60})
|
||||
if err != nil {
|
||||
t.Errorf("could not create lease 1 (%v)", err)
|
||||
}
|
||||
if lresp.ID != 1 {
|
||||
t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
|
||||
}
|
||||
|
||||
putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}
|
||||
// Trigger snapshot from the leader to new member
|
||||
for i := 0; i < 15; i++ {
|
||||
_, err := integration.ToGRPC(clus.RandClient()).KV.Put(ctx, putr)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: couldn't put key (%v)", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := clus.RemoveMember(t, clus.Client(1), uint64(clus.Members[2].ID())); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
clus.WaitMembersForLeader(t, clus.Members)
|
||||
|
||||
clus.AddMember(t)
|
||||
clus.WaitMembersForLeader(t, clus.Members)
|
||||
// Wait for new member to catch up
|
||||
integration.WaitClientV3(t, clus.Members[2].Client)
|
||||
|
||||
// Corrupt member 2 by modifying backend lease bucket offline.
|
||||
clus.Members[2].Stop(t)
|
||||
fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db")
|
||||
bcfg := backend.DefaultBackendConfig()
|
||||
bcfg.Path = fp
|
||||
bcfg.Logger = zaptest.NewLogger(t)
|
||||
be := backend.New(bcfg)
|
||||
|
||||
olpb := leasepb.Lease{ID: int64(1), TTL: 60}
|
||||
tx := be.BatchTx()
|
||||
schema.UnsafeDeleteLease(tx, &olpb)
|
||||
lpb := leasepb.Lease{ID: int64(2), TTL: 60}
|
||||
schema.MustUnsafePutLease(tx, &lpb)
|
||||
tx.Commit()
|
||||
|
||||
if err := be.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := clus.Members[2].Restart(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clus.Members[1].WaitOK(t)
|
||||
clus.Members[2].WaitOK(t)
|
||||
|
||||
// Revoke lease should remove key except the member with corruption
|
||||
_, err = integration.ToGRPC(clus.Members[0].Client).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp0, err0 := clus.Members[1].Client.KV.Get(context.TODO(), "foo")
|
||||
if err0 != nil {
|
||||
t.Fatal(err0)
|
||||
}
|
||||
resp1, err1 := clus.Members[2].Client.KV.Get(context.TODO(), "foo")
|
||||
if err1 != nil {
|
||||
t.Fatal(err1)
|
||||
}
|
||||
|
||||
if resp0.Header.Revision == resp1.Header.Revision {
|
||||
t.Fatalf("matching Revision values")
|
||||
}
|
||||
|
||||
// Wait for CorruptCheckTime
|
||||
time.Sleep(time.Second)
|
||||
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
|
||||
if perr != nil {
|
||||
if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
|
||||
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user