functional/tester: add "EtcdClientEndpoints" to "Checker"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee
2018-04-11 19:22:54 -07:00
parent e9c4bad2d1
commit f574a9aaed
8 changed files with 62 additions and 32 deletions

View File

@ -20,6 +20,9 @@ import "github.com/coreos/etcd/functional/rpcpb"
type Checker interface { type Checker interface {
// Type returns the checker type. // Type returns the checker type.
Type() rpcpb.Checker Type() rpcpb.Checker
// EtcdClientEndpoints returns the client endpoints of
// all checker target nodes..
EtcdClientEndpoints() []string
// Check returns an error if the system fails a consistency check. // Check returns an error if the system fails a consistency check.
Check() error Check() error
} }

View File

@ -25,21 +25,16 @@ import (
const retries = 7 const retries = 7
type hashRevGetter interface {
getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error)
}
type kvHashChecker struct { type kvHashChecker struct {
ctype rpcpb.Checker ctype rpcpb.Checker
lg *zap.Logger lg *zap.Logger
hrg hashRevGetter clus *Cluster
} }
func newKVHashChecker(lg *zap.Logger, hrg hashRevGetter) Checker { func newKVHashChecker(clus *Cluster) Checker {
return &kvHashChecker{ return &kvHashChecker{
ctype: rpcpb.Checker_KV_HASH, ctype: rpcpb.Checker_KV_HASH,
lg: lg, clus: clus,
hrg: hrg,
} }
} }
@ -50,9 +45,9 @@ func (hc *kvHashChecker) checkRevAndHashes() (err error) {
) )
// retries in case of transient failure or etcd cluster has not stablized yet. // retries in case of transient failure or etcd cluster has not stablized yet.
for i := 0; i < retries; i++ { for i := 0; i < retries; i++ {
revs, hashes, err = hc.hrg.getRevisionHash() revs, hashes, err = hc.clus.getRevisionHash()
if err != nil { if err != nil {
hc.lg.Warn( hc.clus.lg.Warn(
"failed to get revision and hash", "failed to get revision and hash",
zap.Int("retries", i), zap.Int("retries", i),
zap.Error(err), zap.Error(err),
@ -63,7 +58,7 @@ func (hc *kvHashChecker) checkRevAndHashes() (err error) {
if sameRev && sameHashes { if sameRev && sameHashes {
return nil return nil
} }
hc.lg.Warn( hc.clus.lg.Warn(
"retrying; etcd cluster is not stable", "retrying; etcd cluster is not stable",
zap.Int("retries", i), zap.Int("retries", i),
zap.Bool("same-revisions", sameRev), zap.Bool("same-revisions", sameRev),
@ -86,6 +81,10 @@ func (hc *kvHashChecker) Type() rpcpb.Checker {
return hc.ctype return hc.ctype
} }
func (hc *kvHashChecker) EtcdClientEndpoints() []string {
return hc.clus.EtcdClientEndpoints()
}
func (hc *kvHashChecker) Check() error { func (hc *kvHashChecker) Check() error {
return hc.checkRevAndHashes() return hc.checkRevAndHashes()
} }

View File

@ -44,12 +44,14 @@ func newLeaseExpireChecker(ls *leaseStresser) Checker {
} }
} }
const leaseExpireCheckerTimeout = 10 * time.Second
func (lc *leaseExpireChecker) Type() rpcpb.Checker { func (lc *leaseExpireChecker) Type() rpcpb.Checker {
return lc.ctype return lc.ctype
} }
func (lc *leaseExpireChecker) EtcdClientEndpoints() []string {
return []string{lc.m.EtcdClientEndpoint}
}
func (lc *leaseExpireChecker) Check() error { func (lc *leaseExpireChecker) Check() error {
if lc.ls == nil { if lc.ls == nil {
return nil return nil
@ -81,6 +83,8 @@ func (lc *leaseExpireChecker) Check() error {
return lc.checkShortLivedLeases() return lc.checkShortLivedLeases()
} }
const leaseExpireCheckerTimeout = 10 * time.Second
// checkShortLivedLeases ensures leases expire. // checkShortLivedLeases ensures leases expire.
func (lc *leaseExpireChecker) checkShortLivedLeases() error { func (lc *leaseExpireChecker) checkShortLivedLeases() error {
ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout) ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)

View File

@ -18,6 +18,7 @@ import "github.com/coreos/etcd/functional/rpcpb"
type noCheck struct{} type noCheck struct{}
func newNoChecker() Checker { return &noCheck{} } func newNoChecker() Checker { return &noCheck{} }
func (nc *noCheck) Type() rpcpb.Checker { return rpcpb.Checker_NO_CHECK } func (nc *noCheck) Type() rpcpb.Checker { return rpcpb.Checker_NO_CHECK }
func (nc *noCheck) Check() error { return nil } func (nc *noCheck) EtcdClientEndpoints() []string { return nil }
func (nc *noCheck) Check() error { return nil }

View File

@ -17,14 +17,16 @@ package tester
import "github.com/coreos/etcd/functional/rpcpb" import "github.com/coreos/etcd/functional/rpcpb"
type runnerChecker struct { type runnerChecker struct {
ctype rpcpb.Checker ctype rpcpb.Checker
errc chan error etcdClientEndpoint string
errc chan error
} }
func newRunnerChecker(errc chan error) Checker { func newRunnerChecker(ep string, errc chan error) Checker {
return &runnerChecker{ return &runnerChecker{
ctype: rpcpb.Checker_RUNNER, ctype: rpcpb.Checker_RUNNER,
errc: errc, etcdClientEndpoint: ep,
errc: errc,
} }
} }
@ -32,6 +34,10 @@ func (rc *runnerChecker) Type() rpcpb.Checker {
return rc.ctype return rc.ctype
} }
func (rc *runnerChecker) EtcdClientEndpoints() []string {
return []string{rc.etcdClientEndpoint}
}
func (rc *runnerChecker) Check() error { func (rc *runnerChecker) Check() error {
select { select {
case err := <-rc.errc: case err := <-rc.errc:

View File

@ -123,6 +123,15 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
return clus, nil return clus, nil
} }
// EtcdClientEndpoints returns all etcd client endpoints.
func (clus *Cluster) EtcdClientEndpoints() (css []string) {
css = make([]string, len(clus.Members))
for i := range clus.Members {
css[i] = clus.Members[i].EtcdClientEndpoint
}
return css
}
func (clus *Cluster) serveTesterServer() { func (clus *Cluster) serveTesterServer() {
clus.lg.Info( clus.lg.Info(
"started tester HTTP server", "started tester HTTP server",
@ -297,7 +306,7 @@ func (clus *Cluster) setStresserChecker() {
for _, cs := range clus.Tester.Checkers { for _, cs := range clus.Tester.Checkers {
switch cs { switch cs {
case "KV_HASH": case "KV_HASH":
clus.checkers = append(clus.checkers, newKVHashChecker(clus.lg, hashRevGetter(clus))) clus.checkers = append(clus.checkers, newKVHashChecker(clus))
case "LEASE_EXPIRE": case "LEASE_EXPIRE":
for _, ls := range lss { for _, ls := range lss {
@ -306,7 +315,7 @@ func (clus *Cluster) setStresserChecker() {
case "RUNNER": case "RUNNER":
for _, rs := range rss { for _, rs := range rss {
clus.checkers = append(clus.checkers, newRunnerChecker(rs.errc)) clus.checkers = append(clus.checkers, newRunnerChecker(rs.etcdClientEndpoint, rs.errc))
} }
case "NO_CHECK": case "NO_CHECK":
@ -335,6 +344,7 @@ func (clus *Cluster) runCheckers() (err error) {
clus.lg.Warn( clus.lg.Warn(
"consistency check FAIL", "consistency check FAIL",
zap.String("checker", chk.Type().String()), zap.String("checker", chk.Type().String()),
zap.Strings("client-endpoints", chk.EtcdClientEndpoints()),
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
zap.Error(err), zap.Error(err),

View File

@ -85,6 +85,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
} }
stressers[i] = newRunnerStresser( stressers[i] = newRunnerStresser(
rpcpb.Stresser_ELECTION_RUNNER, rpcpb.Stresser_ELECTION_RUNNER,
m.EtcdClientEndpoint,
clus.lg, clus.lg,
clus.Tester.RunnerExecPath, clus.Tester.RunnerExecPath,
args, args,
@ -106,6 +107,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
} }
stressers[i] = newRunnerStresser( stressers[i] = newRunnerStresser(
rpcpb.Stresser_WATCH_RUNNER, rpcpb.Stresser_WATCH_RUNNER,
m.EtcdClientEndpoint,
clus.lg, clus.lg,
clus.Tester.RunnerExecPath, clus.Tester.RunnerExecPath,
args, args,
@ -125,6 +127,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
} }
stressers[i] = newRunnerStresser( stressers[i] = newRunnerStresser(
rpcpb.Stresser_LOCK_RACER_RUNNER, rpcpb.Stresser_LOCK_RACER_RUNNER,
m.EtcdClientEndpoint,
clus.lg, clus.lg,
clus.Tester.RunnerExecPath, clus.Tester.RunnerExecPath,
args, args,
@ -140,6 +143,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
} }
stressers[i] = newRunnerStresser( stressers[i] = newRunnerStresser(
rpcpb.Stresser_LEASE_RUNNER, rpcpb.Stresser_LEASE_RUNNER,
m.EtcdClientEndpoint,
clus.lg, clus.lg,
clus.Tester.RunnerExecPath, clus.Tester.RunnerExecPath,
args, args,

View File

@ -27,8 +27,9 @@ import (
) )
type runnerStresser struct { type runnerStresser struct {
stype rpcpb.Stresser stype rpcpb.Stresser
lg *zap.Logger etcdClientEndpoint string
lg *zap.Logger
cmd *exec.Cmd cmd *exec.Cmd
cmdStr string cmdStr string
@ -42,6 +43,7 @@ type runnerStresser struct {
func newRunnerStresser( func newRunnerStresser(
stype rpcpb.Stresser, stype rpcpb.Stresser,
ep string,
lg *zap.Logger, lg *zap.Logger,
cmdStr string, cmdStr string,
args []string, args []string,
@ -50,13 +52,14 @@ func newRunnerStresser(
) *runnerStresser { ) *runnerStresser {
rl.SetLimit(rl.Limit() - rate.Limit(reqRate)) rl.SetLimit(rl.Limit() - rate.Limit(reqRate))
return &runnerStresser{ return &runnerStresser{
stype: stype, stype: stype,
cmdStr: cmdStr, etcdClientEndpoint: ep,
args: args, cmdStr: cmdStr,
rl: rl, args: args,
reqRate: reqRate, rl: rl,
errc: make(chan error, 1), reqRate: reqRate,
donec: make(chan struct{}), errc: make(chan error, 1),
donec: make(chan struct{}),
} }
} }