Compare commits

...

13 Commits

Author SHA1 Message Date
b3f16d6691 version: 3.5.0-beta.4
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-05-25 20:48:48 -04:00
16214c3443 Merge pull request #13037 from ptabor/cherry-picks-3.5
Backport-3.5: seq reset + bucket as object
2021-05-25 01:27:48 -07:00
e6baf6d751 Represent bucket as object instead of []byte name.
Thanks to this change:
  - all the maps bucket -> buffer are indexed by int's instead of
string. No need to do: byte[] -> string -> hash conversion on each
access.
  - buckets are strongly typed in backend/mvcc API.
2021-05-25 09:22:25 +02:00
8bddbdc1d6 Rename seq to bucket2seq. 2021-05-25 09:21:07 +02:00
d3b3228c1f Minor formatting fix on top of https://github.com/etcd-io/etcd/pull/12568 2021-05-25 09:21:07 +02:00
261f8b1daf backend: set seq flag for each bucket buffer 2021-05-25 09:21:06 +02:00
dd22bd747e Merge pull request #13036 from gyuho/cherry-pick
[release-3.5] server: set multiple concurrentReadTx instances share one txReadBuffer.
2021-05-24 18:00:09 -07:00
7a7d6f94a7 server: set multiple concurrentReadTx instances share one txReadBuffer.
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2021-05-24 16:59:42 -07:00
c0d1450b45 Merge pull request #13003 from gyuho/cherry-pick
[backport release-3.5] applyV2 should reapply on backend only once
2021-05-18 23:50:10 -07:00
c4ebac0c57 applyV2 should reapply on backend only once
During review of:  https://github.com/etcd-io/etcd/pull/12988 spotted
that PUT is actially writing to v3-backend.
If we are replaying WAL log, it might happened that backend's
applied_index is > than the WAL's log entry. In such situation we should
skip applying on backend V3.
I think both the methods (setVersion, setMembersAttributes) are in
practice idempotent so its not that 'serious' problem, but for
formal correctness adding the proper checks.
2021-05-18 23:16:59 -07:00
379c361bc6 version: 3.5.0-beta.3
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-05-18 09:44:35 -04:00
163d9fd6f4 Merge pull request #12998 from hexfusion/cp-12997
Manual cherry pick of #12997
2021-05-18 08:20:41 -04:00
d06c97209c scripts: add missing etcdutl to release pipeline
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2021-05-18 07:57:27 -04:00
49 changed files with 537 additions and 345 deletions

View File

@ -127,6 +127,7 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=

View File

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.5.0-beta.2"
Version = "3.5.0-beta.4"
APIVersion = "unknown"
// Git SHA Value will be set during build

View File

@ -89,6 +89,7 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -5,8 +5,8 @@ go 1.16
require (
github.com/json-iterator/go v1.1.10
github.com/modern-go/reflect2 v1.0.1
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
)
replace (

View File

@ -137,6 +137,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=

View File

@ -6,8 +6,8 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/prometheus/client_golang v1.5.1
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19
google.golang.org/grpc v1.37.0
sigs.k8s.io/yaml v1.2.0

View File

@ -9,12 +9,12 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/urfave/cli v1.22.4
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/v2 v2.305.0-beta.2
go.etcd.io/etcd/client/v3 v3.5.0-beta.2
go.etcd.io/etcd/etcdutl/v3 v3.5.0-beta.2
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/v2 v2.305.0-beta.4
go.etcd.io/etcd/client/v3 v3.5.0-beta.4
go.etcd.io/etcd/etcdutl/v3 v3.5.0-beta.4
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.4
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.37.0

View File

@ -181,9 +181,11 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
@ -351,6 +353,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -493,6 +496,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.28 h1:n1tBJnnK2r7g9OW2btFH91V92STTUevLXYFb8gy9EMk=
gopkg.in/cheggaaa/pb.v1 v1.0.28/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=

View File

@ -25,11 +25,11 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6-0.20210426205525-9c92be978ae0
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/v3 v3.5.0-beta.2
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/raft/v3 v3.5.0-beta.2
go.etcd.io/etcd/server/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/v3 v3.5.0-beta.4
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/raft/v3 v3.5.0-beta.4
go.etcd.io/etcd/server/v3 v3.5.0-beta.4
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19
)

View File

@ -177,9 +177,11 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
@ -341,6 +343,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -483,6 +486,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=

20
go.mod
View File

@ -20,16 +20,16 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3
go.etcd.io/bbolt v1.3.6-0.20210426205525-9c92be978ae0
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/v2 v2.305.0-beta.2
go.etcd.io/etcd/client/v3 v3.5.0-beta.2
go.etcd.io/etcd/etcdctl/v3 v3.5.0-beta.2
go.etcd.io/etcd/etcdutl/v3 v3.5.0-beta.2
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/raft/v3 v3.5.0-beta.2
go.etcd.io/etcd/server/v3 v3.5.0-beta.2
go.etcd.io/etcd/tests/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/v2 v2.305.0-beta.4
go.etcd.io/etcd/client/v3 v3.5.0-beta.4
go.etcd.io/etcd/etcdctl/v3 v3.5.0-beta.4
go.etcd.io/etcd/etcdutl/v3 v3.5.0-beta.4
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/raft/v3 v3.5.0-beta.4
go.etcd.io/etcd/server/v3 v3.5.0-beta.4
go.etcd.io/etcd/tests/v3 v3.5.0-beta.4
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.37.0

1
go.sum
View File

@ -364,6 +364,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19
google.golang.org/grpc v1.37.0
)

View File

@ -8,7 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/pkg/errors v0.9.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
)
// Bad imports are sometimes causing attempts to pull that code.

View File

