CronScheduler.kt

  1. package com.hexagontk.scheduler

  2. import com.hexagontk.core.fail
  3. import com.cronutils.model.CronType.QUARTZ
  4. import com.cronutils.model.definition.CronDefinitionBuilder.instanceDefinitionFor as cronDefinition
  5. import com.cronutils.model.time.ExecutionTime
  6. import com.cronutils.parser.CronParser
  7. import com.hexagontk.core.error
  8. import com.hexagontk.core.loggerOf

  9. import java.lang.Runtime.getRuntime
  10. import java.lang.System.Logger
  11. import java.time.ZonedDateTime
  12. import java.util.concurrent.ScheduledThreadPoolExecutor
  13. import java.util.concurrent.TimeUnit.SECONDS

  14. /**
  15.  * Scheduler to execute tasks repeatedly. After using it, you should call the [shutdown] method. If
  16.  * the JVM finishes without calling [shutdown], it will be called upon JVM termination.
  17.  *
  18.  * @param threads Number of threads used by the thread pool. By default, it is equals to the number
  19.  *  of processors.
  20.  *
  21.  * @sample com.hexagontk.scheduler.CronSchedulerTest.callbackExecutedProperly
  22.  */
  23. class CronScheduler(threads: Int = getRuntime().availableProcessors()) {
  24.     private val log: Logger = loggerOf(this::class)

  25.     private val scheduler = ScheduledThreadPoolExecutor(threads)
  26.     private val cronParser = CronParser(cronDefinition(QUARTZ))

  27.     init {
  28.         getRuntime().addShutdownHook(Thread { shutdown() })
  29.     }

  30.     /**
  31.      * Schedules a block of code to be executed repeatedly by a
  32.      * [Cron](https://en.wikipedia.org/wiki/Cron) expression.
  33.      *
  34.      * @param cronExpression Periodicity of the task in Cron format.
  35.      * @param callback Task code to be executed periodically.
  36.      */
  37.     fun schedule(cronExpression: String, callback: () -> Unit) {
  38.         val cron = cronParser.parse(cronExpression)
  39.         val cronExecution = ExecutionTime.forCron(cron)

  40.         scheduler.schedule({ function(callback, cronExecution) }, delay(cronExecution), SECONDS)
  41.     }

  42.     /**
  43.      * Shuts down this scheduler's thread pool. Calling over an already closed scheduler does not
  44.      * have any effect. It is called by the JVM when it is shut down.
  45.      */
  46.     fun shutdown() {
  47.         scheduler.shutdown()
  48.     }

  49.     private fun delay(cronExecution: ExecutionTime): Long =
  50.         cronExecution.timeToNextExecution(ZonedDateTime.now()).orElseThrow { fail }.seconds

  51.     private fun function(callback: () -> Unit, cronExecution: ExecutionTime) {
  52.         try {
  53.             callback()
  54.         }
  55.         catch (e: Exception) {
  56.             log.error(e) { "Error executing cron job" }
  57.         }

  58.         scheduler.schedule({ function(callback, cronExecution) }, delay(cronExecution), SECONDS)
  59.     }
  60. }