06-12-2022

This commit is contained in:
Jonathan Jenne
2022-12-06 14:08:20 +01:00
parent c867e4e3a6
commit 248be23804
46 changed files with 1513 additions and 347 deletions

View File

@@ -1,9 +1,8 @@
Imports System.Collections.Specialized
Imports System.Threading
Imports DigitalData.Modules.Database
Imports DigitalData.Modules.Logging
Imports ECM.JobRunner.Common
Imports Quartz
Imports Quartz.Logging.OperationName
Namespace Scheduler
Public Class JobScheduler
@@ -44,44 +43,7 @@ Namespace Scheduler
Await Scheduler.Start()
' load job Config and setup job schedules
Logger.Info("Loading [{0}] Job Definitions..", State.JobDefinitions.Count)
For Each oJob In State.JobDefinitions
Logger.Debug("Loading Job Definition [{0}]", oJob.Name)
Select Case oJob.TypeId
Case JOB_TYPE_IMPORT
Await ScheduleJob(Of Jobs.FileImportJob)(New JobConfig With {
.Name = oJob.Name,
.Enabled = True,
.Arguments = New Dictionary(Of String, String) From {
{"Name", oJob.Name}
},
.CronSchedule = oJob.CronSchedule
})
Case JOB_TYPE_INDEX
Await ScheduleJob(Of Jobs.FileIndexJob)(New JobConfig With {
.Name = oJob.Name,
.Enabled = True,
.Arguments = New Dictionary(Of String, String) From {
{"Name", oJob.Name}
},
.CronSchedule = oJob.CronSchedule
})
End Select
Next
' setup debug job
Await ScheduleJob(Of Jobs.DebugJob)(New JobConfig With {
.Name = "Debug Job",
.Enabled = True,
.Arguments = New Dictionary(Of String, String) From {{"Arg1", "My awesome argument"}},
.CronSchedule = "0 * * * * ?"
})
ScheduleJobs()
Return True
Catch ex As Exception
@@ -91,57 +53,141 @@ Namespace Scheduler
End Try
End Function
Public Sub Reload()
' first reload the data from db
State.Reload()
' now reschedule jobs
ScheduleJobs()
End Sub
Public Async Function Shutdown() As Task
Await Scheduler.Shutdown()
End Function
Public Async Function ScheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task
Dim oJobName As String = pJobConfig.Name
Dim oTriggerName As String = $"{oJobName}-TRIGGER"
Public Async Function GetRunningJobs() As Task(Of IReadOnlyCollection(Of IJobExecutionContext))
Return Await Scheduler.GetCurrentlyExecutingJobs()
End Function
pJobConfig.Name = oJobName
Private Async Sub ScheduleJobs()
Dim oJobData As New JobDataMap From {
{Constants.Scheduler.JOB_CONFIG_LOGCONFIG, LogConfig},
{Constants.Scheduler.JOB_CONFIG_ARGUMENTS, pJobConfig.Arguments},
{Constants.Scheduler.JOB_CONFIG_DATABASE, Database}
Logger.Info("Loading [{0}] Job Definitions..", State.JobDefinitions.Count)
For Each oJob In State.JobDefinitions
Logger.Debug("Loading Job Definition [{0}]", oJob.Name)
Logger.Debug("Job Type is [{0}]", oJob.Type.Name)
Select Case oJob.TypeId
Case JOB_TYPE_IMPORT
Dim oJobConfig = BuildJobConfig(Of Jobs.FileImportJob)(oJob)
Await ScheduleJob(Of Jobs.FileImportJob)(oJobConfig)
Case JOB_TYPE_INDEX
Dim oJobConfig = BuildJobConfig(Of Jobs.FileIndexJob)(oJob)
Await ScheduleJob(Of Jobs.FileIndexJob)(oJobConfig)
Case Else
Logger.Warn("Job for TypeId [{0}] is not implemented!", oJob.TypeId)
End Select
Next
End Sub
Private Function BuildJobConfig(Of TJob As IJob)(pJob As JobDefinition) As JobConfig
Return New JobConfig With {
.Name = pJob.Name,
.Enabled = pJob.Active,
.Arguments = New Dictionary(Of String, String) From {
{"Name", pJob.Name}
},
.CronSchedule = pJob.CronSchedule
}
End Function
Dim oJob = JobBuilder.Create(Of T)().
WithIdentity(oJobName).
UsingJobData(oJobData).
Build()
Private Async Function ScheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task
If Await Scheduler.CheckExists(New JobKey(GetJobName(pJobConfig))) Then
Logger.Debug("Job already exists, rescheduling..")
Await DoRescheduleJob(Of T)(pJobConfig)
Else
Logger.Debug("Job does not exist, scheduling..")
Await DoScheduleJob(Of T)(pJobConfig)
End If
End Function
Dim oTrigger = TriggerBuilder.Create().
WithIdentity(oTriggerName).
StartNow().
WithCronSchedule(pJobConfig.CronSchedule).
Build()
''' <summary>
''' Updates the Trigger and Schedule for a given JobConfig
''' </summary>
''' <typeparam name="T"></typeparam>
''' <param name="pJobConfig"></param>
''' <returns></returns>
Private Async Function DoRescheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task
Dim oJobKey As New JobKey(pJobConfig.Name)
Dim oTriggerKey As New TriggerKey(GetTriggerName(pJobConfig))
Dim oTrigger = BuildTrigger(pJobConfig)
Await Scheduler.RescheduleJob(oTriggerKey, oTrigger)
End Function
Private Async Function DoScheduleJob(Of T As IJob)(pJobConfig As JobConfig) As Task
Dim oJobName As String = GetJobName(pJobConfig)
Dim oTriggerName As String = GetTriggerName(pJobConfig)
Dim oJobData As JobDataMap = BuildJobData(pJobConfig)
Dim oJob As IJobDetail = BuildJob(Of T)(pJobConfig, oJobData)
Dim oTrigger As ITrigger = BuildTrigger(pJobConfig)
If pJobConfig.Enabled Then
Await Scheduler.ScheduleJob(oJob, oTrigger)
Logger.Info("Job {0} scheduled.", oJobName)
Logger.Info("Job [{0}] scheduled.", oJobName)
Else
Logger.Info("Job {0} is disabled.", oJobName)
Logger.Info("Job [{0}] is disabled.", oJobName)
End If
If pJobConfig.StartWithoutDelay Then
Dim oDebugJob = JobBuilder.Create(Of T)().
WithIdentity(oJobName & "-DEBUG").
UsingJobData(oJobData).
Build()
Dim oDebugTrigger = TriggerBuilder.Create().
WithIdentity(oTriggerName & "-DEBUG").
Dim oNoDelayTrigger = TriggerBuilder.Create().
WithIdentity(oTriggerName & "-NO-DELAY").
StartAt(DateBuilder.FutureDate(10, IntervalUnit.Second)).
Build()
Logger.Info("Job {0} will start in 10 Seconds.", oJobName)
Await Scheduler.ScheduleJob(oDebugJob, oDebugTrigger)
Await Scheduler.ScheduleJob(oJob, oNoDelayTrigger)
End If
End Function
Private Function BuildJobData(pJobConfig As JobConfig) As JobDataMap
Return New JobDataMap From {
{Constants.Scheduler.JOB_CONFIG_LOGCONFIG, LogConfig},
{Constants.Scheduler.JOB_CONFIG_ARGUMENTS, pJobConfig.Arguments},
{Constants.Scheduler.JOB_CONFIG_DATABASE, Database},
{Constants.Scheduler.JOB_CONFIG_STATE, State}
}
End Function
Private Function BuildJob(Of T As IJob)(pJobConfig As JobConfig, pJobData As JobDataMap) As IJobDetail
Dim oJobName = GetJobName(pJobConfig)
Return JobBuilder.Create(Of T)().
WithIdentity(pJobConfig.Name).
UsingJobData(pJobData).
Build()
End Function
Private Function BuildTrigger(pJobConfig As JobConfig) As ITrigger
Dim oTriggerName As String = GetTriggerName(pJobConfig)
Return TriggerBuilder.Create().
WithIdentity(oTriggerName).
WithCronSchedule(pJobConfig.CronSchedule).
StartNow().
Build()
End Function
Private Function GetJobName(pJobConfig As JobConfig) As String
Return pJobConfig.Name
End Function
Private Function GetTriggerName(pJobConfig As JobConfig) As String
Dim oJobName As String = pJobConfig.Name
Return $"{oJobName}-TRIGGER"
End Function
End Class
End Namespace

View File

@@ -0,0 +1,65 @@
Imports DigitalData.Modules.Base
Imports DigitalData.Modules.Logging
Imports ECM.JobRunner.Common
Public Class JobStatus
Inherits BaseClass
Public ReadOnly Entries As New List(Of StatusItem)
Public Sub New(pLogConfig As LogConfig)
MyBase.New(pLogConfig)
End Sub
Public Sub Start(pJob As Quartz.IJobExecutionContext)
Dim oStatus = GetJobStatus(pJob)
Logger.Info("Starting Job [{0}]", oStatus.Id)
If oStatus IsNot Nothing Then
oStatus.Name = pJob.JobDetail.Key.Name
oStatus.StartTime = Date.Now
oStatus.Executing = True
End If
End Sub
Public Sub Update(pJob As Quartz.IJobExecutionContext, pCurrent As Integer, pTotal As Integer)
Dim oStatus = GetJobStatus(pJob)
Logger.Debug("Updating Job [{0}] with Status [{1}/{2}]", oStatus.Id, pCurrent, pTotal)
If oStatus IsNot Nothing Then
oStatus.ProgressTotal = pTotal
oStatus.ProgressCurrent = pCurrent
oStatus.ExecutionTime = pJob.JobRunTime
End If
End Sub
Public Sub Complete(pJob As Quartz.IJobExecutionContext)
Dim oStatus = GetJobStatus(pJob)
Logger.Info("Completing Job [{0}]", oStatus.Id)
If oStatus IsNot Nothing Then
oStatus.ProgressCurrent = oStatus.ProgressTotal
oStatus.ExecutionTime = pJob.JobRunTime
oStatus.Executing = False
oStatus.CompleteTime = Date.Now
End If
End Sub
Private Function GetJobStatus(pJob As Quartz.IJobExecutionContext) As StatusItem
Dim oJobId = GetJobId(pJob)
Dim oExists = Entries.Where(Function(e) e.Id = oJobId).Any()
If Not oExists Then
Entries.Add(New StatusItem With {.Id = oJobId})
End If
Return Entries.Where(Function(e) e.Id = oJobId).SingleOrDefault()
End Function
Private Function GetJobId(pJob As Quartz.IJobExecutionContext) As String
Return pJob.JobDetail.Key.ToString() & pJob.FireTimeUtc.ToString("u")
End Function
End Class

View File

@@ -1,5 +1,6 @@
Imports DigitalData.Modules.Database
Imports DigitalData.Modules.Logging
Imports ECM.JobRunner.Common
Imports Quartz
Namespace Scheduler.Jobs
@@ -7,15 +8,30 @@ Namespace Scheduler.Jobs
Friend LogConfig As LogConfig
Friend Logger As Logger
Friend Database As MSSQLServer
Friend State As State
Private ctx As IJobExecutionContext
Public Function InitializeJob(context As IJobExecutionContext) As Dictionary(Of String, String)
ctx = context
Dim oJobData = context.MergedJobDataMap
LogConfig = oJobData.Item(Constants.Scheduler.JOB_CONFIG_LOGCONFIG)
Database = oJobData.Item(Constants.Scheduler.JOB_CONFIG_DATABASE)
State = oJobData.Item(Constants.Scheduler.JOB_CONFIG_STATE)
Logger = LogConfig.GetLogger()
State.JobStatus.Start(ctx)
Return oJobData.Item(Constants.Scheduler.JOB_CONFIG_ARGUMENTS)
End Function
Public Sub UpdateProgress(pCurrentValue As Integer, pTotalValue As Integer)
State.JobStatus.Update(ctx, pCurrentValue, pTotalValue)
End Sub
Public Sub CompleteJob()
State.JobStatus.Complete(ctx)
End Sub
End Class
End Namespace

View File

@@ -19,6 +19,7 @@ Namespace Scheduler.Jobs
context.Result = oResult
CompleteJob()
Return Task.FromResult(True)
End Function
End Class

View File

@@ -13,12 +13,19 @@ Namespace Scheduler.Jobs
Logger.Info("Running File Import [{0}]", oName)
Dim oMax = 100
For index = 1 To oMax
UpdateProgress(index, oMax)
Threading.Thread.Sleep(100)
Next
Dim oResult = New JobResult() With {
.Description = $"File Import Job [{oName}] completed!"
}
context.Result = oResult
CompleteJob()
Return Task.FromResult(True)
End Function
End Class

View File

@@ -18,6 +18,7 @@ Namespace Scheduler.Jobs
context.Result = oResult
CompleteJob()
Return Task.FromResult(True)
End Function
End Class