Compare commits

...

27 Commits

Author SHA1 Message Date
cc198e22d3 version: bump to v3.0.17 2017-01-20 11:02:34 -08:00
fcf813427b lease/leasehttp: pass min TTL in TestRenewHTTP 2017-01-20 11:02:00 -08:00
518efab61c etcdctlv3: snapshot restore works with lease key 2017-01-20 10:36:02 -08:00
42f9a5ef74 etcdserver, lease: tie lease min ttl to election timeout 2017-01-20 10:35:46 -08:00
21509633ba version: bump to v3.0.16+git 2017-01-20 10:19:14 -08:00
a23109a0c6 version: bump to v3.0.16 2017-01-13 11:29:12 -08:00
219a4e9ad5 clientv3: don't reset keepalive stream on grant failure
Was triggering cancelation errors on outstanding KeepAlives if Grant
had to retry.
2017-01-13 11:28:17 -08:00
3d050630f4 v3api, rpctypes: add ErrTimeoutDueToConnectionLost
Lack of GRPC code was causing this to look like a halting error to the client.
2017-01-13 11:27:56 -08:00
9c66ed2798 clientv3: don't reset stream on keepaliveonce or revoke failure
Would cause the keepalive loop to cancel out.

Fixes #7082
2017-01-13 10:42:01 -08:00
a9e2d3d4d3 *: remove 'tools/etcd-top' to drop pcap.h 2016-12-07 10:34:34 -08:00
41e329cd35 *: drop breaking-changes from master branch 2016-12-07 10:34:28 -08:00
3a8b524d36 travis, test: use Go 1.6.4, skip 'gosimple' 2016-12-07 10:28:53 -08:00
11668f53db integration: use RequireLeader for TestV3LeaseFailover
Giving Renew() the default request timeout causes TestV3LeaseFailover
to miss its timing constraints. Since it only needs to wait until the
leader recognizes the leader is lost, use RequireLeader to cancel the
keepalive stream before the request times out.
2016-12-07 10:06:28 -08:00
7ceca7e046 clientv3/integration: test lease keepalive works following quorum loss 2016-12-07 10:06:28 -08:00
395bd2313c v3rpc, etcdserver, leasehttp: ctxize Renew with request timeout
Would retry a few times before returning a not primary error that
the client should never see. Instead, use proper timeouts and
then return a request timeout error on failure.

Fixes #6922
2016-12-07 10:06:19 -08:00
b357569bc6 version: bump to v3.0.15+git 2016-11-11 11:17:31 -08:00
fc00305a2e version: bump to v3.0.15 2016-11-10 13:12:43 -08:00
f322fe7f0d clientv3, ctlv3: document range end requirement 2016-11-10 13:10:18 -08:00
049fcd30ea integration: test wrong watcher range 2016-11-10 13:09:13 -08:00
1b702e79db mvcc: return -1 for wrong watcher range key >= end
Fix https://github.com/coreos/etcd/issues/6819.
2016-11-10 13:08:51 -08:00
b87190d9dc integration: test canceling a watcher on disconnected stream 2016-11-10 13:07:24 -08:00
83b493f945 clientv3: let watchers cancel when reconnecting 2016-11-10 13:06:47 -08:00
9b69cbd989 version: bump to v3.0.14+git 2016-11-04 13:06:36 -07:00
8a37349097 version: bump to v3.0.14 2016-11-04 10:54:14 -07:00
9a0e4dfe4f ctlv3: fix migration 2016-11-03 09:47:41 -07:00
f60469af16 ctlv3: Add a no-ttl flag to etcdctl migrate to discard keys on transform. 2016-11-03 09:47:39 -07:00
932370d8ca version: bump to v3.0.13+git 2016-10-24 11:22:50 -07:00
36 changed files with 465 additions and 1474 deletions

View File

@ -4,8 +4,7 @@ go_import_path: github.com/coreos/etcd
sudo: false
go:
- 1.6
- tip
- 1.6.4
env:
global:

View File

@ -455,3 +455,46 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
clus.Members[0].Restart(t)
}
// TestLeaseRenewLostQuorum ensures keepalives work after losing quorum
// for a while.
func TestLeaseRenewLostQuorum(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cli := clus.Client(0)
r, err := cli.Grant(context.TODO(), 4)
if err != nil {
t.Fatal(err)
}
kctx, kcancel := context.WithCancel(context.Background())
defer kcancel()
ka, err := cli.KeepAlive(kctx, r.ID)
if err != nil {
t.Fatal(err)
}
// consume first keepalive so next message sends when cluster is down
<-ka
// force keepalive stream message to timeout
clus.Members[1].Stop(t)
clus.Members[2].Stop(t)
// Use TTL-1 since the client closes the keepalive channel if no
// keepalive arrives before the lease deadline.
// The cluster has 1 second to recover and reply to the keepalive.
time.Sleep(time.Duration(r.TTL-1) * time.Second)
clus.Members[1].Restart(t)
clus.Members[2].Restart(t)
select {
case _, ok := <-ka:
if !ok {
t.Fatalf("keepalive closed")
}
case <-time.After(time.Duration(r.TTL) * time.Second):
t.Fatalf("timed out waiting for keepalive")
}
}

View File

@ -782,3 +782,22 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
<-donec
clus.TakeClient(0)
}
// TestWatchCancelDisconnected ensures canceling a watcher works when
// its grpc stream is disconnected / reconnecting.
func TestWatchCancelDisconnected(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.Client(0)
ctx, cancel := context.WithCancel(context.Background())
// add more watches than can be resumed before the cancel
wch := cli.Watch(ctx, "abc")
clus.Members[0].Stop(t)
cancel()
select {
case <-wch:
case <-time.After(time.Second):
t.Fatal("took too long to cancel disconnected watcher")
}
}

View File