@ -30,7 +30,7 @@ IMAGEDIR=${BUILDDIR}/image-docker
mkdir -p "${IMAGEDIR}"/var/etcd
mkdir -p "${IMAGEDIR}"/var/lib/etcd
cp "${BINARYDIR}"/etcd "${BINARYDIR}"/etcdctl "${IMAGEDIR}"
cp "${BINARYDIR}"/etcd "${BINARYDIR}"/etcdctl "${BINARYDIR}"/etcdutl "${IMAGEDIR}"
cat ./"${DOCKERFILE}" > "${IMAGEDIR}"/Dockerfile

View File

@ -193,6 +193,7 @@ main() {
# Sanity checks.
"./release/etcd-${RELEASE_VERSION}-$(go env GOOS)-amd64/etcd" --version | grep -q "etcd Version: ${VERSION}" || true
"./release/etcd-${RELEASE_VERSION}-$(go env GOOS)-amd64/etcdctl" version | grep -q "etcdctl version: ${VERSION}" || true
"./release/etcd-${RELEASE_VERSION}-$(go env GOOS)-amd64/etcdutl" version | grep -q "etcdutl version: ${VERSION}" || true
# Generate SHA256SUMS
log_callout "Generating sha256sums of release artifacts."

View File

@ -30,6 +30,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
@ -45,10 +46,6 @@ var (
revisionKey = []byte("authRevision")
authBucketName = []byte("auth")
authUsersBucketName = []byte("authUsers")
authRolesBucketName = []byte("authRoles")
ErrRootUserNotExist = errors.New("auth: root user does not exist")
ErrRootRoleNotExist = errors.New("auth: root user does not have root role")
ErrUserAlreadyExist = errors.New("auth: user already exists")
@ -240,7 +237,7 @@ func (as *authStore) AuthEnable() error {
return ErrRootRoleNotExist
}
tx.UnsafePut(authBucketName, enableFlagKey, authEnabled)
tx.UnsafePut(buckets.Auth, enableFlagKey, authEnabled)
as.enabled = true
as.tokenProvider.enable()
@ -262,7 +259,7 @@ func (as *authStore) AuthDisable() {
b := as.be
tx := b.BatchTx()
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled)
as.commitRevision(tx)
tx.Unlock()
b.ForceCommit()
@ -357,7 +354,7 @@ func (as *authStore) Recover(be backend.Backend) {
as.be = be
tx := be.BatchTx()
tx.Lock()
_, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
@ -906,7 +903,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
}
func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
_, vs := tx.UnsafeRange(authUsersBucketName, []byte(username), nil, 0)
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0)
if len(vs) == 0 {
return nil
}
@ -924,7 +921,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User {
}
func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User {
_, vs := tx.UnsafeRange(authUsersBucketName, []byte{0}, []byte{0xff}, -1)
_, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
@ -946,15 +943,15 @@ func putUser(lg *zap.Logger, tx backend.BatchTx, user *authpb.User) {
if err != nil {
lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err))
}
tx.UnsafePut(authUsersBucketName, user.Name, b)
tx.UnsafePut(buckets.AuthUsers, user.Name, b)
}
func delUser(tx backend.BatchTx, username string) {
tx.UnsafeDelete(authUsersBucketName, []byte(username))
tx.UnsafeDelete(buckets.AuthUsers, []byte(username))
}
func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte(rolename), nil, 0)
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0)
if len(vs) == 0 {
return nil
}
@ -968,7 +965,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role {
}
func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role {
_, vs := tx.UnsafeRange(authRolesBucketName, []byte{0}, []byte{0xff}, -1)
_, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1)
if len(vs) == 0 {
return nil
}
@ -995,11 +992,11 @@ func putRole(lg *zap.Logger, tx backend.BatchTx, role *authpb.Role) {
)
}
tx.UnsafePut(authRolesBucketName, role.Name, b)
tx.UnsafePut(buckets.AuthRoles, role.Name, b)
}
func delRole(tx backend.BatchTx, rolename string) {
tx.UnsafeDelete(authRolesBucketName, []byte(rolename))
tx.UnsafeDelete(buckets.AuthRoles, []byte(rolename))
}
func (as *authStore) IsAuthEnabled() bool {
@ -1028,12 +1025,12 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
tx := be.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(authBucketName)
tx.UnsafeCreateBucket(authUsersBucketName)
tx.UnsafeCreateBucket(authRolesBucketName)
tx.UnsafeCreateBucket(buckets.Auth)
tx.UnsafeCreateBucket(buckets.AuthUsers)
tx.UnsafeCreateBucket(buckets.AuthRoles)
enabled := false
_, vs := tx.UnsafeRange(authBucketName, enableFlagKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
enabled = true
@ -1076,11 +1073,11 @@ func (as *authStore) commitRevision(tx backend.BatchTx) {
atomic.AddUint64(&as.revision, 1)
revBytes := make([]byte, revBytesLen)
binary.BigEndian.PutUint64(revBytes, as.Revision())
tx.UnsafePut(authBucketName, revisionKey, revBytes)
tx.UnsafePut(buckets.Auth, revisionKey, revBytes)
}
func getRevision(tx backend.BatchTx) uint64 {
_, vs := tx.UnsafeRange(authBucketName, revisionKey, nil, 0)
_, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
return 0

View File

@ -34,6 +34,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"github.com/coreos/go-semver/semver"
"github.com/prometheus/client_golang/prometheus"
@ -700,7 +701,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
keys, vals := tx.UnsafeRange(clusterBucketName, ckey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0)
if len(keys) == 0 {
return nil
}
@ -719,7 +720,7 @@ func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
keys, vals := tx.UnsafeRange(clusterBucketName, dkey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0)
if len(keys) == 0 {
return nil
}

View File

@ -19,8 +19,8 @@ import (
"log"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -36,13 +36,13 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
}
tx.UnsafePut(mvcc.MetaBucketName, confStateKey, confStateBytes)
tx.UnsafePut(buckets.Meta, confStateKey, confStateBytes)
}
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(mvcc.MetaBucketName, confStateKey, nil, 0)
keys, vals := tx.UnsafeRange(buckets.Meta, confStateKey, nil, 0)
if len(keys) == 0 {
return nil
}

View File

@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
@ -36,10 +37,6 @@ const (
)
var (
membersBucketName = []byte("members")
membersRemovedBucketName = []byte("members_removed")
clusterBucketName = []byte("cluster")
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
@ -54,7 +51,7 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(membersBucketName, mkey, mvalue)
tx.UnsafePut(buckets.Members, mkey, mvalue)
}
// TrimClusterFromBackend removes all information about cluster (versions)
@ -63,7 +60,7 @@ func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDeleteBucket(clusterBucketName)
tx.UnsafeDeleteBucket(buckets.Cluster)
return nil
}
@ -73,8 +70,8 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(membersBucketName, mkey)
tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed"))
tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
}
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
@ -84,7 +81,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(lg, k)
m := &Member{ID: memberId}
if err := json.Unmarshal(v, &m); err != nil {
@ -97,7 +94,7 @@ func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*M
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
}
err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(lg, k)
removed[memberId] = true
return nil
@ -123,8 +120,8 @@ func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
tx.UnsafeDelete(membersBucketName, k)
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
tx.UnsafeDelete(buckets.Members, k)
lg.Debug("Removed member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
return nil
@ -132,8 +129,8 @@ func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
if err != nil {
return err
}
return tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
tx.UnsafeDelete(membersRemovedBucketName, k)
return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
tx.UnsafeDelete(buckets.MembersRemoved, k)
lg.Debug("Removed removed_member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
return nil
@ -168,7 +165,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(clusterBucketName, ckey, []byte(ver.String()))
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
}
// The field is populated since etcd v3.5.
@ -181,7 +178,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(clusterBucketName, dkey, dvalue)
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
}
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
@ -300,9 +297,9 @@ func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(membersBucketName)
tx.UnsafeCreateBucket(membersRemovedBucketName)
tx.UnsafeCreateBucket(clusterBucketName)
tx.UnsafeCreateBucket(buckets.Members)
tx.UnsafeCreateBucket(buckets.MembersRemoved)
tx.UnsafeCreateBucket(buckets.Cluster)
}
func MemberStoreKey(id types.ID) string {

View File

@ -21,14 +21,11 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
var (
alarmBucketName = []byte("alarm")
)
type BackendGetter interface {
Backend() backend.Backend
}
@ -69,7 +66,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().UnsafePut(alarmBucketName, v, nil)
b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
b.BatchTx().Unlock()
return newAlarm
@ -98,7 +95,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
b := a.bg.Backend()
b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(alarmBucketName, v)
b.BatchTx().UnsafeDelete(buckets.Alarm, v)
b.BatchTx().Unlock()
return m
@ -126,8 +123,8 @@ func (a *AlarmStore) restore() error {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(alarmBucketName)
err := tx.UnsafeForEach(alarmBucketName, func(k, v []byte) error {
tx.UnsafeCreateBucket(buckets.Alarm)
err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
var m pb.AlarmMember
if err := m.Unmarshal(k); err != nil {
return err

View File

@ -36,7 +36,7 @@ const v2Version = "v2"
type ApplierV2 interface {
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2) Response
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}
@ -67,7 +67,7 @@ func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}
func (a *applierV2store) Put(r *RequestV2) Response {
func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
@ -89,7 +89,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, true)
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// return an empty response since there is no consumer.
return Response{}
@ -98,7 +98,7 @@ func (a *applierV2store) Put(r *RequestV2) Response {
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth)
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return Response{}
}
@ -117,7 +117,7 @@ func (a *applierV2store) Sync(r *RequestV2) Response {
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
stringer := panicAlternativeStringer{
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
@ -132,7 +132,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r)
return s.applyV2.Put(r, shouldApplyV3)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":

View File

@ -20,13 +20,7 @@ import (
"sync/atomic"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
var (
MetaBucketName = []byte("meta")
ConsistentIndexKeyName = []byte("consistent_index")
TermKeyName = []byte("term")
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
type Backend interface {
@ -125,26 +119,26 @@ func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
tx.UnsafeCreateBucket(buckets.Meta)
}
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
func CreateMetaBucket(tx backend.BatchTx) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(MetaBucketName)
tx.UnsafeCreateBucket(buckets.Meta)
}
// unsafeGetConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0)
_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
_, ts := tx.UnsafeRange(MetaBucketName, TermKeyName, nil, 0)
_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
@ -180,11 +174,11 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64,
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs1)
tx.UnsafePut(buckets.Meta, buckets.MetaConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(MetaBucketName, TermKeyName, bs2)
tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2)
}
}

View File

@ -2181,14 +2181,14 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
return
}

