RedisTemplate和RedissonClient的各种操作以及BlockingQu。。。注⼊redisTemplate
@Autowired
private RedisTemplate<String,String> redisTemplate;
0 数字⾃增⾃降
Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2",1);
Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2",5);
Long number = redisTemplate.opsForValue().increment("SAVE_APP_USER_RECORD2",-5);
//第⼆个参数传正数就是+多少,传负数就是减多少
redisTemplate.delete("SAVE_APP_USER_RECORD2");
1 保存和读取Set
SetOperations<String, String> t = redisTemplate.opsForSet();
t.add("t1","22");
t.add("t1","33");
t.add("t1","44");
Set<String> resultSet =redisTemplate.opsForSet().members("t1");
System.out.println("resultSet:"+resultSet);
运⾏结果为:
resultSet:[[t3, t2, t1]] jedis
2、Hash结构,保存和读取map:
Map<String,String> map=new HashMap<String,String>();
map.put("key1","value1");
map.put("key2","value2");
map.put("key3","value3");
map.put("key4","value4");
map.put("key5","value5");
redisTemplate.opsForHash().putAll("map1",map);
Map<String,String> resultMap= redisTemplate.opsForHash().entries("map1");
List<String>reslutMapList=redisTemplate.opsForHash().values("map1");
Set<String>resultMapSet=redisTemplate.opsForHash().keys("map1");
String value=(String)redisTemplate.opsForHash().get("map1","key1");
System.out.println("value:"+value);
System.out.println("resultMapSet:"+resultMapSet);泰戈尔简介及作品
System.out.println("resultMap:"+resultMap);
System.out.println("resulreslutMapListtMap:"+reslutMapList);
redisTemplate.opsForHash().delete("map1");
redisTemplate.opsForHash().delete("map1","key1");
运⾏结果为:
value:value1
tournamentresultMapSet:[key1, key2, key5, key3, key4]
resultMap:{key3=value3, key2=value2, key1=value1, key5=value5, key4=value4}
resulreslutMapListtMap:[value1, value2, value5, value3, value4]
3、保存和读取list
List<String> list1=new ArrayList<String>();
list1.add("a1");
list1.add("a2");
list1.add("a3");
List<String> list2=new ArrayList<String>();
list2.add("b1");peer to peer
list2.add("b2");
list2.add("b3");
redisTemplate.opsForList().leftPush("listkey1",list1);
redisTemplate.opsForList().rightPush("listkey2",list2);
List<String> resultList1=(List<String>)redisTemplate.opsForList().leftPop("listkey1");
List<String> resultList2=(List<String>)redisTemplate.opsForList().rightPop("listkey2");
System.out.println("resultList1:"+resultList1);
System.out.println("resultList2:"+resultList2);
运⾏结果:
resultList1:[a1, a2, a3]
resultList2:[b1, b2, b3]
这⾥需要解释⼀下:不管是leftPush还是rightPush都可以⽤leftPop或者rightPoP任意⼀种获取到其中的值,不过就是获取的遍历⽅向不⼀样。有学过数据结构的⼈都知道⾥⾯循环链表是可以前后遍历的,就和这⾥的场景是⼀样的。如果还有不懂的话可以去看看这部分的源代码,其实就是遍历⽅向不同,所以效率也不同。所以最好leftPush⽤leftPoP遍历,rightPush⽤rightPoP遍历
4、保存和读取String(最常⽤的)
System.out.println("缓存正在设置。。。。。。。。。");
redisTemplate.opsForValue().t("key1","value1");
出纳工作内容redisTemplate.opsForValue().t("key2","value2");
redisTemplate.opsForValue().t("key3","value3");
redisTemplate.opsForValue().t("key4","value4");
System.out.println("缓存已经设置完毕。。。。。。。");
String result1=redisTemplate.opsForValue().get("key1").toString();
String result2=redisTemplate.opsForValue().get("key2").toString();
String result3=redisTemplate.opsForValue().get("key3").toString();
System.out.println("缓存结果为:result:"+result1+" "+result2+" "+result3);
5 redis 锁 + redis hash结构以及BlockingQueue队列代码
package;
import MyRejectedExecutionHandler;
import ThreadExceptionHandle;
import Qualifier;
import Bean;
import Configuration;
import Primary;
import EnableAsync;
import ThreadPoolTaskExecutor;
import Executor;
/**
* 线程池配置
*
* @Author YJX
* @Date 2018/3/24
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
//@Autowired
//private TraceableThreadFactory traceableThreadFactory;
static{
Thread.tDefaultUncaughtExceptionHandler(ThreadExceptionHandle.INSTANCE);
}
/**
* 默认的线程池
*
* @return
*/
@Bean
@Primary
@Qualifier("defaultExecutor")
japaneschoolchild21public Executor defaultExecutor(){
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
executor.tCorePoolSize(7);/*核⼼线程数*/
executor.tMaxPoolSize(13);/*最⼤线程数*/
executor.tQueueCapacity(10000);/*队列⼤⼩*/
executor.tKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
executor.tAllowCoreThreadTimeOut(true);// KeepAliveSeconds 设置也作⽤于【核⼼线程数】 executor.tThreadNamePrefix("defaultExecutor-");
//executor.tThreadFactory(traceableThreadFactory);
//executor.tAwaitTerminationSeconds(3);
executor.tRejectedExecutionHandler(new MyRejectedExecutionHandler());
executor.tWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
/**
* App⽤户埋点持久化线程池
*
* @return
*/
@Bean
@Qualifier("appUreRecordExecutor")
public Executor appUreRecordExecutor(){
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
executor.tCorePoolSize(7);/*核⼼线程数*/
executor.tMaxPoolSize(13);/*最⼤线程数*/
executor.tQueueCapacity(30000);/*队列⼤⼩*/
executor.tKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
executor.tAllowCoreThreadTimeOut(true);// KeepAliveSeconds 设置也作⽤于【核⼼线程数】 executor.tThreadNamePrefix("appUreRecordExecutor-");
executor.tRejectedExecutionHandler(new MyRejectedExecutionHandler());
executor.tWaitForTasksToCompleteOnShutdown(true);
西安作品集培训
executor.initialize();
return executor;
}
/**
* App⽤户埋点持久化线程池alllike
*
* @return
*/
@Bean
@Qualifier("appUreRecordExecutor2")
public Executor appUreRecordExecutor2(){
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
executor.tCorePoolSize(7);/*核⼼线程数*/
executor.tMaxPoolSize(13);/*最⼤线程数*/
executor.tQueueCapacity(30000);/*队列⼤⼩*/
executor.tKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/
executor.tAllowCoreThreadTimeOut(true);// KeepAliveSeconds 设置也作⽤于【核⼼线程数】 executor.tThreadNamePrefix("appUreRecordExecutor2-");
executor.tRejectedExecutionHandler(new MyRejectedExecutionHandler());
executor.tWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
/*
* Copyright 2020 Wicrenet, Inc. All rights rerved.
*/
package;
import JSON;
import AppUrRecordEntity;
import RedisRouteKeyEnum;
import AppUrRecordDao;
import AppUrRecordService2;
import ServiceImpl;
import CollectionUtils;
import StringUtils;
import LocalDateTime;
import Logger;
import LoggerFactory;
import InitializingBean;
import Autowired;衣架英文
import Qualifier;
import RedisTemplate;
import Scheduled;
import ThreadPoolTaskExecutor;
import Service;
kite的音标import ArrayList;
import Date;
import List;
import BlockingQueue;
import Executor;
import TimeUnit;
/
**
* 【埋点统⼀接⼝】
*
* @author YJX
* Created on 2020/2/15 18:18
*/
@Service("app_ur_record_rvice_impl2")
public class AppUrRecordServiceImpl2 extends ServiceImpl<AppUrRecordDao, AppUrRecordEntity>implements AppUrRecordService2, Initializin gBean {
private static final Logger logger = Logger(AppUrRecordServiceImpl2.class);
//持久化的队列
private volatile BlockingQueue appUrRecordEntitieQueue2;
@Autowired
@Qualifier("appUreRecordExecutor2")
private Executor executor2;
www cum com@Autowired
private AppUrRecordDao appUrRecordDao;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public Boolean saveAppUrRecord2(List<AppUrRecordEntity> appUrRecordEntity){
try{
// ⼼跳处理时间逻辑
appUrRecordEntity.stream()
appUrRecordEntity.stream()
.filter(item -> MatterId()!=null)
.forEach(item ->{
Eventaction().toString()){
// 0-开始、1-结束、2-⼼跳
ca"0":
//存Queue
item.tStaringTime(new Date());
item.tEndTime(new Date());
item.tDuration(0L);
appUrRecordEntitieQueue2.add(item);
break;
ca"1":
//存Queue
item.tEndTime(new Date());
appUrRecordEntitieQueue2.add(item);
break;
ca"2":
//⼼跳处理存⼊redis并实时更新,5分钟未刷新结束时间则加⼊持久化队列
String aperationCode =(String) redisTemplate.opsForHash().get(RedisRouteKeyEnum.APP_Key(), Apera tionCode());
if(StringUtils.isNotBlank(aperationCode)){
AppUrRecordEntity recordEntity = JSON.parObject(aperationCode, AppUrRecordEntity.class);
recordEntity.Duration()+10);
redisTemplate.opsForHash().put(RedisRouteKeyEnum.APP_Key(), AperationCode(), JSONStri ng(recordEntity));
}el{
redisTemplate.opsForHash().put(RedisRouteKeyEnum.APP_Key(), AperationCode(), JSONStri ng(item));
}
break;
default:
<("未知的事件类型:Eventaction:{}", JSONString(item));
}
});
return true;
}catch(Exception e){
<("⽤户记录埋点数据进⼊队列异常:{}", e);
return fal;
}
}
/**
* 定时持久化队列的 appUrRecordEntities数据
* 每30s执⾏⼀次
*/
@Scheduled(cron ="0/30 * * * * ? ")
public void execute(){
// redis数据处理:结束时间超过5分钟则将数据加⼊持久化队列
List<Object> objects = redisTemplate.opsForHash().values(RedisRouteKeyEnum.APP_Key());
if(CollectionUtils.isNotEmpty(objects)){
//获取锁,
boolean lock =getLock(RedisRouteKeyEnum.APP_USER_RECORD_Key(), RedisRouteKeyEnum.APP_USER_RECORD_LOCK_ID.g etTimeToLive());
if(lock){
Long size = redisTemplate.opsForHash().size(RedisRouteKeyEnum.APP_Key());
objects.forEach(item ->{
AppUrRecordEntity appUrRecordEntity = JSON.String(), AppUrRecordEntity.class);
if(new EndTime()).plusMinutes(5).toDate().getTime()<= w().toDate().getTime()){
//加⼊队列
appUrRecordEntitieQueue2.add(appUrRecordEntity);
redisTemplate.opsForHash().delete(RedisRouteKeyEnum.APP_Key(), AperationCode());
}
});
Long size2 = redisTemplate.opsForHash().size(RedisRouteKeyEnum.APP_Key());