Imports System.Collections.Specialized Imports DigitalData.Modules.Database Imports DigitalData.Modules.Logging Imports DigitalData.Modules.Windream Imports ECM.JobRunner.Common Imports Quartz Imports Quartz.Logging.OperationName Namespace Scheduler Public Class JobScheduler Private ReadOnly Settings As New NameValueCollection From { {"quartz.serializer.type", "binary"}, {"quartz.threadPool.threadCount", 10} } Private ReadOnly LogConfig As LogConfig Private ReadOnly Logger As Logger Private ReadOnly Database As MSSQLServer Private ReadOnly Factory As Quartz.Impl.StdSchedulerFactory Private ReadOnly State As State Private ReadOnly Windream As Windream Private Scheduler As IScheduler Private Const JOB_TYPE_IMPORT As Integer = 1 Private Const JOB_TYPE_INDEX As Integer = 2 Public Sub New(pLogConfig As LogConfig, pDatabase As MSSQLServer, pState As State, pWindream As Windream) LogConfig = pLogConfig Logger = pLogConfig.GetLogger() Factory = New Impl.StdSchedulerFactory(Settings) Database = pDatabase Windream = pWindream State = pState End Sub Private Function BuildJobData(pJobConfig As JobConfig) As JobDataMap Return New JobDataMap From { {Constants.Scheduler.JOB_CONFIG_LOGCONFIG, LogConfig}, {Constants.Scheduler.JOB_CONFIG_ARGUMENTS, pJobConfig.Arguments}, {Constants.Scheduler.JOB_CONFIG_DATABASE, Database}, {Constants.Scheduler.JOB_CONFIG_STATE, State}, {Constants.Scheduler.JOB_CONFIG_WINDREAM, Windream} } End Function Public Async Function Start() As Task(Of Boolean) Try ' Log all quartz events into our standard log files Logging.LogProvider.SetCurrentLogProvider(New LogProvider(LogConfig)) ' initialize the scheduler Scheduler = Await Factory.GetScheduler() Scheduler.ListenerManager.AddJobListener(New JobListener(LogConfig, State.JobHistory)) ' start the scheduler Await Scheduler.Start() ' load job Config and setup job schedules ScheduleJobs() Return True Catch ex As Exception Logger.Error(ex) Return False End Try End Function Public Sub Reload() ' first reload the data from db State.Reload() ' now reschedule jobs ScheduleJobs() End Sub Public Async Function Shutdown() As Task Await Scheduler.Shutdown() End Function Public Async Function GetRunningJobs() As Task(Of IReadOnlyCollection(Of IJobExecutionContext)) Return Await Scheduler.GetCurrentlyExecutingJobs() End Function Public Async Function ScheduleJob(pJobId As Integer) As Task Dim oJob = State.JobDefinitions.Where(Function(j) j.Id = pJobId).SingleOrDefault() If oJob IsNot Nothing Then Logger.Info("Scheduling Job [{0}] manually!", oJob.Name) Await PrepareScheduleJob(oJob) End If End Function Private Async Function PrepareScheduleJob(pJob As JobDefinition) As Task Logger.Debug("Loading Job Definition [{0}]", pJob.Name) Logger.Debug("Job Type is [{0}]", pJob.Type.Name) Select Case pJob.TypeId Case JOB_TYPE_IMPORT Dim oJobConfig = BuildJobConfig(Of Jobs.FileImportJob)(pJob) Await ScheduleJob(Of Jobs.FileImportJob)(oJobConfig) Case JOB_TYPE_INDEX Dim oJobConfig = BuildJobConfig(Of Jobs.FileIndexJob)(pJob) Await ScheduleJob(Of Jobs.FileIndexJob)(oJobConfig) Case Else Logger.Warn("Job for TypeId [{0}] is not implemented!", pJob.TypeId) End Select End Function Private Async Sub ScheduleJobs() Logger.Info("Loading [{0}] Job Definitions..", State.JobDefinitions.Count) For Each oJob In State.JobDefinitions Await PrepareScheduleJob(oJob) Next End Sub Private Function BuildJobConfig(Of TJob As IJob)(pJob As JobDefinition) As JobConfig Return New JobConfig With { .Name = pJob.Name, .Enabled = pJob.Active, .Arguments = New Dictionary(Of String, String) From { {"Id", pJob.Id}, {"Name", pJob.Name} }, .CronSchedule = pJob.CronSchedule } End Function Private Async Function ScheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task If Await Scheduler.CheckExists(New JobKey(GetJobName(pJobConfig))) Then Logger.Debug("Job already exists, rescheduling..") Await DoRescheduleJob(Of T)(pJobConfig) Else Logger.Debug("Job does not exist, scheduling..") Await DoScheduleJob(Of T)(pJobConfig) End If End Function ''' ''' Updates the Trigger and Schedule for a given JobConfig ''' ''' ''' ''' Private Async Function DoRescheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task Dim oJobKey As New JobKey(pJobConfig.Name) Dim oTriggerKey As New TriggerKey(GetTriggerName(pJobConfig)) Dim oTrigger = BuildTrigger(pJobConfig) Await Scheduler.RescheduleJob(oTriggerKey, oTrigger) End Function Private Async Function DoScheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task Dim oJobName As String = GetJobName(pJobConfig) Dim oTriggerName As String = GetTriggerName(pJobConfig) Dim oJobData As JobDataMap = BuildJobData(pJobConfig) Dim oJob As IJobDetail = BuildJob(Of T)(pJobConfig, oJobData) Dim oTrigger As ITrigger = BuildTrigger(pJobConfig) If pJobConfig.Enabled Then Await Scheduler.ScheduleJob(oJob, oTrigger) Logger.Info("Job [{0}] scheduled.", oJobName) Else Logger.Info("Job [{0}] is disabled.", oJobName) End If If pJobConfig.StartWithoutDelay Then Dim oNoDelayTrigger = TriggerBuilder.Create(). WithIdentity(oTriggerName & "-NO-DELAY"). StartAt(DateBuilder.FutureDate(10, IntervalUnit.Second)). Build() Logger.Info("Job {0} will start in 10 Seconds.", oJobName) Await Scheduler.ScheduleJob(oJob, oNoDelayTrigger) End If End Function Private Function BuildJob(Of T As IJob)(pJobConfig As JobConfig, pJobData As JobDataMap) As IJobDetail Dim oJobName = GetJobName(pJobConfig) Return JobBuilder.Create(Of T)(). WithIdentity(pJobConfig.Name). UsingJobData(pJobData). Build() End Function Private Function BuildTrigger(pJobConfig As JobConfig) As ITrigger Dim oTriggerName As String = GetTriggerName(pJobConfig) Return TriggerBuilder.Create(). WithIdentity(oTriggerName). WithCronSchedule(pJobConfig.CronSchedule). StartNow(). Build() End Function Private Function GetJobName(pJobConfig As JobConfig) As String Return pJobConfig.Name End Function Private Function GetTriggerName(pJobConfig As JobConfig) As String Dim oJobName As String = pJobConfig.Name Return $"{oJobName}-TRIGGER" End Function End Class End Namespace