diff --git a/storage/kvstore.go b/storage/kvstore.go index 97d8edfe8..7cc4c19c1 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -70,7 +70,9 @@ func newStore(path string) *store { func (s *store) Put(key, value []byte) int64 { id := s.TxnBegin() s.put(key, value, s.currentRev.main+1) - s.TxnEnd(id) + s.txnEnd(id) + + putCounter.Inc() return int64(s.currentRev.main) } @@ -78,7 +80,9 @@ func (s *store) Put(key, value []byte) int64 { func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { id := s.TxnBegin() kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev) - s.TxnEnd(id) + s.txnEnd(id) + + rangeCounter.Inc() return kvs, rev, err } @@ -86,7 +90,9 @@ func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.K func (s *store) DeleteRange(key, end []byte) (n, rev int64) { id := s.TxnBegin() n = s.deleteRange(key, end, s.currentRev.main+1) - s.TxnEnd(id) + s.txnEnd(id) + + deleteCounter.Inc() return n, int64(s.currentRev.main) } @@ -102,6 +108,18 @@ func (s *store) TxnBegin() int64 { } func (s *store) TxnEnd(txnID int64) error { + err := s.txnEnd(txnID) + if err != nil { + return err + } + + txnCounter.Inc() + return nil +} + +// txnEnd is used for unlocking an internal txn. It does +// not increase the txnCounter. +func (s *store) txnEnd(txnID int64) error { s.tmu.Lock() defer s.tmu.Unlock() if txnID != s.txnID { @@ -162,6 +180,8 @@ func (s *store) Compact(rev int64) error { return ErrFutureRev } + start := time.Now() + s.compactMainRev = rev rbytes := newRevBytes() @@ -178,6 +198,8 @@ func (s *store) Compact(rev int64) error { s.wg.Add(1) go s.scheduleCompaction(rev, keep) + + indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond)) return nil } diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index 7f1d307de..d63e25f42 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -7,6 +7,10 @@ import ( func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) { defer s.wg.Done() + + totalStart := time.Now() + defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond)) + end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) @@ -15,6 +19,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc for { var rev revision + start := time.Now() tx := s.b.BatchTx() tx.Lock() @@ -37,6 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc // update last revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last) tx.Unlock() + dbCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond)) select { case <-time.After(100 * time.Millisecond): diff --git a/storage/metrics.go b/storage/metrics.go new file mode 100644 index 000000000..d677c879b --- /dev/null +++ b/storage/metrics.go @@ -0,0 +1,92 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" +) + +var ( + rangeCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "range_total", + Help: "Total number of ranges seen by this member.", + }) + + putCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "put_total", + Help: "Total number of puts seen by this member.", + }) + + deleteCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "delete_total", + Help: "Total number of deletes seen by this member.", + }) + + txnCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "txn_total", + Help: "Total number of txns seen by this member.", + }) + + indexCompactionPauseDurations = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "index_compaction_pause_duration_milliseconds", + Help: "Bucketed histogram of index compaction puase duration.", + // 0.5ms -> 1second + Buckets: prometheus.ExponentialBuckets(0.5, 2, 12), + }) + + dbCompactionPauseDurations = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "db_compaction_pause_duration_milliseconds", + Help: "Bucketed histogram of db compaction puase duration.", + // 1ms -> 4second + Buckets: prometheus.ExponentialBuckets(1, 2, 13), + }) + + dbCompactionTotalDurations = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "db_compaction_total_duration_milliseconds", + Help: "Bucketed histogram of db compaction total duration.", + // 100ms -> 800second + Buckets: prometheus.ExponentialBuckets(100, 2, 14), + }) +) + +func init() { + prometheus.MustRegister(rangeCounter) + prometheus.MustRegister(putCounter) + prometheus.MustRegister(deleteCounter) + prometheus.MustRegister(indexCompactionPauseDurations) + prometheus.MustRegister(dbCompactionPauseDurations) + prometheus.MustRegister(dbCompactionTotalDurations) +}