在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟悉分布式的话,应该了解yarn的任务调度算法。本文主要用java实现一个fifo(先进先出调度器),这也是常见的一种调度方式。
主要实现的逻辑可以归纳为:
1、任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行。
2、如果任务无法获得资源,则将任务塞回队列原位置。
maven依赖如下:
<dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <optional>true</optional> </dependency> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-all</artifactid> <version>5.5.2</version> </dependency>
具体的原理就不细说了,通过代码我们看看fifo任务调度策略是什么玩的吧。下面的代码也可以作为参考。我们会使用到一个双向阻塞队列linkedblockingdeque。后面的代码说明会提到。
package ai.guiji.csdn.dispatch;import cn.hutool.core.thread.threadutil;import lombok.builder;import lombok.data;import lombok.extern.slf4j.slf4j;import org.springframework.scheduling.concurrent.customizablethreadfactory;import java.util.random;import java.util.concurrent.*;import java.util.concurrent.atomic.atomicinteger;import java.util.stream.intstream;/** * @program: csdn @classname: fifodemo @author: 剑客阿良_aliang @date: 2021-12-24 21:21 @description: * fifo队列 @version: v1.0 */@slf4jpublic class fifodemo { private static final linkedblockingdeque<task> task_queue = new linkedblockingdeque<>(); private static final concurrenthashmap<integer, linkedblockingqueue<resource>> resource_map = new concurrenthashmap<>(); private static final executorrvice task_pool = new threadpoolexecutor( 8, 16, 0l, timeunit.milliconds, new linkedblockingqueue<>(), new customizablethreadfactory("task-thread-"), new threadpoolexecutor.abortpolicy()); private static final scheduledexecutorrvice engine_pool = executors.newsinglethreadscheduledexecutor(new customizablethreadfactory("engine-")); private static final atomicinteger code_builder = new atomicinteger(0); @data @builder private static class resource { private integer rid; private type type; } @data @builder private static class task implements runnable { private integer tid; private runnable work; private type type; private resource resource; @override public void run() { log.info("[{}]任务,使用资源编号:[{}]", tid, resource.getrid()); try { work.run(); } catch (exception exception) { exception.printstacktrace(); } finally { log.info("[{}]任务结束,回归资源", tid); returnresource(resource); } } } private enum type { /** 资源类型 */ a("a资源", 1), b("b资源", 2), c("c资源", 3); private final string desc; private final integer code; type(string desc, integer code) { this.desc = desc; this.code = code; } public string getdesc() { return desc; } public integer getcode() { return code; } } public static void initresource() { random random = new random(); int acount = random.nextint(10) + 1; int bcount = random.nextint(10) + 1; int ccount = random.nextint(10) + 1; resource_map.put(type.a.getcode(), new linkedblockingqueue<>()); resource_map.put(type.b.getcode(), new linkedblockingqueue<>()); resource_map.put(type.c.getcode(), new linkedblockingqueue<>()); intstream.rangeclod(1, acount) .foreach( a -> resource_map .get(type.a.getcode()) .add(resource.builder().rid(a).type(type.a).build())); intstream.rangeclod(1, bcount) .foreach( a -> resource_map .get(type.b.getcode()) .add(resource.builder().rid(a).type(type.b).build())); intstream.rangeclod(1, ccount) .foreach( a -> resource_map .get(type.c.getcode()) .add(resource.builder().rid(a).type(type.c).build())); log.info("初始化资源a数量:{},资源b数量:{},资源c数量:{}", acount, bcount, ccount); } public static resource extractresource(type type) { return resource_map.get(type.getcode()).poll(); } public static void returnresource(resource resource) { log.info("开始归还资源,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc()); resource_map.get(resource.gettype().code).add(resource); log.info("归还资源完成,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc()); } public static void engindo() { engine_pool.sche广场交谊舞桑巴duleatfixedrate( () -> { task task = task_queue.poll(); if (task == null) { log.info("任务队列为空,无需要执行的任务"); } el { resource resource = extractreso毕业生登记表特长urce(task.gettype()); 三角形高公式 if (resource == null) { log.info("[{}]任务无法获取[{}],返回队列", task.gettid(), task.gettype().getdesc()); task_queue.addfirst(task); } el { task.tresource(resource); task_pool.submi海蚀崖t(task); } } }, 0, 1, timeunit.conds); } public static void addtask(runnable runnable, type type) { integer tid = code_builder.incrementandget(); task task = task.builder().tid(tid).type(type).work(runnable).build(); log.info("提交任务[{}]到任务队列", tid); task_queue.add(task); } public static void main(string[] args) { initresource(); engindo(); random random = new random(); threadutil.sleep(5000); intstream.range(0, 10) .foreach( a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.conds), type.a)); intstream.range(0, 10) .foreach( a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.conds), type.b)); intstream.range(0, 10) .foreach( a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.conds), type.c)); }}
代码说明:
1、首先我们构造了任务队列,使用的是linkedblockingdeque,使用双向队列的原因是如果任务无法获取资源,还需要塞到文章多大队首,保证任务的有序性。
2、使用concurrenthashmap作为资源映射表,为了保证资源队列使用的均衡性,一旦使用完成的资源会塞到对应资源的队尾处。
3、其中实现了添加任务、提取资源、回归资源几个方法。
4、initresource方法可以初始化资源队列,这里面只是简单的随机了几个资源到a、b、c三种资源,塞入各类别队列。
5、任务私有类有自己的任务标识以及执行完后调用回归资源方法。
6、main方法中会分别提交需要3中资源的10个任务,看看调度情况。
执行结果
我们可以通过结果发现任务有序调度,使用完任务后回归队列。
以上就是java实现fifo任务调度队列策略的详细内容,更多关于java fifo任务调度的资料请关注www.887551.com其它相关文章!
本文发布于:2023-04-04 07:27:05,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/e2e0bd699356576de7ba1fe6133ac8bb.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:Java实现FIFO任务调度队列策略.doc
本文 PDF 下载地址:Java实现FIFO任务调度队列策略.pdf
留言与评论(共有 0 条评论) |