Option Explicit On Imports System.Collections.Generic Imports System.Data Imports System.IO Imports System.Linq Imports DigitalData.Modules.Base Imports DigitalData.Modules.Config Imports DigitalData.Modules.Database Imports DigitalData.Modules.Interfaces Imports DigitalData.Modules.Jobs.GraphQL Imports DigitalData.Modules.Logging Imports Newtonsoft.Json.Linq Public Class GraphQLJob Inherits JobBase Implements IJob(Of GraphQLArgs) Private _GraphQL As GraphQLInterface = Nothing Private Const PLACEHOLDER_STATIC = "STATIC:" Private Const JOB_NAME = "GraphQL Job" Public Sub New(LogConfig As LogConfig, MSSQL As MSSQLServer) MyBase.New(LogConfig, Nothing, MSSQL) End Sub Public Sub Start(Args As GraphQLArgs) Implements IJob(Of GraphQLArgs).Start Try Dim oConfigPath As String = Args.QueryConfigPath Dim oConfigManager As New ConfigManager(Of GraphQLConfig)(_LogConfig, oConfigPath) With oConfigManager.Config _GraphQL = New GraphQLInterface(_LogConfig, .BaseUrl, .Email, .Password, .CertificateFingerprint) End With ' Login to get cookie _Logger.Debug("Logging in") Dim oLoginResponse = _GraphQL.Login() ' save cookie for future requests _GraphQL.SaveCookies(oLoginResponse.Cookies.Item(0)) _Logger.Debug("Loading Queries") ' Load query data from TBCUST_JOBRUNNER_QUERY Dim oQueryTable As DataTable = _MSSQL.GetDatatable("SELECT * FROM TBCUST_JOBRUNNER_QUERY ORDER BY SEQUENCE") Dim oQueryList As New List(Of Query) ' Save query data to business objects For Each oRow As DataRow In oQueryTable.Rows Dim oQuery As New Query With { .Id = oRow.Item("GUID"), .Name = oRow.Item("TITLE"), .ClearBeforeFill = oRow.ItemEx("CLEAR_BEFORE_FILL", False), .ConnectionId = oRow.ItemEx("CON_ID", 1), ' TODO: Connection String? .DestinationTable = oRow.ItemEx("DESTINATION_TABLE", String.Empty), .OperationName = oRow.ItemEx("OPERATION_NAME", String.Empty), .MappingBasePath = oRow.ItemEx("MAPPING_BASE_PATH", String.Empty), .QueryString = oRow.ItemEx("QUERY_STRING", String.Empty) } If oQuery.DestinationTable = String.Empty Then _Logger.Warn("Value [DestinationTable] could not be read. Configuration incomplete.") End If If oQuery.OperationName = String.Empty Then _Logger.Warn("Value [OperationName] could not be read. Configuration incomplete.") End If If oQuery.MappingBasePath = String.Empty Then _Logger.Warn("Value [MappingBasePath] could not be read. Configuration incomplete.") End If If oQuery.QueryString = String.Empty Then _Logger.Warn("Value [QueryString] could not be read. Configuration incomplete.") End If oQueryList.Add(oQuery) Next _Logger.Debug("Running [{0}] queries.", oQueryList.Count) ' run For Each oQuery As Query In oQueryList _Logger.Debug("Running Query [{0}].", oQuery.Name) Dim oQueryResult = RunQuery(oQuery) _Logger.Info("Query [{0}] finished with Result [{1}]", oQuery.Name, oQueryResult) Next ' logout _Logger.Debug("Logging out") Dim oLogoutResponse = _GraphQL.Logout() _Logger.Info("Finished GraphQL Job") Catch ex As Exception _Logger.Warn("Finished GraphQL Job with errors") _Logger.Error(ex) Throw ex End Try End Sub Private Function RunQuery(pQuery As Query) Try _Logger.Info("Executing Query [{0}]", pQuery.Name) Dim oConnectionId As Integer = pQuery.ConnectionId Dim oConnectionString = _MSSQL.Get_ConnectionStringforID(oConnectionId) If oConnectionString = String.Empty Then _Logger.Warn("Could not get Connection String for ConnectionId [{0}]", oConnectionId) End If Dim oDatabase As New MSSQLServer(_LogConfig, oConnectionString) 'TODO: ONly set status when clear before fill is false 'TODO: ADDED_WHO which contains the query id which inserted the rows ' Clear Table before inserting If pQuery.ClearBeforeFill = True Then If DeleteWithQueryName(pQuery) = False Then Throw New ApplicationException($"Error while dropping history table before fill for Query [{pQuery.Name}]") End If If CreateHistoryTable(pQuery) = False Then Throw New ApplicationException($"Error while creating history table before fill for Query [{pQuery.Name}]") End If End If ' Reset all records to status = 0 If pQuery.ClearBeforeFill = False Then _Logger.Info("Resetting data for Query [{0}]", pQuery.Name) If UpdateWithStatus(pQuery, 0) = False Then Throw New ApplicationException($"Error while resetting status of current Records for Query [{pQuery.Name}]") End If End If ' get the data from GraphQL _Logger.Info("Getting data..", pQuery.Name) Dim oDataResponse = _GraphQL.GetData(pQuery.QueryString, pQuery.OperationName) Dim oResult As String ' write data to string Using oStream = oDataResponse.GetResponseStream() Using oReader As New StreamReader(oStream) oResult = oReader.ReadToEnd() End Using End Using ' Fill the query object with field mapping data from TBCUST_JOBRUNNER_QUERY_MAPPING Dim oSQL As String = "SELECT t2.* FROM TBCUST_JOBRUNNER_QUERY_MAPPING t JOIN TBCUST_JOBRUNNER_MAPPING t2 ON t.MAPPING_ID = t2.GUID WHERE t.QUERY_ID = {0}" Dim oMappingTable As DataTable = _MSSQL.GetDatatable(String.Format(oSQL, pQuery.Id)) For Each oMapping As DataRow In oMappingTable.Rows pQuery.MappingFields.Add(New GraphQL.FieldMapping With { .DestinationColumn = oMapping.Item("DestinationColumn"), .SourcePath = oMapping.Item("SourcePath") }) Next ' Handle the response from GraphQL and insert Data Dim oWriteDataResult As GraphQL.Query = WriteNewQueryData(oResult, pQuery, oDatabase) If IsNothing(oWriteDataResult) Then Throw New ApplicationException($"Error while handling Result of Query [{pQuery.Name}]") End If _Logger.Info("New Data successfully inserted for Query [{0}]", pQuery.Name) ' Finally delete all old records If pQuery.ClearBeforeFill = False Then _Logger.Info("Deleting old records for Query [{0}].", pQuery.Name) If DeleteWithStatus(pQuery, 0) = False Then Throw New ApplicationException($"Error while deleting current Records for Query [{pQuery.Name}]") End If End If Return True Catch ex As Exception _Logger.Warn("Error while getting Data for Name/OperationName [{0}]/[{1}]", pQuery.Name, pQuery.OperationName) _Logger.Error(ex) ' If a crash happens, delete all records which were inserted in this run, ' thus going back to the previous state _Logger.Info("Failure, deleting new records..", pQuery.Name) If pQuery.ClearBeforeFill = False Then If DeleteWithStatus(pQuery, 1) = False Then Throw New ApplicationException($"Error while deleting new Records for Query [{pQuery.Name}]") End If End If Return False Finally _Logger.Debug("Finished running Query [{0}].", pQuery.Name) End Try End Function Private Function DeleteWithQueryName(pQuery As GraphQL.Query) As Boolean Dim oHistoryTableName = $"{pQuery.DestinationTable}_HISTORY" Dim oDeleteHistorySQL = $" IF (EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = '{oHistoryTableName}')) BEGIN DROP TABLE {oHistoryTableName}; END" Return _MSSQL.ExecuteNonQuery(oDeleteHistorySQL) End Function Private Function CreateHistoryTable(pQuery As Query) As Boolean Dim oHistoryTableName = $"{pQuery.DestinationTable}_HISTORY" Dim oFillHistorySQL = $"SELECT * INTO {oHistoryTableName} FROM {pQuery.DestinationTable}" Return _MSSQL.ExecuteNonQuery(oFillHistorySQL) End Function Private Function DeleteWithStatus(pQuery As Query, pStatus As Integer) As Boolean Dim oDeleteSQL = $"DELETE FROM {pQuery.DestinationTable} WHERE STATUS = {pStatus} AND ADDED_QUERY_ID = '{pQuery.Id}'" Return _MSSQL.ExecuteNonQuery(oDeleteSQL) End Function Private Function UpdateWithStatus(pQuery As Query, pStatus As Integer) Dim oResetSQL = $"UPDATE {pQuery.DestinationTable} SET STATUS = {pStatus} WHERE ADDED_QUERY_ID = '{pQuery.Id}'" Return _MSSQL.ExecuteNonQuery(oResetSQL) End Function Private Function WriteNewQueryData(JsonString As String, QueryData As GraphQL.Query, DB As Database.MSSQLServer) As GraphQL.Query Dim oObj As JObject = JObject.Parse(JsonString) Dim oResultList As JToken If _GraphQL.ReadJSONPathFragmented(oObj, QueryData.MappingBasePath) = False Then _Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", QueryData.Name, QueryData.MappingBasePath) End If Try oResultList = oObj.SelectToken(QueryData.MappingBasePath, errorWhenNoMatch:=True) Catch ex As Exception _Logger.Warn("WriteNewQueryData: Could not find BasePath: [{0}] for query [{1}]", QueryData.MappingBasePath, QueryData.Name) _Logger.Error(ex) Return Nothing End Try If oResultList Is Nothing Then _Logger.Warn("WriteNewQueryData: Could not find BasePath: [{0}] for query [{1}]", QueryData.MappingBasePath, QueryData.Name) Return Nothing End If _Logger.Info("WriteNewQueryData: Processing Queue [{0}] with [{1}] Items", QueryData.Name, oResultList.Count) For Each oResultItem As JToken In oResultList Try ' ADDED_WHO, ADDED_QUERY_ID are system fields which are used to correctly fill ' and delete rows in the destination table without touching rows from other queries Dim oKeys As New List(Of String) From {"ADDED_WHO", "ADDED_QUERY_ID", "STATUS"} Dim oValues As New List(Of String) From {JOB_NAME, QueryData.Id, "1"} For Each oMapping In QueryData.MappingFields Dim oValue As String = String.Empty If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty) Else Dim oToken = oResultItem.SelectToken(oMapping.SourcePath) If oToken Is Nothing Then _Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath) oValue = String.Empty Else oValue = oToken.ToString End If End If oValues.Add(oValue) oKeys.Add(oMapping.DestinationColumn) Next Dim oColumnString = String.Join(",", oKeys.ToArray) Dim oValueList = oValues.Select(Function(Value) $"'{Value.EscapeForSQL}'").ToList() Dim oValueString = String.Join(",", oValueList) Dim oSQL As String = $"INSERT INTO {QueryData.DestinationTable} ({oColumnString}) VALUES ({oValueString})" DB.ExecuteNonQuery(oSQL) Catch ex As Exception _Logger.Error(ex) End Try Next Return QueryData End Function Public Function ShouldStart(Arguments As GraphQLArgs) As Boolean Implements IJob(Of GraphQLArgs).ShouldStart Return Arguments.Enabled End Function End Class