@ -143,9 +143,6 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err
if isHaltErr(cctx, err) {
return nil, toErr(ctx, err)
}
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
}
@ -164,9 +161,6 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse,
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
}
@ -213,10 +207,6 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
if isHaltErr(ctx, err) {
return nil, toErr(ctx, err)
}
if nerr := l.newStream(); nerr != nil {
return nil, nerr
}
}
}
@ -312,10 +302,23 @@ func (l *lessor) recvKeepAliveLoop() {
// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
if err := l.newStream(); err != nil {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
if err = toErr(sctx, err); err != nil {
cancel()
return nil, err
}
stream := l.getKeepAliveStream()
l.mu.Lock()
defer l.mu.Unlock()
if l.stream != nil && l.streamCancel != nil {
l.stream.CloseSend()
l.streamCancel()
}
l.streamCancel = cancel
l.stream = stream
go l.sendKeepAliveLoop(stream)
return stream, nil
}
@ -411,32 +414,6 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
}
}
func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
l.mu.Lock()
defer l.mu.Unlock()
return l.stream
}
func (l *lessor) newStream() error {
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false))
if err != nil {
cancel()
return toErr(sctx, err)
}
l.mu.Lock()
defer l.mu.Unlock()
if l.stream != nil && l.streamCancel != nil {
l.stream.CloseSend()
l.streamCancel()
}
l.streamCancel = cancel
l.stream = stream
return nil
}
func (ka *keepAlive) Close() {
close(ka.donec)
for _, ch := range ka.chs {

View File

@ -215,14 +215,15 @@ func WithPrefix() OpOption {
}
}
// WithRange specifies the range of 'Get' or 'Delete' requests.
// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
// For example, 'Get' requests with 'WithRange(end)' returns
// the keys in the range [key, end).
// endKey must be lexicographically greater than start key.
func WithRange(endKey string) OpOption {
return func(op *Op) { op.end = []byte(endKey) }
}
// WithFromKey specifies the range of 'Get' or 'Delete' requests
// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
// to be equal or greater than the key in the argument.
func WithFromKey() OpOption { return WithRange("\x00") }

View File

@ -125,8 +125,6 @@ type watchGrpcStream struct {
reqc chan *watchRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// stopc is sent to the main goroutine to stop all processing
stopc chan struct{}
// donec closes to broadcast shutdown
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
@ -204,7 +202,6 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
@ -300,7 +297,7 @@ func (w *watcher) Close() (err error) {
}
func (w *watchGrpcStream) Close() (err error) {
close(w.stopc)
w.cancel()
<-w.donec
select {
case err = <-w.errc:
@ -347,7 +344,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
} else {
} else if ws.outc != nil {
close(ws.outc)
}
if ws.id != -1 {
@ -472,7 +469,7 @@ func (w *watchGrpcStream) run() {
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
case <-w.stopc:
case <-w.ctx.Done():
return
case ws := <-w.closingc:
w.closeSubstream(ws)
@ -597,6 +594,8 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
ws.initReq.rev = nextRev
case <-w.ctx.Done():
return
case <-ws.initReq.ctx.Done():
return
case <-resumec:
@ -608,34 +607,78 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
}
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
// connect to grpc stream
// mark all substreams as resuming
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
// strip out nils, if any
var resuming []*watcherStream
for _, ws := range w.resuming {
if ws != nil {
resuming = append(resuming, ws)
}
}
w.resuming = resuming
w.substreams = make(map[int64]*watcherStream)
// connect to grpc stream while accepting watcher cancelation
stopc := make(chan struct{})
donec := w.waitCancelSubstreams(stopc)
wc, err := w.openWatchClient()
close(stopc)
<-donec
// serve all non-closing streams, even if there's a client error
// so that the teardown path can shutdown the streams as expected.
for _, ws := range w.resuming {
if ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
if err != nil {
return nil, v3rpc.Error(err)
}
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
}
w.substreams = make(map[int64]*watcherStream)
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
}
func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
var wg sync.WaitGroup
wg.Add(len(w.resuming))
donec := make(chan struct{})
for i := range w.resuming {
go func(ws *watcherStream) {
defer wg.Done()
if ws.closing {
return
}
select {
case <-ws.initReq.ctx.Done():
// closed ws will be removed from resuming
ws.closing = true
close(ws.outc)
ws.outc = nil
go func() { w.closingc <- ws }()
case <-stopc:
}
}(w.resuming[i])
}
go func() {
defer close(donec)
wg.Wait()
}()
return donec
}
// joinSubstream waits for all substream goroutines to complete
func (w *watchGrpcStream) joinSubstreams() {
for _, ws := range w.substreams {
@ -652,9 +695,9 @@ func (w *watchGrpcStream) joinSubstreams() {
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
for {
select {
case <-w.stopc:
case <-w.ctx.Done():
if err == nil {
return nil, context.Canceled
return nil, w.ctx.Err()
}
return nil, err
default:

View File

@ -11,10 +11,6 @@
"Comment": "null-5",
"Rev": "'75cd24fc2f2c2a2088577d12123ddee5f54e0675'"
},
{
"ImportPath": "github.com/akrennmair/gopcap",
"Rev": "00e11033259acb75598ba416495bb708d864a010"
},
{
"ImportPath": "github.com/beorn7/perks/quantile",
"Rev": "b965b613227fddccbfffe13eae360ed3fa822f8d"

View File

@ -1,5 +0,0 @@
#*
*~
/tools/pass/pass
/tools/pcaptest/pcaptest
/tools/tcpdump/tcpdump

View File

@ -1,27 +0,0 @@
Copyright (c) 2009-2011 Andreas Krennmair. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Andreas Krennmair nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,11 +0,0 @@
# PCAP
This is a simple wrapper around libpcap for Go. Originally written by Andreas
Krennmair <ak@synflood.at> and only minorly touched up by Mark Smith <mark@qq.is>.
Please see the included pcaptest.go and tcpdump.go programs for instructions on
how to use this library.
Miek Gieben <miek@miek.nl> has created a more Go-like package and replaced functionality
with standard functions from the standard library. The package has also been renamed to
pcap.

View File

@ -1,527 +0,0 @@
package pcap
import (
"encoding/binary"
"fmt"
"net"
"reflect"
"strings"
)
const (
TYPE_IP = 0x0800
TYPE_ARP = 0x0806
TYPE_IP6 = 0x86DD
TYPE_VLAN = 0x8100
IP_ICMP = 1
IP_INIP = 4
IP_TCP = 6
IP_UDP = 17
)
const (
ERRBUF_SIZE = 256
// According to pcap-linktype(7).
LINKTYPE_NULL = 0
LINKTYPE_ETHERNET = 1
LINKTYPE_TOKEN_RING = 6
LINKTYPE_ARCNET = 7
LINKTYPE_SLIP = 8
LINKTYPE_PPP = 9
LINKTYPE_FDDI = 10
LINKTYPE_ATM_RFC1483 = 100
LINKTYPE_RAW = 101
LINKTYPE_PPP_HDLC = 50
LINKTYPE_PPP_ETHER = 51
LINKTYPE_C_HDLC = 104
LINKTYPE_IEEE802_11 = 105
LINKTYPE_FRELAY = 107
LINKTYPE_LOOP = 108
LINKTYPE_LINUX_SLL = 113
LINKTYPE_LTALK = 104
LINKTYPE_PFLOG = 117
LINKTYPE_PRISM_HEADER = 119
LINKTYPE_IP_OVER_FC = 122
LINKTYPE_SUNATM = 123
LINKTYPE_IEEE802_11_RADIO = 127
LINKTYPE_ARCNET_LINUX = 129
LINKTYPE_LINUX_IRDA = 144
LINKTYPE_LINUX_LAPD = 177
)
type addrHdr interface {
SrcAddr() string
DestAddr() string
Len() int
}
type addrStringer interface {
String(addr addrHdr) string
}
func decodemac(pkt []byte) uint64 {
mac := uint64(0)
for i := uint(0); i < 6; i++ {
mac = (mac << 8) + uint64(pkt[i])
}
return mac
}
// Decode decodes the headers of a Packet.
func (p *Packet) Decode() {
if len(p.Data) <= 14 {
return
}
p.Type = int(binary.BigEndian.Uint16(p.Data[12:14]))
p.DestMac = decodemac(p.Data[0:6])
p.SrcMac = decodemac(p.Data[6:12])
if len(p.Data) >= 15 {
p.Payload = p.Data[14:]
}
switch p.Type {
case TYPE_IP:
p.decodeIp()
case TYPE_IP6:
p.decodeIp6()
case TYPE_ARP:
p.decodeArp()
case TYPE_VLAN:
p.decodeVlan()
}
}
func (p *Packet) headerString(headers []interface{}) string {
// If there's just one header, return that.
if len(headers) == 1 {
if hdr, ok := headers[0].(fmt.Stringer); ok {
return hdr.String()
}
}
// If there are two headers (IPv4/IPv6 -> TCP/UDP/IP..)
if len(headers) == 2 {
// Commonly the first header is an address.
if addr, ok := p.Headers[0].(addrHdr); ok {
if hdr, ok := p.Headers[1].(addrStringer); ok {
return fmt.Sprintf("%s %s", p.Time, hdr.String(addr))
}
}
}
// For IP in IP, we do a recursive call.
if len(headers) >= 2 {
if addr, ok := headers[0].(addrHdr); ok {
if _, ok := headers[1].(addrHdr); ok {
return fmt.Sprintf("%s > %s IP in IP: ",
addr.SrcAddr(), addr.DestAddr(), p.headerString(headers[1:]))
}
}
}
var typeNames []string
for _, hdr := range headers {
typeNames = append(typeNames, reflect.TypeOf(hdr).String())
}
return fmt.Sprintf("unknown [%s]", strings.Join(typeNames, ","))
}
// String prints a one-line representation of the packet header.
// The output is suitable for use in a tcpdump program.
func (p *Packet) String() string {
// If there are no headers, print "unsupported protocol".
if len(p.Headers) == 0 {
return fmt.Sprintf("%s unsupported protocol %d", p.Time, int(p.Type))
}
return fmt.Sprintf("%s %s", p.Time, p.headerString(p.Headers))
}
// Arphdr is a ARP packet header.
type Arphdr struct {
Addrtype uint16
Protocol uint16
HwAddressSize uint8
ProtAddressSize uint8
Operation uint16
SourceHwAddress []byte
SourceProtAddress []byte
DestHwAddress []byte
DestProtAddress []byte
}
func (arp *Arphdr) String() (s string) {
switch arp.Operation {
case 1:
s = "ARP request"
case 2:
s = "ARP Reply"
}
if arp.Addrtype == LINKTYPE_ETHERNET && arp.Protocol == TYPE_IP {
s = fmt.Sprintf("%012x (%s) > %012x (%s)",
decodemac(arp.SourceHwAddress), arp.SourceProtAddress,
decodemac(arp.DestHwAddress), arp.DestProtAddress)
} else {
s = fmt.Sprintf("addrtype = %d protocol = %d", arp.Addrtype, arp.Protocol)
}
return
}
func (p *Packet) decodeArp() {
if len(p.Payload) < 8 {
return
}
pkt := p.Payload
arp := new(Arphdr)
arp.Addrtype = binary.BigEndian.Uint16(pkt[0:2])
arp.Protocol = binary.BigEndian.Uint16(pkt[2:4])
arp.HwAddressSize = pkt[4]
arp.ProtAddressSize = pkt[5]
arp.Operation = binary.BigEndian.Uint16(pkt[6:8])
if len(pkt) < int(8+2*arp.HwAddressSize+2*arp.ProtAddressSize) {
return
}
arp.SourceHwAddress = pkt[8 : 8+arp.HwAddressSize]
arp.SourceProtAddress = pkt[8+arp.HwAddressSize : 8+arp.HwAddressSize+arp.ProtAddressSize]
arp.DestHwAddress = pkt[8+arp.HwAddressSize+arp.ProtAddressSize : 8+2*arp.HwAddressSize+arp.ProtAddressSize]
arp.DestProtAddress = pkt[8+2*arp.HwAddressSize+arp.ProtAddressSize : 8+2*arp.HwAddressSize+2*arp.ProtAddressSize]
p.Headers = append(p.Headers, arp)
if len(pkt) >= int(8+2*arp.HwAddressSize+2*arp.ProtAddressSize) {
p.Payload = p.Payload[8+2*arp.HwAddressSize+2*arp.ProtAddressSize:]
}
}
// IPadr is the header of an IP packet.
type Iphdr struct {
Version uint8
Ihl uint8
Tos uint8
Length uint16
Id uint16
Flags uint8
FragOffset uint16
Ttl uint8
Protocol uint8
Checksum uint16
SrcIp []byte
DestIp []byte
}
func (p *Packet) decodeIp() {
if len(p.Payload) < 20 {
return
}
pkt := p.Payload
ip := new(Iphdr)
ip.Version = uint8(pkt[0]) >> 4
ip.Ihl = uint8(pkt[0]) & 0x0F
ip.Tos = pkt[1]
ip.Length = binary.BigEndian.Uint16(pkt[2:4])
ip.Id = binary.BigEndian.Uint16(pkt[4:6])
flagsfrags := binary.BigEndian.Uint16(pkt[6:8])
ip.Flags = uint8(flagsfrags >> 13)
ip.FragOffset = flagsfrags & 0x1FFF
ip.Ttl = pkt[8]
ip.Protocol = pkt[9]
ip.Checksum = binary.BigEndian.Uint16(pkt[10:12])
ip.SrcIp = pkt[12:16]
ip.DestIp = pkt[16:20]
pEnd := int(ip.Length)
if pEnd > len(pkt) {
pEnd = len(pkt)
}
if len(pkt) >= pEnd && int(ip.Ihl*4) < pEnd {
p.Payload = pkt[ip.Ihl*4 : pEnd]
} else {
p.Payload = []byte{}
}
p.Headers = append(p.Headers, ip)
p.IP = ip
switch ip.Protocol {
case IP_TCP:
p.decodeTcp()
case IP_UDP:
p.decodeUdp()
case IP_ICMP:
p.decodeIcmp()
case IP_INIP:
p.decodeIp()
}
}
func (ip *Iphdr) SrcAddr() string { return net.IP(ip.SrcIp).String() }
func (ip *Iphdr) DestAddr() string { return net.IP(ip.DestIp).String() }
func (ip *Iphdr) Len() int { return int(ip.Length) }
type Vlanhdr struct {
Priority byte
DropEligible bool
VlanIdentifier int
Type int // Not actually part of the vlan header, but the type of the actual packet
}
func (v *Vlanhdr) String() {
fmt.Sprintf("VLAN Priority:%d Drop:%v Tag:%d", v.Priority, v.DropEligible, v.VlanIdentifier)
}
func (p *Packet) decodeVlan() {
pkt := p.Payload
vlan := new(Vlanhdr)
if len(pkt) < 4 {
return
}
vlan.Priority = (pkt[2] & 0xE0) >> 13
vlan.DropEligible = pkt[2]&0x10 != 0
vlan.VlanIdentifier = int(binary.BigEndian.Uint16(pkt[:2])) & 0x0FFF
vlan.Type = int(binary.BigEndian.Uint16(p.Payload[2:4]))
p.Headers = append(p.Headers, vlan)
if len(pkt) >= 5 {
p.Payload = p.Payload[4:]
}
switch vlan.Type {
case TYPE_IP:
p.decodeIp()
case TYPE_IP6:
p.decodeIp6()
case TYPE_ARP:
p.decodeArp()
}
}
type Tcphdr struct {
SrcPort uint16
DestPort uint16
Seq uint32
Ack uint32
DataOffset uint8
Flags uint16
Window uint16
Checksum uint16
Urgent uint16
Data []byte
}
const (
TCP_FIN = 1 << iota
TCP_SYN
TCP_RST
TCP_PSH
TCP_ACK
TCP_URG
TCP_ECE
TCP_CWR
TCP_NS
)
func (p *Packet) decodeTcp() {
if len(p.Payload) < 20 {
return
}
pkt := p.Payload
tcp := new(Tcphdr)
tcp.SrcPort = binary.BigEndian.Uint16(pkt[0:2])
tcp.DestPort = binary.BigEndian.Uint16(pkt[2:4])
tcp.Seq = binary.BigEndian.Uint32(pkt[4:8])
tcp.Ack = binary.BigEndian.Uint32(pkt[8:12])
tcp.DataOffset = (pkt[12] & 0xF0) >> 4
tcp.Flags = binary.BigEndian.Uint16(pkt[12:14]) & 0x1FF
tcp.Window = binary.BigEndian.Uint16(pkt[14:16])
tcp.Checksum = binary.BigEndian.Uint16(pkt[16:18])
tcp.Urgent = binary.BigEndian.Uint16(pkt[18:20])
if len(pkt) >= int(tcp.DataOffset*4) {
p.Payload = pkt[tcp.DataOffset*4:]
}
p.Headers = append(p.Headers, tcp)
p.TCP = tcp
}
func (tcp *Tcphdr) String(hdr addrHdr) string {
return fmt.Sprintf("TCP %s:%d > %s:%d %s SEQ=%d ACK=%d LEN=%d",
hdr.SrcAddr(), int(tcp.SrcPort), hdr.DestAddr(), int(tcp.DestPort),
tcp.FlagsString(), int64(tcp.Seq), int64(tcp.Ack), hdr.Len())
}
func (tcp *Tcphdr) FlagsString() string {
var sflags []string
if 0 != (tcp.Flags & TCP_SYN) {
sflags = append(sflags, "syn")
}
if 0 != (tcp.Flags & TCP_FIN) {
sflags = append(sflags, "fin")
}
if 0 != (tcp.Flags & TCP_ACK) {
sflags = append(sflags, "ack")
}
if 0 != (tcp.Flags & TCP_PSH) {
sflags = append(sflags, "psh")
}
if 0 != (tcp.Flags & TCP_RST) {
sflags = append(sflags, "rst")
}
if 0 != (tcp.Flags & TCP_URG) {
sflags = append(sflags, "urg")
}
if 0 != (tcp.Flags & TCP_NS) {
sflags = append(sflags, "ns")
}
if 0 != (tcp.Flags & TCP_CWR) {
sflags = append(sflags, "cwr")
}
if 0 != (tcp.Flags & TCP_ECE) {
sflags = append(sflags, "ece")
}
return fmt.Sprintf("[%s]", strings.Join(sflags, " "))
}
type Udphdr struct {
SrcPort uint16
DestPort uint16
Length uint16
Checksum uint16
}
func (p *Packet) decodeUdp() {
if len(p.Payload) < 8 {
return
}
pkt := p.Payload
udp := new(Udphdr)
udp.SrcPort = binary.BigEndian.Uint16(pkt[0:2])
udp.DestPort = binary.BigEndian.Uint16(pkt[2:4])
udp.Length = binary.BigEndian.Uint16(pkt[4:6])
udp.Checksum = binary.BigEndian.Uint16(pkt[6:8])
p.Headers = append(p.Headers, udp)
p.UDP = udp
if len(p.Payload) >= 8 {
p.Payload = pkt[8:]
}
}
func (udp *Udphdr) String(hdr addrHdr) string {
return fmt.Sprintf("UDP %s:%d > %s:%d LEN=%d CHKSUM=%d",
hdr.SrcAddr(), int(udp.SrcPort), hdr.DestAddr(), int(udp.DestPort),
int(udp.Length), int(udp.Checksum))
}
type Icmphdr struct {
Type uint8
Code uint8
Checksum uint16
Id uint16
Seq uint16
Data []byte
}
func (p *Packet) decodeIcmp() *Icmphdr {
if len(p.Payload) < 8 {
return nil
}
pkt := p.Payload
icmp := new(Icmphdr)
icmp.Type = pkt[0]
icmp.Code = pkt[1]
icmp.Checksum = binary.BigEndian.Uint16(pkt[2:4])
icmp.Id = binary.BigEndian.Uint16(pkt[4:6])
icmp.Seq = binary.BigEndian.Uint16(pkt[6:8])
p.Payload = pkt[8:]
p.Headers = append(p.Headers, icmp)
return icmp
}
func (icmp *Icmphdr) String(hdr addrHdr) string {
return fmt.Sprintf("ICMP %s > %s Type = %d Code = %d ",
hdr.SrcAddr(), hdr.DestAddr(), icmp.Type, icmp.Code)
}
func (icmp *Icmphdr) TypeString() (result string) {
switch icmp.Type {
case 0:
result = fmt.Sprintf("Echo reply seq=%d", icmp.Seq)
case 3:
switch icmp.Code {
case 0:
result = "Network unreachable"
case 1:
result = "Host unreachable"
case 2:
result = "Protocol unreachable"
case 3:
result = "Port unreachable"
default:
result = "Destination unreachable"
}
case 8:
result = fmt.Sprintf("Echo request seq=%d", icmp.Seq)
case 30:
result = "Traceroute"
}
return
}
type Ip6hdr struct {
// http://www.networksorcery.com/enp/protocol/ipv6.htm
Version uint8 // 4 bits
TrafficClass uint8 // 8 bits
FlowLabel uint32 // 20 bits
Length uint16 // 16 bits
NextHeader uint8 // 8 bits, same as Protocol in Iphdr
HopLimit uint8 // 8 bits
SrcIp []byte // 16 bytes
DestIp []byte // 16 bytes
}
func (p *Packet) decodeIp6() {
if len(p.Payload) < 40 {
return
}
pkt := p.Payload
ip6 := new(Ip6hdr)
ip6.Version = uint8(pkt[0]) >> 4
ip6.TrafficClass = uint8((binary.BigEndian.Uint16(pkt[0:2]) >> 4) & 0x00FF)
ip6.FlowLabel = binary.BigEndian.Uint32(pkt[0:4]) & 0x000FFFFF
ip6.Length = binary.BigEndian.Uint16(pkt[4:6])
ip6.NextHeader = pkt[6]
ip6.HopLimit = pkt[7]
ip6.SrcIp = pkt[8:24]
ip6.DestIp = pkt[24:40]
if len(p.Payload) >= 40 {
p.Payload = pkt[40:]
}
p.Headers = append(p.Headers, ip6)
switch ip6.NextHeader {
case IP_TCP:
p.decodeTcp()
case IP_UDP:
p.decodeUdp()
case IP_ICMP:
p.decodeIcmp()
case IP_INIP:
p.decodeIp()
}
}
func (ip6 *Ip6hdr) SrcAddr() string { return net.IP(ip6.SrcIp).String() }
func (ip6 *Ip6hdr) DestAddr() string { return net.IP(ip6.DestIp).String() }
func (ip6 *Ip6hdr) Len() int { return int(ip6.Length) }

View File

@ -1,206 +0,0 @@
package pcap
import (
"encoding/binary"
"fmt"
"io"
"time"
)
// FileHeader is the parsed header of a pcap file.
// http://wiki.wireshark.org/Development/LibpcapFileFormat
type FileHeader struct {
MagicNumber uint32
VersionMajor uint16
VersionMinor uint16
TimeZone int32
SigFigs uint32
SnapLen uint32
Network uint32
}
type PacketTime struct {
Sec int32
Usec int32
}
// Convert the PacketTime to a go Time struct.
func (p *PacketTime) Time() time.Time {
return time.Unix(int64(p.Sec), int64(p.Usec)*1000)
}
// Packet is a single packet parsed from a pcap file.
//
// Convenient access to IP, TCP, and UDP headers is provided after Decode()
// is called if the packet is of the appropriate type.
type Packet struct {
Time time.Time // packet send/receive time
Caplen uint32 // bytes stored in the file (caplen <= len)
Len uint32 // bytes sent/received
Data []byte // packet data
Type int // protocol type, see LINKTYPE_*
DestMac uint64
SrcMac uint64
Headers []interface{} // decoded headers, in order
Payload []byte // remaining non-header bytes
IP *Iphdr // IP header (for IP packets, after decoding)
TCP *Tcphdr // TCP header (for TCP packets, after decoding)
UDP *Udphdr // UDP header (for UDP packets after decoding)
}
// Reader parses pcap files.
type Reader struct {
flip bool
buf io.Reader
err error
fourBytes []byte
twoBytes []byte
sixteenBytes []byte
Header FileHeader
}
// NewReader reads pcap data from an io.Reader.
func NewReader(reader io.Reader) (*Reader, error) {
r := &Reader{
buf: reader,
fourBytes: make([]byte, 4),
twoBytes: make([]byte, 2),
sixteenBytes: make([]byte, 16),
}
switch magic := r.readUint32(); magic {
case 0xa1b2c3d4:
r.flip = false
case 0xd4c3b2a1:
r.flip = true
default:
return nil, fmt.Errorf("pcap: bad magic number: %0x", magic)
}
r.Header = FileHeader{
MagicNumber: 0xa1b2c3d4,
VersionMajor: r.readUint16(),
VersionMinor: r.readUint16(),
TimeZone: r.readInt32(),
SigFigs: r.readUint32(),
SnapLen: r.readUint32(),
Network: r.readUint32(),
}
return r, nil
}
// Next returns the next packet or nil if no more packets can be read.
func (r *Reader) Next() *Packet {
d := r.sixteenBytes
r.err = r.read(d)
if r.err != nil {
return nil
}
timeSec := asUint32(d[0:4], r.flip)
timeUsec := asUint32(d[4:8], r.flip)
capLen := asUint32(d[8:12], r.flip)
origLen := asUint32(d[12:16], r.flip)
data := make([]byte, capLen)
if r.err = r.read(data); r.err != nil {
return nil
}
return &Packet{
Time: time.Unix(int64(timeSec), int64(timeUsec)),
Caplen: capLen,
Len: origLen,
Data: data,
}
}
func (r *Reader) read(data []byte) error {
var err error
n, err := r.buf.Read(data)
for err == nil && n != len(data) {
var chunk int
chunk, err = r.buf.Read(data[n:])
n += chunk
}
if len(data) == n {
return nil
}
return err
}
func (r *Reader) readUint32() uint32 {
data := r.fourBytes
if r.err = r.read(data); r.err != nil {
return 0
}
return asUint32(data, r.flip)
}
func (r *Reader) readInt32() int32 {
data := r.fourBytes
if r.err = r.read(data); r.err != nil {
return 0
}
return int32(asUint32(data, r.flip))
}
func (r *Reader) readUint16() uint16 {
data := r.twoBytes
if r.err = r.read(data); r.err != nil {
return 0
}
return asUint16(data, r.flip)
}
// Writer writes a pcap file.
type Writer struct {
writer io.Writer
buf []byte
}
// NewWriter creates a Writer that stores output in an io.Writer.
// The FileHeader is written immediately.
func NewWriter(writer io.Writer, header *FileHeader) (*Writer, error) {
w := &Writer{
writer: writer,
buf: make([]byte, 24),
}
binary.LittleEndian.PutUint32(w.buf, header.MagicNumber)
binary.LittleEndian.PutUint16(w.buf[4:], header.VersionMajor)
binary.LittleEndian.PutUint16(w.buf[6:], header.VersionMinor)
binary.LittleEndian.PutUint32(w.buf[8:], uint32(header.TimeZone))
binary.LittleEndian.PutUint32(w.buf[12:], header.SigFigs)
binary.LittleEndian.PutUint32(w.buf[16:], header.SnapLen)
binary.LittleEndian.PutUint32(w.buf[20:], header.Network)
if _, err := writer.Write(w.buf); err != nil {
return nil, err
}
return w, nil
}
// Writer writes a packet to the underlying writer.
func (w *Writer) Write(pkt *Packet) error {
binary.LittleEndian.PutUint32(w.buf, uint32(pkt.Time.Unix()))
binary.LittleEndian.PutUint32(w.buf[4:], uint32(pkt.Time.Nanosecond()))
binary.LittleEndian.PutUint32(w.buf[8:], uint32(pkt.Time.Unix()))
binary.LittleEndian.PutUint32(w.buf[12:], pkt.Len)
if _, err := w.writer.Write(w.buf[:16]); err != nil {
return err
}
_, err := w.writer.Write(pkt.Data)
return err
}
func asUint32(data []byte, flip bool) uint32 {
if flip {
return binary.BigEndian.Uint32(data)
}
return binary.LittleEndian.Uint32(data)
}
func asUint16(data []byte, flip bool) uint16 {
if flip {
return binary.BigEndian.Uint16(data)
}
return binary.LittleEndian.Uint16(data)
}

View File

@ -1,266 +0,0 @@
// Interface to both live and offline pcap parsing.
package pcap
/*
#cgo linux LDFLAGS: -lpcap
#cgo freebsd LDFLAGS: -lpcap
#cgo darwin LDFLAGS: -lpcap
#cgo windows CFLAGS: -I C:/WpdPack/Include
#cgo windows,386 LDFLAGS: -L C:/WpdPack/Lib -lwpcap
#cgo windows,amd64 LDFLAGS: -L C:/WpdPack/Lib/x64 -lwpcap
#include <stdlib.h>
#include <pcap.h>
// Workaround for not knowing how to cast to const u_char**
int hack_pcap_next_ex(pcap_t *p, struct pcap_pkthdr **pkt_header,
u_char **pkt_data) {
return pcap_next_ex(p, pkt_header, (const u_char **)pkt_data);
}
*/
import "C"
import (
"errors"
"net"
"syscall"
"time"
"unsafe"
)
type Pcap struct {
cptr *C.pcap_t
}
type Stat struct {
PacketsReceived uint32
PacketsDropped uint32
PacketsIfDropped uint32
}
type Interface struct {
Name string
Description string
Addresses []IFAddress
// TODO: add more elements
}
type IFAddress struct {
IP net.IP
Netmask net.IPMask
// TODO: add broadcast + PtP dst ?
}
func (p *Pcap) Next() (pkt *Packet) {
rv, _ := p.NextEx()
return rv
}
// Openlive opens a device and returns a *Pcap handler
func Openlive(device string, snaplen int32, promisc bool, timeout_ms int32) (handle *Pcap, err error) {
var buf *C.char
buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1))
h := new(Pcap)
var pro int32
if promisc {
pro = 1
}
dev := C.CString(device)
defer C.free(unsafe.Pointer(dev))
h.cptr = C.pcap_open_live(dev, C.int(snaplen), C.int(pro), C.int(timeout_ms), buf)
if nil == h.cptr {
handle = nil
err = errors.New(C.GoString(buf))
} else {
handle = h
}
C.free(unsafe.Pointer(buf))
return
}
func Openoffline(file string) (handle *Pcap, err error) {
var buf *C.char
buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1))
h := new(Pcap)
cf := C.CString(file)
defer C.free(unsafe.Pointer(cf))
h.cptr = C.pcap_open_offline(cf, buf)
if nil == h.cptr {
handle = nil
err = errors.New(C.GoString(buf))
} else {
handle = h
}
C.free(unsafe.Pointer(buf))
return
}
func (p *Pcap) NextEx() (pkt *Packet, result int32) {
var pkthdr *C.struct_pcap_pkthdr
var buf_ptr *C.u_char
var buf unsafe.Pointer
result = int32(C.hack_pcap_next_ex(p.cptr, &pkthdr, &buf_ptr))
buf = unsafe.Pointer(buf_ptr)
if nil == buf {
return
}
pkt = new(Packet)
pkt.Time = time.Unix(int64(pkthdr.ts.tv_sec), int64(pkthdr.ts.tv_usec)*1000)
pkt.Caplen = uint32(pkthdr.caplen)
pkt.Len = uint32(pkthdr.len)
pkt.Data = C.GoBytes(buf, C.int(pkthdr.caplen))
return
}
func (p *Pcap) Close() {
C.pcap_close(p.cptr)
}
func (p *Pcap) Geterror() error {
return errors.New(C.GoString(C.pcap_geterr(p.cptr)))
}
func (p *Pcap) Getstats() (stat *Stat, err error) {
var cstats _Ctype_struct_pcap_stat
if -1 == C.pcap_stats(p.cptr, &cstats) {
return nil, p.Geterror()
}
stats := new(Stat)
stats.PacketsReceived = uint32(cstats.ps_recv)
stats.PacketsDropped = uint32(cstats.ps_drop)
stats.PacketsIfDropped = uint32(cstats.ps_ifdrop)
return stats, nil
}
func (p *Pcap) Setfilter(expr string) (err error) {
var bpf _Ctype_struct_bpf_program
cexpr := C.CString(expr)
defer C.free(unsafe.Pointer(cexpr))
if -1 == C.pcap_compile(p.cptr, &bpf, cexpr, 1, 0) {
return p.Geterror()
}
if -1 == C.pcap_setfilter(p.cptr, &bpf) {
C.pcap_freecode(&bpf)
return p.Geterror()
}
C.pcap_freecode(&bpf)
return nil
}
func Version() string {
return C.GoString(C.pcap_lib_version())
}
func (p *Pcap) Datalink() int {
return int(C.pcap_datalink(p.cptr))
}
func (p *Pcap) Setdatalink(dlt int) error {
if -1 == C.pcap_set_datalink(p.cptr, C.int(dlt)) {
return p.Geterror()
}
return nil
}
func DatalinkValueToName(dlt int) string {
if name := C.pcap_datalink_val_to_name(C.int(dlt)); name != nil {
return C.GoString(name)
}
return ""
}
func DatalinkValueToDescription(dlt int) string {
if desc := C.pcap_datalink_val_to_description(C.int(dlt)); desc != nil {
return C.GoString(desc)
}
return ""
}
func Findalldevs() (ifs []Interface, err error) {
var buf *C.char
buf = (*C.char)(C.calloc(ERRBUF_SIZE, 1))
defer C.free(unsafe.Pointer(buf))
var alldevsp *C.pcap_if_t
if -1 == C.pcap_findalldevs((**C.pcap_if_t)(&alldevsp), buf) {
return nil, errors.New(C.GoString(buf))
}
defer C.pcap_freealldevs((*C.pcap_if_t)(alldevsp))
dev := alldevsp
var i uint32
for i = 0; dev != nil; dev = (*C.pcap_if_t)(dev.next) {
i++
}
ifs = make([]Interface, i)
dev = alldevsp
for j := uint32(0); dev != nil; dev = (*C.pcap_if_t)(dev.next) {
var iface Interface
iface.Name = C.GoString(dev.name)
iface.Description = C.GoString(dev.description)
iface.Addresses = findalladdresses(dev.addresses)
// TODO: add more elements
ifs[j] = iface
j++
}
return
}
func findalladdresses(addresses *_Ctype_struct_pcap_addr) (retval []IFAddress) {
// TODO - make it support more than IPv4 and IPv6?
retval = make([]IFAddress, 0, 1)
for curaddr := addresses; curaddr != nil; curaddr = (*_Ctype_struct_pcap_addr)(curaddr.next) {
var a IFAddress
var err error
if a.IP, err = sockaddr_to_IP((*syscall.RawSockaddr)(unsafe.Pointer(curaddr.addr))); err != nil {
continue
}
if a.Netmask, err = sockaddr_to_IP((*syscall.RawSockaddr)(unsafe.Pointer(curaddr.addr))); err != nil {
continue
}
retval = append(retval, a)
}
return
}
func sockaddr_to_IP(rsa *syscall.RawSockaddr) (IP []byte, err error) {
switch rsa.Family {
case syscall.AF_INET:
pp := (*syscall.RawSockaddrInet4)(unsafe.Pointer(rsa))
IP = make([]byte, 4)
for i := 0; i < len(IP); i++ {
IP[i] = pp.Addr[i]
}
return
case syscall.AF_INET6:
pp := (*syscall.RawSockaddrInet6)(unsafe.Pointer(rsa))
IP = make([]byte, 16)
for i := 0; i < len(IP); i++ {
IP[i] = pp.Addr[i]
}
return
}
err = errors.New("Unsupported address type")
return
}
func (p *Pcap) Inject(data []byte) (err error) {
buf := (*C.char)(C.malloc((C.size_t)(len(data))))
for i := 0; i < len(data); i++ {
*(*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(buf)) + uintptr(i))) = data[i]
}
if -1 == C.pcap_sendpacket(p.cptr, (*C.u_char)(unsafe.Pointer(buf)), (C.int)(len(data))) {
err = p.Geterror()
}
C.free(unsafe.Pointer(buf))
return
}

View File

@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
}
if resp.Kvs[0].CreateRevision != 4 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision)
if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
}
}

