Curator应⽤场景(⼀)-分布式计数器DistributedAtomicInteger
⽂章⽬录
curator-recipes功能简介
curator-recipes包中包含了对zookeeper场景应⽤场景的封装,好的项⽬源码让⼈从包名就能看出其功能,下⾯先看下recipes的包结构
简单介绍下不同包及其对应功能
包名功能简介
atomic分布式计数器(DistributedAtomicLong),能在分布式环境下实现原⼦⾃增
barriers分布式屏障(DistributedBarrier),使⽤屏障来阻塞分布式环境中进程的运⾏,直到满⾜特定的条件
cache 监听机制,分为NodeCache(监听节点数据变化),PathChildrenCache(监听节点的⼦节点数据变化),TreeCache(既能监听⾃⾝节点数据
变化也能监听⼦节点数据变化)
leader leader选举
locks分布式锁
nodes提供持久化节点(PersistentNode)服务,即使客户端与zk服务的连接或者会话断开queue分布式队列(包括优先级队列DistributedPriorityQueue,延迟队列DistributedDelayQueue等) shared分布式计数器SharedCount
在介绍curator的分布式原⼦计数器之前,先抛出⼀个经典的⾯试问题,i++在多线程环境下是否存在问题?
先来看⼀段测试代码,其中CountDownLatch是为了让主线程阻塞直到所有⼦线程执⾏完,其应⽤场景如下:CountDownLatch调⽤await()⽅法的线程将⼀直阻塞等待,直到这个CountDownLatch对象的计数值减到0(每次调⽤countDown⽅法计数器减⼀)为⽌。例⼦⾥每个⼦线程⾃增100000次后调⽤countDown()⽅法将计数器减⼀,初始化数值10,10个线程全部跑完⾃增后,主线程await⽅法不再阻塞,输出count值
葡萄英文
static int count = 0;
static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) {
//创建10个线程每个线程内部⾃增100000次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 100000; j++) {
count++;
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
执⾏⼏次后,发现结果不总是10*100000,故可以证明i++确实存在线程安全问题
Java中i++⾃增线程安全问题的由来
先来了解下⼏个基本概念
Java内存模型
1. Java内存模型规定所有的变量都是存在主存中
2. 每个线程都有⾃⼰的⼯作内存
3. 线程对变量的操作都必须在线程内部⼯作内存中进⾏,不能直接对主存中变量进⾏操作
⽹上⼀张图描绘的很形象
avupdate
现在可以联想到i++操作,线程对共享变量的修改总是分为 读-改-写 这3步,即
1. 线程先从主存中读取变量到本地⼯作内存中(读)
2. 线程在本地⼯作内存中对变量进⾏+1操作(改)
3. 线程将本地内存中变量的值写回主存(写)
再想,多线程⼜是可以并⾏执⾏的
假设初始count值为0,如果n个线程同时读到了0这个值,那么更新后count值是多少?
这就是i++出现线程安全问题的核⼼原因 ,i++操作分为三步,其中只有读取和写⼊操作是原⼦的,读改写三步⼀起执⾏时,⽆法保证原⼦性
原⼦性
原⼦性就是指该操作是不可再分的,要么全部执⾏,要么全部不执⾏。最经典的就是银⾏转账问题,对应到i++问题中就是上⾯所述的:只有读取和写⼊操作是原⼦的,⽽读改写合并操作不能保证原⼦性。
Java中保证原⼦性操作的有:
1. Synchronized和Lock,加锁使得同⼀时刻只有⼀个线程能访问共享变量,操作⾃然是原⼦的
2. urrent.atomic下的原⼦操作类,如AtomicInteger,AtomicReference,基于Cas算法实现了类似乐观锁版本更新控
制的原⼦操作
这两种⽅法,下⾯会分别介绍如何使⽤来解决i++原⼦性问题
内存可见性
可见性是指当多个线程访问同⼀个变量时,⼀个线程修改了这个变量的值,其他线程能够⽴即看得到修改后变量的值
Java⾥提供了volatile关键字来修饰变量,使得线程对⼀个变量值的修改会⽴马更新到主存之中,其它线程会重新从主存中去获取最新的变量值。
那么volatile关键字是如何使得其它线程能⽴马获取到最新的变量值呢?
如果对声明了Volatile变量进⾏写操作,JVM就会向处理器发送⼀条Lock前缀的指令,将这个变量所在缓存⾏的数据写回到系统内存。但是就算写回到内存,如果其他(线程)处理器缓存的值还是旧的,再执⾏计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是⼀致的,就会实现缓存
⼀致性协议,每个处理器通过嗅探在总线上传播的数据来检查⾃⼰缓存的值是不是过期了,当处理器发现⾃⼰缓存⾏对应的内存地址被修改,就会将当前处理器的缓存⾏设置成⽆效状态,当处理器要对这个数据进⾏修改操作的时候,会强制重新从系统内存⾥把数据读到处理器缓存⾥
核⼼就是两点
1. Lock前缀指令会引起处理器缓存回写到内存
essays
2. ⼀个处理器的缓存回写到内存会导致其他处理器的缓存⽆效(嗅探机制)
那么问题来了,volatile关键字是否能解决i++⾃增的线程安全问题呢?不是每个线程修改后的值都能⽴马被其他线程看到么?
看官们⾃⼰先思考3分钟?
举个例⼦,假设volatile变量count初始值为0,A,B线程同时对变量count进⾏+1操作
1. A线程读取到了count值,然后进⾏+1操作
2. A线程还没将变量写⼊主存时(即没有触发volatile关键字特性时),B线程读取到了主存中的值(还是0)
3. B也对count进⾏+1,然后A,B分别将变量写⼊主存,count值可能就为1。
所以,volatile关键字不能保证原⼦性,保证原⼦性还得通过synchronized,Lock和
urrent.atomic下的原⼦操作类
当然Java⾥通过synchronized和Lock也能够保证可见性,synchronized和Lock能保证同⼀时刻只有⼀个线程获取锁然后执⾏同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中,这样下个线程获取变量值时,就能获取到最⼩的值
单机版线程安全原⼦⾃增
Synchronized
将i++例⼦修改如下,发现结果输出每次都是10*100000
static int count = 0;
static CountDownLatch countDownLatch = new CountDownLatch(10);
static Object lock = new Object();
社交口才训练技巧
makar
public static void main(String[] args) {
//创建10个线程每个线程内部⾃增100000次
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 100000; j++) {
//锁住lock对象,lock对象所有线程共享
synchronized (lock) {
count++;
}
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
然后问题⼜来了,synchronized关键字⼜是如何实现只有⼀个线程进⾏同步操作的呢?
先写⼀个⼩demo,例⼦如下
also的用法
public class SynchronizedDemo {
public static void main(String[] args) {
//锁住SynchronizedDemo类对象
synchronized (SynchronizedDemo.class) {
夏奇拉最好听的歌
}
//调⽤SynchronizedDemo类中静态⽅法,也是锁住SynchronizedDemo类对象
method();
}
private static synchronized void method() {
}
}
两步操作,第⼀步synchronized (SynchronizedDemo.class),锁当前类对象,执⾏完后再执⾏synchronized静态⽅法,锁的也是静态对象
下⾯来看看这部分代码的字节码⽂件,编译⽂件后,在target⽬录下找到SynchronizedDemo.class⽂件所在位置,执⾏
javap -v SynchronizedDemo.class
结果输出如下
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class com/company/project/core/SynchronizedDemo
dual life2: dup
3: astore_1
4: monitorenterconvince
5: aload_1
6: monitorexit
7: goto 15
10: astore_2递归英文
11: aload_1
12: monitorexit
13: aload_2
14: athrow
15: invokestatic #3 // Method method:()V
18: return
可以看到字节码中有monitorenter,monitorexit两个指令,指令功能如下
monitorenter 获取对象监视器
monitorexit 释放对象监视器
任意⼀个对象都拥有⾃⼰的监视器,当这个对象由同步块或者这个对象的同步⽅法调⽤时,执⾏⽅法的线程必须先获取该对象的监视器才能进⼊同步块和同步⽅法(monitorenter),退出后释放监视器(monitorexit),这就是synchronized关键字能实现同步的原理
Lock
代码如下