首页 > 作文

Java实现FIFO任务调度队列策略

更新时间:2023-04-04 07:27:07 阅读: 评论:0

目录
前言fifo任务调度器架构示例代码

前言

在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟悉分布式的话,应该了解yarn的任务调度算法。本文主要用java实现一个fifo(先进先出调度器),这也是常见的一种调度方式。

fifo任务调度器架构

主要实现的逻辑可以归纳为:

1、任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行。

2、如果任务无法获得资源,则将任务塞回队列原位置。

示例代码

maven依赖如下:

      <dependency>            <groupid>org.projectlombok</groupid>            <artifactid>lombok</artifactid>            <optional>true</optional>        </dependency>                <dependency>            <groupid>cn.hutool</groupid>            <artifactid>hutool-all</artifactid>            <version>5.5.2</version>        </dependency>

具体的原理就不细说了,通过代码我们看看fifo任务调度策略是什么玩的吧。下面的代码也可以作为参考。我们会使用到一个双向阻塞队列linkedblockingdeque。后面的代码说明会提到。

package ai.guiji.csdn.dispatch;import cn.hutool.core.thread.threadutil;import lombok.builder;import lombok.data;import lombok.extern.slf4j.slf4j;import org.springframework.scheduling.concurrent.customizablethreadfactory;import java.util.random;import java.util.concurrent.*;import java.util.concurrent.atomic.atomicinteger;import java.util.stream.intstream;/** * @program: csdn @classname: fifodemo @author: 剑客阿良_aliang @date: 2021-12-24 21:21 @description: * fifo队列 @version: v1.0 */@slf4jpublic class fifodemo {  private static final linkedblockingdeque<task> task_queue = new linkedblockingdeque<>();  private static final concurrenthashmap<integer, linkedblockingqueue<resource>> resource_map =      new concurrenthashmap<>();  private static final executorrvice task_pool =      new threadpoolexecutor(          8,          16,          0l,          timeunit.milliconds,          new linkedblockingqueue<>(),          new customizablethreadfactory("task-thread-"),          new threadpoolexecutor.abortpolicy());  private static final scheduledexecutorrvice engine_pool =      executors.newsinglethreadscheduledexecutor(new customizablethreadfactory("engine-"));  private static final atomicinteger code_builder = new atomicinteger(0);  @data  @builder  private static class resource {    private integer rid;    private type type;  }  @data  @builder  private static class task implements runnable {    private integer tid;    private runnable work;    private type type;    private resource resource;    @override    public void run() {      log.info("[{}]任务,使用资源编号:[{}]", tid, resource.getrid());      try {        work.run();      } catch (exception exception) {        exception.printstacktrace();      } finally {        log.info("[{}]任务结束,回归资源", tid);        returnresource(resource);      }    }  }  private enum type {    /** 资源类型 */    a("a资源", 1),    b("b资源", 2),    c("c资源", 3);    private final string desc;    private final integer code;    type(string desc, integer code) {      this.desc = desc;      this.code = code;    }    public string getdesc() {      return desc;    }    public integer getcode() {      return code;    }  }  public static void initresource() {    random random = new random();    int acount = random.nextint(10) + 1;    int bcount = random.nextint(10) + 1;    int ccount = random.nextint(10) + 1;    resource_map.put(type.a.getcode(), new linkedblockingqueue<>());    resource_map.put(type.b.getcode(), new linkedblockingqueue<>());    resource_map.put(type.c.getcode(), new linkedblockingqueue<>());    intstream.rangeclod(1, acount)        .foreach(            a ->                resource_map                    .get(type.a.getcode())                    .add(resource.builder().rid(a).type(type.a).build()));    intstream.rangeclod(1, bcount)        .foreach(            a ->                resource_map                    .get(type.b.getcode())                    .add(resource.builder().rid(a).type(type.b).build()));    intstream.rangeclod(1, ccount)        .foreach(            a ->                resource_map                    .get(type.c.getcode())                    .add(resource.builder().rid(a).type(type.c).build()));    log.info("初始化资源a数量:{},资源b数量:{},资源c数量:{}", acount, bcount, ccount);  }  public static resource extractresource(type type) {    return resource_map.get(type.getcode()).poll();  }  public static void returnresource(resource resource) {    log.info("开始归还资源,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc());    resource_map.get(resource.gettype().code).add(resource);    log.info("归还资源完成,rid:{},资源类型:{}", resource.getrid(), resource.gettype().getdesc());  }  public static void engindo() {    engine_pool.sche广场交谊舞桑巴duleatfixedrate(        () -> {          task task = task_queue.poll();          if (task == null) {            log.info("任务队列为空,无需要执行的任务");          } el {            resource resource = extractreso毕业生登记表特长urce(task.gettype());     三角形高公式       if (resource == null) {              log.info("[{}]任务无法获取[{}],返回队列", task.gettid(), task.gettype().getdesc());              task_queue.addfirst(task);            } el {              task.tresource(resource);              task_pool.submi海蚀崖t(task);            }          }        },        0,        1,        timeunit.conds);  }  public static void addtask(runnable runnable, type type) {    integer tid = code_builder.incrementandget();    task task = task.builder().tid(tid).type(type).work(runnable).build();    log.info("提交任务[{}]到任务队列", tid);    task_queue.add(task);  }  public static void main(string[] args) {    initresource();    engindo();    random random = new random();    threadutil.sleep(5000);    intstream.range(0, 10)        .foreach(            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.conds), type.a));    intstream.range(0, 10)        .foreach(            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.conds), type.b));    intstream.range(0, 10)        .foreach(            a -> addtask(() -> threadutil.sleep(random.nextint(10) + 1, timeunit.conds), type.c));  }}

代码说明:

1、首先我们构造了任务队列,使用的是linkedblockingdeque,使用双向队列的原因是如果任务无法获取资源,还需要塞到文章多大队首,保证任务的有序性。

2、使用concurrenthashmap作为资源映射表,为了保证资源队列使用的均衡性,一旦使用完成的资源会塞到对应资源的队尾处。

3、其中实现了添加任务、提取资源、回归资源几个方法。

4、initresource方法可以初始化资源队列,这里面只是简单的随机了几个资源到a、b、c三种资源,塞入各类别队列。

5、任务私有类有自己的任务标识以及执行完后调用回归资源方法。

6、main方法中会分别提交需要3中资源的10个任务,看看调度情况。

执行结果

我们可以通过结果发现任务有序调度,使用完任务后回归队列。

以上就是java实现fifo任务调度队列策略的详细内容,更多关于java fifo任务调度的资料请关注www.887551.com其它相关文章!

本文发布于:2023-04-04 07:27:05,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/e2e0bd699356576de7ba1fe6133ac8bb.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:Java实现FIFO任务调度队列策略.doc

本文 PDF 下载地址:Java实现FIFO任务调度队列策略.pdf

标签:队列   资源   代码   几个
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图