View File

@ -472,7 +472,7 @@ func TestApplyRequest(t *testing.T) {
v2store: st,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
resp := srv.applyV2Request((*RequestV2)(&tt.req))
resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth)
if !reflect.DeepEqual(resp, tt.wresp) {
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
@ -500,7 +500,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
Path: membership.MemberAttributesStorePath(1),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
}
srv.applyV2Request((*RequestV2)(&req))
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w)

View File

@ -19,6 +19,7 @@ import (
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
)
@ -52,7 +53,7 @@ func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, e
}
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Put(r), nil
return a.applier.Put(r, membership.ApplyBoth), nil
}
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {

View File

@ -27,12 +27,12 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.6-0.20210426205525-9c92be978ae0
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/v2 v2.305.0-beta.2
go.etcd.io/etcd/client/v3 v3.5.0-beta.2
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/raft/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/v2 v2.305.0-beta.4
go.etcd.io/etcd/client/v3 v3.5.0-beta.4
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/raft/v3 v3.5.0-beta.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/otlp v0.20.0

View File

@ -353,6 +353,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -27,6 +27,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -39,8 +40,6 @@ const MaxLeaseTTL = 9000000000
var (
forever = time.Time{}
leaseBucketName = []byte("lease")
// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000
@ -337,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
txn.End()
@ -771,8 +770,8 @@ func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(leaseBucketName)
_, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
@ -831,7 +830,7 @@ func (l *Lease) persistTo(b backend.Backend) {
}
b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val)
b.BatchTx().UnsafePut(buckets.Lease, key, val)
b.BatchTx().Unlock()
}

