redistemplate批量读取_RedisTemplate实现消息队列并且批量插入数据。

更新时间:2023-06-27 18:52:20 阅读: 评论:0

redistemplate批量读取_RedisTemplate实现消息队列并且批
量插⼊数据。
早期由于⽣产环境业务量⼩。所以⽇志是⼀条⼀条commit的。运⾏也没出过问题。
后来随着业务扩⼤并发量上来后,⽇志写⼊因为频繁与数据库打交道导致数据库连接池经常占满,直⾄程序崩溃。
因为⽇志并⾮需要实时响应。所以考虑改⽤异步+批量提交的⽅式。形容瘦
为了缓解jvm内存压⼒。采⽤redis做消息队列(因为原项⽬有集成过redis,公司不想使⽤其他mq增加维护成本)。
所以在⽹上找了篇springboot整合redistemplate做消息队列的资料。稍微改了⼀下。
⾸先是redisConfig。
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.tion.RedisConnectionFactory;
import org.*;
import org.dis.listener.RedisMessageListenerContainer;
import org.dis.rializer.Jackson2JsonRedisSerializer;
import org.dis.rializer.StringRedisSerializer;
@Configuration
@EnableCaching //开启注解
public class RedisConfig {
/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
RedisTemplate template = new RedisTemplate<>();
// 配置连接⼯⼚
template.tConnectionFactory(factory);
/
/使⽤Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使⽤JDK的序列化⽅式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和t,以及修饰符范围,ANY是都有包括private和public
om.tVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输⼊的类型,类必须是⾮final修饰的,final修饰的类,⽐如String,Integer等会跑出异常
图片叠加jacksonSeial.tObjectMapper(om);
// 值采⽤json序列化
template.tValueSerializer(jacksonSeial);
//使⽤StringRedisSerializer来序列化和反序列化redis的key值
template.tKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.tHashKeySerializer(new StringRedisSerializer());
template.tHashValueSerializer(jacksonSeial);
等待下一个春天template.afterPropertiesSet();
return template;
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.tConnectionFactory(redisConnectionFactory);
return container;
}
}
消息实体Message
import com.alibaba.fastjson.JSON;
import lombok.Data;
import java.util.UUID;
@Data
public class Message {
private String id;
private Integer retryCount;
private String content;
private Integer status;
private String topic;
public Message() {
}
public Message(String topic, Object object) {
表格排版this.id = UUID.randomUUID().toString().replace("-", "");
this.status = 0;
}
}
吃什么解辣Redis订阅管理,采⽤观察者模式。
import org.springframework.beans.factory.annotation.Autowired; import org.RedisTemplate; import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@Component
public class TopicSubscriber {
private final Map> subscriberMap = new HashMap();
@Autowired
private RedisTemplate redisTemplate;
// 观察者模式实现消费者注册。
public Boolean addConsumer(String topic, String consumer) { Set consumerList = (topic);
if (consumerList == null) {
consumerList = new HashSet<>();
}
Boolean b = consumerList.add(consumer);
subscriberMap.put(topic, consumerList);
return b;
}
public Boolean removeConsumer(String topic, String comsumer) { Set consumerList = (topic);
Boolean b = fal;
if (consumerList != null) {
b = ve(comsumer);
subscriberMap.put(topic, consumerList);
}
return b;
}
//消息⼴播
public void broadcast(String topic, String id) {
if ((topic) != null) {
for (String consumer : (topic)) {
String key = String.join("_", topic, consumer, id);
if (!redisTemplate.hasKey("fail_" + key)) {
redisTemplate.opsForValue().t(key, id);
redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic); }
}
}
黄家驹经典歌曲
}
}
然后是Redis发布者
import com.alibaba.fastjson.JSON;
dis.mq.subscriber.TopicSubscriber;
import ioty.util.ChartUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class RedisPublisher {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
TopicSubscriber subscriber;
@PostConstruct
public void init() throws Exception {
// todo test thread
/*new Thread(() -> {
int count = 0;
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (count < 14) {
try {
Thread.sleep(100l);
博物馆的意义Generate generate = new Generate();
generate.tIdNo("" + count);
this.publish("GenerateLog", generate);
count++;
} catch (Exception e) {
}
}蚂蝗
}).start();*/
}
public void publish(String topic, Object content) {  //消息发布到redis
Message message = new Message(topic, content);
subscriber.broadcast(topic, Id());
);
}
}
Redis消费者。实现MessageListener的onMessage就可以。为了易于扩展,这⾥使⽤了泛型。import com.alibaba.fastjson.JSON;

本文发布于:2023-06-27 18:52:20,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1057599.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   实现   序列化   队列   配置   响应   观察者   数据库
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图