Jobs: Rewrite GraphQL Job
This commit is contained in:
178
Jobs/GraphQL/GraphQLWriter.vb
Normal file
178
Jobs/GraphQL/GraphQLWriter.vb
Normal file
@@ -0,0 +1,178 @@
|
||||
Imports System
|
||||
Imports System.Collections.Generic
|
||||
Imports System.Data
|
||||
Imports System.Data.SqlClient
|
||||
Imports System.Linq
|
||||
Imports DigitalData.Modules.Database
|
||||
Imports DigitalData.Modules.Jobs.GraphQL
|
||||
Imports DigitalData.Modules.Logging
|
||||
Imports Newtonsoft.Json.Linq
|
||||
|
||||
Public Class GraphQLWriter
|
||||
Private ReadOnly Database As MSSQLServer
|
||||
Private ReadOnly LogConfig As LogConfig
|
||||
Private ReadOnly Logger As Logger
|
||||
|
||||
Private Const PLACEHOLDER_STATIC = "STATIC:"
|
||||
|
||||
Public Sub New(pLogConfig As LogConfig, pDatabase As MSSQLServer)
|
||||
Database = pDatabase
|
||||
LogConfig = pLogConfig
|
||||
Logger = pLogConfig.GetLogger()
|
||||
End Sub
|
||||
|
||||
Public Function WriteNewQueryData(pJsonString As String, pQueryData As Query, pJobName As String) As GraphQL.Query
|
||||
Try
|
||||
Logger.Debug("Parsing JSON...")
|
||||
|
||||
Dim oObj As JObject = JObject.Parse(pJsonString)
|
||||
Dim oResultList As JToken
|
||||
|
||||
If ValidateJSONPath(oObj, pQueryData.MappingBasePath) = False Then
|
||||
Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", pQueryData.Name, pQueryData.MappingBasePath)
|
||||
End If
|
||||
|
||||
Try
|
||||
oResultList = oObj.SelectToken(pQueryData.MappingBasePath, errorWhenNoMatch:=True)
|
||||
Catch ex As Exception
|
||||
Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name)
|
||||
Logger.Error(ex)
|
||||
Return Nothing
|
||||
End Try
|
||||
|
||||
If oResultList Is Nothing Then
|
||||
Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name)
|
||||
Return Nothing
|
||||
End If
|
||||
|
||||
Logger.Info("Processing Query [{0}] with [{1}] Items", pQueryData.Name, oResultList.Count)
|
||||
|
||||
Dim oTable As New DataTable
|
||||
Dim oColumnList = pQueryData.MappingFields.Select(Function(f) f.DestinationColumn).ToList()
|
||||
|
||||
For Each oColumnName In oColumnList
|
||||
oTable.Columns.Add(oColumnName)
|
||||
Next
|
||||
|
||||
oTable.Columns.Add("ADDED_WHO")
|
||||
oTable.Columns.Add("ADDED_QUERY_ID")
|
||||
oTable.Columns.Add("STATUS")
|
||||
|
||||
Logger.Debug("Creating DataTable..")
|
||||
|
||||
For Each oResultItem As JToken In oResultList
|
||||
Dim oRow = FillRowFromJson(pQueryData, oResultItem, pJobName, oTable)
|
||||
|
||||
If oRow Is Nothing Then
|
||||
Logger.Error("DataRow could not be created!")
|
||||
Continue For
|
||||
End If
|
||||
|
||||
oTable.Rows.Add(oRow)
|
||||
Next
|
||||
oTable.AcceptChanges()
|
||||
|
||||
Logger.Debug("Starting Bulk Insert..")
|
||||
|
||||
'Bulk insert
|
||||
Dim oBulkResult = BulkInsert(oTable, pQueryData.DestinationTable, oColumnList)
|
||||
|
||||
If oBulkResult = False Then
|
||||
Logger.Error("Bulk Insert for Query [{0}] failed!", pQueryData.Name)
|
||||
End If
|
||||
|
||||
Logger.Info("Bulk Insert finished. [{0}] rows inserted.", oTable.Rows.Count)
|
||||
|
||||
Return pQueryData
|
||||
|
||||
Catch ex As Exception
|
||||
Logger.Error(ex)
|
||||
Return Nothing
|
||||
|
||||
End Try
|
||||
End Function
|
||||
|
||||
Private Function FillRowFromJson(pQueryData As Query, pToken As JToken, pJobName As String, pTable As DataTable) As DataRow
|
||||
Try
|
||||
Dim oValuesNew As New Dictionary(Of String, String)
|
||||
Dim oRow As DataRow = pTable.NewRow()
|
||||
|
||||
For Each oMapping In pQueryData.MappingFields
|
||||
Dim oValue As String = String.Empty
|
||||
|
||||
If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then
|
||||
oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty)
|
||||
Else
|
||||
Dim oToken = pToken.SelectToken(oMapping.SourcePath)
|
||||
|
||||
If oToken Is Nothing Then
|
||||
Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath)
|
||||
oValue = String.Empty
|
||||
Else
|
||||
oValue = oToken.ToString
|
||||
End If
|
||||
End If
|
||||
|
||||
oValuesNew.Add(oMapping.DestinationColumn, oValue)
|
||||
Next
|
||||
|
||||
oValuesNew.Add("ADDED_WHO", pJobName)
|
||||
oValuesNew.Add("ADDED_QUERY_ID", pQueryData.Id)
|
||||
oValuesNew.Add("STATUS", "1")
|
||||
|
||||
For Each oColumn As DataColumn In pTable.Columns
|
||||
oRow.Item(oColumn.ColumnName) = oValuesNew.Item(oColumn.ColumnName)
|
||||
Next
|
||||
|
||||
Return oRow
|
||||
Catch ex As Exception
|
||||
Logger.Error(ex)
|
||||
Return Nothing
|
||||
End Try
|
||||
End Function
|
||||
|
||||
Private Function BulkInsert(pTable As DataTable, pDestinationTable As String, pColumns As List(Of String)) As Boolean
|
||||
Using oConnection = Database.GetConnection()
|
||||
Using oBulkCopy = New SqlBulkCopy(oConnection)
|
||||
|
||||
oBulkCopy.DestinationTableName = pDestinationTable
|
||||
For Each oColumn In pColumns
|
||||
oBulkCopy.ColumnMappings.Add(New SqlBulkCopyColumnMapping(oColumn, oColumn))
|
||||
Next
|
||||
|
||||
Try
|
||||
oBulkCopy.WriteToServer(pTable)
|
||||
Catch ex As Exception
|
||||
Logger.Error(ex)
|
||||
Return False
|
||||
End Try
|
||||
End Using
|
||||
End Using
|
||||
|
||||
Return True
|
||||
End Function
|
||||
|
||||
Public Function ValidateJSONPath(pObject As Newtonsoft.Json.Linq.JObject, pJsonPath As String) As Boolean
|
||||
Dim oSplitPath As List(Of String) = pJsonPath.Split(".").ToList()
|
||||
Dim oCurrentPath As String = String.Empty
|
||||
|
||||
For Each oPart In oSplitPath
|
||||
If oCurrentPath = String.Empty Then
|
||||
oCurrentPath = oPart
|
||||
Else
|
||||
oCurrentPath &= "." & oPart
|
||||
End If
|
||||
|
||||
Logger.Debug("Selecting Path Fragment [{0}]", oCurrentPath)
|
||||
|
||||
Try
|
||||
pObject.SelectToken(oCurrentPath, errorWhenNoMatch:=True)
|
||||
Catch ex As Exception
|
||||
Logger.Warn("Path Fragment [{0}] did not return a valid token", oCurrentPath)
|
||||
Return False
|
||||
End Try
|
||||
Next
|
||||
|
||||
Return True
|
||||
End Function
|
||||
End Class
|
||||
Reference in New Issue
Block a user