View File

@ -28,6 +28,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -92,7 +93,7 @@ func TestLessorGrant(t *testing.T) {
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 1 {
t.Errorf("len(vs) = %d, want 1", len(vs))
}
@ -195,7 +196,7 @@ func TestLessorRevoke(t *testing.T) {
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(leaseBucketName, int64ToBytes(int64(l.ID)), nil, 0)
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 0 {
t.Errorf("len(vs) = %d, want 0", len(vs))
}

View File

@ -53,7 +53,7 @@ type Backend interface {
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
@ -79,6 +79,12 @@ type Snapshot interface {
Close() error
}
type txReadBufferCache struct {
mu sync.Mutex
buf *txReadBuffer
bufVersion uint64
}
type backend struct {
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
@ -102,6 +108,11 @@ type backend struct {
batchTx *batchTxBuffered
readTx *readTx
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
// When creating "concurrentReadTx":
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
txReadBufferCache txReadBufferCache
stopc chan struct{}
donec chan struct{}
@ -183,19 +194,26 @@ func newBackend(bcfg BackendConfig) *backend {
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[string]*bolt.Bucket),
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
lg: bcfg.Logger,
}
b.batchTx = newBatchTxBuffered(b)
// We set it after newBatchTxBuffered to skip the 'empty' commit.
b.hooks = bcfg.Hooks
@ -221,10 +239,68 @@ func (b *backend) ConcurrentReadTx() ReadTx {
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
// inspect/update cache recency iff there's no ongoing update to the cache
// this falls through if there's no cache update
// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
// which requires write lock to update "readTx.baseReadTx.buf".
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
// by avoiding copying "readTx.baseReadTx.buf".
b.txReadBufferCache.mu.Lock()
curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion
isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer
var buf *txReadBuffer
switch {
case isEmptyCache:
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
// this is only supposed to run once so there won't be much overhead
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
// to maximize the concurrency, try unsafe copy of buffer
// release the lock while copying buffer -- cache may become stale again and
// get overwritten by someone else.
// therefore, we need to check the readTx buffer version again
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
// neither empty nor stale cache, just use the current buffer
buf = curCache
}
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
// as a result, curCacheVer could be no longer the same as
// txReadBufferCache.bufVersion
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
// then the cache became stale while copying "readTx.baseReadTx.buf".
// It is safe to not update "txReadBufferCache.buf", because the next following
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}
b.txReadBufferCache.mu.Unlock()
// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: b.readTx.buf.unsafeCopy(),
buf: *buf,
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
@ -282,12 +358,7 @@ func (b *backend) Snapshot() Snapshot {
return &snapshot{tx, stopc, donec}
}
type IgnoreKey struct {
Bucket string
Key string
}
func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
b.mu.RLock()
@ -301,8 +372,7 @@ func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
}
h.Write(next)
b.ForEach(func(k, v []byte) error {
bk := IgnoreKey{Bucket: string(next), Key: string(k)}
if _, ok := ignores[bk]; !ok {
if ignores != nil && !ignores(next, k) {
h.Write(k)
h.Write(v)
}
@ -511,7 +581,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
if berr != nil {
return berr
}
tmpb.FillPercent = 0.9 // for seq write in for each
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
if err = b.ForEach(func(k, v []byte) error {
count++
@ -525,7 +595,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return err
}
tmpb = tmptx.Bucket(next)
tmpb.FillPercent = 0.9 // for seq write in for each
tmpb.FillPercent = 0.9 // for bucket2seq write in for each
count = 0
}

View File

@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func BenchmarkBackendPut(b *testing.B) {
@ -41,13 +42,13 @@ func BenchmarkBackendPut(b *testing.B) {
batchTx := backend.BatchTx()
batchTx.Lock()
batchTx.UnsafeCreateBucket([]byte("test"))
batchTx.UnsafeCreateBucket(buckets.Test)
batchTx.Unlock()
b.ResetTimer()
for i := 0; i < b.N; i++ {
batchTx.Lock()
batchTx.UnsafePut([]byte("test"), keys[i], value)
batchTx.UnsafePut(buckets.Test, keys[i], value)
batchTx.Unlock()
}
}

View File

@ -25,6 +25,7 @@ import (
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func TestBackendClose(t *testing.T) {
@ -52,8 +53,8 @@ func TestBackendSnapshot(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
@ -77,7 +78,7 @@ func TestBackendSnapshot(t *testing.T) {
newTx := nb.BatchTx()
newTx.Lock()
ks, _ := newTx.UnsafeRange([]byte("test"), []byte("foo"), []byte("goo"), 0)
ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0)
if len(ks) != 1 {
t.Errorf("len(kvs) = %d, want 1", len(ks))
}
@ -94,8 +95,8 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
for i := 0; i < 10; i++ {
@ -126,9 +127,9 @@ func TestBackendDefrag(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafeCreateBucket(buckets.Test)
for i := 0; i < backend.DefragLimitForTest()+100; i++ {
tx.UnsafePut([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
}
tx.Unlock()
b.ForceCommit()
@ -137,7 +138,7 @@ func TestBackendDefrag(t *testing.T) {
tx = b.BatchTx()
tx.Lock()
for i := 0; i < 50; i++ {
tx.UnsafeDelete([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)))
tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)))
}
tx.Unlock()
b.ForceCommit()
@ -171,8 +172,8 @@ func TestBackendDefrag(t *testing.T) {
// try put more keys after shrink.
tx = b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("more"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
}
@ -184,15 +185,15 @@ func TestBackendWriteback(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar"))
tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz"))
tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar"))
tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz"))
tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
tx.Unlock()
// overwrites should be propagated too
tx.Lock()
tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
tx.Unlock()
keys := []struct {
@ -242,12 +243,14 @@ func TestBackendWriteback(t *testing.T) {
}
rtx := b.ReadTx()
for i, tt := range keys {
rtx.RLock()
k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit)
rtx.RUnlock()
if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
}
func() {
rtx.RLock()
defer rtx.RUnlock()
k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit)
if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
}
}()
}
}
@ -258,20 +261,20 @@ func TestConcurrentReadTx(t *testing.T) {
wtx1 := b.BatchTx()
wtx1.Lock()
wtx1.UnsafeCreateBucket([]byte("key"))
wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
wtx1.UnsafeCreateBucket(buckets.Key)
wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC"))
wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
wtx1.Unlock()
wtx2 := b.BatchTx()
wtx2.Lock()
wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF"))
wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
wtx2.Unlock()
rtx := b.ConcurrentReadTx()
rtx.RLock() // no-op
k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0)
rtx.RUnlock()
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
@ -288,10 +291,10 @@ func TestBackendWritebackForEach(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafeCreateBucket(buckets.Key)
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
tx.UnsafePut(buckets.Key, k, []byte("bar"))
}
tx.Unlock()
@ -299,10 +302,10 @@ func TestBackendWritebackForEach(t *testing.T) {
b.ForceCommit()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafeCreateBucket(buckets.Key)
for i := 5; i < 20; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut([]byte("key"), k, []byte("bar"))
tx.UnsafePut(buckets.Key, k, []byte("bar"))
}
tx.Unlock()
@ -313,7 +316,7 @@ func TestBackendWritebackForEach(t *testing.T) {
}
rtx := b.ReadTx()
rtx.RLock()
assert.NoError(t, rtx.UnsafeForEach([]byte("key"), getSeq))
assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq))
rtx.RUnlock()
partialSeq := seq
@ -322,7 +325,7 @@ func TestBackendWritebackForEach(t *testing.T) {
b.ForceCommit()
tx.Lock()
assert.NoError(t, tx.UnsafeForEach([]byte("key"), getSeq))
assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq))
tx.Unlock()
if seq != partialSeq {

View File

@ -25,13 +25,30 @@ import (
"go.uber.org/zap"
)
type BucketID int
type Bucket interface {
// ID returns a unique identifier of a bucket.
// The id must NOT be persisted and can be used as lightweight identificator
// in the in-memory maps.
ID() BucketID
Name() []byte
// String implements Stringer (human readable name).
String() string
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// is known to never overwrite any key so range is safe.
IsSafeRangeBucket() bool
}
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafeDeleteBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
@ -69,24 +86,24 @@ func (t *batchTx) RUnlock() {
panic("unexpected RUnlock")
}
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
_, err := t.tx.CreateBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketExists {
t.backend.lg.Fatal(
"failed to create a bucket",
zap.String("bucket-name", string(name)),
zap.Stringer("bucket-name", bucket),
zap.Error(err),
)
}
t.pending++
}
func (t *batchTx) UnsafeDeleteBucket(name []byte) {
err := t.tx.DeleteBucket(name)
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
err := t.tx.DeleteBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketNotFound {
t.backend.lg.Fatal(
"failed to delete a bucket",
zap.String("bucket-name", string(name)),
zap.Stringer("bucket-name", bucket),
zap.Error(err),
)
}
@ -94,21 +111,21 @@ func (t *batchTx) UnsafeDeleteBucket(name []byte) {
}
// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, false)
func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, false)
}
// UnsafeSeqPut must be called holding the lock on the tx.
func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.unsafePut(bucketName, key, value, true)
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.unsafePut(bucket, key, value, true)
}
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
@ -120,7 +137,7 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
if err := bucket.Put(key, value); err != nil {
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
@ -128,12 +145,12 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
}
// UnsafeRange must be called holding the lock on the tx.
func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
@ -163,12 +180,12 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
}
// UnsafeDelete must be called holding the lock on the tx.
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
bucket := t.tx.Bucket(bucketType.Name())
if bucket == nil {
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Stack("stack"),
)
}
@ -176,7 +193,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
if err != nil {
t.backend.lg.Fatal(
"failed to delete a key",
zap.String("bucket-name", string(bucketName)),
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
@ -184,12 +201,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
}
// UnsafeForEach must be called holding the lock on the tx.
func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return unsafeForEach(t.tx, bucketName, visitor)
func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
return unsafeForEach(t.tx, bucket, visitor)
}
func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket); b != nil {
func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
if b := tx.Bucket(bucket.Name()); b != nil {
return b.ForEach(visitor)
}
return nil
@ -253,8 +270,8 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered {
tx := &batchTxBuffered{
batchTx: batchTx{backend: backend},
buf: txWriteBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
seq: true,
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bucket2seq: make(map[BucketID]bool),
},
}
tx.Commit()
@ -316,12 +333,12 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}
}
func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafePut(bucket, key, value)
t.buf.put(bucket, key, value)
}
func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucketName, key, value)
t.buf.putSeq(bucketName, key, value)
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}