View File

@ -33,10 +33,18 @@ func snapshotTest(cx ctlCtx) {
}
}
leaseID, err := ctlV3LeaseGrant(cx, 100)
if err != nil {
cx.t.Fatalf("snapshot: ctlV3LeaseGrant error (%v)", err)
}
if err = ctlV3Put(cx, "withlease", "withlease", leaseID); err != nil {
cx.t.Fatalf("snapshot: ctlV3Put error (%v)", err)
}
fpath := "test.snapshot"
defer os.RemoveAll(fpath)
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
if err = ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}
@ -44,11 +52,11 @@ func snapshotTest(cx ctlCtx) {
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
if st.Revision != 5 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
if st.TotalKey < 4 {
cx.t.Fatalf("expected at least 4, got %d", st.TotalKey)
}
}

View File

@ -221,7 +221,7 @@ OK
### WATCH [options] [key or prefix] [range_end]
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user.
Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `range-end` is given. The watch command runs until it encounters an error or is terminated by the user. If range_end is given, it must be lexicographically greater than key or "\x00".
#### Options

View File

@ -27,11 +27,14 @@ import (
"github.com/coreos/etcd/client"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
@ -42,9 +45,10 @@ import (
)
var (
migrateDatadir string
migrateWALdir string
migrateTransformer string
migrateExcludeTTLKey bool
migrateDatadir string
migrateWALdir string
migrateTransformer string
)
// NewMigrateCommand returns the cobra command for "migrate".
@ -55,6 +59,7 @@ func NewMigrateCommand() *cobra.Command {
Run: migrateCommandFunc,
}
mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
@ -74,18 +79,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer()
}
st := rebuildStoreV2()
st, index := rebuildStoreV2()
be := prepareBackend()
defer be.Close()
maxIndexc := make(chan uint64, 1)
go func() {
maxIndexc <- writeStore(writer, st)
writeStore(writer, st)
writer.Close()
}()
readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
mvcc.UpdateConsistentIndex(be, index)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
@ -106,7 +110,10 @@ func prepareBackend() backend.Backend {
return be
}
func rebuildStoreV2() store.Store {
func rebuildStoreV2() (store.Store, uint64) {
var index uint64
cl := membership.NewCluster("")
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
@ -122,6 +129,7 @@ func rebuildStoreV2() store.Store {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
}
w, err := wal.OpenForRead(waldir, walsnap)
@ -143,9 +151,15 @@ func rebuildStoreV2() store.Store {
}
}
applier := etcdserver.NewApplierV2(st, nil)
cl.SetStore(st)
cl.Recover(api.UpdateCapability)
applier := etcdserver.NewApplierV2(st, cl)
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue
}
@ -160,9 +174,34 @@ func rebuildStoreV2() store.Store {
applyRequest(req, applier)
}
}
if ent.Index > index {
index = ent.Index
}
}
return st
return st, index
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
}
}
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
@ -216,11 +255,13 @@ func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
if n.Dir {
n.Nodes = nil
}
b, err := json.Marshal(n)
if err != nil {
ExitWithError(ExitError, err)
if !migrateExcludeTTLKey || n.TTL == 0 {
b, err := json.Marshal(n)
if err != nil {
ExitWithError(ExitError, err)
}
fmt.Fprint(w, string(b))
}
fmt.Fprint(w, string(b))
for _, nn := range nodes {
max := writeKeys(w, nn)
if max > maxIndex {

View File

@ -21,6 +21,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path"
"reflect"
@ -30,6 +31,7 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/fileutil"
@ -371,7 +373,10 @@ func makeDB(snapdir, dbfile string, commit int) {
// update consistentIndex so applies go through on etcdserver despite
// having a new raft instance
be := backend.NewDefaultBackend(dbpath)
s := mvcc.NewStore(be, nil, (*initIndex)(&commit))
// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
id := s.TxnBegin()
btx := be.BatchTx()
del := func(k, v []byte) error {

View File

@ -73,14 +73,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
}
if err != nil {
return err
return togRPCError(err)
}
resp.TTL = ttl

View File

@ -49,9 +49,13 @@ var (
ErrGRPCRoleNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role is not granted to the user")
ErrGRPCPermissionNotGranted = grpc.Errorf(codes.FailedPrecondition, "etcdserver: permission is not granted to the role")
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
ErrGRPCTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out")
ErrGRPCTimeoutDueToLeaderFail = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure")
ErrGRPCTimeoutDueToConnectionLost = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost")
ErrGRPCUnhealthy = grpc.Errorf(codes.Unavailable, "etcdserver: unhealthy cluster")
errStringToError = map[string]error{
grpc.ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
@ -82,9 +86,13 @@ var (
grpc.ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
grpc.ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
grpc.ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
grpc.ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
grpc.ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
grpc.ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
}
// client-side error
@ -116,9 +124,13 @@ var (
ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrNoLeader = Error(ErrGRPCNoLeader)
ErrNotCapable = Error(ErrGRPCNotCapable)
ErrStopped = Error(ErrGRPCStopped)
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
)
// EtcdError defines gRPC server errors.

View File

@ -38,6 +38,17 @@ func togRPCError(err error) error {
case etcdserver.ErrNoSpace:
return rpctypes.ErrGRPCNoSpace
case etcdserver.ErrNoLeader:
return rpctypes.ErrGRPCNoLeader
case etcdserver.ErrStopped:
return rpctypes.ErrGRPCStopped
case etcdserver.ErrTimeout:
return rpctypes.ErrGRPCTimeout
case etcdserver.ErrTimeoutDueToLeaderFail:
return rpctypes.ErrGRPCTimeoutDueToLeaderFail
case etcdserver.ErrTimeoutDueToConnectionLost:
return rpctypes.ErrGRPCTimeoutDueToConnectionLost
case auth.ErrRootUserNotExist:
return rpctypes.ErrGRPCRootUserNotExist
case auth.ErrRootRoleNotExist:

View File

@ -18,6 +18,7 @@ import (
"encoding/json"
"expvar"
"fmt"
"math"
"math/rand"
"net/http"
"os"
@ -398,11 +399,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
srv.be = be
srv.lessor = lease.NewLessor(srv.be)
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be)
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()

View File

@ -20,6 +20,7 @@ import (
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
@ -54,7 +55,7 @@ type Lessor interface {
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(id lease.LeaseID) (int64, error)
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
}
type Authenticator interface {
@ -218,7 +219,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return result.resp.(*pb.LeaseRevokeResponse), nil
}
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
ttl, err := s.lessor.Renew(id)
if err == nil {
return ttl, nil
@ -228,29 +229,44 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
}
// renewals don't go through raft; forward to leader manually
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
leader, lerr := s.waitLeader(cctx)
if lerr != nil {
return -1, lerr
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
}
}
return -1, ErrTimeout
}
func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
for leader == nil {
// wait an election
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
select {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
case <-s.done:
return -1, ErrStopped
return nil, ErrStopped
case <-ctx.Done():
return nil, ErrNoLeader
}
}
if leader == nil || len(leader.PeerURLs) == 0 {
return -1, ErrNoLeader
return nil, ErrNoLeader
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
if err == nil {
break
}
}
return ttl, err
return leader, nil
}
func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {

View File

@ -19,11 +19,13 @@ import (
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/testutil"
"golang.org/x/net/context"
)
// TestV3LeasePrmote ensures the newly elected leader can promote itself
@ -332,7 +334,9 @@ func TestV3LeaseFailover(t *testing.T) {
lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
ctx, cancel := context.WithCancel(context.Background())
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
mctx := metadata.NewContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
defer cancel()
lac, err := lc.LeaseKeepAlive(ctx)
if err != nil {

View File

@ -348,6 +348,51 @@ func TestV3WatchFutureRevision(t *testing.T) {
}
}
// TestV3WatchWrongRange tests wrong range does not create watchers.
func TestV3WatchWrongRange(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
wAPI := toGRPC(clus.RandClient()).Watch
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
wStream, err := wAPI.Watch(ctx)
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
}
tests := []struct {
key []byte
end []byte
canceled bool
}{
{[]byte("a"), []byte("a"), true}, // wrong range end
{[]byte("b"), []byte("a"), true}, // wrong range end
{[]byte("foo"), []byte{0}, false}, // watch request with 'WithFromKey'
}
for i, tt := range tests {
if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
CreateRequest: &pb.WatchCreateRequest{Key: tt.key, RangeEnd: tt.end, StartRevision: 1}}}); err != nil {
t.Fatalf("#%d: wStream.Send error: %v", i, err)
}
cresp, err := wStream.Recv()
if err != nil {
t.Fatalf("#%d: wStream.Recv error: %v", i, err)
}
if !cresp.Created {
t.Fatalf("#%d: create %v, want %v", i, cresp.Created, true)
}
if cresp.Canceled != tt.canceled {
t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled)
}
if tt.canceled && cresp.WatchId != -1 {
t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId)
}
}
}
// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map.
func TestV3WatchCancelSynced(t *testing.T) {
defer testutil.AfterTest(t)

View File

@ -19,10 +19,10 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
"golang.org/x/net/context"
)
// NewHandler returns an http Handler for lease renewals
@ -75,15 +75,22 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// RenewHTTP renews a lease at a given primary server.
// TODO: Batch request in future?
func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) {
func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
// will post lreq protobuf to leader
lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
if err != nil {
return -1, err
}
cc := &http.Client{Transport: rt, Timeout: timeout}
resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
cc := &http.Client{Transport: rt}
req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
if err != nil {
return -1, err
}
req.Header.Set("Content-Type", "application/protobuf")
req.Cancel = ctx.Done()
resp, err := cc.Do(req)
if err != nil {
// TODO detect if leader failed and retry?
return -1, err

View File

@ -0,0 +1,51 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leasehttp
import (
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"golang.org/x/net/context"
)
func TestRenewHTTP(t *testing.T) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(be, 3)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}
ts := httptest.NewServer(NewHandler(le))
defer ts.Close()
ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+"/leases", http.DefaultTransport)
if err != nil {
t.Fatal(err)
}
if ttl != 5 {
t.Fatalf("ttl expected 5, got %d", ttl)
}
}

