functional-tester: share limiter among stresser
Otherwise, adding more members stresses the cluster with more ops.
This commit is contained in:
@ -25,6 +25,7 @@ import (
|
|||||||
|
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -99,6 +100,7 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
|
|||||||
// 'out of memory' error. Put rate limits in server side.
|
// 'out of memory' error. Put rate limits in server side.
|
||||||
stressN := 100
|
stressN := 100
|
||||||
c.Stressers = make([]Stresser, len(members))
|
c.Stressers = make([]Stresser, len(members))
|
||||||
|
limiter := rate.NewLimiter(rate.Limit(c.stressQPS), c.stressQPS)
|
||||||
for i, m := range members {
|
for i, m := range members {
|
||||||
if c.v2Only {
|
if c.v2Only {
|
||||||
c.Stressers[i] = &stresserV2{
|
c.Stressers[i] = &stresserV2{
|
||||||
@ -113,8 +115,8 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
|
|||||||
keySize: c.stressKeySize,
|
keySize: c.stressKeySize,
|
||||||
keySuffixRange: c.stressKeySuffixRange,
|
keySuffixRange: c.stressKeySuffixRange,
|
||||||
keyRangeLimit: c.stressKeyRangeLimit,
|
keyRangeLimit: c.stressKeyRangeLimit,
|
||||||
qps: c.stressQPS,
|
|
||||||
N: stressN,
|
N: stressN,
|
||||||
|
rateLimiter: limiter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
go c.Stressers[i].Stress()
|
go c.Stressers[i].Stress()
|
||||||
|
@ -33,7 +33,7 @@ func main() {
|
|||||||
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||||
stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.")
|
stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.")
|
||||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||||
stressQPS := flag.Int("stress-qps", 3000, "maximum number of stresser requests per second.")
|
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||||
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
||||||
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
|
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
|
||||||
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
|
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
|
||||||
|
@ -148,8 +148,7 @@ type stresser struct {
|
|||||||
keySuffixRange int
|
keySuffixRange int
|
||||||
keyRangeLimit int
|
keyRangeLimit int
|
||||||
|
|
||||||
qps int
|
N int
|
||||||
N int
|
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
@ -166,6 +165,10 @@ type stresser struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *stresser) Stress() error {
|
func (s *stresser) Stress() error {
|
||||||
|
if s.rateLimiter == nil {
|
||||||
|
panic("expect rateLimiter to be set")
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: add backoff option
|
// TODO: add backoff option
|
||||||
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
|
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -180,7 +183,6 @@ func (s *stresser) Stress() error {
|
|||||||
s.conn = conn
|
s.conn = conn
|
||||||
s.cancel = cancel
|
s.cancel = cancel
|
||||||
s.wg = wg
|
s.wg = wg
|
||||||
s.rateLimiter = rate.NewLimiter(rate.Limit(s.qps), s.qps)
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
kvc := pb.NewKVClient(conn)
|
kvc := pb.NewKVClient(conn)
|
||||||
|
Reference in New Issue
Block a user