Jobs: WIP GraphQL Job, fix logic errors, improve logging
This commit is contained in:
parent
8267ecb72d
commit
ca92abbee5
@ -1,16 +1,17 @@
|
||||
Option Explicit On
|
||||
|
||||
Imports System.IO
|
||||
Imports DigitalData.Modules.Interfaces
|
||||
Imports DigitalData.Modules.Config
|
||||
Imports DigitalData.Modules.Logging
|
||||
Imports DigitalData.Modules.Language
|
||||
Imports Newtonsoft.Json.Linq
|
||||
Imports System.Collections.Generic
|
||||
Imports System.Linq
|
||||
Imports System.Text.RegularExpressions
|
||||
Imports DigitalData.Modules.Database
|
||||
Imports System.Data
|
||||
Imports System.IO
|
||||
Imports System.Linq
|
||||
Imports System.Net.NetworkInformation
|
||||
Imports DigitalData.Modules.Config
|
||||
Imports DigitalData.Modules.Database
|
||||
Imports DigitalData.Modules.Interfaces
|
||||
Imports DigitalData.Modules.Jobs.GraphQL
|
||||
Imports DigitalData.Modules.Language
|
||||
Imports DigitalData.Modules.Logging
|
||||
Imports Newtonsoft.Json.Linq
|
||||
|
||||
Public Class GraphQLJob
|
||||
Inherits JobBase
|
||||
@ -19,6 +20,7 @@ Public Class GraphQLJob
|
||||
Private _GraphQL As GraphQLInterface = Nothing
|
||||
|
||||
Private Const PLACEHOLDER_STATIC = "STATIC:"
|
||||
Private Const JOB_NAME = "GraphQL Job"
|
||||
|
||||
Public Sub New(LogConfig As LogConfig, MSSQL As MSSQLServer)
|
||||
MyBase.New(LogConfig, Nothing, MSSQL)
|
||||
@ -44,11 +46,11 @@ Public Class GraphQLJob
|
||||
|
||||
' Load query data from TBCUST_JOBRUNNER_QUERY
|
||||
Dim oQueryTable As DataTable = _MSSQL.GetDatatable("SELECT * FROM TBCUST_JOBRUNNER_QUERY ORDER BY SEQUENCE")
|
||||
Dim oQueryList As New List(Of GraphQL.Query)
|
||||
Dim oQueryList As New List(Of Query)
|
||||
|
||||
' Save query data to business objects
|
||||
For Each oRow As DataRow In oQueryTable.Rows
|
||||
Dim oQuery As New GraphQL.Query With {
|
||||
Dim oQuery As New Query With {
|
||||
.Id = oRow.Item("GUID"),
|
||||
.Name = oRow.Item("TITLE"),
|
||||
.ClearBeforeFill = oRow.ItemEx("CLEAR_BEFORE_FILL", False),
|
||||
@ -56,8 +58,7 @@ Public Class GraphQLJob
|
||||
.DestinationTable = oRow.ItemEx("DESTINATION_TABLE", String.Empty),
|
||||
.OperationName = oRow.ItemEx("OPERATION_NAME", String.Empty),
|
||||
.MappingBasePath = oRow.ItemEx("MAPPING_BASE_PATH", String.Empty),
|
||||
.QueryString = oRow.ItemEx("QUERY_STRING", String.Empty),
|
||||
.QueryConstraint = oRow.ItemEx("QUERY_CONSTRAINT", String.Empty)
|
||||
.QueryString = oRow.ItemEx("QUERY_STRING", String.Empty)
|
||||
}
|
||||
|
||||
If oQuery.DestinationTable = String.Empty Then
|
||||
@ -76,107 +77,142 @@ Public Class GraphQLJob
|
||||
_Logger.Warn("Value [QueryString] could not be read. Configuration incomplete.")
|
||||
End If
|
||||
|
||||
If oQuery.QueryConstraint = String.Empty Then
|
||||
_Logger.Warn("Value [QueryConstraint] could not be read. Configuration incomplete.")
|
||||
End If
|
||||
|
||||
oQueryList.Add(oQuery)
|
||||
Next
|
||||
|
||||
_Logger.Debug("Getting the data from GraphQL")
|
||||
_Logger.Debug("Running [{0}] queries.", oQueryList.Count)
|
||||
|
||||
For Each oQuery As GraphQL.Query In oQueryList
|
||||
Try
|
||||
_Logger.NewBlock($"Query [{oQuery.Name}]")
|
||||
|
||||
Dim oConnectionId As Integer = oQuery.ConnectionId
|
||||
Dim oConnectionString = _MSSQL.Get_ConnectionStringforID(oConnectionId)
|
||||
|
||||
If oConnectionString = String.Empty Then
|
||||
_Logger.Warn("Could not get Connection String for ConnectionId [{0}]", oConnectionId)
|
||||
End If
|
||||
|
||||
Dim oDatabase As New MSSQLServer(_LogConfig, oConnectionString)
|
||||
|
||||
' Reset all records to status = 0
|
||||
_Logger.Info("Resetting data with constraint [{1}]", oQuery.Name, oQuery.QueryConstraint)
|
||||
|
||||
Dim oResetSQL = $"UPDATE {oQuery.DestinationTable} SET STATUS = 0"
|
||||
If oQuery.QueryConstraint <> String.Empty Then
|
||||
oResetSQL &= $" WHERE {oQuery.QueryConstraint}"
|
||||
End If
|
||||
_MSSQL.ExecuteNonQuery(oResetSQL)
|
||||
|
||||
_Logger.Info("Getting data..", oQuery.Name)
|
||||
|
||||
' get the data from GraphQL
|
||||
Dim oDataResponse = _GraphQL.GetData(oQuery.QueryString, oQuery.OperationName)
|
||||
Dim oResult As String
|
||||
|
||||
' write data to string
|
||||
Using oStream = oDataResponse.GetResponseStream()
|
||||
Using oReader As New StreamReader(oStream)
|
||||
oResult = oReader.ReadToEnd()
|
||||
End Using
|
||||
End Using
|
||||
|
||||
' Fill the query object with field mapping data from TBCUST_JOBRUNNER_QUERY_MAPPING
|
||||
Dim oSQL As String = "SELECT t2.* FROM TBCUST_JOBRUNNER_QUERY_MAPPING t
|
||||
JOIN TBCUST_JOBRUNNER_MAPPING t2 ON t.MAPPING_ID = t2.GUID
|
||||
WHERE t.QUERY_ID = {0}"
|
||||
Dim oMappingTable As DataTable = _MSSQL.GetDatatable(String.Format(oSQL, oQuery.Id))
|
||||
|
||||
For Each oMapping As DataRow In oMappingTable.Rows
|
||||
oQuery.MappingFields.Add(New GraphQL.FieldMapping With {
|
||||
.DestinationColumn = oMapping.Item("DestinationColumn"),
|
||||
.SourcePath = oMapping.Item("SourcePath")
|
||||
})
|
||||
Next
|
||||
|
||||
' Handle the response from GraphQL and insert Data
|
||||
Dim oQueryHandleResult = HandleResponse(oResult, oQuery, oDatabase)
|
||||
|
||||
If IsNothing(oQueryHandleResult) Then
|
||||
Continue For
|
||||
End If
|
||||
|
||||
' Finally delete all old records
|
||||
Dim oDeleteSQL = $"DELETE FROM {oQuery.DestinationTable} WHERE STATUS = 0"
|
||||
If oQuery.QueryConstraint <> String.Empty Then
|
||||
oDeleteSQL &= $" AND {oQuery.QueryConstraint}"
|
||||
End If
|
||||
|
||||
_Logger.Info("Success, deleting old records..", oQuery.Name)
|
||||
_MSSQL.ExecuteNonQuery(oDeleteSQL)
|
||||
|
||||
Catch ex As Exception
|
||||
_Logger.Warn("Error while getting Data for Name/OperationName [{0}]/[{1}]", oQuery.Name, oQuery.OperationName)
|
||||
_Logger.Error(ex)
|
||||
|
||||
_Logger.Info("Failure, deleting new records..", oQuery.Name)
|
||||
|
||||
' If a crash happens, delete all records which were inserted in this run,
|
||||
' thus going back to the previous state
|
||||
Dim oDeleteSQL = $"DELETE FROM {oQuery.DestinationTable} WHERE STATUS = 1"
|
||||
If oQuery.QueryConstraint <> String.Empty Then
|
||||
oDeleteSQL &= $" AND {oQuery.QueryConstraint}"
|
||||
End If
|
||||
_MSSQL.ExecuteNonQuery(oDeleteSQL)
|
||||
Finally
|
||||
_Logger.EndBlock()
|
||||
End Try
|
||||
' run
|
||||
For Each oQuery As Query In oQueryList
|
||||
_Logger.Debug("Running Query [{0}].", oQuery.Name)
|
||||
Dim oQueryResult = RunQuery(oQuery)
|
||||
_Logger.Info("Query [{0}] finished with Result [{1}]", oQuery.Name, oQueryResult)
|
||||
Next
|
||||
|
||||
' logout
|
||||
_Logger.Debug("Logging out")
|
||||
Dim oLogoutResponse = _GraphQL.Logout()
|
||||
|
||||
_Logger.Info("Finished GraphQL Job")
|
||||
|
||||
Catch ex As Exception
|
||||
_Logger.Warn("Finished GraphQL Job with errors")
|
||||
_Logger.Error(ex)
|
||||
Throw ex
|
||||
End Try
|
||||
End Sub
|
||||
|
||||
Private Function HandleResponse(JsonString As String, QueryData As GraphQL.Query, DB As Database.MSSQLServer) As GraphQL.Query
|
||||
Private Function RunQuery(pQuery As Query)
|
||||
Try
|
||||
_Logger.Info("Executing Query [{0}]", pQuery.Name)
|
||||
|
||||
Dim oConnectionId As Integer = pQuery.ConnectionId
|
||||
Dim oConnectionString = _MSSQL.Get_ConnectionStringforID(oConnectionId)
|
||||
|
||||
If oConnectionString = String.Empty Then
|
||||
_Logger.Warn("Could not get Connection String for ConnectionId [{0}]", oConnectionId)
|
||||
End If
|
||||
|
||||
Dim oDatabase As New MSSQLServer(_LogConfig, oConnectionString)
|
||||
|
||||
'TODO: ONly set status when clear before fill is false
|
||||
'TODO: ADDED_WHO which contains the query id which inserted the rows
|
||||
|
||||
' Clear Table before inserting
|
||||
If pQuery.ClearBeforeFill = True Then
|
||||
If DeleteWithQueryName(pQuery) Then
|
||||
Throw New ApplicationException($"Error while clearing table before fill for Query [{pQuery.Name}]")
|
||||
End If
|
||||
End If
|
||||
|
||||
' Reset all records to status = 0
|
||||
If pQuery.ClearBeforeFill = False Then
|
||||
_Logger.Info("Resetting data for Query [{0}]", pQuery.Name)
|
||||
If UpdateWithStatus(pQuery, 0) = False Then
|
||||
Throw New ApplicationException($"Error while resetting status of current Records for Query [{pQuery.Name}]")
|
||||
End If
|
||||
End If
|
||||
|
||||
' get the data from GraphQL
|
||||
_Logger.Info("Getting data..", pQuery.Name)
|
||||
Dim oDataResponse = _GraphQL.GetData(pQuery.QueryString, pQuery.OperationName)
|
||||
Dim oResult As String
|
||||
|
||||
' write data to string
|
||||
Using oStream = oDataResponse.GetResponseStream()
|
||||
Using oReader As New StreamReader(oStream)
|
||||
oResult = oReader.ReadToEnd()
|
||||
End Using
|
||||
End Using
|
||||
|
||||
' Fill the query object with field mapping data from TBCUST_JOBRUNNER_QUERY_MAPPING
|
||||
Dim oSQL As String = "SELECT t2.* FROM TBCUST_JOBRUNNER_QUERY_MAPPING t
|
||||
JOIN TBCUST_JOBRUNNER_MAPPING t2 ON t.MAPPING_ID = t2.GUID
|
||||
WHERE t.QUERY_ID = {0}"
|
||||
Dim oMappingTable As DataTable = _MSSQL.GetDatatable(String.Format(oSQL, pQuery.Id))
|
||||
For Each oMapping As DataRow In oMappingTable.Rows
|
||||
pQuery.MappingFields.Add(New GraphQL.FieldMapping With {
|
||||
.DestinationColumn = oMapping.Item("DestinationColumn"),
|
||||
.SourcePath = oMapping.Item("SourcePath")
|
||||
})
|
||||
Next
|
||||
|
||||
' Handle the response from GraphQL and insert Data
|
||||
Dim oWriteDataResult As GraphQL.Query = WriteNewQueryData(oResult, pQuery, oDatabase)
|
||||
|
||||
If IsNothing(oWriteDataResult) Then
|
||||
Throw New ApplicationException($"Error while handling Result of Query [{pQuery.Name}]")
|
||||
End If
|
||||
|
||||
_Logger.Info("New Data successfully inserted for Query [{0}]", pQuery.Name)
|
||||
|
||||
' Finally delete all old records
|
||||
If pQuery.ClearBeforeFill = False Then
|
||||
_Logger.Info("Deleting old records for Query [{0}].", pQuery.Name)
|
||||
If DeleteWithStatus(pQuery, 0) = False Then
|
||||
Throw New ApplicationException($"Error while deleting current Records for Query [{pQuery.Name}]")
|
||||
End If
|
||||
End If
|
||||
|
||||
Return True
|
||||
|
||||
Catch ex As Exception
|
||||
_Logger.Warn("Error while getting Data for Name/OperationName [{0}]/[{1}]", pQuery.Name, pQuery.OperationName)
|
||||
_Logger.Error(ex)
|
||||
|
||||
' If a crash happens, delete all records which were inserted in this run,
|
||||
' thus going back to the previous state
|
||||
_Logger.Info("Failure, deleting new records..", pQuery.Name)
|
||||
|
||||
If pQuery.ClearBeforeFill = False Then
|
||||
If DeleteWithStatus(pQuery, 1) = False Then
|
||||
Throw New ApplicationException($"Error while deleting new Records for Query [{pQuery.Name}]")
|
||||
End If
|
||||
End If
|
||||
|
||||
Return False
|
||||
|
||||
Finally
|
||||
_Logger.Debug("Finished running Query [{0}].", pQuery.Name)
|
||||
End Try
|
||||
End Function
|
||||
|
||||
Private Function DeleteWithQueryName(pQuery)
|
||||
Dim oDeleteSQL = $"DELETE FROM {pQuery.DestinationTable}"
|
||||
Return _MSSQL.ExecuteNonQuery(oDeleteSQL)
|
||||
End Function
|
||||
|
||||
Private Function DeleteWithStatus(pQuery As Query, pStatus As Integer)
|
||||
Dim oDeleteSQL = $"DELETE FROM {pQuery.DestinationTable} WHERE STATUS = {pStatus} AND ADDED_QUERY_ID = '{pQuery.Id}'"
|
||||
Return _MSSQL.ExecuteNonQuery(oDeleteSQL)
|
||||
End Function
|
||||
|
||||
Private Function UpdateWithStatus(pQuery As Query, pStatus As Integer)
|
||||
Dim oResetSQL = $"UPDATE {pQuery.DestinationTable} SET STATUS = {pStatus} WHERE ADDED_WHERY_ID = '{pQuery.Id}'"
|
||||
Return _MSSQL.ExecuteNonQuery(oResetSQL)
|
||||
End Function
|
||||
|
||||
Private Function WriteNewQueryData(JsonString As String, QueryData As GraphQL.Query, DB As Database.MSSQLServer) As GraphQL.Query
|
||||
Dim oObj As JObject = JObject.Parse(JsonString)
|
||||
Dim oResultList As JToken
|
||||
|
||||
@ -201,8 +237,10 @@ Public Class GraphQLJob
|
||||
|
||||
For Each oResultItem As JToken In oResultList
|
||||
Try
|
||||
Dim oValues As New List(Of String)
|
||||
Dim oKeys As New List(Of String)
|
||||
' ADDED_WHO, ADDED_QUERY_ID are system fields which are used to correctly fill
|
||||
' and delete rows in the destination table without touching rows from other queries
|
||||
Dim oKeys As New List(Of String) From {"ADDED_WHO", "ADDED_QUERY_ID"}
|
||||
Dim oValues As New List(Of String) From {JOB_NAME, QueryData.Id}
|
||||
|
||||
For Each oMapping In QueryData.MappingFields
|
||||
Dim oValue As String = String.Empty
|
||||
@ -224,14 +262,12 @@ Public Class GraphQLJob
|
||||
oKeys.Add(oMapping.DestinationColumn)
|
||||
Next
|
||||
|
||||
Dim oColumnValues = oValues.
|
||||
Select(Function(Value) Regex.Replace(Value, "'", "''")).
|
||||
Select(Function(Value) $"'{Value}'").
|
||||
ToList()
|
||||
Dim oValueString = String.Join(",", oColumnValues)
|
||||
Dim oColumnString = String.Join(",", oKeys.ToArray)
|
||||
|
||||
Dim oColumns = String.Join(",", oKeys.ToArray)
|
||||
Dim oSQL As String = $"INSERT INTO {QueryData.DestinationTable} ({oColumns}) VALUES ({oValueString})"
|
||||
Dim oValueList = oValues.Select(Function(Value) $"'{Value.EscapeForSQL}'").ToList()
|
||||
Dim oValueString = String.Join(",", oValueList)
|
||||
|
||||
Dim oSQL As String = $"INSERT INTO {QueryData.DestinationTable} ({oColumnString}) VALUES ({oValueString})"
|
||||
|
||||
DB.ExecuteNonQuery(oSQL)
|
||||
Catch ex As Exception
|
||||
|
||||
@ -6,9 +6,7 @@ Namespace GraphQL
|
||||
Public Property Name As String
|
||||
Public Property ConnectionId As String = ""
|
||||
Public Property ClearBeforeFill As Boolean = False
|
||||
Public Property ClearCommand As String = ""
|
||||
Public Property QueryString As String = ""
|
||||
Public Property QueryConstraint As String = ""
|
||||
Public Property OperationName As String = ""
|
||||
Public Property DestinationTable As String = ""
|
||||
|
||||
|
||||
@ -7,6 +7,8 @@ Public Class JobConfig
|
||||
Public Property CronSchedule As String = ""
|
||||
|
||||
Public Property ArgsString As String = ""
|
||||
|
||||
<Xml.Serialization.XmlIgnore>
|
||||
Public Property Args As New Dictionary(Of String, String)
|
||||
|
||||
Public Enum JobType
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user