diff --git a/raft/multinode.go b/raft/multinode.go new file mode 100644 index 000000000..1d3a05362 --- /dev/null +++ b/raft/multinode.go @@ -0,0 +1,449 @@ +package raft + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/raft/raftpb" +) + +// MultiNode represents a node that is participating in multiple consensus groups. +// A MultiNode is more efficient than a collection of Nodes. +// The methods of this interface correspond to the methods of Node and are described +// more fully there. +type MultiNode interface { + // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup + // on each particpating node with the same group ID; it may create groups on demand as it + // receives messages. If the given storage contains existing log entries the list of peers + // may be empty. + CreateGroup(group uint64, peers []Peer, storage Storage) error + // RemoveGroup removes a group from the MultiNode. + RemoveGroup(group uint64) error + // Tick advances the internal logical clock by a single tick. + Tick() + // Campaign causes this MultiNode to transition to candidate state in the given group. + Campaign(ctx context.Context, group uint64) error + // Propose proposes that data be appended to the given group's log. + Propose(ctx context.Context, group uint64, data []byte) error + // ProposeConfChange proposes a config change. + ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error + // ApplyConfChange applies a config change to the local node. + ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState + // Step advances the state machine using the given message. + Step(ctx context.Context, group uint64, msg pb.Message) error + // Ready returns a channel that returns the current point-in-time state of any ready + // groups. Only groups with something to report will appear in the map. + Ready() <-chan map[uint64]Ready + // Advance notifies the node that the application has applied and saved progress in the + // last Ready results. It must be called with the last value returned from the Ready() + // channel. + Advance(map[uint64]Ready) + // Status returns the current status of the given group. + Status(group uint64) Status + // Stop performs any necessary termination of the MultiNode. + Stop() +} + +// StartMultiNode creates a MultiNode and starts its background goroutine. +// The id identifies this node and will be used as its node ID in all groups. +// The election and heartbeat timers are in units of ticks. +func StartMultiNode(id uint64, election, heartbeat int) MultiNode { + mn := newMultiNode(id, election, heartbeat) + go mn.run() + return &mn +} + +// TODO(bdarnell): add group ID to the underlying protos? +type multiMessage struct { + group uint64 + msg pb.Message +} + +type multiConfChange struct { + group uint64 + msg pb.ConfChange + ch chan pb.ConfState +} + +type multiStatus struct { + group uint64 + ch chan Status +} + +type groupCreation struct { + id uint64 + peers []Peer + storage Storage + // TODO(bdarnell): do we really need the done channel here? It's + // unlike the rest of this package, but we need the group creation + // to be complete before any Propose or other calls. + done chan struct{} +} + +type groupRemoval struct { + id uint64 + // TODO(bdarnell): see comment on groupCreation.done + done chan struct{} +} + +type multiNode struct { + id uint64 + election int + heartbeat int + groupc chan groupCreation + rmgroupc chan groupRemoval + propc chan multiMessage + recvc chan multiMessage + confc chan multiConfChange + readyc chan map[uint64]Ready + advancec chan map[uint64]Ready + tickc chan struct{} + stop chan struct{} + done chan struct{} + status chan multiStatus +} + +func newMultiNode(id uint64, election, heartbeat int) multiNode { + return multiNode{ + id: id, + election: election, + heartbeat: heartbeat, + groupc: make(chan groupCreation), + rmgroupc: make(chan groupRemoval), + propc: make(chan multiMessage), + recvc: make(chan multiMessage), + confc: make(chan multiConfChange), + readyc: make(chan map[uint64]Ready), + advancec: make(chan map[uint64]Ready), + tickc: make(chan struct{}), + stop: make(chan struct{}), + done: make(chan struct{}), + status: make(chan multiStatus), + } +} + +type groupState struct { + id uint64 + raft *raft + prevSoftSt *SoftState + prevHardSt pb.HardState + prevSnapi uint64 +} + +func (g *groupState) newReady() Ready { + return newReady(g.raft, g.prevSoftSt, g.prevHardSt) +} + +func (g *groupState) commitReady(rd Ready) { + if rd.SoftState != nil { + g.prevSoftSt = rd.SoftState + } + if !IsEmptyHardState(rd.HardState) { + g.prevHardSt = rd.HardState + } + if !IsEmptySnap(rd.Snapshot) { + g.prevSnapi = rd.Snapshot.Metadata.Index + g.raft.raftLog.stableSnapTo(g.prevSnapi) + } + if len(rd.Entries) > 0 { + // TODO(bdarnell): stableTo(rd.Snapshot.Index) if any + e := rd.Entries[len(rd.Entries)-1] + g.raft.raftLog.stableTo(e.Index, e.Term) + } + + // TODO(bdarnell): in node.go, Advance() ignores CommittedEntries and calls + // appliedTo with HardState.Commit, but this causes problems in multinode/cockroach. + // The two should be the same except for the special-casing of the initial ConfChange + // entries. + if len(rd.CommittedEntries) > 0 { + g.raft.raftLog.appliedTo(rd.CommittedEntries[len(rd.CommittedEntries)-1].Index) + } + //g.raft.raftLog.appliedTo(rd.HardState.Commit) +} + +func (mn *multiNode) run() { + groups := map[uint64]*groupState{} + rds := map[uint64]Ready{} + var advancec chan map[uint64]Ready + for { + // Only select readyc if we have something to report and we are not + // currently waiting for an advance. + readyc := mn.readyc + if len(rds) == 0 || advancec != nil { + readyc = nil + } + + // group points to the group that was touched on this iteration (if any) + var group *groupState + select { + case gc := <-mn.groupc: + // TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it? + r := newRaft(mn.id, nil, mn.election, mn.heartbeat, gc.storage, 0) + group = &groupState{ + id: gc.id, + raft: r, + prevSoftSt: r.softState(), + prevHardSt: r.HardState, + } + groups[gc.id] = group + lastIndex, err := gc.storage.LastIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + // If the log is empty, this is a new group (like StartNode); otherwise it's + // restoring an existing group (like RestartNode). + // TODO(bdarnell): rethink group initialization and whether the application needs + // to be able to tell us when it expects the group to exist. + if lastIndex == 0 { + r.becomeFollower(1, None) + ents := make([]pb.Entry, len(gc.peers)) + for i, peer := range gc.peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + panic("unexpected marshal error") + } + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + } + r.raftLog.append(ents...) + r.raftLog.committed = uint64(len(ents)) + for _, peer := range gc.peers { + r.addNode(peer.ID) + } + } + close(gc.done) + + case gr := <-mn.rmgroupc: + delete(groups, gr.id) + delete(rds, gr.id) + close(gr.done) + + case mm := <-mn.propc: + // TODO(bdarnell): single-node impl doesn't read from propc unless the group + // has a leader; we can't do that since we have one propc for many groups. + // We'll have to buffer somewhere on a group-by-group basis, or just let + // raft.Step drop any such proposals on the floor. + mm.msg.From = mn.id + group = groups[mm.group] + group.raft.Step(mm.msg) + + case mm := <-mn.recvc: + group = groups[mm.group] + if _, ok := group.raft.prs[mm.msg.From]; ok || !IsResponseMsg(mm.msg) { + group.raft.Step(mm.msg) + } + + case mcc := <-mn.confc: + group = groups[mcc.group] + if mcc.msg.NodeID == None { + group.raft.resetPendingConf() + select { + case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}: + case <-mn.done: + } + break + } + switch mcc.msg.Type { + case pb.ConfChangeAddNode: + group.raft.addNode(mcc.msg.NodeID) + case pb.ConfChangeRemoveNode: + group.raft.removeNode(mcc.msg.NodeID) + case pb.ConfChangeUpdateNode: + group.raft.resetPendingConf() + default: + panic("unexpected conf type") + } + select { + case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}: + case <-mn.done: + } + + case <-mn.tickc: + // TODO(bdarnell): instead of calling every group on every tick, + // we should have a priority queue of groups based on their next + // time-based event. + for _, g := range groups { + g.raft.tick() + rd := g.newReady() + if rd.containsUpdates() { + rds[g.id] = rd + } + } + + case readyc <- rds: + // Clear outgoing messages as soon as we've passed them to the application. + for g := range rds { + groups[g].raft.msgs = nil + } + rds = map[uint64]Ready{} + advancec = mn.advancec + + case advs := <-advancec: + for groupID, rd := range advs { + group, ok := groups[groupID] + if !ok { + continue + } + group.commitReady(rd) + + // We've been accumulating new entries in rds which may now be obsolete. + // Drop the old Ready object and create a new one if needed. + delete(rds, groupID) + newRd := group.newReady() + if newRd.containsUpdates() { + rds[groupID] = newRd + } + } + advancec = nil + + case ms := <-mn.status: + ms.ch <- getStatus(groups[ms.group].raft) + + case <-mn.stop: + close(mn.done) + return + } + + if group != nil { + rd := group.newReady() + if rd.containsUpdates() { + rds[group.id] = rd + } + } + } +} + +func (mn *multiNode) CreateGroup(id uint64, peers []Peer, storage Storage) error { + gc := groupCreation{ + id: id, + peers: peers, + storage: storage, + done: make(chan struct{}), + } + mn.groupc <- gc + select { + case <-gc.done: + return nil + case <-mn.done: + return ErrStopped + } +} + +func (mn *multiNode) RemoveGroup(id uint64) error { + gr := groupRemoval{ + id: id, + done: make(chan struct{}), + } + mn.rmgroupc <- gr + select { + case <-gr.done: + return nil + case <-mn.done: + return ErrStopped + } +} + +func (mn *multiNode) Stop() { + select { + case mn.stop <- struct{}{}: + case <-mn.done: + } + <-mn.done +} + +func (mn *multiNode) Tick() { + select { + case mn.tickc <- struct{}{}: + case <-mn.done: + } +} + +func (mn *multiNode) Campaign(ctx context.Context, group uint64) error { + return mn.step(ctx, multiMessage{group, + pb.Message{ + Type: pb.MsgHup, + }, + }) +} + +func (mn *multiNode) Propose(ctx context.Context, group uint64, data []byte) error { + return mn.step(ctx, multiMessage{group, + pb.Message{ + Type: pb.MsgProp, + Entries: []pb.Entry{ + {Data: data}, + }, + }}) +} + +func (mn *multiNode) ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error { + data, err := cc.Marshal() + if err != nil { + return err + } + return mn.Step(ctx, group, + pb.Message{ + Type: pb.MsgProp, + Entries: []pb.Entry{ + {Type: pb.EntryConfChange, Data: data}, + }, + }) +} + +func (mn *multiNode) step(ctx context.Context, m multiMessage) error { + ch := mn.recvc + if m.msg.Type == pb.MsgProp { + ch = mn.propc + } + + select { + case ch <- m: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-mn.done: + return ErrStopped + } +} + +func (mn *multiNode) ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState { + mcc := multiConfChange{group, cc, make(chan pb.ConfState)} + select { + case mn.confc <- mcc: + case <-mn.done: + } + select { + case cs := <-mcc.ch: + return &cs + case <-mn.done: + // Per comments on Node.ApplyConfChange, this method should never return nil. + return &pb.ConfState{} + } +} + +func (mn *multiNode) Step(ctx context.Context, group uint64, m pb.Message) error { + // ignore unexpected local messages receiving over network + if IsLocalMsg(m) { + // TODO: return an error? + return nil + } + return mn.step(ctx, multiMessage{group, m}) +} + +func (mn *multiNode) Ready() <-chan map[uint64]Ready { + return mn.readyc +} + +func (mn *multiNode) Advance(rds map[uint64]Ready) { + select { + case mn.advancec <- rds: + case <-mn.done: + } +} + +func (mn *multiNode) Status(group uint64) Status { + ms := multiStatus{ + group: group, + ch: make(chan Status), + } + mn.status <- ms + return <-ms.ch +} diff --git a/raft/multinode_test.go b/raft/multinode_test.go new file mode 100644 index 000000000..9fd1f88b1 --- /dev/null +++ b/raft/multinode_test.go @@ -0,0 +1,394 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "bytes" + "reflect" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/raft/raftpb" +) + +// TestMultiNodeStep ensures that multiNode.Step sends MsgProp to propc +// chan and other kinds of messages to recvc chan. +func TestMultiNodeStep(t *testing.T) { + for i, msgn := range raftpb.MessageType_name { + mn := &multiNode{ + propc: make(chan multiMessage, 1), + recvc: make(chan multiMessage, 1), + } + msgt := raftpb.MessageType(i) + mn.Step(context.TODO(), 1, raftpb.Message{Type: msgt}) + // Proposal goes to proc chan. Others go to recvc chan. + if msgt == raftpb.MsgProp { + select { + case <-mn.propc: + default: + t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn) + } + } else { + if msgt == raftpb.MsgBeat || msgt == raftpb.MsgHup || msgt == raftpb.MsgUnreachable || msgt == raftpb.MsgSnapStatus { + select { + case <-mn.recvc: + t.Errorf("%d: step should ignore %s", msgt, msgn) + default: + } + } else { + select { + case <-mn.recvc: + default: + t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn) + } + } + } + } +} + +// Cancel and Stop should unblock Step() +func TestMultiNodeStepUnblock(t *testing.T) { + // a node without buffer to block step + mn := &multiNode{ + propc: make(chan multiMessage), + done: make(chan struct{}), + } + + ctx, cancel := context.WithCancel(context.Background()) + stopFunc := func() { close(mn.done) } + + tests := []struct { + unblock func() + werr error + }{ + {stopFunc, ErrStopped}, + {cancel, context.Canceled}, + } + + for i, tt := range tests { + errc := make(chan error, 1) + go func() { + err := mn.Step(ctx, 1, raftpb.Message{Type: raftpb.MsgProp}) + errc <- err + }() + tt.unblock() + select { + case err := <-errc: + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + //clean up side-effect + if ctx.Err() != nil { + ctx = context.TODO() + } + select { + case <-mn.done: + mn.done = make(chan struct{}) + default: + } + case <-time.After(time.Millisecond * 100): + t.Errorf("#%d: failed to unblock step", i) + } + } +} + +// TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft. +func TestMultiNodePropose(t *testing.T) { + mn := newMultiNode(1, 10, 1) + go mn.run() + s := NewMemoryStorage() + mn.CreateGroup(1, []Peer{{ID: 1}}, s) + mn.Campaign(context.TODO(), 1) + proposed := false + for { + rds := <-mn.Ready() + rd := rds[1] + s.Append(rd.Entries) + // Once we are the leader, propose a command. + if !proposed && rd.SoftState.Lead == mn.id { + mn.Propose(context.TODO(), 1, []byte("somedata")) + proposed = true + } + mn.Advance(rds) + + // Exit when we have three entries: one ConfChange, one no-op for the election, + // and our proposed command. + lastIndex, err := s.LastIndex() + if err != nil { + t.Fatal(err) + } + if lastIndex >= 3 { + break + } + } + mn.Stop() + + lastIndex, err := s.LastIndex() + if err != nil { + t.Fatal(err) + } + entries, err := s.Entries(lastIndex, lastIndex+1) + if err != nil { + t.Fatal(err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want %d", len(entries), 1) + } + if !bytes.Equal(entries[0].Data, []byte("somedata")) { + t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) + } +} + +// TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange +// sends the given configuration proposal to the underlying raft. +func TestMultiNodeProposeConfig(t *testing.T) { + mn := newMultiNode(1, 10, 1) + go mn.run() + s := NewMemoryStorage() + mn.CreateGroup(1, []Peer{{ID: 1}}, s) + mn.Campaign(context.TODO(), 1) + proposed := false + var lastIndex uint64 + var ccdata []byte + for { + rds := <-mn.Ready() + rd := rds[1] + s.Append(rd.Entries) + // change the step function to appendStep until this raft becomes leader + if !proposed && rd.SoftState.Lead == mn.id { + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + var err error + ccdata, err = cc.Marshal() + if err != nil { + t.Fatal(err) + } + mn.ProposeConfChange(context.TODO(), 1, cc) + proposed = true + } + mn.Advance(rds) + + var err error + lastIndex, err = s.LastIndex() + if err != nil { + t.Fatal(err) + } + if lastIndex >= 3 { + break + } + } + mn.Stop() + + entries, err := s.Entries(lastIndex, lastIndex+1) + if err != nil { + t.Fatal(err) + } + if len(entries) != 1 { + t.Fatalf("len(entries) = %d, want %d", len(entries), 1) + } + if entries[0].Type != raftpb.EntryConfChange { + t.Fatalf("type = %v, want %v", entries[0].Type, raftpb.EntryConfChange) + } + if !bytes.Equal(entries[0].Data, ccdata) { + t.Errorf("data = %v, want %v", entries[0].Data, ccdata) + } +} + +// TestBlockProposal from node_test.go has no equivalent in multiNode +// because we cannot block proposals based on individual group leader status. + +// TestNodeTick from node_test.go has no equivalent in multiNode because +// it reaches into the raft object which is not exposed. + +// TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped +// processing, and that it is idempotent +func TestMultiNodeStop(t *testing.T) { + mn := newMultiNode(1, 10, 1) + donec := make(chan struct{}) + + go func() { + mn.run() + close(donec) + }() + + mn.Tick() + mn.Stop() + + select { + case <-donec: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for node to stop!") + } + + // Further ticks should have no effect, the node is stopped. + // There is no way to verify this in multinode but at least we can test + // it doesn't block or panic. + mn.Tick() + // Subsequent Stops should have no effect. + mn.Stop() +} + +// TestMultiNodeStart ensures that a node can be started correctly. The node should +// start with correct configuration change entries, and can accept and commit +// proposals. +func TestMultiNodeStart(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + ccdata, err := cc.Marshal() + if err != nil { + t.Fatalf("unexpected marshal error: %v", err) + } + wants := []Ready{ + { + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: raftpb.HardState{Term: 2, Commit: 2}, + Entries: []raftpb.Entry{ + {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, + {Term: 2, Index: 2}, + }, + CommittedEntries: []raftpb.Entry{ + {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, + {Term: 2, Index: 2}, + }, + }, + { + HardState: raftpb.HardState{Term: 2, Commit: 3}, + Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + }, + } + mn := StartMultiNode(1, 10, 1) + storage := NewMemoryStorage() + mn.CreateGroup(1, []Peer{{ID: 1}}, storage) + mn.Campaign(ctx, 1) + gs := <-mn.Ready() + g := gs[1] + if !reflect.DeepEqual(g, wants[0]) { + t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) + } else { + storage.Append(g.Entries) + mn.Advance(gs) + } + + mn.Propose(ctx, 1, []byte("foo")) + if gs2 := <-mn.Ready(); !reflect.DeepEqual(gs2[1], wants[1]) { + t.Errorf("#%d: g = %+v,\n w %+v", 2, gs2[1], wants[1]) + } else { + storage.Append(gs2[1].Entries) + mn.Advance(gs2) + } + + select { + case rd := <-mn.Ready(): + t.Errorf("unexpected Ready: %+v", rd) + case <-time.After(time.Millisecond): + } +} + +func TestMultiNodeRestart(t *testing.T) { + entries := []raftpb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2, Data: []byte("foo")}, + } + st := raftpb.HardState{Term: 1, Commit: 1} + + want := Ready{ + HardState: emptyState, + // commit up to index commit index in st + CommittedEntries: entries[:st.Commit], + } + + storage := NewMemoryStorage() + storage.SetHardState(st) + storage.Append(entries) + mn := StartMultiNode(1, 10, 1) + mn.CreateGroup(1, nil, storage) + gs := <-mn.Ready() + if !reflect.DeepEqual(gs[1], want) { + t.Errorf("g = %+v,\n w %+v", gs[1], want) + } + mn.Advance(gs) + + select { + case rd := <-mn.Ready(): + t.Errorf("unexpected Ready: %+v", rd) + case <-time.After(time.Millisecond): + } + mn.Stop() +} + +func TestMultiNodeRestartFromSnapshot(t *testing.T) { + snap := raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + ConfState: raftpb.ConfState{Nodes: []uint64{1, 2}}, + Index: 2, + Term: 1, + }, + } + entries := []raftpb.Entry{ + {Term: 1, Index: 3, Data: []byte("foo")}, + } + st := raftpb.HardState{Term: 1, Commit: 3} + + want := Ready{ + HardState: emptyState, + // commit up to index commit index in st + CommittedEntries: entries, + } + + s := NewMemoryStorage() + s.SetHardState(st) + s.ApplySnapshot(snap) + s.Append(entries) + mn := StartMultiNode(1, 10, 1) + mn.CreateGroup(1, nil, s) + if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) { + t.Errorf("g = %+v,\n w %+v", gs[1], want) + } else { + mn.Advance(gs) + } + + select { + case rd := <-mn.Ready(): + t.Errorf("unexpected Ready: %+v", rd) + case <-time.After(time.Millisecond): + } +} + +func TestMultiNodeAdvance(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + storage := NewMemoryStorage() + mn := StartMultiNode(1, 10, 1) + mn.CreateGroup(1, []Peer{{ID: 1}}, storage) + mn.Campaign(ctx, 1) + rd1 := <-mn.Ready() + mn.Propose(ctx, 1, []byte("foo")) + select { + case rd2 := <-mn.Ready(): + t.Fatalf("unexpected Ready before Advance: %+v", rd2) + case <-time.After(time.Millisecond): + } + storage.Append(rd1[1].Entries) + mn.Advance(rd1) + select { + case <-mn.Ready(): + case <-time.After(time.Millisecond): + t.Errorf("expect Ready after Advance, but there is no Ready available") + } +}