etcdserver: add id generator
This commit is contained in:
@ -18,14 +18,13 @@ package command
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||||
"github.com/coreos/etcd/etcdserver"
|
|
||||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/etcdserver/idutil"
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/snap"
|
||||||
"github.com/coreos/etcd/wal"
|
"github.com/coreos/etcd/wal"
|
||||||
@ -78,9 +77,9 @@ func handleBackup(c *cli.Context) {
|
|||||||
}
|
}
|
||||||
var metadata etcdserverpb.Metadata
|
var metadata etcdserverpb.Metadata
|
||||||
pbutil.MustUnmarshal(&metadata, wmetadata)
|
pbutil.MustUnmarshal(&metadata, wmetadata)
|
||||||
rand.Seed(time.Now().UnixNano())
|
idgen := idutil.NewGenerator(0, time.Now())
|
||||||
metadata.NodeID = etcdserver.GenID()
|
metadata.NodeID = idgen.Next()
|
||||||
metadata.ClusterID = etcdserver.GenID()
|
metadata.ClusterID = idgen.Next()
|
||||||
|
|
||||||
neww, err := wal.Create(destWAL, pbutil.MustMarshal(&metadata))
|
neww, err := wal.Create(destWAL, pbutil.MustMarshal(&metadata))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -102,7 +102,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
|
rr, err := parseKeyRequest(r, clockwork.NewRealClock())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, err)
|
writeError(w, err)
|
||||||
return
|
return
|
||||||
@ -279,7 +279,7 @@ func serveVersion(w http.ResponseWriter, r *http.Request) {
|
|||||||
// parseKeyRequest converts a received http.Request on keysPrefix to
|
// parseKeyRequest converts a received http.Request on keysPrefix to
|
||||||
// a server Request, performing validation of supplied fields as appropriate.
|
// a server Request, performing validation of supplied fields as appropriate.
|
||||||
// If any validation fails, an empty Request and non-nil error is returned.
|
// If any validation fails, an empty Request and non-nil error is returned.
|
||||||
func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
|
func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, error) {
|
||||||
emptyReq := etcdserverpb.Request{}
|
emptyReq := etcdserverpb.Request{}
|
||||||
|
|
||||||
err := r.ParseForm()
|
err := r.ParseForm()
|
||||||
@ -394,7 +394,6 @@ func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdser
|
|||||||
}
|
}
|
||||||
|
|
||||||
rr := etcdserverpb.Request{
|
rr := etcdserverpb.Request{
|
||||||
ID: id,
|
|
||||||
Method: r.Method,
|
Method: r.Method,
|
||||||
Path: p,
|
Path: p,
|
||||||
Val: r.FormValue("value"),
|
Val: r.FormValue("value"),
|
||||||
|
@ -312,7 +312,7 @@ func TestBadParseRequest(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
got, err := parseKeyRequest(tt.in, 1234, clockwork.NewFakeClock())
|
got, err := parseKeyRequest(tt.in, clockwork.NewFakeClock())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("#%d: unexpected nil error!", i)
|
t.Errorf("#%d: unexpected nil error!", i)
|
||||||
continue
|
continue
|
||||||
@ -343,7 +343,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// good prefix, all other values default
|
// good prefix, all other values default
|
||||||
mustNewRequest(t, "foo"),
|
mustNewRequest(t, "foo"),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
},
|
},
|
||||||
@ -356,7 +355,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"value": []string{"some_value"}},
|
url.Values{"value": []string{"some_value"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Val: "some_value",
|
Val: "some_value",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -370,7 +368,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"prevIndex": []string{"98765"}},
|
url.Values{"prevIndex": []string{"98765"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
PrevIndex: 98765,
|
PrevIndex: 98765,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -384,7 +381,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"recursive": []string{"true"}},
|
url.Values{"recursive": []string{"true"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -398,7 +394,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"sorted": []string{"true"}},
|
url.Values{"sorted": []string{"true"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Sorted: true,
|
Sorted: true,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -412,7 +407,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"quorum": []string{"true"}},
|
url.Values{"quorum": []string{"true"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Quorum: true,
|
Quorum: true,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -422,7 +416,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// wait specified
|
// wait specified
|
||||||
mustNewRequest(t, "foo?wait=true"),
|
mustNewRequest(t, "foo?wait=true"),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Wait: true,
|
Wait: true,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -432,7 +425,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// empty TTL specified
|
// empty TTL specified
|
||||||
mustNewRequest(t, "foo?ttl="),
|
mustNewRequest(t, "foo?ttl="),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
Expiration: 0,
|
Expiration: 0,
|
||||||
@ -442,7 +434,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// non-empty TTL specified
|
// non-empty TTL specified
|
||||||
mustNewRequest(t, "foo?ttl=5678"),
|
mustNewRequest(t, "foo?ttl=5678"),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
Expiration: fc.Now().Add(5678 * time.Second).UnixNano(),
|
Expiration: fc.Now().Add(5678 * time.Second).UnixNano(),
|
||||||
@ -452,7 +443,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// zero TTL specified
|
// zero TTL specified
|
||||||
mustNewRequest(t, "foo?ttl=0"),
|
mustNewRequest(t, "foo?ttl=0"),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
Expiration: fc.Now().UnixNano(),
|
Expiration: fc.Now().UnixNano(),
|
||||||
@ -462,7 +452,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// dir specified
|
// dir specified
|
||||||
mustNewRequest(t, "foo?dir=true"),
|
mustNewRequest(t, "foo?dir=true"),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Dir: true,
|
Dir: true,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -472,7 +461,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
// dir specified negatively
|
// dir specified negatively
|
||||||
mustNewRequest(t, "foo?dir=false"),
|
mustNewRequest(t, "foo?dir=false"),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
Dir: false,
|
Dir: false,
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -486,7 +474,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"prevExist": []string{"true"}},
|
url.Values{"prevExist": []string{"true"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
PrevExist: boolp(true),
|
PrevExist: boolp(true),
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -500,7 +487,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{"prevExist": []string{"false"}},
|
url.Values{"prevExist": []string{"false"}},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
PrevExist: boolp(false),
|
PrevExist: boolp(false),
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -518,7 +504,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
PrevExist: boolp(true),
|
PrevExist: boolp(true),
|
||||||
PrevValue: "previous value",
|
PrevValue: "previous value",
|
||||||
@ -534,7 +519,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
url.Values{},
|
url.Values{},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
PrevValue: "woof",
|
PrevValue: "woof",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -550,7 +534,6 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
etcdserverpb.Request{
|
etcdserverpb.Request{
|
||||||
ID: 1234,
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
PrevValue: "miaow",
|
PrevValue: "miaow",
|
||||||
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
|
||||||
@ -559,7 +542,7 @@ func TestGoodParseRequest(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
got, err := parseKeyRequest(tt.in, 1234, fc)
|
got, err := parseKeyRequest(tt.in, fc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("#%d: err = %v, want %v", i, err, nil)
|
t.Errorf("#%d: err = %v, want %v", i, err, nil)
|
||||||
}
|
}
|
||||||
|
75
etcdserver/idutil/id.go
Normal file
75
etcdserver/idutil/id.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 CoreOS, Inc.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package idutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
tsLen = 5 * 8
|
||||||
|
cntLen = 2 * 8
|
||||||
|
suffixLen = tsLen + cntLen
|
||||||
|
)
|
||||||
|
|
||||||
|
// The initial id is in this format:
|
||||||
|
// High order byte is memberID, next 5 bytes are from timestamp,
|
||||||
|
// and low order 2 bytes are 0s.
|
||||||
|
// | prefix | suffix |
|
||||||
|
// | 1 byte | 5 bytes | 2 bytes |
|
||||||
|
// | memberID | timestamp | cnt |
|
||||||
|
//
|
||||||
|
// The timestamp 5 bytes is different when the machine is restart
|
||||||
|
// after 1 ms and before 35 years.
|
||||||
|
//
|
||||||
|
// It increases suffix to generate the next id.
|
||||||
|
// The count field may overflow to timestamp field, which is intentional.
|
||||||
|
// It helps to extend the event window to 2^56. This doesn't break that
|
||||||
|
// id generated after restart is unique because etcd throughput is <<
|
||||||
|
// 65536req/ms.
|
||||||
|
type Generator struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
// high order byte
|
||||||
|
prefix uint64
|
||||||
|
// low order 7 bytes
|
||||||
|
suffix uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGenerator(memberID uint8, now time.Time) *Generator {
|
||||||
|
prefix := uint64(memberID) << suffixLen
|
||||||
|
unixMilli := uint64(now.UnixNano()) / uint64(time.Millisecond/time.Nanosecond)
|
||||||
|
suffix := lowbit(unixMilli, tsLen) << cntLen
|
||||||
|
return &Generator{
|
||||||
|
prefix: prefix,
|
||||||
|
suffix: suffix,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next generates a id that is unique.
|
||||||
|
func (g *Generator) Next() uint64 {
|
||||||
|
g.mu.Lock()
|
||||||
|
defer g.mu.Unlock()
|
||||||
|
g.suffix++
|
||||||
|
id := g.prefix | lowbit(g.suffix, suffixLen)
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
|
func lowbit(x uint64, n uint) uint64 {
|
||||||
|
return x & (math.MaxUint64 >> (64 - n))
|
||||||
|
}
|
57
etcdserver/idutil/id_test.go
Normal file
57
etcdserver/idutil/id_test.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 CoreOS, Inc.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package idutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewGenerator(t *testing.T) {
|
||||||
|
g := NewGenerator(0x12, time.Unix(0, 0).Add(0x3456*time.Millisecond))
|
||||||
|
id := g.Next()
|
||||||
|
wid := uint64(0x1200000034560001)
|
||||||
|
if id != wid {
|
||||||
|
t.Errorf("id = %x, want %x", id, wid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewGeneratorUnique(t *testing.T) {
|
||||||
|
g := NewGenerator(0, time.Time{})
|
||||||
|
id := g.Next()
|
||||||
|
// different server generates different ID
|
||||||
|
g1 := NewGenerator(1, time.Time{})
|
||||||
|
if gid := g1.Next(); id == gid {
|
||||||
|
t.Errorf("generate the same id %x using different server ID", id)
|
||||||
|
}
|
||||||
|
// restarted server generates different ID
|
||||||
|
g2 := NewGenerator(0, time.Now())
|
||||||
|
if gid := g2.Next(); id == gid {
|
||||||
|
t.Errorf("generate the same id %x after restart", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNext(t *testing.T) {
|
||||||
|
g := NewGenerator(0x12, time.Unix(0, 0).Add(0x3456*time.Millisecond))
|
||||||
|
wid := uint64(0x1200000034560001)
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
id := g.Next()
|
||||||
|
if id != wid+uint64(i) {
|
||||||
|
t.Errorf("id = %x, want %x", id, wid+uint64(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/coreos/etcd/discovery"
|
"github.com/coreos/etcd/discovery"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/etcdserver/idutil"
|
||||||
"github.com/coreos/etcd/etcdserver/stats"
|
"github.com/coreos/etcd/etcdserver/stats"
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
@ -160,6 +161,8 @@ type EtcdServer struct {
|
|||||||
raftTerm uint64
|
raftTerm uint64
|
||||||
|
|
||||||
raftLead uint64
|
raftLead uint64
|
||||||
|
|
||||||
|
reqIDGen *idutil.Generator
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||||
@ -270,6 +273,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
|||||||
Ticker: time.Tick(100 * time.Millisecond),
|
Ticker: time.Tick(100 * time.Millisecond),
|
||||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||||
snapCount: cfg.SnapCount,
|
snapCount: cfg.SnapCount,
|
||||||
|
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||||
}
|
}
|
||||||
tr := &rafthttp.Transport{
|
tr := &rafthttp.Transport{
|
||||||
RoundTripper: cfg.Transport,
|
RoundTripper: cfg.Transport,
|
||||||
@ -474,9 +478,7 @@ func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
|
|||||||
// respective operation. Do will block until an action is performed or there is
|
// respective operation. Do will block until an action is performed or there is
|
||||||
// an error.
|
// an error.
|
||||||
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
||||||
if r.ID == 0 {
|
r.ID = s.reqIDGen.Next()
|
||||||
log.Panicf("request ID should never be 0")
|
|
||||||
}
|
|
||||||
if r.Method == "GET" && r.Quorum {
|
if r.Method == "GET" && r.Quorum {
|
||||||
r.Method = "QGET"
|
r.Method = "QGET"
|
||||||
}
|
}
|
||||||
@ -543,7 +545,6 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cc := raftpb.ConfChange{
|
cc := raftpb.ConfChange{
|
||||||
ID: GenID(),
|
|
||||||
Type: raftpb.ConfChangeAddNode,
|
Type: raftpb.ConfChangeAddNode,
|
||||||
NodeID: uint64(memb.ID),
|
NodeID: uint64(memb.ID),
|
||||||
Context: b,
|
Context: b,
|
||||||
@ -553,7 +554,6 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
|
|||||||
|
|
||||||
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
|
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
|
||||||
cc := raftpb.ConfChange{
|
cc := raftpb.ConfChange{
|
||||||
ID: GenID(),
|
|
||||||
Type: raftpb.ConfChangeRemoveNode,
|
Type: raftpb.ConfChangeRemoveNode,
|
||||||
NodeID: id,
|
NodeID: id,
|
||||||
}
|
}
|
||||||
@ -566,7 +566,6 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cc := raftpb.ConfChange{
|
cc := raftpb.ConfChange{
|
||||||
ID: GenID(),
|
|
||||||
Type: raftpb.ConfChangeUpdateNode,
|
Type: raftpb.ConfChangeUpdateNode,
|
||||||
NodeID: uint64(memb.ID),
|
NodeID: uint64(memb.ID),
|
||||||
Context: b,
|
Context: b,
|
||||||
@ -588,6 +587,7 @@ func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) }
|
|||||||
// then waits for it to be applied to the server. It
|
// then waits for it to be applied to the server. It
|
||||||
// will block until the change is performed or there is an error.
|
// will block until the change is performed or there is an error.
|
||||||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
||||||
|
cc.ID = s.reqIDGen.Next()
|
||||||
ch := s.w.Register(cc.ID)
|
ch := s.w.Register(cc.ID)
|
||||||
if err := s.node.ProposeConfChange(ctx, cc); err != nil {
|
if err := s.node.ProposeConfChange(ctx, cc); err != nil {
|
||||||
s.w.Trigger(cc.ID, nil)
|
s.w.Trigger(cc.ID, nil)
|
||||||
@ -617,7 +617,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
|
|||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
req := pb.Request{
|
req := pb.Request{
|
||||||
Method: "SYNC",
|
Method: "SYNC",
|
||||||
ID: GenID(),
|
ID: s.reqIDGen.Next(),
|
||||||
Time: time.Now().UnixNano(),
|
Time: time.Now().UnixNano(),
|
||||||
}
|
}
|
||||||
data := pbutil.MustMarshal(&req)
|
data := pbutil.MustMarshal(&req)
|
||||||
@ -641,7 +641,6 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
req := pb.Request{
|
req := pb.Request{
|
||||||
ID: GenID(),
|
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Path: MemberAttributesStorePath(s.id),
|
Path: MemberAttributesStorePath(s.id),
|
||||||
Val: string(b),
|
Val: string(b),
|
||||||
@ -987,15 +986,6 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string {
|
|||||||
return us
|
return us
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: move the function to /id pkg maybe?
|
|
||||||
// GenID generates a random id that is not equal to 0.
|
|
||||||
func GenID() (n uint64) {
|
|
||||||
for n == 0 {
|
|
||||||
n = uint64(rand.Int63())
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseCtxErr(err error) error {
|
func parseCtxErr(err error) error {
|
||||||
switch err {
|
switch err {
|
||||||
case context.Canceled:
|
case context.Canceled:
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||||
|
"github.com/coreos/etcd/etcdserver/idutil"
|
||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
@ -113,7 +114,10 @@ func TestDoLocalAction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
srv := &EtcdServer{store: st}
|
srv := &EtcdServer{
|
||||||
|
store: st,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
|
}
|
||||||
resp, err := srv.Do(context.TODO(), tt.req)
|
resp, err := srv.Do(context.TODO(), tt.req)
|
||||||
|
|
||||||
if err != tt.werr {
|
if err != tt.werr {
|
||||||
@ -153,7 +157,10 @@ func TestDoBadLocalAction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
st := &errStoreRecorder{err: storeErr}
|
st := &errStoreRecorder{err: storeErr}
|
||||||
srv := &EtcdServer{store: st}
|
srv := &EtcdServer{
|
||||||
|
store: st,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
|
}
|
||||||
resp, err := srv.Do(context.Background(), tt.req)
|
resp, err := srv.Do(context.Background(), tt.req)
|
||||||
|
|
||||||
if err != storeErr {
|
if err != storeErr {
|
||||||
@ -579,6 +586,7 @@ func testServer(t *testing.T, ns uint64) {
|
|||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
Ticker: tk.C,
|
Ticker: tk.C,
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(uint8(i), time.Time{}),
|
||||||
}
|
}
|
||||||
ss[i] = srv
|
ss[i] = srv
|
||||||
}
|
}
|
||||||
@ -591,7 +599,6 @@ func testServer(t *testing.T, ns uint64) {
|
|||||||
for i := 1; i <= 10; i++ {
|
for i := 1; i <= 10; i++ {
|
||||||
r := pb.Request{
|
r := pb.Request{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
ID: uint64(i),
|
|
||||||
Path: "/foo",
|
Path: "/foo",
|
||||||
Val: "bar",
|
Val: "bar",
|
||||||
}
|
}
|
||||||
@ -654,6 +661,7 @@ func TestDoProposal(t *testing.T) {
|
|||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
Ticker: tk,
|
Ticker: tk,
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
srv.start()
|
srv.start()
|
||||||
resp, err := srv.Do(ctx, tt)
|
resp, err := srv.Do(ctx, tt)
|
||||||
@ -686,12 +694,13 @@ func TestDoProposalCancelled(t *testing.T) {
|
|||||||
raftStorage: s,
|
raftStorage: s,
|
||||||
store: st,
|
store: st,
|
||||||
w: wait,
|
w: wait,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
var err error
|
var err error
|
||||||
go func() {
|
go func() {
|
||||||
_, err = srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
_, err = srv.Do(ctx, pb.Request{Method: "PUT"})
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
cancel()
|
cancel()
|
||||||
@ -704,7 +713,7 @@ func TestDoProposalCancelled(t *testing.T) {
|
|||||||
if err != ErrCanceled {
|
if err != ErrCanceled {
|
||||||
t.Fatalf("err = %v, want %v", err, ErrCanceled)
|
t.Fatalf("err = %v, want %v", err, ErrCanceled)
|
||||||
}
|
}
|
||||||
w := []action{action{name: "Register1"}, action{name: "Trigger1"}}
|
w := []action{action{name: "Register"}, action{name: "Trigger"}}
|
||||||
if !reflect.DeepEqual(wait.action, w) {
|
if !reflect.DeepEqual(wait.action, w) {
|
||||||
t.Errorf("wait.action = %+v, want %+v", wait.action, w)
|
t.Errorf("wait.action = %+v, want %+v", wait.action, w)
|
||||||
}
|
}
|
||||||
@ -713,8 +722,9 @@ func TestDoProposalCancelled(t *testing.T) {
|
|||||||
func TestDoProposalTimeout(t *testing.T) {
|
func TestDoProposalTimeout(t *testing.T) {
|
||||||
ctx, _ := context.WithTimeout(context.Background(), 0)
|
ctx, _ := context.WithTimeout(context.Background(), 0)
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
node: &nodeRecorder{},
|
node: &nodeRecorder{},
|
||||||
w: &waitRecorder{},
|
w: &waitRecorder{},
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
_, err := srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
_, err := srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
||||||
if err != ErrTimeout {
|
if err != ErrTimeout {
|
||||||
@ -743,6 +753,7 @@ func TestDoProposalStopped(t *testing.T) {
|
|||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
Ticker: tk,
|
Ticker: tk,
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
srv.start()
|
srv.start()
|
||||||
|
|
||||||
@ -768,7 +779,8 @@ func TestDoProposalStopped(t *testing.T) {
|
|||||||
func TestSync(t *testing.T) {
|
func TestSync(t *testing.T) {
|
||||||
n := &nodeProposeDataRecorder{}
|
n := &nodeProposeDataRecorder{}
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
node: n,
|
node: n,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@ -802,7 +814,8 @@ func TestSync(t *testing.T) {
|
|||||||
func TestSyncTimeout(t *testing.T) {
|
func TestSyncTimeout(t *testing.T) {
|
||||||
n := &nodeProposalBlockerRecorder{}
|
n := &nodeProposalBlockerRecorder{}
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
node: n,
|
node: n,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@ -854,6 +867,7 @@ func TestSyncTrigger(t *testing.T) {
|
|||||||
transport: &nopTransporter{},
|
transport: &nopTransporter{},
|
||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
SyncTicker: st,
|
SyncTicker: st,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
srv.start()
|
srv.start()
|
||||||
// trigger the server to become a leader and accept sync requests
|
// trigger the server to become a leader and accept sync requests
|
||||||
@ -943,6 +957,7 @@ func TestTriggerSnap(t *testing.T) {
|
|||||||
raftStorage: s,
|
raftStorage: s,
|
||||||
snapCount: 10,
|
snapCount: 10,
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.start()
|
srv.start()
|
||||||
@ -1089,6 +1104,7 @@ func TestAddMember(t *testing.T) {
|
|||||||
transport: &nopTransporter{},
|
transport: &nopTransporter{},
|
||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
s.start()
|
s.start()
|
||||||
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
|
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
|
||||||
@ -1124,6 +1140,7 @@ func TestRemoveMember(t *testing.T) {
|
|||||||
transport: &nopTransporter{},
|
transport: &nopTransporter{},
|
||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
s.start()
|
s.start()
|
||||||
err := s.RemoveMember(context.TODO(), 1234)
|
err := s.RemoveMember(context.TODO(), 1234)
|
||||||
@ -1158,6 +1175,7 @@ func TestUpdateMember(t *testing.T) {
|
|||||||
transport: &nopTransporter{},
|
transport: &nopTransporter{},
|
||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
Cluster: cl,
|
Cluster: cl,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
s.start()
|
s.start()
|
||||||
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
|
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
|
||||||
@ -1193,6 +1211,7 @@ func TestPublish(t *testing.T) {
|
|||||||
Cluster: &Cluster{},
|
Cluster: &Cluster{},
|
||||||
node: n,
|
node: n,
|
||||||
w: w,
|
w: w,
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
srv.publish(time.Hour)
|
srv.publish(time.Hour)
|
||||||
|
|
||||||
@ -1229,6 +1248,7 @@ func TestPublishStopped(t *testing.T) {
|
|||||||
w: &waitRecorder{},
|
w: &waitRecorder{},
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
close(srv.done)
|
close(srv.done)
|
||||||
srv.publish(time.Hour)
|
srv.publish(time.Hour)
|
||||||
@ -1238,9 +1258,10 @@ func TestPublishStopped(t *testing.T) {
|
|||||||
func TestPublishRetry(t *testing.T) {
|
func TestPublishRetry(t *testing.T) {
|
||||||
n := &nodeRecorder{}
|
n := &nodeRecorder{}
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
node: n,
|
node: n,
|
||||||
w: &waitRecorder{},
|
w: &waitRecorder{},
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
}
|
}
|
||||||
time.AfterFunc(500*time.Microsecond, func() { close(srv.done) })
|
time.AfterFunc(500*time.Microsecond, func() { close(srv.done) })
|
||||||
srv.publish(10 * time.Nanosecond)
|
srv.publish(10 * time.Nanosecond)
|
||||||
@ -1338,19 +1359,6 @@ func TestGetBool(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGenID(t *testing.T) {
|
|
||||||
// Sanity check that the GenID function has been seeded appropriately
|
|
||||||
// (math/rand is seeded with 1 by default)
|
|
||||||
r := rand.NewSource(int64(1))
|
|
||||||
var n uint64
|
|
||||||
for n == 0 {
|
|
||||||
n = uint64(r.Int63())
|
|
||||||
}
|
|
||||||
if n == GenID() {
|
|
||||||
t.Fatalf("GenID's rand seeded with 1!")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type action struct {
|
type action struct {
|
||||||
name string
|
name string
|
||||||
params []interface{}
|
params []interface{}
|
||||||
@ -1475,11 +1483,11 @@ type waitRecorder struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *waitRecorder) Register(id uint64) <-chan interface{} {
|
func (w *waitRecorder) Register(id uint64) <-chan interface{} {
|
||||||
w.action = append(w.action, action{name: fmt.Sprint("Register", id)})
|
w.action = append(w.action, action{name: "Register"})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (w *waitRecorder) Trigger(id uint64, x interface{}) {
|
func (w *waitRecorder) Trigger(id uint64, x interface{}) {
|
||||||
w.action = append(w.action, action{name: fmt.Sprint("Trigger", id)})
|
w.action = append(w.action, action{name: "Trigger"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func boolp(b bool) *bool { return &b }
|
func boolp(b bool) *bool { return &b }
|
||||||
|
2
test
2
test
@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
|
|||||||
source ./build
|
source ./build
|
||||||
|
|
||||||
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
|
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
|
||||||
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/netutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
|
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb etcdserver/idutil integration migrate pkg/fileutil pkg/flags pkg/ioutils pkg/netutil pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal"
|
||||||
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
|
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
|
||||||
|
|
||||||
# user has not provided PKG override
|
# user has not provided PKG override
|
||||||
|
Reference in New Issue
Block a user