Merge pull request #4868 from heyitsanthony/api-quota

etcdserver: storage quotas
This commit is contained in:
Anthony Romano
2016-03-28 15:15:57 -07:00
21 changed files with 1823 additions and 499 deletions

View File

@ -757,6 +757,8 @@ type grpcAPI struct {
Lease pb.LeaseClient
// Watch is the watch API for the client's connection.
Watch pb.WatchClient
// Maintenance is the maintenance API for the client's connection.
Maintenance pb.MaintenanceClient
}
func toGRPC(c *clientv3.Client) grpcAPI {
@ -765,5 +767,6 @@ func toGRPC(c *clientv3.Client) grpcAPI {
pb.NewKVClient(c.ActiveConnection()),
pb.NewLeaseClient(c.ActiveConnection()),
pb.NewWatchClient(c.ActiveConnection()),
pb.NewMaintenanceClient(c.ActiveConnection()),
}
}

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/storage/backend"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -455,6 +456,142 @@ func TestV3Hash(t *testing.T) {
}
}
// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
func TestV3StorageQuotaAPI(t *testing.T) {
oldSize := backend.InitialMmapSize
defer func() {
backend.InitialMmapSize = oldSize
testutil.AfterTest(t)
}()
backend.InitialMmapSize = 64 * 1024
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := toGRPC(clus.RandClient()).KV
key := []byte("abc")
// test small put that fits in quota
smallbuf := make([]byte, 512)
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
t.Fatal(err)
}
// test big put
bigbuf := make([]byte, 64*1024)
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
if err == nil || err != rpctypes.ErrNoSpace {
t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrNoSpace)
}
// test big txn
puttxn := &pb.RequestUnion{
Request: &pb.RequestUnion_RequestPut{
RequestPut: &pb.PutRequest{
Key: key,
Value: bigbuf,
},
},
}
txnreq := &pb.TxnRequest{}
txnreq.Success = append(txnreq.Success, puttxn)
_, txnerr := kvc.Txn(context.TODO(), txnreq)
if txnerr == nil || err != rpctypes.ErrNoSpace {
t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrNoSpace)
}
}
// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
func TestV3StorageQuotaApply(t *testing.T) {
oldSize := backend.InitialMmapSize
defer func() {
backend.InitialMmapSize = oldSize
testutil.AfterTest(t)
}()
clus := NewClusterV3(t, &ClusterConfig{Size: 2})
defer clus.Terminate(t)
kvc0 := toGRPC(clus.Client(0)).KV
kvc1 := toGRPC(clus.Client(1)).KV
// force a node to have a different quota
backend.InitialMmapSize = 64 * 1024
clus.Members[0].Stop(t)
clus.Members[0].Restart(t)
clus.waitLeader(t, clus.Members)
key := []byte("abc")
// test small put still works
smallbuf := make([]byte, 1024)
_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
if serr != nil {
t.Fatal(serr)
}
// test big put
bigbuf := make([]byte, 64*1024)
_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
if err != nil {
t.Fatal(err)
}
// small quota machine should reject put
// first, synchronize with the cluster via quorum get
kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
t.Fatalf("past-quota instance should reject put")
}
// large quota machine should reject put
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
t.Fatalf("past-quota instance should reject put")
}
// reset large quota node to ensure alarm persisted
backend.InitialMmapSize = oldSize
clus.Members[1].Stop(t)
clus.Members[1].Restart(t)
clus.waitLeader(t, clus.Members)
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
t.Fatalf("alarmed instance should reject put after reset")
}
}
// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
func TestV3AlarmDeactivate(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := toGRPC(clus.RandClient()).KV
mt := toGRPC(clus.RandClient()).Maintenance
alarmReq := &pb.AlarmRequest{
MemberID: 123,
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_NOSPACE,
}
if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
t.Fatal(err)
}
key := []byte("abc")
smallbuf := make([]byte, 512)
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
if err == nil && err != rpctypes.ErrNoSpace {
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrNoSpace)
}
alarmReq.Action = pb.AlarmRequest_DEACTIVATE
if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
t.Fatal(err)
}
if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
t.Fatal(err)
}
}
func TestV3RangeRequest(t *testing.T) {
defer testutil.AfterTest(t)
tests := []struct {