View File

@ -22,6 +22,7 @@ import (
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func TestBatchTxPut(t *testing.T) {
@ -33,18 +34,18 @@ func TestBatchTxPut(t *testing.T) {
tx.Lock()
// create bucket
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafeCreateBucket(buckets.Test)
// put
v := []byte("bar")
tx.UnsafePut([]byte("test"), []byte("foo"), v)
tx.UnsafePut(buckets.Test, []byte("foo"), v)
tx.Unlock()
// check put result before and after tx is committed
for k := 0; k < 2; k++ {
tx.Lock()
_, gv := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0)
_, gv := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
tx.Unlock()
if !reflect.DeepEqual(gv[0], v) {
t.Errorf("v = %s, want %s", string(gv[0]), string(v))
@ -61,12 +62,12 @@ func TestBatchTxRange(t *testing.T) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafeCreateBucket(buckets.Test)
// put keys
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")}
for i := range allKeys {
tx.UnsafePut([]byte("test"), allKeys[i], allVals[i])
tx.UnsafePut(buckets.Test, allKeys[i], allVals[i])
}
tests := []struct {
@ -114,7 +115,7 @@ func TestBatchTxRange(t *testing.T) {
},
}
for i, tt := range tests {
keys, vals := tx.UnsafeRange([]byte("test"), tt.key, tt.endKey, tt.limit)
keys, vals := tx.UnsafeRange(buckets.Test, tt.key, tt.endKey, tt.limit)
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys)
}
@ -131,17 +132,17 @@ func TestBatchTxDelete(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeDelete([]byte("test"), []byte("foo"))
tx.UnsafeDelete(buckets.Test, []byte("foo"))
tx.Unlock()
// check put result before and after tx is committed
for k := 0; k < 2; k++ {
tx.Lock()
ks, _ := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0)
ks, _ := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
tx.Unlock()
if len(ks) != 0 {
t.Errorf("keys on foo = %v, want nil", ks)
@ -156,15 +157,15 @@ func TestBatchTxCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()
// check whether put happens via db view
backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("test"))
bucket := tx.Bucket(buckets.Test.Name())
if bucket == nil {
t.Errorf("bucket test does not exit")
return nil
@ -185,14 +186,14 @@ func TestBatchTxBatchLimitCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
// batch limit commit should have been triggered
// check whether put happens via db view
backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte("test"))
bucket := tx.Bucket(buckets.Test.Name())
if bucket == nil {
t.Errorf("bucket test does not exit")
return nil

View File

@ -22,10 +22,11 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
var (
bucket = []byte("bucket")
bucket = buckets.Test
key = []byte("key")
)

View File

@ -15,17 +15,15 @@
package backend
import (
"bytes"
"math"
"sync"
bolt "go.etcd.io/bbolt"
)
// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
// overwrites on a bucket should only fetch with limit=1, but IsSafeRangeBucket
// is known to never overwrite any key so range is safe.
var safeRangeBucket = []byte("key")
type ReadTx interface {
Lock()
@ -33,8 +31,8 @@ type ReadTx interface {
RLock()
RUnlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
// Base type for readTx and concurrentReadTx to eliminate duplicate functions between these
@ -47,12 +45,12 @@ type baseReadTx struct {
// txMu protects accesses to buckets and tx on Range requests.
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
buckets map[BucketID]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}
func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
@ -64,19 +62,19 @@ func (baseReadTx *baseReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v
}
return visitor(k, v)
}
if err := baseReadTx.buf.ForEach(bucketName, getDups); err != nil {
if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
return err
}
baseReadTx.txMu.Lock()
err := unsafeForEach(baseReadTx.tx, bucketName, visitNoDup)
err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
baseReadTx.txMu.Unlock()
if err != nil {
return err
}
return baseReadTx.buf.ForEach(bucketName, visitor)
return baseReadTx.buf.ForEach(bucket, visitor)
}
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
@ -84,16 +82,16 @@ func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
if limit > 1 && !bucketType.IsSafeRangeBucket() {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
// find/cache bucket
bn := string(bucketName)
bn := bucketType.ID()
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
@ -101,7 +99,7 @@ func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit
if !ok {
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketName)
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
}
@ -133,7 +131,7 @@ func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.buckets = make(map[BucketID]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}

View File

@ -19,9 +19,11 @@ import (
"sort"
)
const bucketBufferInitialSize = 512
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
buckets map[BucketID]*bucketBuffer
}
func (txb *txBuffer) reset() {
@ -37,23 +39,42 @@ func (txb *txBuffer) reset() {
// txWriteBuffer buffers writes of pending updates that have not yet committed.
type txWriteBuffer struct {
txBuffer
seq bool
// Map from bucket ID into information whether this bucket is edited
// sequentially (i.e. keys are growing monotonically).
bucket2seq map[BucketID]bool
}
func (txw *txWriteBuffer) put(bucket, k, v []byte) {
txw.seq = false
txw.putSeq(bucket, k, v)
func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
txw.bucket2seq[bucket.ID()] = false
txw.putInternal(bucket, k, v)
}
func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
b, ok := txw.buckets[string(bucket)]
func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) {
// TODO: Add (in tests?) verification whether k>b[len(b)]
txw.putInternal(bucket, k, v)
}
func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {
b, ok := txw.buckets[bucket.ID()]
if !ok {
b = newBucketBuffer()
txw.buckets[string(bucket)] = b
txw.buckets[bucket.ID()] = b
}
b.add(k, v)
}
func (txw *txWriteBuffer) reset() {
txw.txBuffer.reset()
for k := range txw.bucket2seq {
v, ok := txw.buckets[k]
if !ok {
delete(txw.bucket2seq, k)
} else if v.used == 0 {
txw.bucket2seq[k] = true
}
}
}
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
for k, wb := range txw.buckets {
rb, ok := txr.buckets[k]
@ -62,27 +83,33 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
txr.buckets[k] = wb
continue
}
if !txw.seq && wb.used > 1 {
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
// assume no duplicate keys
sort.Sort(wb)
}
rb.merge(wb)
}
txw.reset()
// increase the buffer version
txr.bufVersion++
}
// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
type txReadBuffer struct {
txBuffer
// bufVersion is used to check if the buffer is modified recently
bufVersion uint64
}
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[bucket.ID()]; b != nil {
return b.Range(key, endKey, limit)
}
return nil, nil
}
func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
if b := txr.buckets[string(bucketName)]; b != nil {
func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {
if b := txr.buckets[bucket.ID()]; b != nil {
return b.ForEach(visitor)
}
return nil
@ -92,8 +119,9 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txrCopy := txReadBuffer{
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)),
},
bufVersion: 0,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
@ -114,7 +142,7 @@ type bucketBuffer struct {
}
func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
}
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {

View File

@ -0,0 +1,80 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"bytes"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
var (
keyBucketName = []byte("key")
metaBucketName = []byte("meta")
leaseBucketName = []byte("lease")
alarmBucketName = []byte("alarm")
clusterBucketName = []byte("cluster")
membersBucketName = []byte("members")
membersRemovedBucketName = []byte("members_removed")
authBucketName = []byte("auth")
authUsersBucketName = []byte("authUsers")
authRolesBucketName = []byte("authRoles")
testBucketName = []byte("test")
)
var (
Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
Meta = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
Lease = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
Alarm = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
Members = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})
Auth = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})
Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
)
type bucket struct {
id backend.BucketID
name []byte
safeRangeBucket bool
}
func (b bucket) ID() backend.BucketID { return b.id }
func (b bucket) Name() []byte { return b.name }
func (b bucket) String() string { return string(b.Name()) }
func (b bucket) IsSafeRangeBucket() bool { return b.safeRangeBucket }
var (
MetaConsistentIndexKeyName = []byte("consistent_index")
MetaTermKeyName = []byte("term")
)
// DefaultIgnores defines buckets & keys to ignore in hash checking.
func DefaultIgnores(bucket, key []byte) bool {
// consistent index & term might be changed due to v2 internal sync, which
// is not controllable by the user.
return bytes.Compare(bucket, Meta.Name()) == 0 &&
(bytes.Compare(key, MetaTermKeyName) == 0 || bytes.Compare(key, MetaConsistentIndexKeyName) == 0)
}

