From e85379dcf5e95e3096293dd46e0a5cc1809b5c1a Mon Sep 17 00:00:00 2001 From: Jonathan Jenne Date: Tue, 23 Jan 2024 13:33:44 +0100 Subject: [PATCH] Jobs: Rewrite GraphQL Job --- Jobs/GraphQL/GraphQLJob.vb | 133 +++--------------------- Jobs/GraphQL/GraphQLModel.vb | 86 +++++++++++++++ Jobs/GraphQL/GraphQLWriter.vb | 178 ++++++++++++++++++++++++++++++++ Jobs/JobConfig.vb | 2 + Jobs/Jobs.vbproj | 2 + Jobs/My Project/AssemblyInfo.vb | 6 +- 6 files changed, 286 insertions(+), 121 deletions(-) create mode 100644 Jobs/GraphQL/GraphQLModel.vb create mode 100644 Jobs/GraphQL/GraphQLWriter.vb diff --git a/Jobs/GraphQL/GraphQLJob.vb b/Jobs/GraphQL/GraphQLJob.vb index 0e4f3826..54dbcc86 100644 --- a/Jobs/GraphQL/GraphQLJob.vb +++ b/Jobs/GraphQL/GraphQLJob.vb @@ -4,6 +4,7 @@ Imports System.Collections.Generic Imports System.Data Imports System.IO Imports System.Linq +Imports System.Reflection Imports DigitalData.Modules.Base Imports DigitalData.Modules.Config Imports DigitalData.Modules.Database @@ -17,6 +18,8 @@ Public Class GraphQLJob Implements IJob(Of GraphQLArgs) Private _GraphQL As GraphQLInterface = Nothing + Private _Model As GraphQLModel + Private _Writer As GraphQLWriter Private Const PLACEHOLDER_STATIC = "STATIC:" Private Const JOB_NAME = "GraphQL Job" @@ -34,6 +37,9 @@ Public Class GraphQLJob _GraphQL = New GraphQLInterface(_LogConfig, .BaseUrl, .Email, .Password, .CertificateFingerprint) End With + _Model = New GraphQLModel(_LogConfig, _MSSQL) + _Writer = New GraphQLWriter(_LogConfig, _MSSQL) + ' Login to get cookie _Logger.Debug("Logging in") Dim oLoginResponse = _GraphQL.Login() @@ -43,41 +49,7 @@ Public Class GraphQLJob _Logger.Debug("Loading Queries") - ' Load query data from TBCUST_JOBRUNNER_QUERY - Dim oQueryTable As DataTable = _MSSQL.GetDatatable("SELECT * FROM TBCUST_JOBRUNNER_QUERY ORDER BY SEQUENCE") - Dim oQueryList As New List(Of Query) - - ' Save query data to business objects - For Each oRow As DataRow In oQueryTable.Rows - Dim oQuery As New Query With { - .Id = oRow.Item("GUID"), - .Name = oRow.Item("TITLE"), - .ClearBeforeFill = oRow.ItemEx("CLEAR_BEFORE_FILL", False), - .ConnectionId = oRow.ItemEx("CON_ID", 1), ' TODO: Connection String? - .DestinationTable = oRow.ItemEx("DESTINATION_TABLE", String.Empty), - .OperationName = oRow.ItemEx("OPERATION_NAME", String.Empty), - .MappingBasePath = oRow.ItemEx("MAPPING_BASE_PATH", String.Empty), - .QueryString = oRow.ItemEx("QUERY_STRING", String.Empty) - } - - If oQuery.DestinationTable = String.Empty Then - _Logger.Warn("Value [DestinationTable] could not be read. Configuration incomplete.") - End If - - If oQuery.OperationName = String.Empty Then - _Logger.Warn("Value [OperationName] could not be read. Configuration incomplete.") - End If - - If oQuery.MappingBasePath = String.Empty Then - _Logger.Warn("Value [MappingBasePath] could not be read. Configuration incomplete.") - End If - - If oQuery.QueryString = String.Empty Then - _Logger.Warn("Value [QueryString] could not be read. Configuration incomplete.") - End If - - oQueryList.Add(oQuery) - Next + Dim oQueryList = _Model.GetQueryList() _Logger.Debug("Running [{0}] queries.", oQueryList.Count) @@ -132,8 +104,8 @@ Public Class GraphQLJob End If End If - ' Reset all records to status = 0 - If pQuery.ClearBeforeFill = False Then + ' Reset all records to status = 0 + If pQuery.ClearBeforeFill = False Then _Logger.Info("Resetting data for Query [{0}]", pQuery.Name) If UpdateWithStatus(pQuery, 0) = False Then Throw New ApplicationException($"Error while resetting status of current Records for Query [{pQuery.Name}]") @@ -141,31 +113,22 @@ Public Class GraphQLJob End If ' get the data from GraphQL - _Logger.Info("Getting data..", pQuery.Name) + _Logger.Info("Getting data from GraphQL..", pQuery.Name) Dim oDataResponse = _GraphQL.GetData(pQuery.QueryString, pQuery.OperationName) - Dim oResult As String + Dim oJsonResult As String ' write data to string Using oStream = oDataResponse.GetResponseStream() Using oReader As New StreamReader(oStream) - oResult = oReader.ReadToEnd() + oJsonResult = oReader.ReadToEnd() End Using End Using - ' Fill the query object with field mapping data from TBCUST_JOBRUNNER_QUERY_MAPPING - Dim oSQL As String = "SELECT t2.* FROM TBCUST_JOBRUNNER_QUERY_MAPPING t - JOIN TBCUST_JOBRUNNER_MAPPING t2 ON t.MAPPING_ID = t2.GUID - WHERE t.QUERY_ID = {0}" - Dim oMappingTable As DataTable = _MSSQL.GetDatatable(String.Format(oSQL, pQuery.Id)) - For Each oMapping As DataRow In oMappingTable.Rows - pQuery.MappingFields.Add(New GraphQL.FieldMapping With { - .DestinationColumn = oMapping.Item("DestinationColumn"), - .SourcePath = oMapping.Item("SourcePath") - }) - Next + _Logger.Debug("Writing JSON data to database..") ' Handle the response from GraphQL and insert Data - Dim oWriteDataResult As GraphQL.Query = WriteNewQueryData(oResult, pQuery, oDatabase) + 'Dim oWriteDataResult As Query = WriteNewQueryData(oResult, pQuery, oDatabase) + Dim oWriteDataResult As Query = _Writer.WriteNewQueryData(oJsonResult, pQuery, JOB_NAME) If IsNothing(oWriteDataResult) Then Throw New ApplicationException($"Error while handling Result of Query [{pQuery.Name}]") @@ -241,72 +204,6 @@ Public Class GraphQLJob Return _MSSQL.ExecuteNonQuery(oResetSQL) End Function - Private Function WriteNewQueryData(JsonString As String, QueryData As GraphQL.Query, DB As Database.MSSQLServer) As GraphQL.Query - Dim oObj As JObject = JObject.Parse(JsonString) - Dim oResultList As JToken - - If _GraphQL.ReadJSONPathFragmented(oObj, QueryData.MappingBasePath) = False Then - _Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", QueryData.Name, QueryData.MappingBasePath) - End If - - Try - oResultList = oObj.SelectToken(QueryData.MappingBasePath, errorWhenNoMatch:=True) - Catch ex As Exception - _Logger.Warn("WriteNewQueryData: Could not find BasePath: [{0}] for query [{1}]", QueryData.MappingBasePath, QueryData.Name) - _Logger.Error(ex) - Return Nothing - End Try - - If oResultList Is Nothing Then - _Logger.Warn("WriteNewQueryData: Could not find BasePath: [{0}] for query [{1}]", QueryData.MappingBasePath, QueryData.Name) - Return Nothing - End If - - _Logger.Info("WriteNewQueryData: Processing Queue [{0}] with [{1}] Items", QueryData.Name, oResultList.Count) - - For Each oResultItem As JToken In oResultList - Try - ' ADDED_WHO, ADDED_QUERY_ID are system fields which are used to correctly fill - ' and delete rows in the destination table without touching rows from other queries - Dim oKeys As New List(Of String) From {"ADDED_WHO", "ADDED_QUERY_ID", "STATUS"} - Dim oValues As New List(Of String) From {JOB_NAME, QueryData.Id, "1"} - - For Each oMapping In QueryData.MappingFields - Dim oValue As String = String.Empty - - If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then - oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty) - Else - Dim oToken = oResultItem.SelectToken(oMapping.SourcePath) - - If oToken Is Nothing Then - _Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath) - oValue = String.Empty - Else - oValue = oToken.ToString - End If - End If - - oValues.Add(oValue) - oKeys.Add(oMapping.DestinationColumn) - Next - - Dim oColumnString = String.Join(",", oKeys.ToArray) - - Dim oValueList = oValues.Select(Function(Value) $"'{Value.EscapeForSQL}'").ToList() - Dim oValueString = String.Join(",", oValueList) - - Dim oSQL As String = $"INSERT INTO {QueryData.DestinationTable} ({oColumnString}) VALUES ({oValueString})" - - DB.ExecuteNonQuery(oSQL) - Catch ex As Exception - _Logger.Error(ex) - End Try - Next - - Return QueryData - End Function - Public Function ShouldStart(Arguments As GraphQLArgs) As Boolean Implements IJob(Of GraphQLArgs).ShouldStart Return Arguments.Enabled End Function diff --git a/Jobs/GraphQL/GraphQLModel.vb b/Jobs/GraphQL/GraphQLModel.vb new file mode 100644 index 00000000..723ec4f9 --- /dev/null +++ b/Jobs/GraphQL/GraphQLModel.vb @@ -0,0 +1,86 @@ +Imports System.Collections.Generic +Imports System.Data +Imports DigitalData.Modules.Base +Imports DigitalData.Modules.Database +Imports DigitalData.Modules.Jobs.GraphQL +Imports DigitalData.Modules.Logging + +Public Class GraphQLModel + Private Database As MSSQLServer + Private LogConfig As LogConfig + Private Logger As Logger + + Public Sub New(pLogConfig As LogConfig, pDatabase As MSSQLServer) + Database = pDatabase + LogConfig = pLogConfig + Logger = pLogConfig.GetLogger() + End Sub + + Public Function GetQueryList() As List(Of Query) + Try + Dim oQueryTable As DataTable = Database.GetDatatable("SELECT * FROM TBCUST_JOBRUNNER_QUERY ORDER BY SEQUENCE") + Dim oQueryList As New List(Of Query) + + For Each oRow As DataRow In oQueryTable.Rows + Dim oQuery As New Query With { + .Id = oRow.Item("GUID"), + .Name = oRow.Item("TITLE"), + .ClearBeforeFill = oRow.ItemEx("CLEAR_BEFORE_FILL", False), + .ConnectionId = oRow.ItemEx("CON_ID", 1), ' TODO: Connection String? + .DestinationTable = oRow.ItemEx("DESTINATION_TABLE", String.Empty), + .OperationName = oRow.ItemEx("OPERATION_NAME", String.Empty), + .MappingBasePath = oRow.ItemEx("MAPPING_BASE_PATH", String.Empty), + .QueryString = oRow.ItemEx("QUERY_STRING", String.Empty) + } + + If oQuery.DestinationTable = String.Empty Then + Logger.Warn("Value [DestinationTable] could not be read. Configuration incomplete.") + End If + + If oQuery.OperationName = String.Empty Then + Logger.Warn("Value [OperationName] could not be read. Configuration incomplete.") + End If + + If oQuery.MappingBasePath = String.Empty Then + Logger.Warn("Value [MappingBasePath] could not be read. Configuration incomplete.") + End If + + If oQuery.QueryString = String.Empty Then + Logger.Warn("Value [QueryString] could not be read. Configuration incomplete.") + End If + + oQuery.MappingFields = GetQueryMapping(oQuery.Id) + + oQueryList.Add(oQuery) + Next + + Return oQueryList + Catch ex As Exception + Logger.Error(ex) + Return New List(Of Query) + End Try + End Function + + Public Function GetQueryMapping(pQueryId As Integer) As List(Of FieldMapping) + Try + Dim oSQL As String = "SELECT t2.* FROM TBCUST_JOBRUNNER_QUERY_MAPPING t + JOIN TBCUST_JOBRUNNER_MAPPING t2 ON t.MAPPING_ID = t2.GUID + WHERE t.QUERY_ID = {0}" + Dim oMappingTable As DataTable = Database.GetDatatable(String.Format(oSQL, pQueryId)) + Dim oMappings As New List(Of FieldMapping) + + For Each oMapping As DataRow In oMappingTable.Rows + oMappings.Add(New FieldMapping With { + .DestinationColumn = oMapping.Item("DestinationColumn"), + .SourcePath = oMapping.Item("SourcePath") + }) + Next + + Return oMappings + Catch ex As Exception + Logger.Error(ex) + Return New List(Of FieldMapping) + End Try + End Function + +End Class diff --git a/Jobs/GraphQL/GraphQLWriter.vb b/Jobs/GraphQL/GraphQLWriter.vb new file mode 100644 index 00000000..20b46a97 --- /dev/null +++ b/Jobs/GraphQL/GraphQLWriter.vb @@ -0,0 +1,178 @@ +Imports System +Imports System.Collections.Generic +Imports System.Data +Imports System.Data.SqlClient +Imports System.Linq +Imports DigitalData.Modules.Database +Imports DigitalData.Modules.Jobs.GraphQL +Imports DigitalData.Modules.Logging +Imports Newtonsoft.Json.Linq + +Public Class GraphQLWriter + Private ReadOnly Database As MSSQLServer + Private ReadOnly LogConfig As LogConfig + Private ReadOnly Logger As Logger + + Private Const PLACEHOLDER_STATIC = "STATIC:" + + Public Sub New(pLogConfig As LogConfig, pDatabase As MSSQLServer) + Database = pDatabase + LogConfig = pLogConfig + Logger = pLogConfig.GetLogger() + End Sub + + Public Function WriteNewQueryData(pJsonString As String, pQueryData As Query, pJobName As String) As GraphQL.Query + Try + Logger.Debug("Parsing JSON...") + + Dim oObj As JObject = JObject.Parse(pJsonString) + Dim oResultList As JToken + + If ValidateJSONPath(oObj, pQueryData.MappingBasePath) = False Then + Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", pQueryData.Name, pQueryData.MappingBasePath) + End If + + Try + oResultList = oObj.SelectToken(pQueryData.MappingBasePath, errorWhenNoMatch:=True) + Catch ex As Exception + Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name) + Logger.Error(ex) + Return Nothing + End Try + + If oResultList Is Nothing Then + Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name) + Return Nothing + End If + + Logger.Info("Processing Query [{0}] with [{1}] Items", pQueryData.Name, oResultList.Count) + + Dim oTable As New DataTable + Dim oColumnList = pQueryData.MappingFields.Select(Function(f) f.DestinationColumn).ToList() + + For Each oColumnName In oColumnList + oTable.Columns.Add(oColumnName) + Next + + oTable.Columns.Add("ADDED_WHO") + oTable.Columns.Add("ADDED_QUERY_ID") + oTable.Columns.Add("STATUS") + + Logger.Debug("Creating DataTable..") + + For Each oResultItem As JToken In oResultList + Dim oRow = FillRowFromJson(pQueryData, oResultItem, pJobName, oTable) + + If oRow Is Nothing Then + Logger.Error("DataRow could not be created!") + Continue For + End If + + oTable.Rows.Add(oRow) + Next + oTable.AcceptChanges() + + Logger.Debug("Starting Bulk Insert..") + + 'Bulk insert + Dim oBulkResult = BulkInsert(oTable, pQueryData.DestinationTable, oColumnList) + + If oBulkResult = False Then + Logger.Error("Bulk Insert for Query [{0}] failed!", pQueryData.Name) + End If + + Logger.Info("Bulk Insert finished. [{0}] rows inserted.", oTable.Rows.Count) + + Return pQueryData + + Catch ex As Exception + Logger.Error(ex) + Return Nothing + + End Try + End Function + + Private Function FillRowFromJson(pQueryData As Query, pToken As JToken, pJobName As String, pTable As DataTable) As DataRow + Try + Dim oValuesNew As New Dictionary(Of String, String) + Dim oRow As DataRow = pTable.NewRow() + + For Each oMapping In pQueryData.MappingFields + Dim oValue As String = String.Empty + + If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then + oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty) + Else + Dim oToken = pToken.SelectToken(oMapping.SourcePath) + + If oToken Is Nothing Then + Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath) + oValue = String.Empty + Else + oValue = oToken.ToString + End If + End If + + oValuesNew.Add(oMapping.DestinationColumn, oValue) + Next + + oValuesNew.Add("ADDED_WHO", pJobName) + oValuesNew.Add("ADDED_QUERY_ID", pQueryData.Id) + oValuesNew.Add("STATUS", "1") + + For Each oColumn As DataColumn In pTable.Columns + oRow.Item(oColumn.ColumnName) = oValuesNew.Item(oColumn.ColumnName) + Next + + Return oRow + Catch ex As Exception + Logger.Error(ex) + Return Nothing + End Try + End Function + + Private Function BulkInsert(pTable As DataTable, pDestinationTable As String, pColumns As List(Of String)) As Boolean + Using oConnection = Database.GetConnection() + Using oBulkCopy = New SqlBulkCopy(oConnection) + + oBulkCopy.DestinationTableName = pDestinationTable + For Each oColumn In pColumns + oBulkCopy.ColumnMappings.Add(New SqlBulkCopyColumnMapping(oColumn, oColumn)) + Next + + Try + oBulkCopy.WriteToServer(pTable) + Catch ex As Exception + Logger.Error(ex) + Return False + End Try + End Using + End Using + + Return True + End Function + + Public Function ValidateJSONPath(pObject As Newtonsoft.Json.Linq.JObject, pJsonPath As String) As Boolean + Dim oSplitPath As List(Of String) = pJsonPath.Split(".").ToList() + Dim oCurrentPath As String = String.Empty + + For Each oPart In oSplitPath + If oCurrentPath = String.Empty Then + oCurrentPath = oPart + Else + oCurrentPath &= "." & oPart + End If + + Logger.Debug("Selecting Path Fragment [{0}]", oCurrentPath) + + Try + pObject.SelectToken(oCurrentPath, errorWhenNoMatch:=True) + Catch ex As Exception + Logger.Warn("Path Fragment [{0}] did not return a valid token", oCurrentPath) + Return False + End Try + Next + + Return True + End Function +End Class diff --git a/Jobs/JobConfig.vb b/Jobs/JobConfig.vb index 99bc851f..19ef04d9 100644 --- a/Jobs/JobConfig.vb +++ b/Jobs/JobConfig.vb @@ -4,6 +4,8 @@ Public Class JobConfig Public Property Name As JobType Public Property Enabled As Boolean = False Public Property StartWithoutDelay As Boolean = False + + ' https://www.quartz-scheduler.net/documentation/quartz-3.x/how-tos/crontrigger.html Public Property CronSchedule As String = "" Public Property ArgsString As String = "" diff --git a/Jobs/Jobs.vbproj b/Jobs/Jobs.vbproj index 30ed392e..197c297a 100644 --- a/Jobs/Jobs.vbproj +++ b/Jobs/Jobs.vbproj @@ -92,7 +92,9 @@ + + diff --git a/Jobs/My Project/AssemblyInfo.vb b/Jobs/My Project/AssemblyInfo.vb index bb949f68..1dc2d01c 100644 --- a/Jobs/My Project/AssemblyInfo.vb +++ b/Jobs/My Project/AssemblyInfo.vb @@ -13,7 +13,7 @@ Imports System.Runtime.InteropServices - + @@ -30,5 +30,5 @@ Imports System.Runtime.InteropServices ' Sie können alle Werte angeben oder die standardmäßigen Build- und Revisionsnummern ' übernehmen, indem Sie "*" eingeben: - - + +