From 82afadbcc66de232e133ea01f9e9c3a6a192881b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 8 Aug 2015 05:29:18 -0700 Subject: [PATCH 1/7] etcdserverpb: update proto --- etcdserver/etcdserverpb/etcdserver.pb.go | 143 +----- etcdserver/etcdserverpb/etcdserver.proto | 9 - etcdserver/etcdserverpb/raft_internal.pb.go | 463 ++++++++++++++++++++ etcdserver/etcdserverpb/raft_internal.proto | 24 + 4 files changed, 488 insertions(+), 151 deletions(-) create mode 100644 etcdserver/etcdserverpb/raft_internal.pb.go create mode 100644 etcdserver/etcdserverpb/raft_internal.proto diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 3fc62b854..6626e706e 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -7,12 +7,12 @@ It is generated from these files: etcdserver.proto + raft_internal.proto rpc.proto It has these top-level messages: Request Metadata - InternalRaftRequest */ package etcdserverpb @@ -62,17 +62,6 @@ func (m *Metadata) Reset() { *m = Metadata{} } func (m *Metadata) String() string { return proto.CompactTextString(m) } func (*Metadata) ProtoMessage() {} -// An InternalRaftRequest is the union of all requests which can be -// sent via raft. -type InternalRaftRequest struct { - V2 *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } -func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) } -func (*InternalRaftRequest) ProtoMessage() {} - func init() { } func (m *Request) Unmarshal(data []byte) error { @@ -474,76 +463,6 @@ func (m *Metadata) Unmarshal(data []byte) error { return nil } -func (m *InternalRaftRequest) Unmarshal(data []byte) error { - l := len(data) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.V2 == nil { - m.V2 = &Request{} - } - if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - iNdEx -= sizeOfWire - skippy, err := skipEtcdserver(data[iNdEx:]) - if err != nil { - return err - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - return nil -} func skipEtcdserver(data []byte) (n int, err error) { l := len(data) iNdEx := 0 @@ -628,22 +547,6 @@ func skipEtcdserver(data []byte) (n int, err error) { } panic("unreachable") } -func (this *InternalRaftRequest) GetValue() interface{} { - if this.V2 != nil { - return this.V2 - } - return nil -} - -func (this *InternalRaftRequest) SetValue(value interface{}) bool { - switch vt := value.(type) { - case *Request: - this.V2 = vt - default: - return false - } - return true -} func (m *Request) Size() (n int) { var l int _ = l @@ -686,19 +589,6 @@ func (m *Metadata) Size() (n int) { return n } -func (m *InternalRaftRequest) Size() (n int) { - var l int - _ = l - if m.V2 != nil { - l = m.V2.Size() - n += 1 + l + sovEtcdserver(uint64(l)) - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - func sovEtcdserver(x uint64) (n int) { for { n++ @@ -851,37 +741,6 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) { return i, nil } -func (m *InternalRaftRequest) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if m.V2 != nil { - data[i] = 0xa - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.V2.Size())) - n1, err := m.V2.MarshalTo(data[i:]) - if err != nil { - return 0, err - } - i += n1 - } - if m.XXX_unrecognized != nil { - i += copy(data[i:], m.XXX_unrecognized) - } - return i, nil -} - func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index ee989d9c3..bfc29625c 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -31,12 +31,3 @@ message Metadata { optional uint64 NodeID = 1 [(gogoproto.nullable) = false]; optional uint64 ClusterID = 2 [(gogoproto.nullable) = false]; } - -// An InternalRaftRequest is the union of all requests which can be -// sent via raft. -message InternalRaftRequest { - option (gogoproto.onlyone) = true; - oneof value { - Request v2 = 1; - } -} diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go new file mode 100644 index 000000000..c3073178a --- /dev/null +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -0,0 +1,463 @@ +// Code generated by protoc-gen-gogo. +// source: raft_internal.proto +// DO NOT EDIT! + +package etcdserverpb + +import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + +// discarding unused import gogoproto "github.com/gogo/protobuf/gogoproto/gogo.pb" + +import io "io" +import fmt "fmt" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal + +// An InternalRaftRequest is the union of all requests which can be +// sent via raft. +type InternalRaftRequest struct { + V2 *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"` + Range *RangeRequest `protobuf:"bytes,2,opt,name=range" json:"range,omitempty"` + Put *PutRequest `protobuf:"bytes,3,opt,name=put" json:"put,omitempty"` + DeleteRange *DeleteRangeRequest `protobuf:"bytes,4,opt,name=delete_range" json:"delete_range,omitempty"` + Txn *TxnRequest `protobuf:"bytes,5,opt,name=txn" json:"txn,omitempty"` +} + +func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } +func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) } +func (*InternalRaftRequest) ProtoMessage() {} + +func init() { +} +func (m *InternalRaftRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.V2 == nil { + m.V2 = &Request{} + } + if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Range == nil { + m.Range = &RangeRequest{} + } + if err := m.Range.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Put", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Put == nil { + m.Put = &PutRequest{} + } + if err := m.Put.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DeleteRange", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.DeleteRange == nil { + m.DeleteRange = &DeleteRangeRequest{} + } + if err := m.DeleteRange.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Txn == nil { + m.Txn = &TxnRequest{} + } + if err := m.Txn.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRaftInternal(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func skipRaftInternal(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRaftInternal(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} +func (this *InternalRaftRequest) GetValue() interface{} { + if this.V2 != nil { + return this.V2 + } + if this.Range != nil { + return this.Range + } + if this.Put != nil { + return this.Put + } + if this.DeleteRange != nil { + return this.DeleteRange + } + if this.Txn != nil { + return this.Txn + } + return nil +} + +func (this *InternalRaftRequest) SetValue(value interface{}) bool { + switch vt := value.(type) { + case *Request: + this.V2 = vt + case *RangeRequest: + this.Range = vt + case *PutRequest: + this.Put = vt + case *DeleteRangeRequest: + this.DeleteRange = vt + case *TxnRequest: + this.Txn = vt + default: + return false + } + return true +} +func (m *InternalRaftRequest) Size() (n int) { + var l int + _ = l + if m.V2 != nil { + l = m.V2.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.Range != nil { + l = m.Range.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.Put != nil { + l = m.Put.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.DeleteRange != nil { + l = m.DeleteRange.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.Txn != nil { + l = m.Txn.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + return n +} + +func sovRaftInternal(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRaftInternal(x uint64) (n int) { + return sovRaftInternal(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *InternalRaftRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.V2 != nil { + data[i] = 0xa + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.V2.Size())) + n1, err := m.V2.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.Range != nil { + data[i] = 0x12 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Range.Size())) + n2, err := m.Range.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if m.Put != nil { + data[i] = 0x1a + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Put.Size())) + n3, err := m.Put.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n3 + } + if m.DeleteRange != nil { + data[i] = 0x22 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.DeleteRange.Size())) + n4, err := m.DeleteRange.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n4 + } + if m.Txn != nil { + data[i] = 0x2a + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Txn.Size())) + n5, err := m.Txn.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n5 + } + return i, nil +} + +func encodeFixed64RaftInternal(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32RaftInternal(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintRaftInternal(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} diff --git a/etcdserver/etcdserverpb/raft_internal.proto b/etcdserver/etcdserverpb/raft_internal.proto new file mode 100644 index 000000000..4fb496bfd --- /dev/null +++ b/etcdserver/etcdserverpb/raft_internal.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; +package etcdserverpb; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "etcdserver.proto"; +import "rpc.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// An InternalRaftRequest is the union of all requests which can be +// sent via raft. +message InternalRaftRequest { + option (gogoproto.onlyone) = true; + oneof value { + Request v2 = 1; + RangeRequest range = 2; + PutRequest put = 3; + DeleteRangeRequest delete_range = 4; + TxnRequest txn = 5; + } +} From f004b4dac7558befa7f0209beb33e12f83659dc5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 8 Aug 2015 05:58:29 -0700 Subject: [PATCH 2/7] *: etcdserver supports v3 demo --- etcdmain/config.go | 5 ++++ etcdmain/etcd.go | 21 +++++++++++++ etcdserver/api/v3rpc/key.go | 54 +++++++++++++++++++++++++++++++++ etcdserver/config.go | 2 ++ etcdserver/server.go | 12 ++++++++ etcdserver/v3demo_server.go | 59 +++++++++++++++++++++++++++++++++++++ 6 files changed, 153 insertions(+) create mode 100644 etcdserver/api/v3rpc/key.go create mode 100644 etcdserver/v3demo_server.go diff --git a/etcdmain/config.go b/etcdmain/config.go index 117ba4a9b..30d3c15f5 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -115,6 +115,8 @@ type config struct { printVersion bool + v3demo bool + ignored []string } @@ -208,6 +210,9 @@ func NewConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit") + // demo flag + fs.BoolVar(&cfg.v3demo, "v3demo", false, "Enable v3 demo") + // backwards-compatibility with v0.4.6 fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.") fs.Var(&flags.IPAddressPort{}, "bind-addr", "DEPRECATED: Use -listen-client-urls instead.") diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 3c925bf64..77619237d 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -31,9 +31,12 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/osutil" @@ -233,6 +236,15 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { clns = append(clns, l) } + var v3l net.Listener + if cfg.v3demo { + v3l, err = net.Listen("tcp", "127.0.0.1:12379") + if err != nil { + plog.Fatal(err) + } + plog.Infof("listening for client rpc on 127.0.0.1:12379") + } + srvcfg := &etcdserver.ServerConfig{ Name: cfg.name, ClientURLs: cfg.acurls, @@ -250,6 +262,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Transport: pt, TickMs: cfg.TickMs, ElectionTicks: cfg.electionTicks(), + V3demo: cfg.v3demo, } var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(srvcfg) @@ -281,6 +294,14 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Fatal(serveHTTP(l, ch, 0)) }(l) } + + if cfg.v3demo { + // set up v3 demo rpc + grpcServer := grpc.NewServer() + etcdserverpb.RegisterEtcdServer(grpcServer, v3rpc.New(s)) + go plog.Fatal(grpcServer.Serve(v3l)) + } + return s.StopNotify(), nil } diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go new file mode 100644 index 000000000..ff179c8f0 --- /dev/null +++ b/etcdserver/api/v3rpc/key.go @@ -0,0 +1,54 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type handler struct { + server etcdserver.Server +} + +func New(s etcdserver.Server) pb.EtcdServer { + return &handler{s} +} + +func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) + return resp.(*pb.RangeResponse), nil +} + +func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) + return resp.(*pb.PutResponse), nil +} + +func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) + return resp.(*pb.DeleteRangeResponse), nil +} + +func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + panic("not implemented") + return nil, nil +} + +func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + panic("not implemented") + return nil, nil +} diff --git a/etcdserver/config.go b/etcdserver/config.go index 9b6132670..b19eae375 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -43,6 +43,8 @@ type ServerConfig struct { TickMs uint ElectionTicks int + + V3demo bool } // VerifyBootstrapConfig sanity-checks the initial config for bootstrap case diff --git a/etcdserver/server.go b/etcdserver/server.go index 330029bda..646d56de2 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "net/http" + "os" "path" "regexp" "sync/atomic" @@ -27,6 +28,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" @@ -43,6 +45,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/snap" + dstorage "github.com/coreos/etcd/storage" "github.com/coreos/etcd/store" "github.com/coreos/etcd/version" "github.com/coreos/etcd/wal" @@ -106,6 +109,7 @@ type Server interface { Leader() types.ID // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) + V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. Process(ctx context.Context, m raftpb.Message) error @@ -156,6 +160,7 @@ type EtcdServer struct { cluster *cluster store store.Store + kv dstorage.KV stats *stats.ServerStats lstats *stats.LeaderStats @@ -313,6 +318,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { forceVersionC: make(chan struct{}), } + if cfg.V3demo { + srv.kv = dstorage.New(path.Join(cfg.DataDir, "member", "v3demo")) + } else { + // we do not care about the error of the removal + os.RemoveAll(path.Join(cfg.DataDir, "member", "v3demo")) + } + // TODO: move transport initialization near the definition of remote tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats) // add all remotes into transport diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go new file mode 100644 index 000000000..80272a67a --- /dev/null +++ b/etcdserver/v3demo_server.go @@ -0,0 +1,59 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { + switch { + case r.Range != nil: + rr := r.Range + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0) + if err != nil { + panic("not handled error") + } + + resp.Header.Index = rev + for i := range kvs { + resp.Kvs = append(resp.Kvs, &kvs[i]) + } + return resp + case r.Put != nil: + rp := r.Put + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + rev := s.kv.Put(rp.Key, rp.Value) + resp.Header.Index = rev + return resp + case r.DeleteRange != nil: + rd := r.DeleteRange + resp := &pb.DeleteRangeResponse{} + resp.Header = &pb.ResponseHeader{} + _, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd) + resp.Header.Index = rev + return resp + case r.Txn != nil: + panic("not implemented") + default: + panic("not implemented") + } + return nil +} From 523567bcc76b16ac6580cc099c5a6fb49ca1981f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 8 Aug 2015 05:58:58 -0700 Subject: [PATCH 3/7] v3etcdctl: initial v3 ctl support --- v3etcdctl/command/delete_range_command.go | 61 +++++++++++++++++++++++ v3etcdctl/command/put_command.go | 53 ++++++++++++++++++++ v3etcdctl/command/range_command.go | 58 +++++++++++++++++++++ v3etcdctl/main.go | 37 ++++++++++++++ 4 files changed, 209 insertions(+) create mode 100644 v3etcdctl/command/delete_range_command.go create mode 100644 v3etcdctl/command/put_command.go create mode 100644 v3etcdctl/command/range_command.go create mode 100644 v3etcdctl/main.go diff --git a/v3etcdctl/command/delete_range_command.go b/v3etcdctl/command/delete_range_command.go new file mode 100644 index 000000000..a3aee77b3 --- /dev/null +++ b/v3etcdctl/command/delete_range_command.go @@ -0,0 +1,61 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewDeleteRangeCommand returns the CLI command for "deleteRange". +func NewDeleteRangeCommand() cli.Command { + return cli.Command{ + Name: "delete-range", + Action: func(c *cli.Context) { + deleteRangeCommandFunc(c) + }, + } +} + +// deleteRangeCommandFunc executes the "delegeRange" command. +func deleteRangeCommandFunc(c *cli.Context) { + if len(c.Args()) == 0 { + panic("bad arg") + } + + var rangeEnd []byte + key := []byte(c.Args()[0]) + if len(c.Args()) > 1 { + rangeEnd = []byte(c.Args()[1]) + } + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.DeleteRangeRequest{Key: key, RangeEnd: rangeEnd} + + etcd.DeleteRange(context.Background(), req) + + if rangeEnd != nil { + fmt.Printf("range [%s, %s) is deleted\n", string(key), string(rangeEnd)) + } else { + fmt.Printf("key %s is deleted\n", string(key)) + } +} diff --git a/v3etcdctl/command/put_command.go b/v3etcdctl/command/put_command.go new file mode 100644 index 000000000..515e65e66 --- /dev/null +++ b/v3etcdctl/command/put_command.go @@ -0,0 +1,53 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewPutCommand returns the CLI command for "put". +func NewPutCommand() cli.Command { + return cli.Command{ + Name: "put", + Action: func(c *cli.Context) { + putCommandFunc(c) + }, + } +} + +// putCommandFunc executes the "put" command. +func putCommandFunc(c *cli.Context) { + if len(c.Args()) != 2 { + panic("bad arg") + } + + key := []byte(c.Args()[0]) + value := []byte(c.Args()[1]) + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.PutRequest{Key: key, Value: value} + + etcd.Put(context.Background(), req) + fmt.Printf("%s %s\n", key, value) +} diff --git a/v3etcdctl/command/range_command.go b/v3etcdctl/command/range_command.go new file mode 100644 index 000000000..0b77dd698 --- /dev/null +++ b/v3etcdctl/command/range_command.go @@ -0,0 +1,58 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewRangeCommand returns the CLI command for "range". +func NewRangeCommand() cli.Command { + return cli.Command{ + Name: "range", + Action: func(c *cli.Context) { + rangeCommandFunc(c) + }, + } +} + +// rangeCommandFunc executes the "range" command. +func rangeCommandFunc(c *cli.Context) { + if len(c.Args()) == 0 { + panic("bad arg") + } + + var rangeEnd []byte + key := []byte(c.Args()[0]) + if len(c.Args()) > 1 { + rangeEnd = []byte(c.Args()[1]) + } + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.RangeRequest{Key: key, RangeEnd: rangeEnd} + + resp, err := etcd.Range(context.Background(), req) + for _, kv := range resp.Kvs { + fmt.Printf("%s %s\n", string(kv.Key), string(kv.Value)) + } +} diff --git a/v3etcdctl/main.go b/v3etcdctl/main.go new file mode 100644 index 000000000..acbcd8c2d --- /dev/null +++ b/v3etcdctl/main.go @@ -0,0 +1,37 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/v3etcdctl/command" + "github.com/coreos/etcd/version" +) + +func main() { + app := cli.NewApp() + app.Name = "v3etcdctl" + app.Version = version.Version + app.Usage = "A simple command line client for etcd3." + app.Commands = []cli.Command{ + command.NewRangeCommand(), + command.NewPutCommand(), + command.NewDeleteRangeCommand(), + } + + app.Run(os.Args) +} From 9ff7075ce80b791a120cb29f2b908178207dfcf7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 8 Aug 2015 10:14:42 -0700 Subject: [PATCH 4/7] etcdserver: use v3server interface --- etcdserver/api/v3rpc/key.go | 4 ++-- etcdserver/server.go | 2 -- etcdserver/v3demo_server.go | 5 ++++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index ff179c8f0..b2af39908 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -21,10 +21,10 @@ import ( ) type handler struct { - server etcdserver.Server + server etcdserver.V3DemoServer } -func New(s etcdserver.Server) pb.EtcdServer { +func New(s etcdserver.V3DemoServer) pb.EtcdServer { return &handler{s} } diff --git a/etcdserver/server.go b/etcdserver/server.go index 646d56de2..a4752cf49 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -28,7 +28,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" @@ -109,7 +108,6 @@ type Server interface { Leader() types.ID // Do takes a request and attempts to fulfill it, returning a Response. Do(ctx context.Context, r pb.Request) (Response, error) - V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. Process(ctx context.Context, m raftpb.Message) error diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 80272a67a..409a56c17 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -20,6 +20,10 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) +type V3DemoServer interface { + V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message +} + func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { switch { case r.Range != nil: @@ -55,5 +59,4 @@ func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) pro default: panic("not implemented") } - return nil } From c1e0b19f9faee80d304f128dee35386b8250561a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 10 Aug 2015 09:53:17 -0700 Subject: [PATCH 5/7] *: better flag --- Documentation/configuration.md | 6 ++++++ etcdmain/config.go | 2 +- etcdmain/help.go | 6 ++++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Documentation/configuration.md b/Documentation/configuration.md index d785079eb..6ce7fa46f 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -193,6 +193,12 @@ Follow the instructions when using these flags. + Force to create a new one-member cluster. It commits configuration changes in force to remove all existing members in the cluster and add itself. It needs to be set to [restore a backup][restore]. + default: false +### Experimental Flags + +##### -experimental-v3demo ++ Enable experimental v3 demo API ++ default: false + ### Miscellaneous Flags ##### -version diff --git a/etcdmain/config.go b/etcdmain/config.go index 30d3c15f5..55ebdb163 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -211,7 +211,7 @@ func NewConfig() *config { fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit") // demo flag - fs.BoolVar(&cfg.v3demo, "v3demo", false, "Enable v3 demo") + fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API") // backwards-compatibility with v0.4.6 fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.") diff --git a/etcdmain/help.go b/etcdmain/help.go index 404160509..987fe22fd 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -121,5 +121,11 @@ given by the consensus protocol. --force-new-cluster 'false' force to create a new one-member cluster. + + +experimental flags: + + --experimental-v3demo 'false' + enable experimental v3 demo API ` ) From c32919e6d18188947fb9d092a8b1f9f90a59beca Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 10 Aug 2015 11:21:37 -0700 Subject: [PATCH 6/7] *: rename v3etcdctl to etcdctlv3 --- {v3etcdctl => etcdctlv3}/command/delete_range_command.go | 0 {v3etcdctl => etcdctlv3}/command/put_command.go | 0 {v3etcdctl => etcdctlv3}/command/range_command.go | 0 {v3etcdctl => etcdctlv3}/main.go | 4 ++-- 4 files changed, 2 insertions(+), 2 deletions(-) rename {v3etcdctl => etcdctlv3}/command/delete_range_command.go (100%) rename {v3etcdctl => etcdctlv3}/command/put_command.go (100%) rename {v3etcdctl => etcdctlv3}/command/range_command.go (100%) rename {v3etcdctl => etcdctlv3}/main.go (93%) diff --git a/v3etcdctl/command/delete_range_command.go b/etcdctlv3/command/delete_range_command.go similarity index 100% rename from v3etcdctl/command/delete_range_command.go rename to etcdctlv3/command/delete_range_command.go diff --git a/v3etcdctl/command/put_command.go b/etcdctlv3/command/put_command.go similarity index 100% rename from v3etcdctl/command/put_command.go rename to etcdctlv3/command/put_command.go diff --git a/v3etcdctl/command/range_command.go b/etcdctlv3/command/range_command.go similarity index 100% rename from v3etcdctl/command/range_command.go rename to etcdctlv3/command/range_command.go diff --git a/v3etcdctl/main.go b/etcdctlv3/main.go similarity index 93% rename from v3etcdctl/main.go rename to etcdctlv3/main.go index acbcd8c2d..bf7a5d07b 100644 --- a/v3etcdctl/main.go +++ b/etcdctlv3/main.go @@ -18,13 +18,13 @@ import ( "os" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" - "github.com/coreos/etcd/v3etcdctl/command" + "github.com/coreos/etcd/etcdctlv3/command" "github.com/coreos/etcd/version" ) func main() { app := cli.NewApp() - app.Name = "v3etcdctl" + app.Name = "etcdctlv3" app.Version = version.Version app.Usage = "A simple command line client for etcd3." app.Commands = []cli.Command{ From b0ea4ab3b10416e6708c1a91f8c756a45780fb71 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 10 Aug 2015 11:22:55 -0700 Subject: [PATCH 7/7] doc: link to v3 api doc --- Documentation/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Documentation/configuration.md b/Documentation/configuration.md index 6ce7fa46f..d340028ee 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -196,7 +196,7 @@ Follow the instructions when using these flags. ### Experimental Flags ##### -experimental-v3demo -+ Enable experimental v3 demo API ++ Enable experimental [v3 demo API](rfc/v3api.proto). + default: false ### Miscellaneous Flags