etcdserver/api/v3rpc: document, clean up snapshot sends

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee
2020-05-14 18:39:42 -07:00
parent f1179fd70d
commit 4ddcc36057

View File

@ -18,7 +18,9 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"io" "io"
"time"
"github.com/dustin/go-humanize"
"go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver" "go.etcd.io/etcd/v3/etcdserver"
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
@ -94,6 +96,9 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
return &pb.DefragmentResponse{}, nil return &pb.DefragmentResponse{}, nil
} }
// big enough size to hold >1 OS pages in the buffer
const snapshotSendBufferSize = 32 * 1024
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
snap := ms.bg.Backend().Snapshot() snap := ms.bg.Backend().Snapshot()
pr, pw := io.Pipe() pr, pw := io.Pipe()
@ -108,19 +113,41 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
pw.Close() pw.Close()
}() }()
// send file data // record SHA digest of snapshot data
// used for integrity checks during snapshot restore operation
h := sha256.New() h := sha256.New()
br := int64(0)
buf := make([]byte, 32*1024) // buffer just holds read bytes from stream
sz := snap.Size() // response size is multiple of OS page size, fetched in boltdb
for br < sz { // e.g. 4*1024
buf := make([]byte, snapshotSendBufferSize)
sent := int64(0)
total := snap.Size()
size := humanize.Bytes(uint64(total))
start := time.Now()
ms.lg.Info("sending database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
)
for total-sent > 0 {
n, err := io.ReadFull(pr, buf) n, err := io.ReadFull(pr, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return togRPCError(err) return togRPCError(err)
} }
br += int64(n) sent += int64(n)
// if total is x * snapshotSendBufferSize. it is possible that
// resp.RemainingBytes == 0
// resp.Blob == zero byte but not nil
// does this make server response sent to client nil in proto
// and client stops receiving from snapshot stream before
// server sends snapshot SHA?
// No, the client will still receive non-nil response
// until server closes the stream with EOF
resp := &pb.SnapshotResponse{ resp := &pb.SnapshotResponse{
RemainingBytes: uint64(sz - br), RemainingBytes: uint64(total - sent),
Blob: buf[:n], Blob: buf[:n],
} }
if err = srv.Send(resp); err != nil { if err = srv.Send(resp); err != nil {
@ -129,13 +156,24 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
h.Write(buf[:n]) h.Write(buf[:n])
} }
// send sha // send SHA digest for integrity checks
// during snapshot restore operation
sha := h.Sum(nil) sha := h.Sum(nil)
ms.lg.Info("sending database sha256 checksum to client",
zap.Int64("total-bytes", total),
zap.Int("checksum-size", len(sha)),
)
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha} hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
if err := srv.Send(hresp); err != nil { if err := srv.Send(hresp); err != nil {
return togRPCError(err) return togRPCError(err)
} }
ms.lg.Info("successfully sent database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("took", humanize.Time(start)),
)
return nil return nil
} }