View File

@ -31,8 +31,6 @@ const (
)
var (
minLeaseTTL = int64(5)
leaseBucketName = []byte("lease")
// do not use maxInt64 since it can overflow time which will add
// the offset of unix time (1970yr to seconds).
@ -143,6 +141,10 @@ type lessor struct {
// The leased items can be recovered by iterating all the keys in kv.
b backend.Backend
// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64
expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{}
@ -150,14 +152,15 @@ type lessor struct {
doneC chan struct{}
}
func NewLessor(b backend.Backend) Lessor {
return newLessor(b)
func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
return newLessor(b, minLeaseTTL)
}
func newLessor(b backend.Backend) *lessor {
func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
b: b,
leaseMap: make(map[LeaseID]*Lease),
b: b,
minLeaseTTL: minLeaseTTL,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
@ -193,6 +196,10 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
return nil, ErrLeaseExists
}
if l.TTL < le.minLeaseTTL {
l.TTL = le.minLeaseTTL
}
if le.primary {
l.refresh(0)
} else {
@ -425,6 +432,9 @@ func (le *lessor) initAndRecover() {
panic("failed to unmarshal lease proto item")
}
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
}
le.leaseMap[ID] = &Lease{
ID: ID,
TTL: lpb.TTL,
@ -464,19 +474,11 @@ func (l Lease) persistTo(b backend.Backend) {
// refresh refreshes the expiry of the lease.
func (l *Lease) refresh(extend time.Duration) {
if l.TTL < minLeaseTTL {
l.TTL = minLeaseTTL
}
l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL))
}
// forever sets the expiry of lease to be forever.
func (l *Lease) forever() {
if l.TTL < minLeaseTTL {
l.TTL = minLeaseTTL
}
l.expiry = forever
}
func (l *Lease) forever() { l.expiry = forever }
type LeaseItem struct {
Key string

View File

@ -26,6 +26,8 @@ import (
"github.com/coreos/etcd/mvcc/backend"
)
const minLeaseTTL = int64(5)
// TestLessorGrant ensures Lessor can grant wanted lease.
// The granted lease should have a unique ID with a term
// that is greater than minLeaseTTL.
@ -34,7 +36,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be)
le := newLessor(be, minLeaseTTL)
le.Promote(0)
l, err := le.Grant(1, 1)
@ -82,7 +84,7 @@ func TestLessorRevoke(t *testing.T) {
fd := &fakeDeleter{}
le := newLessor(be)
le := newLessor(be, minLeaseTTL)
le.SetRangeDeleter(fd)
// grant a lease with long term (100 seconds) to
@ -129,10 +131,10 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(be)
le := newLessor(be, minLeaseTTL)
le.Promote(0)
l, err := le.Grant(1, 5)
l, err := le.Grant(1, minLeaseTTL)
if err != nil {
t.Fatalf("failed to grant lease (%v)", err)
}
@ -160,7 +162,7 @@ func TestLessorDetach(t *testing.T) {
fd := &fakeDeleter{}
le := newLessor(be)
le := newLessor(be, minLeaseTTL)
le.SetRangeDeleter(fd)
// grant a lease with long term (100 seconds) to
@ -199,7 +201,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(be)
le := newLessor(be, minLeaseTTL)
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
if err1 != nil || err2 != nil {
@ -207,7 +209,7 @@ func TestLessorRecover(t *testing.T) {
}
// Create a new lessor with the same backend
nle := newLessor(be)
nle := newLessor(be, minLeaseTTL)
nl1 := nle.get(l1.ID)
if nl1 == nil || nl1.TTL != l1.TTL {
t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)

View File

@ -15,6 +15,7 @@
package mvcc
import (
"bytes"
"errors"
"sync"
@ -96,6 +97,12 @@ type watchStream struct {
// Watch creates a new watcher in the stream and returns its WatchID.
// TODO: return error if ws is closed?
func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1
}
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {

View File

@ -153,6 +153,28 @@ func TestWatcherWatchPrefix(t *testing.T) {
}
}
// TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range
// does not create watcher, which panics when canceling in range tree.
func TestWatcherWatchWrongRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
defer w.Close()
if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 {
t.Fatalf("key == end range given; id expected -1, got %d", id)
}
if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 {
t.Fatalf("key > end range given; id expected -1, got %d", id)
}
// watch request with 'WithFromKey' has empty-byte range end
if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 {
t.Fatalf("\x00 is range given; id expected 0, got %d", id)
}
}
func TestWatchDeleteRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)

9
test
View File

@ -125,15 +125,6 @@ function fmt_tests {
fi
if which gosimple >/dev/null; then
echo "Checking gosimple..."
for path in $GOSIMPLE_UNUSED_PATHS; do
simplResult=`gosimple $REPO_PATH/${path} || true`
if [ -n "${simplResult}" ]; then
echo -e "gosimple checking ${path} failed:\n${simplResult}"
exit 255
fi
done
else
echo "Skipping gosimple..."
fi

View File

@ -1,23 +0,0 @@
# etcd-top
etcd realtime workload analyzer. Useful for rapid diagnosis of production usage issues and analysis of production request distributions.
usage:
```
-iface="eth0": interface for sniffing traffic on
-period=1: seconds between submissions
-ports="2379": etcd listening ports
-promiscuous=true: whether to perform promiscuous sniffing or not.
-topk=10: submit stats for the top <K> sniffed paths
```
result:
```
go run etcd-top.go --period=1 -topk=3
1440035702 sniffed 1074 requests over last 1 seconds
Top 3 most popular http requests:
Sum Rate Verb Path
1305 22 GET /v2/keys/c
1302 8 GET /v2/keys/S
1297 10 GET /v2/keys/h
```

View File

@ -1,16 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// etcd-top is a utility for analyzing etcd v2 API workload traffic.
package main

View File

@ -1,229 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"math"
"net/http"
"os"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/akrennmair/gopcap"
"github.com/spacejam/loghisto"
)
type nameSum struct {
Name string
Sum float64
Rate float64
}
type nameSums []nameSum
func (n nameSums) Len() int {
return len(n)
}
func (n nameSums) Less(i, j int) bool {
return n[i].Sum > n[j].Sum
}
func (n nameSums) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
// This function listens for periodic metrics from the loghisto metric system,
// and upon receipt of a batch of them it will print out the desired topK.
func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) {
for m := range metricStream {
requestCounter := float64(0)
nvs := nameSums{}
for k, v := range m.Metrics {
// loghisto adds _rate suffixed metrics for counters and histograms
if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") {
continue
}
nvs = append(nvs, nameSum{
Name: k,
Sum: v,
Rate: m.Metrics[k+"_rate"],
})
requestCounter += m.Metrics[k+"_rate"]
}
fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(),
uint(requestCounter), period)
if len(nvs) == 0 {
continue
}
sort.Sort(nvs)
fmt.Printf("Top %d most popular http requests:\n", topK)
fmt.Println("Total Sum Period Sum Verb Path")
for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] {
fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name)
}
}
}
// packetDecoder decodes packets and hands them off to the streamRouter
func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) {
for pkt := range packetsIn {
pkt.Decode()
select {
case packetsOut <- pkt:
default:
fmt.Fprint(os.Stderr, "shedding at decoder!")
}
}
}
// processor tries to parse an http request from each packet, and if
// successful it records metrics about it in the loghisto metric system.
func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) {
for pkt := range packetsIn {
req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload)))
if reqErr == nil {
ms.Counter(req.Method+" "+req.URL.Path, 1)
}
}
}
// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests
// and responses for this particular TCP connection. This allows the processor to own a local map
// of requests so that it can avoid coordinating with other goroutines to perform analysis.
func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) {
for pkt := range parsedPackets {
if pkt.TCP == nil {
continue
}
clientPort := uint16(0)
for _, p := range ports {
if pkt.TCP.SrcPort == p {
clientPort = pkt.TCP.DestPort
break
}
if pkt.TCP.DestPort == p {
clientPort = pkt.TCP.SrcPort
break
}
}
if clientPort != 0 {
// client Port can be assumed to have sufficient entropy for
// distribution among processors, and we want the same
// tcp stream to go to the same processor every time
// so that if we do proper packet reconstruction it will
// be easier.
select {
case processors[int(clientPort)%len(processors)] <- pkt:
default:
fmt.Fprint(os.Stderr, "Shedding load at router!")
}
}
}
}
// 1. parse args
// 2. start the loghisto metric system
// 3. start the processing and printing goroutines
// 4. open the pcap handler
// 5. hand off packets from the handler to the decoder
func main() {
portsArg := flag.String("ports", "2379", "etcd listening ports")
iface := flag.String("iface", "eth0", "interface for sniffing traffic on")
promisc := flag.Bool("promiscuous", true, "promiscuous mode")
period := flag.Uint("period", 1, "seconds between submissions")
topK := flag.Uint("topk", 10, "submit stats for the top <K> sniffed paths")
flag.Parse()
numCPU := runtime.NumCPU()
runtime.GOMAXPROCS(numCPU)
ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false)
ms.Start()
metricStream := make(chan *loghisto.ProcessedMetricSet, 2)
ms.SubscribeToProcessedMetrics(metricStream)
defer ms.UnsubscribeFromProcessedMetrics(metricStream)
go statPrinter(metricStream, *topK, *period)
ports := []uint16{}
for _, p := range strings.Split(*portsArg, ",") {
port, err := strconv.Atoi(p)
if err == nil {
ports = append(ports, uint16(port))
} else {
fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err)
os.Exit(1)
}
}
if len(ports) == 0 {
fmt.Fprint(os.Stderr, "No ports given! Exiting.\n")
os.Exit(1)
}
// We choose 1518 for the snaplen because it's the default
// ethernet MTU at the link layer. We choose 1000 for the
// timeout based on a measurement for its impact on latency
// impact, but it is less precise.
h, err := pcap.Openlive(*iface, 1518, *promisc, 1000)
if err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
defer h.Close()
portArray := strings.Split(*portsArg, ",")
dst := strings.Join(portArray, " or dst port ")
src := strings.Join(portArray, " or src port ")
filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src)
fmt.Println("using bpf filter: ", filter)
if err := h.Setfilter(filter); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
unparsedPackets := make(chan *pcap.Packet, 16384)
parsedPackets := make(chan *pcap.Packet, 16384)
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
go packetDecoder(unparsedPackets, parsedPackets)
}
processors := []chan *pcap.Packet{}
for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ {
p := make(chan *pcap.Packet, 16384)
processors = append(processors, p)
go processor(ms, p)
}
go streamRouter(ports, parsedPackets, processors)
for {
pkt := h.Next()
if pkt != nil {
select {
case unparsedPackets <- pkt:
default:
fmt.Fprint(os.Stderr, "SHEDDING IN MAIN")
}
}
}
}

View File

@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.13"
Version = "3.0.17"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"