View File

@ -26,17 +26,14 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/schedule"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
var (
keyBucketName = []byte("key")
MetaBucketName = cindex.MetaBucketName
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
@ -123,8 +120,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(keyBucketName)
cindex.UnsafeCreateMetaBucket(tx)
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafeCreateBucket(buckets.Meta)
tx.Unlock()
s.b.ForceCommit()
@ -162,7 +159,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
start := time.Now()
s.b.ForceCommit()
h, err := s.b.Hash(DefaultIgnores)
h, err := s.b.Hash(buckets.DefaultIgnores)
hashSec.Observe(time.Since(start).Seconds())
return h, s.currentRev, err
@ -198,8 +195,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
lower := revision{main: compactRev + 1}
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(keyBucketName)
err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
h.Write(buckets.Key.Name())
err = tx.UnsafeForEach(buckets.Key, func(k, v []byte) error {
kr := bytesToRev(k)
if !upper.GreaterThan(kr) {
return nil
@ -242,7 +239,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafePut(MetaBucketName, scheduledCompactKeyName, rbytes)
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
tx.Unlock()
// ensure that desired compaction is persisted
s.b.ForceCommit()
@ -297,18 +294,6 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
return s.compact(trace, rev)
}
// DefaultIgnores is a map of keys to ignore in hash checking.
var DefaultIgnores map[backend.IgnoreKey]struct{}
func init() {
DefaultIgnores = map[backend.IgnoreKey]struct{}{
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
{Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {},
{Bucket: string(MetaBucketName), Key: string(cindex.TermKeyName)}: {},
}
}
func (s *store) Commit() {
s.mu.Lock()
defer s.mu.Unlock()
@ -352,20 +337,20 @@ func (s *store) restore() error {
tx := s.b.BatchTx()
tx.Lock()
_, finishedCompactBytes := tx.UnsafeRange(MetaBucketName, finishedCompactKeyName, nil, 0)
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
s.revMu.Lock()
s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
s.lg.Info(
"restored last compact revision",
zap.String("meta-bucket-name", string(MetaBucketName)),
zap.Stringer("meta-bucket-name", buckets.Meta),
zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
s.revMu.Unlock()
}
_, scheduledCompactBytes := tx.UnsafeRange(MetaBucketName, scheduledCompactKeyName, nil, 0)
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
@ -375,7 +360,7 @@ func (s *store) restore() error {
keysGauge.Set(0)
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
@ -436,7 +421,7 @@ func (s *store) restore() error {
s.lg.Info(
"resume scheduled compaction",
zap.String("meta-bucket-name", string(MetaBucketName)),
zap.Stringer("meta-bucket-name", buckets.Meta),
zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)

View File

@ -18,6 +18,7 @@ import (
"encoding/binary"
"time"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -39,11 +40,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
tx := s.b.BatchTx()
tx.Lock()
keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit))
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
for _, key := range keys {
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(keyBucketName, key)
tx.UnsafeDelete(buckets.Key, key)
keyCompactions++
}
}
@ -51,7 +52,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
if len(keys) < s.cfg.CompactionBatchLimit {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(MetaBucketName, finishedCompactKeyName, rbytes)
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
tx.Unlock()
s.lg.Info(
"finished scheduled compaction",

View File

@ -24,6 +24,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -74,7 +75,7 @@ func TestScheduleCompaction(t *testing.T) {
ibytes := newRevBytes()
for _, rev := range revs {
revToBytes(rev, ibytes)
tx.UnsafePut(keyBucketName, ibytes, []byte("bar"))
tx.UnsafePut(buckets.Key, ibytes, []byte("bar"))
}
tx.Unlock()
@ -83,12 +84,12 @@ func TestScheduleCompaction(t *testing.T) {
tx.Lock()
for _, rev := range tt.wrevs {
revToBytes(rev, ibytes)
keys, _ := tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
keys, _ := tx.UnsafeRange(buckets.Key, ibytes, nil, 0)
if len(keys) != 1 {
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))
}
}
_, vals := tx.UnsafeRange(MetaBucketName, finishedCompactKeyName, nil, 0)
_, vals := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
revToBytes(revision{main: tt.rev}, ibytes)
if w := [][]byte{ibytes}; !reflect.DeepEqual(vals, w) {
t.Errorf("#%d: vals on %v = %+v, want %+v", i, finishedCompactKeyName, vals, w)

View File

@ -37,6 +37,7 @@ import (
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -148,12 +149,12 @@ func TestStorePut(t *testing.T) {
}
wact := []testutil.Action{
{Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}},
{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
}
if tt.rr != nil {
wact = []testutil.Action{
{Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}},
{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
}
}
@ -228,7 +229,7 @@ func TestStoreRange(t *testing.T) {
wstart := newRevBytes()
revToBytes(tt.idxr.revs[0], wstart)
wact := []testutil.Action{
{Name: "range", Params: []interface{}{keyBucketName, wstart, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Key, wstart, []byte(nil), int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@ -303,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) {
t.Errorf("#%d: marshal err = %v, want nil", i, err)
}
wact := []testutil.Action{
{Name: "seqput", Params: []interface{}{keyBucketName, tt.wkey, data}},
{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@ -342,10 +343,10 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "put", Params: []interface{}{MetaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{keyBucketName, key2}},
{Name: "put", Params: []interface{}{MetaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
{Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -398,9 +399,9 @@ func TestStoreRestore(t *testing.T) {
t.Errorf("current rev = %v, want 5", s.currentRev)
}
wact := []testutil.Action{
{Name: "range", Params: []interface{}{MetaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{MetaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -484,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(MetaBucketName, scheduledCompactKeyName, rbytes)
tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
tx.Unlock()
s0.Close()
@ -513,7 +514,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
for i := 0; i < 5; i++ {
tx = s.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
@ -870,27 +871,27 @@ type fakeBatchTx struct {
rangeRespc chan rangeResp
}
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
func (b *fakeBatchTx) RUnlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafeDeleteBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) RLock() {}
func (b *fakeBatchTx) RUnlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(bucket backend.Bucket) {}
func (b *fakeBatchTx) UnsafeDeleteBucket(bucket backend.Bucket) {}
func (b *fakeBatchTx) UnsafePut(bucket backend.Bucket, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucket, key, value}})
}
func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}})
func (b *fakeBatchTx) UnsafeSeqPut(bucket backend.Bucket, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucket, key, value}})
}
func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}})
func (b *fakeBatchTx) UnsafeRange(bucket backend.Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucket, key, endKey, limit}})
r := <-b.rangeRespc
return r.keys, r.vals
}
func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) {
b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}})
func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucket, key}})
}
func (b *fakeBatchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
return nil
}
func (b *fakeBatchTx) Commit() {}
@ -900,17 +901,17 @@ type fakeBackend struct {
tx *fakeBatchTx
}
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
func (b *fakeBackend) OpenReadTxN() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(func(bucketName, keyName []byte) bool) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
func (b *fakeBackend) OpenReadTxN() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
type indexGetResp struct {
rev revision

View File

@ -21,6 +21,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -159,7 +160,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
default:
}
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
_, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0)
if len(vs) != 1 {
tr.s.lg.Fatal(
"range failed to find revision pair",
@ -214,7 +215,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
}
tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")
@ -275,7 +276,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
)
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
tw.storeTxnRead.s.lg.Fatal(

View File

@ -19,6 +19,7 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
)
func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
@ -31,6 +32,6 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
}
be.BatchTx().Lock()
be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
be.BatchTx().UnsafePut(buckets.Key, ibytes, d)
be.BatchTx().Unlock()
}

