functional-tester/tester: shuffle failure cases with coprime

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee
2018-03-31 18:35:06 -07:00
parent ffabe55a25
commit 2b9c810fa4
4 changed files with 99 additions and 25 deletions

View File

@ -19,6 +19,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"path/filepath" "path/filepath"
"strings" "strings"
@ -234,6 +235,33 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
} }
go clus.serveTesterServer() go clus.serveTesterServer()
clus.updateFailures()
clus.rateLimiter = rate.NewLimiter(
rate.Limit(int(clus.Tester.StressQPS)),
int(clus.Tester.StressQPS),
)
clus.updateStresserChecker()
return clus, nil
}
func (clus *Cluster) serveTesterServer() {
clus.logger.Info(
"started tester HTTP server",
zap.String("tester-address", clus.Tester.TesterAddr),
)
err := clus.testerHTTPServer.ListenAndServe()
clus.logger.Info(
"tester HTTP server returned",
zap.String("tester-address", clus.Tester.TesterAddr),
zap.Error(err),
)
if err != nil && err != http.ErrServerClosed {
clus.logger.Fatal("tester HTTP errored", zap.Error(err))
}
}
func (clus *Cluster) updateFailures() {
for _, cs := range clus.Tester.FailureCases { for _, cs := range clus.Tester.FailureCases {
switch cs { switch cs {
case "KILL_ONE_FOLLOWER": case "KILL_ONE_FOLLOWER":
@ -270,34 +298,52 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
clus.failures = append(clus.failures, newFailureNoOp()) clus.failures = append(clus.failures, newFailureNoOp())
case "EXTERNAL": case "EXTERNAL":
clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath)) clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
default: }
return nil, fmt.Errorf("unknown failure %q", cs)
} }
} }
clus.rateLimiter = rate.NewLimiter( func (clus *Cluster) shuffleFailures() {
rate.Limit(int(clus.Tester.StressQPS)), rand.Seed(time.Now().UnixNano())
int(clus.Tester.StressQPS), offset := rand.Intn(1000)
) n := len(clus.failures)
clus.updateStresserChecker() cp := coprime(n)
return clus, nil
clus.logger.Info("shuffling test failure cases", zap.Int("total", n))
fs := make([]Failure, n)
for i := 0; i < n; i++ {
fs[i] = clus.failures[(cp*i+offset)%n]
}
clus.failures = fs
clus.logger.Info("shuffled test failure cases", zap.Int("total", n))
} }
func (clus *Cluster) serveTesterServer() { /*
clus.logger.Info( x and y of GCD 1 are coprime to each other
"started tester HTTP server",
zap.String("tester-address", clus.Tester.TesterAddr), x1 = ( coprime of n * idx1 + offset ) % n
) x2 = ( coprime of n * idx2 + offset ) % n
err := clus.testerHTTPServer.ListenAndServe() (x2 - x1) = coprime of n * (idx2 - idx1) % n
clus.logger.Info( = (idx2 - idx1) = 1
"tester HTTP server returned",
zap.String("tester-address", clus.Tester.TesterAddr), Consecutive x's are guaranteed to be distinct
zap.Error(err), */
) func coprime(n int) int {
if err != nil && err != http.ErrServerClosed { coprime := 1
clus.logger.Fatal("tester HTTP errored", zap.Error(err)) for i := n / 2; i < n; i++ {
if gcd(i, n) == 1 {
coprime = i
break
} }
} }
return coprime
}
func gcd(x, y int) int {
if y == 0 {
return x
}
return gcd(y, x%y)
}
func (clus *Cluster) updateStresserChecker() { func (clus *Cluster) updateStresserChecker() {
clus.logger.Info( clus.logger.Info(

View File

@ -131,6 +131,7 @@ func Test_newCluster(t *testing.T) {
"DELAY_PEER_PORT_TX_RX_LEADER", "DELAY_PEER_PORT_TX_RX_LEADER",
"DELAY_PEER_PORT_TX_RX_ALL", "DELAY_PEER_PORT_TX_RX_ALL",
}, },
FailureShuffle: true,
FailpointCommands: []string{`panic("etcd-tester")`}, FailpointCommands: []string{`panic("etcd-tester")`},
RunnerExecPath: "/etcd-runner", RunnerExecPath: "/etcd-runner",
ExternalExecPath: "", ExternalExecPath: "",
@ -159,4 +160,30 @@ func Test_newCluster(t *testing.T) {
if !reflect.DeepEqual(exp, cfg) { if !reflect.DeepEqual(exp, cfg) {
t.Fatalf("expected %+v, got %+v", exp, cfg) t.Fatalf("expected %+v, got %+v", exp, cfg)
} }
cfg.logger = logger
cfg.updateFailures()
fs1 := make([]string, len(cfg.failures))
for i := range cfg.failures {
fs1[i] = cfg.failures[i].Desc()
}
cfg.shuffleFailures()
fs2 := make([]string, len(cfg.failures))
for i := range cfg.failures {
fs2[i] = cfg.failures[i].Desc()
}
if reflect.DeepEqual(fs1, fs2) {
t.Fatalf("expected shuffled failure cases, got %q", fs2)
}
cfg.shuffleFailures()
fs3 := make([]string, len(cfg.failures))
for i := range cfg.failures {
fs3[i] = cfg.failures[i].Desc()
}
if reflect.DeepEqual(fs2, fs3) {
t.Fatalf("expected reshuffled failure cases from %q, got %q", fs2, fs3)
}
} }

View File

@ -106,6 +106,9 @@ func (clus *Cluster) StartTester() {
} }
func (clus *Cluster) doRound(round int) error { func (clus *Cluster) doRound(round int) error {
if clus.Tester.FailureShuffle {
clus.shuffleFailures()
}
for i, f := range clus.failures { for i, f := range clus.failures {
clus.cs = i clus.cs = i

View File

@ -98,9 +98,7 @@ tester-config:
- DELAY_PEER_PORT_TX_RX_LEADER - DELAY_PEER_PORT_TX_RX_LEADER
- DELAY_PEER_PORT_TX_RX_ALL - DELAY_PEER_PORT_TX_RX_ALL
# TODO: shuffle failure-shuffle: true
# fail-shuffle: true
failpoint-commands: failpoint-commands:
- panic("etcd-tester") - panic("etcd-tester")
# failpoint-commands: # failpoint-commands: