ZooKeeper:Curator框架重试策略和SessionAPI介绍ZooKeeper : Curator框架重试策略和Session API介绍
在学习Curator框架API之前,可以先了解Java客户端原⽣API,这样不仅可以更好的理解Curator框架API,还可以突出Curator框架的⽅便和强⼤。
Curator是⼀个⽐较完善的ZooKeeper客户端框架,通过封装的⼀套⾼级API简化了ZooKeeper的操作。Curator框架主要解决了三类问题:封装ZooKeeper Client与ZooKeeper Server之间的连接处理(提供连接重试机制等)。
摘要英文翻译提供了⼀套Fluent风格的API,并且在Java客户端原⽣API的基础上进⾏了增强(创捷多层节点、删除多层节点等)。
ahb提供ZooKeeper各种应⽤场景(分布式锁、leader选举、共享计数器、分布式队列等)的抽象封装。
博主将Curator框架API分为Session、Znode、ACL、Watcher和Transaction这⼏个部分来进⾏介绍,限于篇幅原因,本篇博客只介
绍Session API以及其中的重试策略,之后的博客会介绍其他API的使⽤。博主使⽤的Curator框架版本是5.2.0,ZooKeeper版本是3.6.3。
<dependency>
英语等级考试网
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
新概念英语第一册下载
5.2.0版本的Curator使⽤3.
6.3版本的ZooKeeper。arbitrary
重试策略
在Java客户端原⽣API中,客户端与服务端的连接是没有提供连接重试机制的,如果客户端需要重连,就只能将上⼀次连接的Session
ID与Session Password发送给服务端进⾏重连。⽽Curator框架提供了客户端与服务端的连接重试机制,并且可以通过Fluent风格的API来给连接添加重试策略。
RetryPolicy接⼝是重试策略的抽象,allowRetry⽅法⽤来判断是否允许重试。
package;
import KeeperException;
/**
* 重连策略的抽象
*/
public interface RetryPolicy
prentday
{
/**
* 当操作由于某种原因失败时调⽤,此⽅法应返回true以进⾏另⼀次尝试
儒森教育* retryCount – 到⽬前为⽌重试的次数(第⼀次为0)
* elapdTimeMs – ⾃尝试操作以来经过的时间(以毫秒为单位)
* sleeper – 使⽤它来睡眠,不要调⽤Thread.sleep
*/
boolean allowRetry(int retryCount,long elapdTimeMs, RetrySleeper sleeper);
/**
* 当操作因特定异常⽽失败时调⽤,此⽅法应返回true以进⾏另⼀次尝试
*/
default boolean allowRetry(Throwable exception)
{
if( exception instanceof KeeperException)
{
final int rc =((KeeperException) exception).code().intValue();
return(rc == KeeperException.Code.CONNECTIONLOSS.intValue())|| (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue())||
(rc == KeeperException.Code.SESSIONMOVED.intValue())||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue());
}
return fal;
}
}
RetryPolicy接⼝的关系图如下图所⽰:
SleepingRetry抽象类(实现了RetryPolicy接⼝):
package;
import RetryPolicy;
import RetrySleeper;
import TimeUnit;
abstract class SleepingRetry implements RetryPolicy
{
private final int n;
山光水色
protected SleepingRetry(int n)
{
this.n = n;
}
public int getN()
{
return n;
}
public boolean allowRetry(int retryCount,long elapdTimeMs, RetrySleeper sleeper)
{
if( retryCount < n )
{
try
{
sleeper.sleepFor(getSleepTimeMs(retryCount, elapdTimeMs), TimeUnit.MILLISECONDS);
}
catch( InterruptedException e )
{
Thread.currentThread().interrupt();
return fal;
}
return true;
}
return fal;
altek}
protected abstract long getSleepTimeMs(int retryCount,long elapdTimeMs);
}
重试n次(有次数限制),重试之前先进⾏睡眠,睡眠时间由getSleepTimeMs⽅法得到(抽象⽅法)。SessionFailedRetryPolicy类(实现了RetryPolicy接⼝):
package;
import KeeperException;
/**
* Session过期导致操作失败时的重连策略
*/
public class SessionFailedRetryPolicy implements RetryPolicy
{
private final RetryPolicy delegatePolicy;
public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
{
this.delegatePolicy = delegatePolicy;
}
@Override
public boolean allowRetry(int retryCount,long elapdTimeMs, RetrySleeper sleeper)
{
return delegatePolicy.allowRetry(retryCount, elapdTimeMs, sleeper);
}
@Override
public boolean allowRetry(Throwable exception)
{
if( exception instanceof KeeperException.SessionExpiredException )
{
return fal;
}
el
{
return delegatePolicy.allowRetry(exception);
}
}
}
这⾥只是增加了对SessionExpiredException这种异常的判断,当遇到Session过期异常时,不再进⾏重连,即返回fal。⽽其他的所有业务全部委托给delegatePolicy实例。因为RetryPolicy接⼝的allowRetry(Throwable exception)⽅法有默认实现:
default boolean allowRetry(Throwable exception)
{
if( exception instanceof KeeperException)
{
final int rc =((KeeperException) exception).code().intValue();
return(rc == KeeperException.Code.CONNECTIONLOSS.intValue())||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue())||
(rc == KeeperException.Code.SESSIONMOVED.intValue())||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue());
}
return fal;
}
当遇到Session过期异常时,允许进⾏重连(rc == KeeperException.Code.SESSIONEXPIRED.intValsoccer是什么意思
ue())。SessionFailedRetryPolicy这种重连策略⽤的不多,这⾥就不详细介绍了。
RetryForever类(实现了RetryPolicy接⼝):