使⽤etcd实现动态分布式选主
⽇常开发中经常会有后台运⾏的worker类任务,由于服务是分布式的,我们可能会有多个分布式的worker同时在运⾏,有时候我们需要分布式下只有⼀个worker在运⾏,这时候就可以⽤到etcd的分布式选主。
etcd中concurrency包下已经帮我们实现好了选主,我们只需要调⽤其api实现就可以了,下⾯我们分析下etcd是如何实现选主机制的。直接进⾏源码分析:
// Campaign puts a value as eligible for the election on the prefix
// key.
// Multiple ssions can participate in the election for the
// same pref马钱子的毒性
ix, but only one can be the leader at a time.
//
// If the context is 'context.TODO()/context.Background()', the Campaign
// will continue to be blocked for other keys to be deleted, unless rver
// returns a non-recoverable error (e.g. ErrCompacted).
// Otherwi, until the context is not cancelled or timed-out, Campaign will
// continue to be blocked until it becomes the leader.
// 多个etcd的ssion可以通过prefix来参与选举。但是只有⼀个ssion能成为leader。
// Campaign⽅法会阻塞,直到ssion成功成为leader才返回。
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.ssion
client := e.ssion.Client()
// 根据前缀和租约创建当前key
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lea())
// 如果是第⼀次创建key,那么key的revision为0
// 这⾥⽤到了etcd的事务,如果if判断为true,那么put这个key,否则get这个key;最终都能获取到这个key的内容。 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v昆明旅游攻略
3.OpPut(k, val, v3.WithLea(s.Lea())))
txn = txn.El(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
// 这⾥是事务中if判断为fal,即执⾏了el
if !resp.Succeeded {恬的组词
kv := resp.Respons[0].GetResponRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val { // 判定val是否相同,不相同的话,在不更换leader的情况下,更新val
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
// 等待prefix前缀下所有⽐当前key的revision⼩的其他key都被删除后,才返回,竞选为leader
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in ca of context cancel
lect {
ca <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wr v3.WatchRespon
// 这⾥watch指定的key,对于这个key所有events事件,都会收到服务端的推送。
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE { // 如果当前这个key被删除了,那么会退出这个⽅法,watch下⼀个key。
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponHeader, error) {
// option,获取createRevision不⼤于maxCreateRev的key,只取最后⼀个revision最⼤的
getOpts := append(v3.WithLastCreate(), v3.WithMaxC有关环保的手抄报
reateRev(maxCreateRev))
for {
// 获取前缀prefix下,所有⽐指定revision⼩的key
resp, err := client.Get(ctx, pfx, )
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
// 去watch revision最⼤的key,这⾥也会阻塞的watch。外层有循环判断,要等所有⽐revision⼩的key的没了,才退出。
// 下⽅有具体的说明
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
总体来说还是⽐较好理解的,主要是利⽤watch机制来实现了节点在不是leader的时候的阻塞机制。
可以看到,每个节点都创建了⾃⼰的key,但是这些key的前缀是⼀致的,选主是根据前缀去选主的。
如果有a,b,c三个节点同时去竞选,分别对应竞选的createRevision是0,1,2,那么每个节点会watch⽐⾃⼰createRevision⼩并且最⼤的节点,这是个循环的过程,等到所有⽐⾃⼰createRevision⼩的节点都被删除后,⾃⼰才成为leader伊曲康唑说明书
。
对应的,a节点会成为leader,b节点在watch a节点,c节点在watch b节点。如果b节点key被删除了,c节点会去watch a节点。
如果a节点key被删除了,b节点会成为leader。
TODO:后⾯补充如何使⽤……
好了,现在补充如何使⽤etcd提供的这个包来实现分布式下选主。
直接贴代码吧,然后注释⾥会讲解。
const CampaignPrefix = "/election-test-demo" // 这是选举的prefix
func Campaign(c *clientv3.Client, parentCtx context.Context, wg *sync.WaitGroup) (success <-chan struct{}) {
// 我们设置etcd的value为当前机器的ip,这个不是关键
ip, _ := getLocalIP()
/
/ 当外层的context关闭时,我们也会优雅的退出。
ctx, _ := context.WithCancel(parentCtx)
// ctx的作⽤是让外⾯通知我们要退出,wg的作⽤是我们通知外⾯已经完全退出了。当然外⾯要wg.Wait等待我们。
if wg != nil {
wg.Add(1)
}
// 创建⼀个信号channel,并返回,所有worker可以监听这个channel,这种实现可以让worker阻塞等待节点成为leader,⽽不是轮询是否是leader节点。 // 返回只读channel,所有worker可以阻塞在这。
notify := make(chan struct{}, 100)
go func() {
defer func() {
if wg != nil {
wg.Done()
}
}()
for {
lect {
ca <-ctx.Done(): // 如果是⾮leader节点,会阻塞在Campaign⽅法,context被cancel后,Campaign报错,最终会从这⾥退出。
return
default:
}
// 创建ssion,ssion参与选主,etcd的client需要⾃⼰传⼊。
// ssion中keepAlive机制会⼀直续租,如果keepAlive断掉,ssion.Done会收到退出信号。
s, err := concurrency.NewSession(c, concurrency.WithTTL(5))
if err != nil {
fmt.Println("NewSession", "error", "err", err)
time.Sleep(time.Second * 2)
continue
}
// 创建⼀个新的etcd选举election
e := concurrency.NewElection(s, CampaignPrefix)
//调⽤Campaign⽅法,成为leader的节点会运⾏出来,⾮leader节点会阻塞在⾥⾯。
if err = e.Campaign(ctx, ip); err != nil {
fmt.Println("Campaign",山黄鳝
"error", "err", err)
s.Clo()
time.Sleep(1 * time.Second) //不致于重试的频率太⾼
continue
}
// 运⾏到这的协程,成为leader,分布式下只有⼀个。
fmt.Println("campaign", "success", "ip", ip)
shouldBreak := fal
for !shouldBreak {
lect {
ca notify <- struct{}{}: // 不断向所有worker协程发信号
ca notify <- struct{}{}: // 不断向所有worker协程发信号
ca <-s.Done(): // 如果因为⽹络因素导致与etcd断开了keepAlive,这⾥break,重新创建ssion,重新选举 fmt.Println("campaign", "ssion has done")
shouldBreak = true
break
ca <-ctx.Done():
ctxTmp, _ := context.WithTimeout(context.Background(), time.Second*1)
e.Resign(ctxTmp)
s.Clo()
return
}
}
}
}()
return notify
}
// 获取本机⽹卡IP
func getLocalIP() (ipv4 string, err error) {
var (
addrs []net.Add电路板修复
r
addr net.Addr
ipNet *net.IPNet // IP地址
isIpNet bool
)
// 获取所有⽹卡
if addrs, err = net.InterfaceAddrs(); err烤箱生蚝
!= nil {
return
}
// 取第⼀个⾮lo的⽹卡IP
for _, addr = range addrs {
//fmt.Println(addr)
// 这个⽹络地址是IP地址: ipv4, ipv6
if ipNet, isIpNet = addr.(*net.IPNet); isIpNet && !ipNet.IP.IsLoopback() {
// 跳过IPV6
if ipNet.IP.To4() != nil {
ipv4 = ipNet.IP.String() // 192.168.1.1
return
}
}
}
err = errors.New("no local ip")
return
}