rafthttp: remove the newPipeline func
Using struct to initialize pipeline is better when we have many fields to file in.
This commit is contained in:
@ -120,6 +120,17 @@ type peer struct {
|
|||||||
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
|
func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
|
||||||
status := newPeerStatus(to)
|
status := newPeerStatus(to)
|
||||||
picker := newURLPicker(urls)
|
picker := newURLPicker(urls)
|
||||||
|
pipeline := &pipeline{
|
||||||
|
to: to,
|
||||||
|
tr: transport,
|
||||||
|
picker: picker,
|
||||||
|
status: status,
|
||||||
|
followerStats: fs,
|
||||||
|
raft: r,
|
||||||
|
errorc: errorc,
|
||||||
|
}
|
||||||
|
pipeline.start()
|
||||||
|
|
||||||
p := &peer{
|
p := &peer{
|
||||||
id: to,
|
id: to,
|
||||||
r: r,
|
r: r,
|
||||||
@ -127,7 +138,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
|
|||||||
picker: picker,
|
picker: picker,
|
||||||
msgAppV2Writer: startStreamWriter(to, status, fs, r),
|
msgAppV2Writer: startStreamWriter(to, status, fs, r),
|
||||||
writer: startStreamWriter(to, status, fs, r),
|
writer: startStreamWriter(to, status, fs, r),
|
||||||
pipeline: newPipeline(transport, picker, local, to, cid, status, fs, r, errorc),
|
pipeline: pipeline,
|
||||||
snapSender: newSnapshotSender(transport, picker, local, to, cid, status, r, errorc),
|
snapSender: newSnapshotSender(transport, picker, local, to, cid, status, r, errorc),
|
||||||
sendc: make(chan raftpb.Message),
|
sendc: make(chan raftpb.Message),
|
||||||
recvc: make(chan raftpb.Message, recvBufSize),
|
recvc: make(chan raftpb.Message, recvBufSize),
|
||||||
|
@ -41,15 +41,15 @@ const (
|
|||||||
var errStopped = errors.New("stopped")
|
var errStopped = errors.New("stopped")
|
||||||
|
|
||||||
type pipeline struct {
|
type pipeline struct {
|
||||||
from, to types.ID
|
to types.ID
|
||||||
cid types.ID
|
|
||||||
|
|
||||||
tr *Transport
|
tr *Transport
|
||||||
picker *urlPicker
|
picker *urlPicker
|
||||||
status *peerStatus
|
status *peerStatus
|
||||||
fs *stats.FollowerStats
|
raft Raft
|
||||||
r Raft
|
|
||||||
errorc chan error
|
errorc chan error
|
||||||
|
// deprecate when we depercate v2 API
|
||||||
|
followerStats *stats.FollowerStats
|
||||||
|
|
||||||
msgc chan raftpb.Message
|
msgc chan raftpb.Message
|
||||||
// wait for the handling routines
|
// wait for the handling routines
|
||||||
@ -57,25 +57,13 @@ type pipeline struct {
|
|||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPipeline(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
|
func (p *pipeline) start() {
|
||||||
p := &pipeline{
|
p.stopc = make(chan struct{})
|
||||||
from: from,
|
p.msgc = make(chan raftpb.Message, pipelineBufSize)
|
||||||
to: to,
|
|
||||||
cid: cid,
|
|
||||||
tr: tr,
|
|
||||||
picker: picker,
|
|
||||||
status: status,
|
|
||||||
fs: fs,
|
|
||||||
r: r,
|
|
||||||
errorc: errorc,
|
|
||||||
stopc: make(chan struct{}),
|
|
||||||
msgc: make(chan raftpb.Message, pipelineBufSize),
|
|
||||||
}
|
|
||||||
p.wg.Add(connPerPipeline)
|
p.wg.Add(connPerPipeline)
|
||||||
for i := 0; i < connPerPipeline; i++ {
|
for i := 0; i < connPerPipeline; i++ {
|
||||||
go p.handle()
|
go p.handle()
|
||||||
}
|
}
|
||||||
return p
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pipeline) stop() {
|
func (p *pipeline) stop() {
|
||||||
@ -96,22 +84,22 @@ func (p *pipeline) handle() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
|
p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
|
||||||
|
|
||||||
if m.Type == raftpb.MsgApp && p.fs != nil {
|
if m.Type == raftpb.MsgApp && p.followerStats != nil {
|
||||||
p.fs.Fail()
|
p.followerStats.Fail()
|
||||||
}
|
}
|
||||||
p.r.ReportUnreachable(m.To)
|
p.raft.ReportUnreachable(m.To)
|
||||||
if isMsgSnap(m) {
|
if isMsgSnap(m) {
|
||||||
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
p.status.activate()
|
p.status.activate()
|
||||||
if m.Type == raftpb.MsgApp && p.fs != nil {
|
if m.Type == raftpb.MsgApp && p.followerStats != nil {
|
||||||
p.fs.Succ(end.Sub(start))
|
p.followerStats.Succ(end.Sub(start))
|
||||||
}
|
}
|
||||||
if isMsgSnap(m) {
|
if isMsgSnap(m) {
|
||||||
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||||
}
|
}
|
||||||
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
|
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
|
||||||
case <-p.stopc:
|
case <-p.stopc:
|
||||||
@ -124,7 +112,7 @@ func (p *pipeline) handle() {
|
|||||||
// error on any failure.
|
// error on any failure.
|
||||||
func (p *pipeline) post(data []byte) (err error) {
|
func (p *pipeline) post(data []byte) (err error) {
|
||||||
u := p.picker.pick()
|
u := p.picker.pick()
|
||||||
req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.from, p.cid)
|
req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
|
||||||
|
|
||||||
done := make(chan struct{}, 1)
|
done := make(chan struct{}, 1)
|
||||||
cancel := httputil.RequestCanceler(p.tr.pipelineRt, req)
|
cancel := httputil.RequestCanceler(p.tr.pipelineRt, req)
|
||||||
|
@ -36,9 +36,8 @@ import (
|
|||||||
func TestPipelineSend(t *testing.T) {
|
func TestPipelineSend(t *testing.T) {
|
||||||
tr := &roundTripperRecorder{}
|
tr := &roundTripperRecorder{}
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
fs := &stats.FollowerStats{}
|
|
||||||
tp := &Transport{pipelineRt: tr}
|
tp := &Transport{pipelineRt: tr}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
p := startTestPipeline(tp, picker)
|
||||||
|
|
||||||
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
||||||
testutil.WaitSchedule()
|
testutil.WaitSchedule()
|
||||||
@ -47,10 +46,8 @@ func TestPipelineSend(t *testing.T) {
|
|||||||
if tr.Request() == nil {
|
if tr.Request() == nil {
|
||||||
t.Errorf("sender fails to post the data")
|
t.Errorf("sender fails to post the data")
|
||||||
}
|
}
|
||||||
fs.Lock()
|
if p.followerStats.Counts.Success != 1 {
|
||||||
defer fs.Unlock()
|
t.Errorf("success = %d, want 1", p.followerStats.Counts.Success)
|
||||||
if fs.Counts.Success != 1 {
|
|
||||||
t.Errorf("success = %d, want 1", fs.Counts.Success)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,9 +56,8 @@ func TestPipelineSend(t *testing.T) {
|
|||||||
func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
||||||
tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
|
tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
fs := &stats.FollowerStats{}
|
|
||||||
tp := &Transport{pipelineRt: tr}
|
tp := &Transport{pipelineRt: tr}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
p := startTestPipeline(tp, picker)
|
||||||
defer p.stop()
|
defer p.stop()
|
||||||
|
|
||||||
for i := 0; i < 50; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
@ -77,9 +73,9 @@ func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
|||||||
func TestPipelineExceedMaximumServing(t *testing.T) {
|
func TestPipelineExceedMaximumServing(t *testing.T) {
|
||||||
tr := newRoundTripperBlocker()
|
tr := newRoundTripperBlocker()
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
fs := &stats.FollowerStats{}
|
|
||||||
tp := &Transport{pipelineRt: tr}
|
tp := &Transport{pipelineRt: tr}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
p := startTestPipeline(tp, picker)
|
||||||
|
defer p.stop()
|
||||||
|
|
||||||
// keep the sender busy and make the buffer full
|
// keep the sender busy and make the buffer full
|
||||||
// nothing can go out as we block the sender
|
// nothing can go out as we block the sender
|
||||||
@ -111,33 +107,29 @@ func TestPipelineExceedMaximumServing(t *testing.T) {
|
|||||||
default:
|
default:
|
||||||
t.Errorf("failed to send out message")
|
t.Errorf("failed to send out message")
|
||||||
}
|
}
|
||||||
p.stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestPipelineSendFailed tests that when send func meets the post error,
|
// TestPipelineSendFailed tests that when send func meets the post error,
|
||||||
// it increases fail count in stats.
|
// it increases fail count in stats.
|
||||||
func TestPipelineSendFailed(t *testing.T) {
|
func TestPipelineSendFailed(t *testing.T) {
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
fs := &stats.FollowerStats{}
|
|
||||||
tp := &Transport{pipelineRt: newRespRoundTripper(0, errors.New("blah"))}
|
tp := &Transport{pipelineRt: newRespRoundTripper(0, errors.New("blah"))}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
p := startTestPipeline(tp, picker)
|
||||||
|
|
||||||
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
||||||
testutil.WaitSchedule()
|
testutil.WaitSchedule()
|
||||||
p.stop()
|
p.stop()
|
||||||
|
|
||||||
fs.Lock()
|
if p.followerStats.Counts.Fail != 1 {
|
||||||
defer fs.Unlock()
|
t.Errorf("fail = %d, want 1", p.followerStats.Counts.Fail)
|
||||||
if fs.Counts.Fail != 1 {
|
|
||||||
t.Errorf("fail = %d, want 1", fs.Counts.Fail)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPipelinePost(t *testing.T) {
|
func TestPipelinePost(t *testing.T) {
|
||||||
tr := &roundTripperRecorder{}
|
tr := &roundTripperRecorder{}
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
tp := &Transport{pipelineRt: tr}
|
tp := &Transport{ClusterID: types.ID(1), pipelineRt: tr}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
|
p := startTestPipeline(tp, picker)
|
||||||
if err := p.post([]byte("some data")); err != nil {
|
if err := p.post([]byte("some data")); err != nil {
|
||||||
t.Fatalf("unexpected post error: %v", err)
|
t.Fatalf("unexpected post error: %v", err)
|
||||||
}
|
}
|
||||||
@ -185,7 +177,7 @@ func TestPipelinePostBad(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
picker := mustNewURLPicker(t, []string{tt.u})
|
picker := mustNewURLPicker(t, []string{tt.u})
|
||||||
tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
|
tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, make(chan error))
|
p := startTestPipeline(tp, picker)
|
||||||
err := p.post([]byte("some data"))
|
err := p.post([]byte("some data"))
|
||||||
p.stop()
|
p.stop()
|
||||||
|
|
||||||
@ -205,13 +197,12 @@ func TestPipelinePostErrorc(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
picker := mustNewURLPicker(t, []string{tt.u})
|
picker := mustNewURLPicker(t, []string{tt.u})
|
||||||
errorc := make(chan error, 1)
|
|
||||||
tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
|
tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, errorc)
|
p := startTestPipeline(tp, picker)
|
||||||
p.post([]byte("some data"))
|
p.post([]byte("some data"))
|
||||||
p.stop()
|
p.stop()
|
||||||
select {
|
select {
|
||||||
case <-errorc:
|
case <-p.errorc:
|
||||||
default:
|
default:
|
||||||
t.Fatalf("#%d: cannot receive from errorc", i)
|
t.Fatalf("#%d: cannot receive from errorc", i)
|
||||||
}
|
}
|
||||||
@ -221,7 +212,7 @@ func TestPipelinePostErrorc(t *testing.T) {
|
|||||||
func TestStopBlockedPipeline(t *testing.T) {
|
func TestStopBlockedPipeline(t *testing.T) {
|
||||||
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
||||||
tp := &Transport{pipelineRt: newRoundTripperBlocker()}
|
tp := &Transport{pipelineRt: newRoundTripperBlocker()}
|
||||||
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
|
p := startTestPipeline(tp, picker)
|
||||||
// send many messages that most of them will be blocked in buffer
|
// send many messages that most of them will be blocked in buffer
|
||||||
for i := 0; i < connPerPipeline*10; i++ {
|
for i := 0; i < connPerPipeline*10; i++ {
|
||||||
p.msgc <- raftpb.Message{}
|
p.msgc <- raftpb.Message{}
|
||||||
@ -307,3 +298,18 @@ type nopReadCloser struct{}
|
|||||||
|
|
||||||
func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
|
func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
|
||||||
func (n *nopReadCloser) Close() error { return nil }
|
func (n *nopReadCloser) Close() error { return nil }
|
||||||
|
|
||||||
|
func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
|
||||||
|
p := &pipeline{
|
||||||
|
tr: tr,
|
||||||
|
picker: picker,
|
||||||
|
|
||||||
|
to: types.ID(1),
|
||||||
|
status: newPeerStatus(types.ID(1)),
|
||||||
|
raft: &fakeRaft{},
|
||||||
|
followerStats: &stats.FollowerStats{},
|
||||||
|
errorc: make(chan error, 1),
|
||||||
|
}
|
||||||
|
p.start()
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
@ -28,10 +28,20 @@ type remote struct {
|
|||||||
func startRemote(tr *Transport, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
func startRemote(tr *Transport, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
||||||
picker := newURLPicker(urls)
|
picker := newURLPicker(urls)
|
||||||
status := newPeerStatus(to)
|
status := newPeerStatus(to)
|
||||||
|
pipeline := &pipeline{
|
||||||
|
to: to,
|
||||||
|
tr: tr,
|
||||||
|
picker: picker,
|
||||||
|
status: status,
|
||||||
|
raft: r,
|
||||||
|
errorc: errorc,
|
||||||
|
}
|
||||||
|
pipeline.start()
|
||||||
|
|
||||||
return &remote{
|
return &remote{
|
||||||
id: to,
|
id: to,
|
||||||
status: status,
|
status: status,
|
||||||
pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc),
|
pipeline: pipeline,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user