Redis学习(一):redis集群之哨兵模式下的负载均衡

更新时间:2023-07-24 18:52:11 阅读: 评论:0

Redis学习(⼀):redis集群之哨兵模式下的负载均衡
说明
在学习研究Redis集群部署的过程中,发现以哨兵模式部署集群时,使⽤Jedis作为客户端只可以连接到主机,从机只作为备份保证⾼可⽤。这样读写都在主机,在读⽐较⾼的情况下对主机带来很⼤压⼒。通过阅读Jedis的JedisSentinelPool源码,在该类的基础上实现JedisSentinelMasterSlavePool类,通过该类实现redis 哨兵模式下的读操作负载均衡。
正⽂
基础知识
关于redis集群的基础知识,这⾥先不做总结,可以看以下资料进⾏学习和搭建:
在哨兵模式下,⼀般部署三个节点作为哨兵集群,保证哨兵的⾼可⽤。同时每个master节点都是采⽤主从复制的模式,写操作在master,再将数据同步到slave节点。这⾥的哨兵⽤来监测master节点的状态,保证在master节点⽆法正常⼯作时,能够⾃动故障转移从slave节点中选取新的master节点,当旧master节点恢复正常后,可以作为新的slave节点重新加⼊集群。
JedisSentinelPool
该类是Jedis⽀持redis 哨兵模式的连接池,在该类中持有个GenericObjectPool对象,初始化该类时会创建⼀个主机连接池,同时会创建⼀个MasterListener,当发⽣故障转移时会重新初始化主机连接池。在MasterListener监听器中主要订阅监听了+switch-master通道,当发⽣主机切换时,哨兵会通过此通道发送同名事件,通过监听该事件JedisSentinelPool实现连接池的重新初始化。
更多事件详见
this.j.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
JedisSentinelPool.this.log.debug("Sentinel {}:{} published: {}.", new Object[]{MasterListener.this.host, MasterListener.this.port, message});
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (MasterListener.this.masterName.equals(switchMasterMsg[0])) {
JedisSentinelPool.this.initPool(HostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} el {
素养教育JedisSentinelPool.this.log.debug("Ignoring message on +switch-master for master name {}, our master name is {}", switchMasterMsg[0], MasterL istener.this.masterName);
}
} el {
JedisSentinelPool.("Invalid message received on Sentinel {}:{} on channel +switch-master: {}", new Object[]{MasterListener.this.host, M asterListener.this.port, message});
}
}
}, new String[]{"+switch-master"});
基于以上特性,通过改写JedisSentinelPool使其在拥有主机连接池的情况下,携带从机连接池,并改写监听器监听其他事件,使能够主从机变化时主从连接池跟随变化。
JedisSentinelMasterSlavePool
在JedisSentinelPool类的基础上实现该类。添加了从机连接池地址列表slavesAddr,从机连接池集合Map<HostAndPort, GenericObjectPool<jedis>> slavePools,改写MasterListener,增加监听+slave, +sdown, -sdown事件,使得在发⽣主机切换,从机上下线时能⾃动改变连接池。同时添加⼀个ThreadLocal<GenericObjectPool<Jedis>> objectPoolThreadLocal变量,主要是为了能够归还资源,在通过主机获取jedis对象时设置了DataSource,关闭时jedis通过该变量获取连接池对象,连接池pool使⽤returnResource()⽅法归还资源。根据此特性,添加ThreadLocal变量,当从slave获取资源时保存该线程从哪个从机连接池获取的Jedis,归还资源时可以找到对应的连接池。
以这种⽅式实现读操作在从机上的负载均衡,当集群状态发⽣变化,连接池也跟随变化,可能造成⽆法获取连接的情况,需要做容错处理。
这⾥从机连接的获取使⽤了随机算法,也可以使⽤其他算法。
新增变量
private volatile Map<HostAndPort, GenericObjectPool<Jedis>> slavePools;
private volatile List<HostAndPort> slavesAddr;
private final Object changeSlavePoolLock;
private final ThreadLocal<GenericObjectPool<Jedis>> objectPoolThreadLocal = new ThreadLocal<>();
private volatile JedisFactory2 factory;
由于JedisFactory属于包内资源,要新建⼀个JedisFactory2。
使⽤随机算法获取从机连接
public Jedis getSlaveResource() {
try{
if (this.slavePools != null && this.slavePools.size() > 0) {
Random random = new Random();
HostAndPort slaveHP = (Int(slavePools.size()));
GenericObjectPool<Jedis> pool = (slaveHP);
this.log.info("Get a slave pool, the address is {}", slaveHP);
objectPoolThreadLocal.t(pool);
return pool.borrowObject();
}
}catch(Exception e){
this.log.debug("Could not get a resource form slave pools");
}
Resource();
}
归还从机连接池资源
public void cloSlaveJedis(Jedis jedis) {
GenericObjectPool<Jedis> pool = ();
//是否为从机的连接
if (pool != null) {
颜真卿楷书作品
} el {
jedis.clo();
}爵士风
}
修改监听器,添加其他监听事件
/
/重写监听机制当发现主机切换时,重新初始化主机的连接池。当发现新从机上线(作为旧主机故障恢复,重新上线成为从机),添加新从机到从机连接池
//还有从机的主观下线时需要将其删除
this.j.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
JedisSentinelMasterSlavePool.this.log.debug("Sentinel {}:{}, channel is {} == published: {}.", new Object[]{JedisSentinelMasterSlavePool.MasterListene r.this.host, JedisSentinelMasterSlavePool.MasterListener.this.port, channel, message});
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3 && channel.equals("+switch-master")) {
if (JedisSentinelMasterSlavePool.MasterListener.this.masterName.equals(switchMasterMsg[0])) {
JedisSentinelMasterSlavePool.this.log.debug("Listening messgae on +switch-master for master nam
e {}, the new master address is {} : {}", switc hMasterMsg[0], switchMasterMsg[3], switchMasterMsg[4]);
JedisSentinelMasterSlavePool.this.initPool(HostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMs g[4])));
}
}
if (switchMasterMsg.length > 5 && JedisSentinelMasterSlavePool.MasterListener.this.masterName.equals(switchMasterMsg[5])) {
if (channel.equals("+slave")) {
JedisSentinelMasterSlavePool.this.log.debug("Listening messgae on +slave for master name {}, the new slave address is {} : {}", switchMasterMs g[5], switchMasterMsg[2], switchMasterMsg[3]);
JedisSentinelMasterSlavePool.this.addSlavePool(switchMasterMsg);
}
if (channel.equals("+sdown")) {
if ("slave".equals(switchMasterMsg[0])) {
JedisSentinelMasterSlavePool.this.log.debug("Listening messgae on +sdown for master name {}, the slave is now in Subjectively Down state, r emove the slave, the address is {} : {}", switchMasterMsg[5], switchMasterMsg[2], switchMasterMsg[3]);
veSlavePool(switchMasterMsg);
}
}
if (channel.equals("-sdown")) {
if ("slave".equals(switchMasterMsg[0])) {
JedisSentinelMasterSlavePool.this.log.debug("Listening messgae on -sdown for master name {}, the slave is no logger in Subjectively Down st ate, readd the slave, the address is {} : {}", switchMasterMsg[5], switchMasterMsg[2], switchMasterMsg[3]);
经典爱情JedisSentinelMasterSlavePool.this.addSlavePool(switchMasterMsg);
}
}
}
}
}, new String[]{"+switch-master", "+slave", "+sdown", "-sdown"});
源码
public class JedisSentinelMasterSlavePool extends JedisPoolAbstract {
protected GenericObjectPoolConfig poolConfig;
protected int connectionTimeout;
protected int soTimeout;
protected String password;
protected int databa;
protected String clientName;
protected Set<MasterListener> masterListeners;
protected Logger log;
private volatile HostAndPort currentHostMaster;
初中必背古文private volatile JedisFactory2 factory;
private volatile Map<HostAndPort, GenericObjectPool<Jedis>> slavePools;
private volatile List<HostAndPort> slavesAddr;
星罗棋布的近义词
private final Set<String> ntinels;
private final Object initPoolLock;
private final Object changeSlavePoolLock;
private final ThreadLocal<GenericObjectPool<Jedis>> objectPoolThreadLocal = new ThreadLocal<>();
private final ThreadLocal<GenericObjectPool<Jedis>> objectPoolThreadLocal = new ThreadLocal<>();
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig) {
this(masterName, ntinels, poolConfig, 2000, (String)null, 0);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels) {
this(masterName, ntinels, new GenericObjectPoolConfig(), 2000, (String)null, 0);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, String password) {
this(masterName, ntinels, new GenericObjectPoolConfig(), 2000, password);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig, int timeout, String password) {
this(masterName, ntinels, poolConfig, timeout, password, 0);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig, int timeout) {
this(masterName, ntinels, poolConfig, timeout, (String)null, 0);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPool
Config poolConfig, String password) {
this(masterName, ntinels, poolConfig, 2000, password);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int d ataba) {
this(masterName, ntinels, poolConfig, timeout, timeout, password, databa);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig, int timeout, String password, int d ataba, String clientName) {
this(masterName, ntinels, poolConfig, timeout, timeout, password, databa, clientName);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig, int timeout, int soTimeout, String password, int databa) {
this(masterName, ntinels, poolConfig, timeout, soTimeout, password, databa, (String)null);
}
public JedisSentinelMasterSlavePool(String masterName, Set<String> ntinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTime out, String password, int databa, String clientName) {
this.soTimeout = 2000;
this.databa = 0;
入冬
this.masterListeners = new HashSet();
this.log = Logger(JedisSentinelMasterSlavePool.class);
this.initPoolLock = new Object();
this.changeSlavePoolLock = new Object();
this.poolConfig = poolConfig;
this.soTimeout = soTimeout;
this.password = password;
this.databa = databa;
this.clientName = clientName;
HostAndPort master = this.initSentinels(ntinels, masterName);
this.initPool(master, this.slavesAddr);
}
public void destroy() {
Iterator var1 = this.masterListeners.iterator();
while(var1.hasNext()) {
JedisSentinelMasterSlavePool.MasterListener m = (JedisSentinelMasterSlavePool.();
JedisSentinelMasterSlavePool.MasterListener m = (JedisSentinelMasterSlavePool.();
m.shutdown();
}
super.destroy();
//关闭从机连接池
this.destroySlavePool(this.slavePools);
}
private void destroySlavePool(Map<HostAndPort, GenericObjectPool<Jedis>> slavePools) {
for (GenericObjectPool pool : slavePools.values()) {
pool.clo();
}
}
public HostAndPort getCurrentHostMaster() {
return this.currentHostMaster;
}
private void initPool(HostAndPort master) {
Object var2 = this.initPoolLock;
synchronized(this.initPoolLock) {
if (!master.equals(this.currentHostMaster)) {
this.currentHostMaster = master;
if (this.factory == null) {
this.factory = new Host(), Port(), tionTimeout, this.soTimeout, this.password, this.databa, thi s.clientName);
this.initPool(this.poolConfig, this.factory);
} el {
this.factory.tHostAndPort(this.currentHostMaster);
this.internalPool.clear();
}
this.log.info("Rcreated JedisPool to master at " + master);
}
}
}
//重载 initPool⽅法添加从机连接池初始化
private void initPool(HostAndPort master, List<HostAndPort> slaves) {
Object var2 = this.initPoolLock;
synchronized(this.initPoolLock) {
if (!master.equals(this.currentHostMaster)) {
禁播美剧this.currentHostMaster = master;
this.slavesAddr = slaves;
if (this.factory == null) {
this.factory = new Host(), Port(), tionTimeout, this.soTimeout, this.password, this.databa, thi s.clientName);
this.initPool(this.poolConfig, this.factory);
//初始化从机连接池
this.initSlavePool(slaves);
} el {
this.factory.tHostAndPort(this.currentHostMaster);
this.internalPool.clear();
}
this.log.info("Created JedisPool to master at " + master);
}
}
}
//创建从机连接池
private void initSlavePool(List<HostAndPort> slaves) {
Map<HostAndPort, GenericObjectPool<Jedis>> slavePools = new HashMap<>();

本文发布于:2023-07-24 18:52:11,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1095045.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:哨兵   获取   资源   集群   监听   节点   事件
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图