View File

@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -353,7 +354,7 @@ func (s *watchableStore) syncWatchers() int {
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
tx.RUnlock()
evs := kvsToEvents(s.store.lg, wg, revs, vs)

View File

@ -28,14 +28,14 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6-0.20210426205525-9c92be978ae0
go.etcd.io/etcd/api/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/client/v2 v2.305.0-beta.2
go.etcd.io/etcd/client/v3 v3.5.0-beta.2
go.etcd.io/etcd/etcdutl/v3 v3.5.0-beta.2
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.2
go.etcd.io/etcd/raft/v3 v3.5.0-beta.2
go.etcd.io/etcd/server/v3 v3.5.0-beta.2
go.etcd.io/etcd/api/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/v2 v2.305.0-beta.4
go.etcd.io/etcd/client/v3 v3.5.0-beta.4
go.etcd.io/etcd/etcdutl/v3 v3.5.0-beta.4
go.etcd.io/etcd/pkg/v3 v3.5.0-beta.4
go.etcd.io/etcd/raft/v3 v3.5.0-beta.4
go.etcd.io/etcd/server/v3 v3.5.0-beta.4
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9

View File

@ -358,6 +358,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -20,10 +20,10 @@ import (
"path/filepath"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
bolt "go.etcd.io/bbolt"
@ -163,7 +163,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error)
func getHash(dbPath string) (hash uint32, err error) {
b := backend.NewDefaultBackend(dbPath)
return b.Hash(mvcc.DefaultIgnores)
return b.Hash(buckets.DefaultIgnores)
}
// TODO: revert by revision and find specified hash value