server: Move setting storage version to schema
This commit is contained in:
@ -292,8 +292,8 @@ type EtcdServer struct {
|
|||||||
|
|
||||||
*AccessController
|
*AccessController
|
||||||
|
|
||||||
// Ensure that storage version is updated only once.
|
// Ensure that storage schema is updated only once.
|
||||||
storageVersionUpdated sync.Once
|
updateStorageSchema sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
type backendHooks struct {
|
type backendHooks struct {
|
||||||
@ -2136,8 +2136,8 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
"saved snapshot",
|
"saved snapshot",
|
||||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||||
)
|
)
|
||||||
s.storageVersionUpdated.Do(func() {
|
s.updateStorageSchema.Do(func() {
|
||||||
err := serverversion.UpdateStorageVersion(s.lg, s.be.BatchTx())
|
err := schema.UpdateStorageSchema(s.lg, s.be.BatchTx())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.lg.Warn("failed to update storage version", zap.Error(err))
|
s.lg.Warn("failed to update storage version", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package version
|
package schema
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -21,7 +21,6 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -29,8 +28,8 @@ var (
|
|||||||
V3_6 = semver.Version{Major: 3, Minor: 6}
|
V3_6 = semver.Version{Major: 3, Minor: 6}
|
||||||
)
|
)
|
||||||
|
|
||||||
// UpdateStorageVersion updates storage version.
|
// UpdateStorageSchema updates storage version.
|
||||||
func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
|
func UpdateStorageSchema(lg *zap.Logger, tx backend.BatchTx) error {
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
v, err := detectStorageVersion(lg, tx)
|
v, err := detectStorageVersion(lg, tx)
|
||||||
@ -41,7 +40,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
|
|||||||
case V3_5:
|
case V3_5:
|
||||||
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
|
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
|
||||||
// All meta keys introduced in v3.6 should be filled in here.
|
// All meta keys introduced in v3.6 should be filled in here.
|
||||||
schema.UnsafeSetStorageVersion(tx, &V3_6)
|
UnsafeSetStorageVersion(tx, &V3_6)
|
||||||
case V3_6:
|
case V3_6:
|
||||||
default:
|
default:
|
||||||
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
|
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
|
||||||
@ -50,17 +49,17 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
|
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
|
||||||
v := schema.UnsafeReadStorageVersion(tx)
|
v := UnsafeReadStorageVersion(tx)
|
||||||
if v != nil {
|
if v != nil {
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
confstate := schema.UnsafeConfStateFromBackend(lg, tx)
|
confstate := UnsafeConfStateFromBackend(lg, tx)
|
||||||
if confstate == nil {
|
if confstate == nil {
|
||||||
return nil, fmt.Errorf("missing %q key", schema.MetaConfStateName)
|
return nil, fmt.Errorf("missing %q key", MetaConfStateName)
|
||||||
}
|
}
|
||||||
_, term := schema.UnsafeReadConsistentIndex(tx)
|
_, term := UnsafeReadConsistentIndex(tx)
|
||||||
if term == 0 {
|
if term == 0 {
|
||||||
return nil, fmt.Errorf("missing %q key", schema.MetaTermKeyName)
|
return nil, fmt.Errorf("missing %q key", MetaTermKeyName)
|
||||||
}
|
}
|
||||||
copied := V3_5
|
copied := V3_5
|
||||||
return &copied, nil
|
return &copied, nil
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package version
|
package schema
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
@ -24,7 +24,6 @@ import (
|
|||||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -47,7 +46,7 @@ func TestUpdateStorageVersion(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: `Backend before 3.6 without "term" should be rejected`,
|
name: `Backend before 3.6 without "term" should be rejected`,
|
||||||
version: "",
|
version: "",
|
||||||
metaKeys: [][]byte{schema.MetaConfStateName},
|
metaKeys: [][]byte{MetaConfStateName},
|
||||||
expectVersion: nil,
|
expectVersion: nil,
|
||||||
expectError: true,
|
expectError: true,
|
||||||
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
|
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
|
||||||
@ -55,25 +54,25 @@ func TestUpdateStorageVersion(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
|
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
|
||||||
version: "",
|
version: "",
|
||||||
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName},
|
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName},
|
||||||
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Backend in 3.6.0 should be skipped",
|
name: "Backend in 3.6.0 should be skipped",
|
||||||
version: "3.6.0",
|
version: "3.6.0",
|
||||||
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName},
|
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
|
||||||
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Backend with current version should be skipped",
|
name: "Backend with current version should be skipped",
|
||||||
version: version.Version,
|
version: version.Version,
|
||||||
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName},
|
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName},
|
||||||
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
expectVersion: &semver.Version{Major: 3, Minor: 6},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Backend in 3.7.0 should be skipped",
|
name: "Backend in 3.7.0 should be skipped",
|
||||||
version: "3.7.0",
|
version: "3.7.0",
|
||||||
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName, []byte("future-key")},
|
metaKeys: [][]byte{MetaTermKeyName, MetaConfStateName, MetaStorageVersionName, []byte("future-key")},
|
||||||
expectVersion: &semver.Version{Major: 3, Minor: 7},
|
expectVersion: &semver.Version{Major: 3, Minor: 7},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -86,19 +85,19 @@ func TestUpdateStorageVersion(t *testing.T) {
|
|||||||
t.Fatal("batch tx is nil")
|
t.Fatal("batch tx is nil")
|
||||||
}
|
}
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
schema.UnsafeCreateMetaBucket(tx)
|
UnsafeCreateMetaBucket(tx)
|
||||||
for _, k := range tc.metaKeys {
|
for _, k := range tc.metaKeys {
|
||||||
switch string(k) {
|
switch string(k) {
|
||||||
case string(schema.MetaConfStateName):
|
case string(MetaConfStateName):
|
||||||
schema.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
|
MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
|
||||||
case string(schema.MetaTermKeyName):
|
case string(MetaTermKeyName):
|
||||||
schema.UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||||
default:
|
default:
|
||||||
tx.UnsafePut(schema.Meta, k, []byte{})
|
tx.UnsafePut(Meta, k, []byte{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if tc.version != "" {
|
if tc.version != "" {
|
||||||
schema.UnsafeSetStorageVersion(tx, semver.New(tc.version))
|
UnsafeSetStorageVersion(tx, semver.New(tc.version))
|
||||||
}
|
}
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
be.ForceCommit()
|
be.ForceCommit()
|
||||||
@ -106,14 +105,14 @@ func TestUpdateStorageVersion(t *testing.T) {
|
|||||||
|
|
||||||
b := backend.NewDefaultBackend(tmpPath)
|
b := backend.NewDefaultBackend(tmpPath)
|
||||||
defer b.Close()
|
defer b.Close()
|
||||||
err := UpdateStorageVersion(lg, b.BatchTx())
|
err := UpdateStorageSchema(lg, b.BatchTx())
|
||||||
if (err != nil) != tc.expectError {
|
if (err != nil) != tc.expectError {
|
||||||
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
|
t.Errorf("UpgradeStorage(...) = %+v, expected error: %v", err, tc.expectError)
|
||||||
}
|
}
|
||||||
if err != nil && err.Error() != tc.expectedErrorMsg {
|
if err != nil && err.Error() != tc.expectedErrorMsg {
|
||||||
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
|
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
|
||||||
}
|
}
|
||||||
v := schema.UnsafeReadStorageVersion(b.BatchTx())
|
v := UnsafeReadStorageVersion(b.BatchTx())
|
||||||
assert.Equal(t, tc.expectVersion, v)
|
assert.Equal(t, tc.expectVersion, v)
|
||||||
})
|
})
|
||||||
}
|
}
|
Reference in New Issue
Block a user