SpringBoot结合线程池解决多线程问题实录

更新时间:2023-05-05 18:23:44 阅读: 评论:0

SpringBoot结合线程池解决多线程问题实录
背景:
线上有⼀个接⼝,3台机器总共QPS在3000左右,单机QPS在1000左右,接⼝响应时间2ms。为了保证接⼝的任何改动在上线之前能够在⼤流量下能够没有问题。提出想法,搭建⼀套流量回放环境,上线之前把代码先部署到流量回放环境。然后将线上的流量导⼊到流量回放环境,⽤真实的业务请求来做模拟测试,这个过程我们称作是流量回放。
为了保证流量回放的时候,流量导⼊过程,不能影响正常的线上接⼝请求,也就是响应时间不能有变化。⾸先就要考虑启动⼀个线程来异步处理这个事情。好,按照这个想法,写了第⼀版本代码。(以下代码是有问题的,只怪我too young too sample)
第⼀版本代码写起来:
@RequestMapping(value ="/flags", method = RequestMethod.POST)
@ResponBody
public ServerRespon getFlagsPost(@RequestBody NewServerParam param){
//如果流量回放开关打开,就创建线程,将请求发送到流量回放环境
if(Constant.TRAFFIC_REPLAY_FLAG){
Runnable r =()->{
HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", JSONString(param),10000,"application/json");
};
Thread thread =new Thread(r);
thread.start();
}
long a = System.currentTimeMillis();
ServerRespon respon =new ServerRespon();
respon.Flags(param, Exp_key()));
long b = System.currentTimeMillis();
LOGGER.info(logService.parToJSon(param, respon,(b - a), LOGGER, Name()));
return respon;
}
线下测试没有问题,⼀上线,⼤流量上来,服务器瞬间报错上万条,马上回滚。
报错提⽰:
java.lang.OutOfMemoryError: Unable to create new native thread
这个错误的意思是:程序创建的线程数量已达到上限值
剖析错误
JVM向操作系统申请创建新的 native thread(原⽣线程)时, 就有可能会碰到 java.lang.OutOfMemoryError: Unable to create new native thread 错误. 如果底层操作系统创建新的 native thread 失败, JVM就会抛出相应的OutOfMemoryError. 原⽣线程的数量受到具体环境的限制, 通
过⼀些测试⽤例可以找出这些限制, 请参考下⽂的⽰例. 但总体来说, 导致 java.lang.OutOfMemoryError: Unable to create new native thread 错误的场景⼤多经历以下这些阶段:
1. java程序向jvm申请创建⼀个线程
2. jvm本地代码(native code)代理该请求,尝试创建⼀个操作系统级别的native Thread(原⽣线程)
3. 操作系统尝试创建⼀个新的native Thread,需要同时分配⼀些内存给该线程
4. 如果操作系统的虚拟内存已经耗尽,或者受到32位进程的地址空间限制(约2-4GB),OS就会拒绝本地内存分配
5. JVM抛出 java.lang.OutOfMemoryError: Unable to create new native thread 错误。
改进⽅案-springBoot整合线程池优化
错误很明显,就是创建线程数量过多,超过OS所能允许的最⼤空间。那这个问题,完全就可以⽤线程池去解决,⽤线程池维护⼀定数量的线程,防⽌⽆限制的创建线程,带来的内存开销过⼤。
代码改进:
1. 创建线程池配置类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.urrent.ThreadPoolTaskExecutor;
import urrent.Executor;
import urrent.ThreadPoolExecutor;
/**
* @description: 线程池配置类
**/
@Configuration//表⽰这个类是配置类
@EnableAsync//表⽰这个类是线程池配置类
public class ExecutorConfig {
private static final Logger logger = Logger(ExecutorConfig.class);
@Bean
public Executor asyncServiceExecutor(){
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor =new VisiableThreadPoolTaskExecutor();
int corePoolSize = Runtime().availableProcessors();
//配置核⼼线程数
executor.tCorePoolSize(corePoolSize*2);
//配置最⼤线程数
executor.tMaxPoolSize(corePoolSize*2);
//配置队列⼤⼩
executor.tQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.tThreadNamePrefix("async-rvice-");
// rejection-policy:当pool已经达到max size的时候,并且队列已经满了,如何处理新任务
// CALLER_RUNS:不在新线程中执⾏任务,⽽是有调⽤者所在的线程来执⾏
//DiscardPolicy: 直接丢弃
executor.tRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
/
/执⾏初始化
executor.initialize();
// 等待所有任务结束后再关闭线程池
executor.tWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
2.创建线程信息打印的类,这样在执⾏线程池执⾏excute⽅法的时候,会把当前的任务的情况打印出来
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.urrent.ThreadPoolTaskExecutor;
import org.urrent.ListenableFuture;
import urrent.Callable;
import urrent.Future;
import urrent.ThreadPoolExecutor;
/**
* @description: 获取线程池的监控信息
**/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
private static final Logger logger = Logger(VisiableThreadPoolTaskExecutor.class);
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor =getThreadPoolExecutor();
if(null == threadPoolExecutor){
return;
}
logger.info("{},{},taskCount[{}],completedTaskCount[{}],activeCount[{}],queuesize[{}]", ThreadNamePrefix(),
prefix,
);
}
@Override
public void execute(Runnable task){
showThreadPoolInfo("1. do execute");
}
@Override
public void execute(Runnable task,long startTimeout){
showThreadPoolInfo("2. do execute");
}
@Override
public Future<?>submit(Runnable task){
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public<T> Future<T>submit(Callable<T> task){
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?>submitListenable(Runnable task){
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public<T> ListenableFuture<T>submitListenable(Callable<T> task){
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
3. 创建任务接⼝
import com.alibaba.fastjson.JSONObject;
/
**
* 异步任务接⼝
*/
public interface AsyncService {
void trfficRepalyForFlagV2(String param);
}
4. 创建任务实现类
``
import com.alibaba.fastjson.JSONObject;
import com.lianjia.platform.sampling.util.HttpClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @description: 异步任务实现类
**/
@Service
public class AsyncServiceImpl implements AsyncService {
private final Logger logger = Logger(AsyncServiceImpl.class);
@Value("${afficReplayUrl}")
private String trafficReplayUrl;//获取yml⽂件中配置的流量回放环境的URL
@Override
@Async("asyncServiceExecutor")//这⾥要使⽤定义的线程池配置的Bean的⽅法名
public void trfficRepalyForFlagV2(String param){
HttpClientUtil.post(trafficReplayUrl+"sample/v2/flags", param,10000,"application/json");
}
}
5. 使⽤任务
@RequestMapping(value ="/flags", method = RequestMethod.POST)
@ResponBody
public ServerRespon getFlagsPost(@RequestBody NewServerParam param){
/
/如果流量回放开关打开,就创建线程,将请求发送到流量回放环境
if(Constant.TRAFFIC_REPLAY_FLAG){
}
long a = System.currentTimeMillis();
ServerRespon respon =new ServerRespon();
respon.Flags(param, Exp_key()));
long b = System.currentTimeMillis();
LOGGER.info(logService.parToJSon(param, respon,(b - a), LOGGER, Name()));
return respon;
}
压测结果:
压测数据:并发数50,压测时间10min。并发数=QPS(1000)*响应时间(0.02s)。
之前因为上线之前没有做压测,导致了上线之后,⼤流量下报错。吃⼀堑,长⼀智。这次改完之后做了压测。压测之后,第⼀,打开流量开关之后,不报错了;第⼆,主线程平均响应耗时和不开启异步任务时候的平均响应耗时基本⼀致。证明⽅案是可以的。
插曲:拒绝策略使⽤不当导致主线程平均响应时间⾮常⼤。第⼀次在写线程池配置类的时候,使⽤了
executor.tRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());的拒绝策略。队列的⼤⼩设置了3000,在压测的时候发现并发数50(⼀般计算是并发数=QPS * 响应时间)左右,这个和线上单台机器的QPS基本接近,虽然不报错了主线程的接⼝耗时远远超出了不打开开关的接⼝耗时。通过打印信息来看,是因为提交的任务量⾮常⼤,队列中的任务已经把队列填满了,这个时候,从线程池原理来看,要去创建线程数达到maxpoolsize,我们这⾥设置的maxPoolsize和corePoolsize⼤⼩是⼀样的。意味着就不会再去创建线程了,只能⾛拒绝策略。这⾥的拒绝策略CallerRunsPolicy的含义是如果异步线程执⾏不了,就由调⽤线程处理,实际上就是主线程来处理,这样就会导致主线程的部分流量回放任务成了同步的了。这当然会增⼤主线程的接⼝响应时间了。因为我们只需要有部分线上流量了其实就可以了,因此,我把拒绝策略改为了直接丢弃。这样处理不了
的线程不进⼊队列,也不由主线程执⾏,保证主线程的响应时间不受影响。

本文发布于:2023-05-05 18:23:44,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/858158.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:线程   流量   创建   回放
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图