285 lines
12 KiB
VB.net
285 lines
12 KiB
VB.net
Option Explicit On
|
|
|
|
Imports System.Collections.Generic
|
|
Imports System.Data
|
|
Imports System.IO
|
|
Imports System.Linq
|
|
Imports DigitalData.Modules.Base
|
|
Imports DigitalData.Modules.Config
|
|
Imports DigitalData.Modules.Database
|
|
Imports DigitalData.Modules.Interfaces
|
|
Imports DigitalData.Modules.Jobs.GraphQL
|
|
Imports DigitalData.Modules.Language
|
|
Imports DigitalData.Modules.Logging
|
|
Imports Newtonsoft.Json.Linq
|
|
|
|
Public Class GraphQLJob
|
|
Inherits JobBase
|
|
Implements IJob(Of GraphQLArgs)
|
|
|
|
Private _GraphQL As GraphQLInterface = Nothing
|
|
|
|
Private Const PLACEHOLDER_STATIC = "STATIC:"
|
|
Private Const JOB_NAME = "GraphQL Job"
|
|
|
|
Public Sub New(LogConfig As LogConfig, MSSQL As MSSQLServer)
|
|
MyBase.New(LogConfig, Nothing, MSSQL)
|
|
End Sub
|
|
|
|
Public Sub Start(Args As GraphQLArgs) Implements IJob(Of GraphQLArgs).Start
|
|
Try
|
|
Dim oConfigPath As String = Args.QueryConfigPath
|
|
Dim oConfigManager As New ConfigManager(Of GraphQLConfig)(_LogConfig, oConfigPath)
|
|
|
|
With oConfigManager.Config
|
|
_GraphQL = New GraphQLInterface(_LogConfig, .BaseUrl, .Email, .Password, .CertificateFingerprint)
|
|
End With
|
|
|
|
' Login to get cookie
|
|
_Logger.Debug("Logging in")
|
|
Dim oLoginResponse = _GraphQL.Login()
|
|
|
|
' save cookie for future requests
|
|
_GraphQL.SaveCookies(oLoginResponse.Cookies.Item(0))
|
|
|
|
_Logger.Debug("Loading Queries")
|
|
|
|
' Load query data from TBCUST_JOBRUNNER_QUERY
|
|
Dim oQueryTable As DataTable = _MSSQL.GetDatatable("SELECT * FROM TBCUST_JOBRUNNER_QUERY ORDER BY SEQUENCE")
|
|
Dim oQueryList As New List(Of Query)
|
|
|
|
' Save query data to business objects
|
|
For Each oRow As DataRow In oQueryTable.Rows
|
|
Dim oQuery As New Query With {
|
|
.Id = oRow.Item("GUID"),
|
|
.Name = oRow.Item("TITLE"),
|
|
.ClearBeforeFill = oRow.ItemEx("CLEAR_BEFORE_FILL", False),
|
|
.ConnectionId = oRow.ItemEx("CON_ID", 1), ' TODO: Connection String?
|
|
.DestinationTable = oRow.ItemEx("DESTINATION_TABLE", String.Empty),
|
|
.OperationName = oRow.ItemEx("OPERATION_NAME", String.Empty),
|
|
.MappingBasePath = oRow.ItemEx("MAPPING_BASE_PATH", String.Empty),
|
|
.QueryString = oRow.ItemEx("QUERY_STRING", String.Empty)
|
|
}
|
|
|
|
If oQuery.DestinationTable = String.Empty Then
|
|
_Logger.Warn("Value [DestinationTable] could not be read. Configuration incomplete.")
|
|
End If
|
|
|
|
If oQuery.OperationName = String.Empty Then
|
|
_Logger.Warn("Value [OperationName] could not be read. Configuration incomplete.")
|
|
End If
|
|
|
|
If oQuery.MappingBasePath = String.Empty Then
|
|
_Logger.Warn("Value [MappingBasePath] could not be read. Configuration incomplete.")
|
|
End If
|
|
|
|
If oQuery.QueryString = String.Empty Then
|
|
_Logger.Warn("Value [QueryString] could not be read. Configuration incomplete.")
|
|
End If
|
|
|
|
oQueryList.Add(oQuery)
|
|
Next
|
|
|
|
_Logger.Debug("Running [{0}] queries.", oQueryList.Count)
|
|
|
|
' run
|
|
For Each oQuery As Query In oQueryList
|
|
_Logger.Debug("Running Query [{0}].", oQuery.Name)
|
|
Dim oQueryResult = RunQuery(oQuery)
|
|
_Logger.Info("Query [{0}] finished with Result [{1}]", oQuery.Name, oQueryResult)
|
|
Next
|
|
|
|
' logout
|
|
_Logger.Debug("Logging out")
|
|
Dim oLogoutResponse = _GraphQL.Logout()
|
|
|
|
_Logger.Info("Finished GraphQL Job")
|
|
|
|
Catch ex As Exception
|
|
_Logger.Warn("Finished GraphQL Job with errors")
|
|
_Logger.Error(ex)
|
|
Throw ex
|
|
End Try
|
|
End Sub
|
|
|
|
Private Function RunQuery(pQuery As Query)
|
|
Try
|
|
_Logger.Info("Executing Query [{0}]", pQuery.Name)
|
|
|
|
Dim oConnectionId As Integer = pQuery.ConnectionId
|
|
Dim oConnectionString = _MSSQL.Get_ConnectionStringforID(oConnectionId)
|
|
|
|
If oConnectionString = String.Empty Then
|
|
_Logger.Warn("Could not get Connection String for ConnectionId [{0}]", oConnectionId)
|
|
End If
|
|
|
|
Dim oDatabase As New MSSQLServer(_LogConfig, oConnectionString)
|
|
|
|
'TODO: ONly set status when clear before fill is false
|
|
'TODO: ADDED_WHO which contains the query id which inserted the rows
|
|
|
|
' Clear Table before inserting
|
|
If pQuery.ClearBeforeFill = True Then
|
|
If DeleteWithQueryName(pQuery) = False Then
|
|
Throw New ApplicationException($"Error while clearing table before fill for Query [{pQuery.Name}]")
|
|
End If
|
|
End If
|
|
|
|
' Reset all records to status = 0
|
|
If pQuery.ClearBeforeFill = False Then
|
|
_Logger.Info("Resetting data for Query [{0}]", pQuery.Name)
|
|
If UpdateWithStatus(pQuery, 0) = False Then
|
|
Throw New ApplicationException($"Error while resetting status of current Records for Query [{pQuery.Name}]")
|
|
End If
|
|
End If
|
|
|
|
' get the data from GraphQL
|
|
_Logger.Info("Getting data..", pQuery.Name)
|
|
Dim oDataResponse = _GraphQL.GetData(pQuery.QueryString, pQuery.OperationName)
|
|
Dim oResult As String
|
|
|
|
' write data to string
|
|
Using oStream = oDataResponse.GetResponseStream()
|
|
Using oReader As New StreamReader(oStream)
|
|
oResult = oReader.ReadToEnd()
|
|
End Using
|
|
End Using
|
|
|
|
' Fill the query object with field mapping data from TBCUST_JOBRUNNER_QUERY_MAPPING
|
|
Dim oSQL As String = "SELECT t2.* FROM TBCUST_JOBRUNNER_QUERY_MAPPING t
|
|
JOIN TBCUST_JOBRUNNER_MAPPING t2 ON t.MAPPING_ID = t2.GUID
|
|
WHERE t.QUERY_ID = {0}"
|
|
Dim oMappingTable As DataTable = _MSSQL.GetDatatable(String.Format(oSQL, pQuery.Id))
|
|
For Each oMapping As DataRow In oMappingTable.Rows
|
|
pQuery.MappingFields.Add(New GraphQL.FieldMapping With {
|
|
.DestinationColumn = oMapping.Item("DestinationColumn"),
|
|
.SourcePath = oMapping.Item("SourcePath")
|
|
})
|
|
Next
|
|
|
|
' Handle the response from GraphQL and insert Data
|
|
Dim oWriteDataResult As GraphQL.Query = WriteNewQueryData(oResult, pQuery, oDatabase)
|
|
|
|
If IsNothing(oWriteDataResult) Then
|
|
Throw New ApplicationException($"Error while handling Result of Query [{pQuery.Name}]")
|
|
End If
|
|
|
|
_Logger.Info("New Data successfully inserted for Query [{0}]", pQuery.Name)
|
|
|
|
' Finally delete all old records
|
|
If pQuery.ClearBeforeFill = False Then
|
|
_Logger.Info("Deleting old records for Query [{0}].", pQuery.Name)
|
|
If DeleteWithStatus(pQuery, 0) = False Then
|
|
Throw New ApplicationException($"Error while deleting current Records for Query [{pQuery.Name}]")
|
|
End If
|
|
End If
|
|
|
|
Return True
|
|
|
|
Catch ex As Exception
|
|
_Logger.Warn("Error while getting Data for Name/OperationName [{0}]/[{1}]", pQuery.Name, pQuery.OperationName)
|
|
_Logger.Error(ex)
|
|
|
|
' If a crash happens, delete all records which were inserted in this run,
|
|
' thus going back to the previous state
|
|
_Logger.Info("Failure, deleting new records..", pQuery.Name)
|
|
|
|
If pQuery.ClearBeforeFill = False Then
|
|
If DeleteWithStatus(pQuery, 1) = False Then
|
|
Throw New ApplicationException($"Error while deleting new Records for Query [{pQuery.Name}]")
|
|
End If
|
|
End If
|
|
|
|
Return False
|
|
|
|
Finally
|
|
_Logger.Debug("Finished running Query [{0}].", pQuery.Name)
|
|
End Try
|
|
End Function
|
|
|
|
Private Function DeleteWithQueryName(pQuery)
|
|
Dim oDeleteSQL = $"TRUNCATE TABLE {pQuery.DestinationTable}"
|
|
Return _MSSQL.ExecuteNonQuery(oDeleteSQL)
|
|
End Function
|
|
|
|
Private Function DeleteWithStatus(pQuery As Query, pStatus As Integer)
|
|
Dim oDeleteSQL = $"DELETE FROM {pQuery.DestinationTable} WHERE STATUS = {pStatus} AND ADDED_QUERY_ID = '{pQuery.Id}'"
|
|
Return _MSSQL.ExecuteNonQuery(oDeleteSQL)
|
|
End Function
|
|
|
|
Private Function UpdateWithStatus(pQuery As Query, pStatus As Integer)
|
|
Dim oResetSQL = $"UPDATE {pQuery.DestinationTable} SET STATUS = {pStatus} WHERE ADDED_QUERY_ID = '{pQuery.Id}'"
|
|
Return _MSSQL.ExecuteNonQuery(oResetSQL)
|
|
End Function
|
|
|
|
Private Function WriteNewQueryData(JsonString As String, QueryData As GraphQL.Query, DB As Database.MSSQLServer) As GraphQL.Query
|
|
Dim oObj As JObject = JObject.Parse(JsonString)
|
|
Dim oResultList As JToken
|
|
|
|
If _GraphQL.ReadJSONPathFragmented(oObj, QueryData.MappingBasePath) = False Then
|
|
_Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", QueryData.Name, QueryData.MappingBasePath)
|
|
End If
|
|
|
|
Try
|
|
oResultList = oObj.SelectToken(QueryData.MappingBasePath, errorWhenNoMatch:=True)
|
|
Catch ex As Exception
|
|
_Logger.Warn("WriteNewQueryData: Could not find BasePath: [{0}] for query [{1}]", QueryData.MappingBasePath, QueryData.Name)
|
|
_Logger.Error(ex)
|
|
Return Nothing
|
|
End Try
|
|
|
|
If oResultList Is Nothing Then
|
|
_Logger.Warn("WriteNewQueryData: Could not find BasePath: [{0}] for query [{1}]", QueryData.MappingBasePath, QueryData.Name)
|
|
Return Nothing
|
|
End If
|
|
|
|
_Logger.Info("WriteNewQueryData: Processing Queue [{0}] with [{1}] Items", QueryData.Name, oResultList.Count)
|
|
|
|
For Each oResultItem As JToken In oResultList
|
|
Try
|
|
' ADDED_WHO, ADDED_QUERY_ID are system fields which are used to correctly fill
|
|
' and delete rows in the destination table without touching rows from other queries
|
|
Dim oKeys As New List(Of String) From {"ADDED_WHO", "ADDED_QUERY_ID", "STATUS"}
|
|
Dim oValues As New List(Of String) From {JOB_NAME, QueryData.Id, "1"}
|
|
|
|
For Each oMapping In QueryData.MappingFields
|
|
Dim oValue As String = String.Empty
|
|
|
|
If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then
|
|
oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty)
|
|
Else
|
|
Dim oToken = oResultItem.SelectToken(oMapping.SourcePath)
|
|
|
|
If oToken Is Nothing Then
|
|
_Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath)
|
|
oValue = String.Empty
|
|
Else
|
|
oValue = oToken.ToString
|
|
End If
|
|
End If
|
|
|
|
oValues.Add(oValue)
|
|
oKeys.Add(oMapping.DestinationColumn)
|
|
Next
|
|
|
|
Dim oColumnString = String.Join(",", oKeys.ToArray)
|
|
|
|
Dim oValueList = oValues.Select(Function(Value) $"'{Value.EscapeForSQL}'").ToList()
|
|
Dim oValueString = String.Join(",", oValueList)
|
|
|
|
Dim oSQL As String = $"INSERT INTO {QueryData.DestinationTable} ({oColumnString}) VALUES ({oValueString})"
|
|
|
|
DB.ExecuteNonQuery(oSQL)
|
|
Catch ex As Exception
|
|
_Logger.Error(ex)
|
|
End Try
|
|
Next
|
|
|
|
Return QueryData
|
|
End Function
|
|
|
|
Public Function ShouldStart(Arguments As GraphQLArgs) As Boolean Implements IJob(Of GraphQLArgs).ShouldStart
|
|
Return Arguments.Enabled
|
|
End Function
|
|
End Class
|