diff --git a/rafthttp/metrics.go b/rafthttp/metrics.go new file mode 100644 index 000000000..1112aa8cc --- /dev/null +++ b/rafthttp/metrics.go @@ -0,0 +1,45 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +var ( + msgWriteDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "rafthttp_message_sending_latency_microseconds", + Help: "message sending latency distributions.", + }, + []string{"channel", "remoteID", "msgType"}, + ) +) + +func init() { + prometheus.MustRegister(msgWriteDuration) +} + +func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) { + typ := m.Type.String() + if isLinkHeartbeatMessage(m) { + typ = "MsgLinkHeartbeat" + } + msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond))) +} diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 5ef6ff21a..ce3253122 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -121,6 +121,7 @@ func (p *pipeline) handle() { if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFinish) } + reportSendingDuration(pipelineMsg, m, time.Since(start)) } p.Unlock() } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 446abe918..9d03d0564 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -100,6 +100,7 @@ func (cw *streamWriter) run() { for { select { case <-heartbeatc: + start := time.Now() if err := enc.encode(linkHeartbeatMessage); err != nil { log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() @@ -107,6 +108,7 @@ func (cw *streamWriter) run() { continue } flusher.Flush() + reportSendingDuration(string(t), linkHeartbeatMessage, time.Since(start)) case m := <-msgc: if t == streamTypeMsgApp && m.Term != msgAppTerm { // TODO: reasonable retry logic @@ -116,6 +118,7 @@ func (cw *streamWriter) run() { } continue } + start := time.Now() if err := enc.encode(m); err != nil { log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() @@ -124,6 +127,7 @@ func (cw *streamWriter) run() { continue } flusher.Flush() + reportSendingDuration(string(t), m, time.Since(start)) case conn := <-cw.connc: cw.resetCloser() t = conn.t