Translate v2 requests into v3 ClusterMemberAttrSetRequest and ClusterVersionSetRequest
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@ -19,32 +19,46 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/membershippb"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
)
|
||||
|
||||
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) {
|
||||
func v2ToV3Request(lg *zap.Logger, r *RequestV2) pb.InternalRaftRequest {
|
||||
if r.Method != http.MethodPut || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) {
|
||||
s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
|
||||
lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
|
||||
}
|
||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
|
||||
id := membership.MustParseMemberIDFromKey(lg, path.Dir(r.Path))
|
||||
var attr membership.Attributes
|
||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||
lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
|
||||
}
|
||||
if s.cluster != nil {
|
||||
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
|
||||
return pb.InternalRaftRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ID: r.ID,
|
||||
},
|
||||
ClusterMemberAttrSet: &membershippb.ClusterMemberAttrSetRequest{
|
||||
Member_ID: uint64(id),
|
||||
MemberAttributes: &membershippb.Attributes{
|
||||
Name: attr.Name,
|
||||
ClientUrls: attr.ClientURLs,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
if s.cluster != nil {
|
||||
// persist to backend given v2store can be very stale
|
||||
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
|
||||
return pb.InternalRaftRequest{
|
||||
Header: &pb.RequestHeader{
|
||||
ID: r.ID,
|
||||
},
|
||||
ClusterVersionSet: &membershippb.ClusterVersionSetRequest{
|
||||
Ver: r.Val,
|
||||
},
|
||||
}
|
||||
}
|
||||
lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
|
||||
return pb.InternalRaftRequest{}
|
||||
}
|
||||
|
@ -1945,17 +1945,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
|
||||
rp := &r
|
||||
pbutil.MustUnmarshal(rp, e.Data)
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
|
||||
s.applyV2Request((*RequestV2)(rp), shouldApplyV3)
|
||||
s.w.Trigger(r.ID, Response{})
|
||||
return
|
||||
raftReq = v2ToV3Request(s.lg, (*RequestV2)(rp))
|
||||
}
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
||||
|
||||
if raftReq.V2 != nil {
|
||||
req := (*RequestV2)(raftReq.V2)
|
||||
s.applyV2Request(req, shouldApplyV3)
|
||||
s.w.Trigger(req.ID, Response{})
|
||||
return
|
||||
raftReq = v2ToV3Request(s.lg, req)
|
||||
}
|
||||
|
||||
id := raftReq.ID
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
@ -52,6 +53,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
|
||||
apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/errors"
|
||||
@ -160,7 +162,15 @@ func TestV2SetMemberAttributes(t *testing.T) {
|
||||
lg: zaptest.NewLogger(t),
|
||||
v2store: mockstore.NewRecorder(),
|
||||
cluster: cl,
|
||||
consistIndex: cindex.NewConsistentIndex(be),
|
||||
w: wait.New(),
|
||||
}
|
||||
as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
srv.alarmStore = as
|
||||
srv.uberApply = srv.NewUberApplier()
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
@ -168,7 +178,13 @@ func TestV2SetMemberAttributes(t *testing.T) {
|
||||
Path: membership.MemberAttributesStorePath(1),
|
||||
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
|
||||
}
|
||||
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
|
||||
data, err := proto.Marshal(&req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
srv.applyEntryNormal(&raftpb.Entry{
|
||||
Data: data,
|
||||
}, membership.ApplyV2storeOnly)
|
||||
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
|
||||
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
|
||||
t.Errorf("attributes = %v, want %v", g, w)
|
||||
@ -187,7 +203,15 @@ func TestV2SetClusterVersion(t *testing.T) {
|
||||
lg: zaptest.NewLogger(t),
|
||||
v2store: mockstore.NewRecorder(),
|
||||
cluster: cl,
|
||||
consistIndex: cindex.NewConsistentIndex(be),
|
||||
w: wait.New(),
|
||||
}
|
||||
as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
srv.alarmStore = as
|
||||
srv.uberApply = srv.NewUberApplier()
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
@ -195,7 +219,13 @@ func TestV2SetClusterVersion(t *testing.T) {
|
||||
Path: membership.StoreClusterVersionKey(),
|
||||
Val: "3.5.0",
|
||||
}
|
||||
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
|
||||
data, err := proto.Marshal(&req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
srv.applyEntryNormal(&raftpb.Entry{
|
||||
Data: data,
|
||||
}, membership.ApplyV2storeOnly)
|
||||
if g := cl.Version(); !reflect.DeepEqual(*g, version.V3_5) {
|
||||
t.Errorf("attributes = %v, want %v", *g, version.V3_5)
|
||||
}
|
||||
|
Reference in New Issue
Block a user