*: implement Endpoint Watch and new Resolver

This commit is contained in:
limeng01
2021-02-05 16:48:11 +08:00
parent dae29bb719
commit 8feb55f65c
7 changed files with 197 additions and 95 deletions

View File

@ -6,12 +6,12 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints/internal"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@ -78,73 +78,82 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
}
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
lg := m.client.GetLogger()
initUpdates := make([]*Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var iup internal.Update
if err := json.Unmarshal(kv.Value, &iup); err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
continue
}
up := &Update{
Op: Add,
Key: string(kv.Key),
Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
}
initUpdates = append(initUpdates, up)
}
upch := make(chan []*Update, 1)
if len(initUpdates) > 0 {
upch <- initUpdates
}
go m.watch(ctx, resp.Header.Revision+1, upch)
return upch, nil
}
func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
defer close(upch)
lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...)
for {
select {
case <-ctx.Done():
return
case wresp, ok := <-wch:
if !ok {
lg.Warn("watch closed", zap.String("target", m.target))
return
}
if wresp.Err() != nil {
lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
return
}
deltaUps := make([]*Update, 0, len(wresp.Events))
for _, e := range wresp.Events {
var iup internal.Update
var err error
var op Operation
switch e.Type {
case clientv3.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &iup)
op = Add
if err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
continue
}
case clientv3.EventTypeDelete:
iup = internal.Update{Op: internal.Delete}
op = Delete
default:
continue
}
up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
deltaUps = append(deltaUps, up)
}
if len(deltaUps) > 0 {
upch <- deltaUps
}
}
}
}
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {