首页 > 编程 > Java > 正文

Spring boot定时任务的原理及动态创建详解

2019-11-26 09:14:33
字体:
来源:转载
供稿:网友

v一、前言

定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于Spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧

上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到SFTP服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。

v二、Springboot引入定时任务核心配置

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic @interface EnableScheduling {}@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); }}

接下来主要看一下这个核心后置处理器:ScheduledAnnotationBeanPostProcessor 。

@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||  bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,  (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {   Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(    method, Scheduled.class, Schedules.class);   return (!scheduledMethods.isEmpty() ? scheduledMethods : null);  }); if (annotatedMethods.isEmpty()) {  this.nonAnnotatedClasses.add(targetClass);  if (logger.isTraceEnabled()) {  logger.trace("No @Scheduled annotations found on bean class: " + targetClass);  } } else {  // Non-empty set of methods  annotatedMethods.forEach((method, scheduledMethods) ->   scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));  if (logger.isTraceEnabled()) {  logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +   "': " + annotatedMethods);  } } } return bean;}

1、处理Scheduled注解,通过ScheduledTaskRegistrar注册定时任务。

private void finishRegistration() { if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans =  ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) {  configurer.configureTasks(this.registrar); } } if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try {  // Search for TaskScheduler bean...  this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) {  logger.trace("Could not find unique TaskScheduler bean", ex);  try {  this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));  }  catch (NoSuchBeanDefinitionException ex2) {  if (logger.isInfoEnabled()) {   logger.info("More than one TaskScheduler bean exists within the context, and " +    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +    ex.getBeanNamesFound());  }  } } catch (NoSuchBeanDefinitionException ex) {  logger.trace("Could not find default TaskScheduler bean", ex);  // Search for ScheduledExecutorService bean next...  try {  this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));  }  catch (NoUniqueBeanDefinitionException ex2) {  logger.trace("Could not find unique ScheduledExecutorService bean", ex2);  try {   this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));  }  catch (NoSuchBeanDefinitionException ex3) {   if (logger.isInfoEnabled()) {   logger.info("More than one ScheduledExecutorService bean exists within the context, and " +    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +    ex2.getBeanNamesFound());   }  }  }  catch (NoSuchBeanDefinitionException ex2) {  logger.trace("Could not find default ScheduledExecutorService bean", ex2);  // Giving up -> falling back to default scheduler within the registrar...  logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");  } } } this.registrar.afterPropertiesSet();}

  1、通过一系列的SchedulingConfigurer动态配置ScheduledTaskRegistrar。

  2、向ScheduledTaskRegistrar注册一个TaskScheduler(用于对Runnable的任务进行调度,它包含有多种触发规则)。

  3、registrar.afterPropertiesSet(),在这开始安排所有的定时任务开始执行了。

protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) {  addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) {  addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) {  addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) {  addScheduledTask(scheduleFixedDelayTask(task)); } }}

  1、TriggerTask:动态定时任务。通过Trigger#nextExecutionTime 给定的触发上下文确定下一个执行时间。

  2、CronTask:动态定时任务,TriggerTask子类。通过cron表达式确定的时间触发下一个任务执行。

  3、IntervalTask:一定时间延迟之后,周期性执行的任务。

  4、taskScheduler 如果为空,默认是ConcurrentTaskScheduler,并使用默认单线程的ScheduledExecutor。

v三、主要看一下CronTask工作原理

ScheduledTaskRegistrar.java@Nullablepublic ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null);}ConcurrentTaskScheduler.java@Override@Nullablepublic ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { try { if (this.enterpriseConcurrentScheduler) {  return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); } else {  ErrorHandler errorHandler =   (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));  return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); }}ReschedulingRunnable.java@Nullablepublic ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) {  return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; }}private ScheduledFuture<?> obtainCurrentFuture() { Assert.state(this.currentFuture != null, "No scheduled future"); return this.currentFuture;}@Overridepublic void run() { Date actualExecutionTime = new Date(); super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) { Assert.state(this.scheduledExecutionTime != null, "No scheduled execution"); this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) {  schedule(); } }}

  1、最终将task和trigger都封装到了ReschedulingRunnable中。

  2、ReschedulingRunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。

  3、ReschedulingRunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。

  4、不同的ReschedulingRunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,TaskScheduler的schedule方法的多次调用是可以交叉执行的。

ScheduledThreadPoolExecutor.javapublic ScheduledFuture<?> schedule(Runnable command,     long delay,     TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,     triggerTime(delay, unit))); delayedExecute(t); return t;}private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() &&  !canRunInCurrentRunState(task.isPeriodic()) &&  remove(task))  task.cancel(false); else  ensurePrestart(); }}

  ScheduledFutureTask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。

 

  1、ScheduledFutureTask会放入优先阻塞队列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆实现)

  2、上图中的Thread对象即ThreadPoolExecutor.Worker,实现了Runnable接口

/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() { runWorker(this);}

  1、Worker中维护了Thread对象,Thread对象的Runnable实例即Worker自身

  2、ThreadPoolExecutor#addWorker方法中会创建Worker对象,然后拿到Worker中的thread实例并start,这样就创建了线程池中的一个线程实例

  3、Worker的run方法会调用ThreadPoolExecutor#runWorker方法,这才是任务最终被执行的地方,该方法示意如下

  (1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向  workQueue队列中offer任务。
getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束,返回的是null,runWorker循环结束,执行processWorkerExit方法。
至此,该线程结束自己的使命,从线程池中“消失”。

  (2)在开始执行任务之前,会调用Worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。

  (3)beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。

我们可以在beforeExecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedAbruptly的值是true,表示the worker died due to user exception,会用decrementWorkerCount调整wc。

  (4)因为Runnable的run方法不能抛出Throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterExecute中对异常做一些处理。

  (5)afterExecute方法也可能抛出异常,也可能使当前线程死掉。

v四、动态创建定时任务

v  TaskConfiguration 配置类

@Configuration@EnableScheduling@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class TaskConfiguration { @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledExecutorService scheduledAnnotationProcessor() { return Executors.newScheduledThreadPool(5, new DefaultThreadFactory()); } private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() {  SecurityManager s = System.getSecurityManager();  group = (s != null) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();  namePrefix = "pool-" +   poolNumber.getAndIncrement() +   "-schedule-"; } @Override public Thread newThread(Runnable r) {  Thread t = new Thread(group, r,   namePrefix + threadNumber.getAndIncrement(),   0);  if (t.isDaemon()) {  t.setDaemon(false);  }  if (t.getPriority() != Thread.NORM_PRIORITY) {  t.setPriority(Thread.NORM_PRIORITY);  }  return t; } }}

  1、保证ConcurrentTaskScheduler不使用默认单线程的ScheduledExecutor,而是corePoolSize=5的线程池

  2、自定义线程池工厂类

v  DynamicTask 动态定时任务

@Configurationpublic class DynamicTask implements SchedulingConfigurer { private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class); private static final ExecutorService es = new ThreadPoolExecutor(10, 20,   0L, TimeUnit.MILLISECONDS,   new LinkedBlockingQueue<>(10),   new DynamicTaskConsumeThreadFactory()); private volatile ScheduledTaskRegistrar registrar; private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>(); private volatile List<TaskConstant> taskConstants = Lists.newArrayList(); @Override public void configureTasks(ScheduledTaskRegistrar registrar) {  this.registrar = registrar;  this.registrar.addTriggerTask(() -> {     if (!CollectionUtils.isEmpty(taskConstants)) {      LOGGER.info("检测动态定时任务列表...");      List<TimingTask> tts = new ArrayList<>();      taskConstants        .forEach(taskConstant -> {         TimingTask tt = new TimingTask();         tt.setExpression(taskConstant.getCron());         tt.setTaskId("dynamic-task-" + taskConstant.getTaskId());         tts.add(tt);        });      this.refreshTasks(tts);     }    }    , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext)); } public List<TaskConstant> getTaskConstants() {  return taskConstants; } private void refreshTasks(List<TimingTask> tasks) {  //取消已经删除的策略任务  Set<String> taskIds = scheduledFutures.keySet();  for (String taskId : taskIds) {   if (!exists(tasks, taskId)) {    scheduledFutures.get(taskId).cancel(false);   }  }  for (TimingTask tt : tasks) {   String expression = tt.getExpression();   if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) {    LOGGER.error("定时任务DynamicTask cron表达式不合法: " + expression);    continue;   }   //如果配置一致,则不需要重新创建定时任务   if (scheduledFutures.containsKey(tt.getTaskId())     && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) {    continue;   }   //如果策略执行时间发生了变化,则取消当前策略的任务   if (scheduledFutures.containsKey(tt.getTaskId())) {    scheduledFutures.remove(tt.getTaskId()).cancel(false);    cronTasks.remove(tt.getTaskId());   }   CronTask task = new CronTask(tt, expression);   ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());   cronTasks.put(tt.getTaskId(), task);   scheduledFutures.put(tt.getTaskId(), future);  } } private boolean exists(List<TimingTask> tasks, String taskId) {  for (TimingTask task : tasks) {   if (task.getTaskId().equals(taskId)) {    return true;   }  }  return false; } @PreDestroy public void destroy() {  this.registrar.destroy(); } public static class TaskConstant {  private String cron;  private String taskId;  public String getCron() {   return cron;  }  public void setCron(String cron) {   this.cron = cron;  }  public String getTaskId() {   return taskId;  }  public void setTaskId(String taskId) {   this.taskId = taskId;  } } private class TimingTask implements Runnable {  private String expression;  private String taskId;  public String getTaskId() {   return taskId;  }  public void setTaskId(String taskId) {   this.taskId = taskId;  }  @Override  public void run() {   //设置队列大小10   LOGGER.error("当前CronTask: " + this);   DynamicBlockingQueue queue = new DynamicBlockingQueue(3);   es.submit(() -> {    while (!queue.isDone() || !queue.isEmpty()) {     try {      String content = queue.poll(500, TimeUnit.MILLISECONDS);      if (StringUtils.isBlank(content)) {       return;      }      LOGGER.info("DynamicBlockingQueue 消费:" + content);      TimeUnit.MILLISECONDS.sleep(500);     } catch (InterruptedException e) {      e.printStackTrace();     }    }   });   //队列放入数据   for (int i = 0; i < 5; ++i) {    try {     queue.put(String.valueOf(i));     LOGGER.info("DynamicBlockingQueue 生产:" + i);    } catch (InterruptedException e) {     e.printStackTrace();    }   }   queue.setDone(true);  }  public String getExpression() {   return expression;  }  public void setExpression(String expression) {   this.expression = expression;  }  @Override  public String toString() {   return ReflectionToStringBuilder.toString(this     , ToStringStyle.JSON_STYLE     , false     , false     , TimingTask.class);  } } /**  * 队列消费线程工厂类  */ private static class DynamicTaskConsumeThreadFactory implements ThreadFactory {  private static final AtomicInteger poolNumber = new AtomicInteger(1);  private final ThreadGroup group;  private final AtomicInteger threadNumber = new AtomicInteger(1);  private final String namePrefix;  DynamicTaskConsumeThreadFactory() {   SecurityManager s = System.getSecurityManager();   group = (s != null) ? s.getThreadGroup() :     Thread.currentThread().getThreadGroup();   namePrefix = "pool-" +     poolNumber.getAndIncrement() +     "-dynamic-task-";  }  @Override  public Thread newThread(Runnable r) {   Thread t = new Thread(group, r,     namePrefix + threadNumber.getAndIncrement(),     0);   if (t.isDaemon()) {    t.setDaemon(false);   }   if (t.getPriority() != Thread.NORM_PRIORITY) {    t.setPriority(Thread.NORM_PRIORITY);   }   return t;  } } private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> {  DynamicBlockingQueue(int capacity) {   super(capacity);  }  private volatile boolean done = false;  public boolean isDone() {   return done;  }  public void setDone(boolean done) {   this.done = done;  } }}

  1、taskConstants 动态任务列表

  2、ScheduledTaskRegistrar#addTriggerTask 添加动态周期定时任务,检测动态任务列表的变化

CronTask task = new CronTask(tt, expression);ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());cronTasks.put(tt.getTaskId(), task);scheduledFutures.put(tt.getTaskId(), future);

  3、动态创建cron定时任务,拿到ScheduledFuture实例并缓存起来

  4、在刷新任务列表时,通过缓存的ScheduledFuture实例和CronTask实例,来决定是否取消、移除失效的动态定时任务。

v  DynamicTaskTest 动态定时任务测试类

@RunWith(SpringRunner.class)@SpringBootTestpublic class DynamicTaskTest { @Autowired private DynamicTask dynamicTask; @Test public void test() throws InterruptedException {  List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants();  DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant();  taskConstant.setCron("0/5 * * * * ?");  taskConstant.setTaskId("test1");  taskConstans.add(taskConstant);  DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant();  taskConstant1.setCron("0/5 * * * * ?");  taskConstant1.setTaskId("test2");  taskConstans.add(taskConstant1);  DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant();  taskConstant2.setCron("0/5 * * * * ?");  taskConstant2.setTaskId("test3");  taskConstans.add(taskConstant2);  TimeUnit.SECONDS.sleep(40);  //移除并添加新的配置  taskConstans.remove(taskConstans.size() - 1);  DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant();  taskConstant3.setCron("0/5 * * * * ?");  taskConstant3.setTaskId("test4");  taskConstans.add(taskConstant3);//  TimeUnit.MINUTES.sleep(50); }}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对武林网的支持。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表