179 lines
6.2 KiB
VB.net
179 lines
6.2 KiB
VB.net
Imports System
|
|
Imports System.Collections.Generic
|
|
Imports System.Data
|
|
Imports System.Data.SqlClient
|
|
Imports System.Linq
|
|
Imports DigitalData.Modules.Database
|
|
Imports DigitalData.Modules.Jobs.GraphQL
|
|
Imports DigitalData.Modules.Logging
|
|
Imports Newtonsoft.Json.Linq
|
|
|
|
Public Class GraphQLWriter
|
|
Private ReadOnly Database As MSSQLServer
|
|
Private ReadOnly LogConfig As LogConfig
|
|
Private ReadOnly Logger As Logger
|
|
|
|
Private Const PLACEHOLDER_STATIC = "STATIC:"
|
|
|
|
Public Sub New(pLogConfig As LogConfig, pDatabase As MSSQLServer)
|
|
Database = pDatabase
|
|
LogConfig = pLogConfig
|
|
Logger = pLogConfig.GetLogger()
|
|
End Sub
|
|
|
|
Public Function WriteNewQueryData(pJsonString As String, pQueryData As Query, pJobName As String) As GraphQL.Query
|
|
Try
|
|
Logger.Debug("Parsing JSON...")
|
|
|
|
Dim oObj As JObject = JObject.Parse(pJsonString)
|
|
Dim oResultList As JToken
|
|
|
|
If ValidateJSONPath(oObj, pQueryData.MappingBasePath) = False Then
|
|
Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", pQueryData.Name, pQueryData.MappingBasePath)
|
|
End If
|
|
|
|
Try
|
|
oResultList = oObj.SelectToken(pQueryData.MappingBasePath, errorWhenNoMatch:=True)
|
|
Catch ex As Exception
|
|
Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name)
|
|
Logger.Error(ex)
|
|
Return Nothing
|
|
End Try
|
|
|
|
If oResultList Is Nothing Then
|
|
Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name)
|
|
Return Nothing
|
|
End If
|
|
|
|
Logger.Info("Processing Query [{0}] with [{1}] Items", pQueryData.Name, oResultList.Count)
|
|
|
|
Dim oTable As New DataTable
|
|
Dim oColumnList = pQueryData.MappingFields.Select(Function(f) f.DestinationColumn).ToList()
|
|
|
|
For Each oColumnName In oColumnList
|
|
oTable.Columns.Add(oColumnName)
|
|
Next
|
|
|
|
oTable.Columns.Add("ADDED_WHO")
|
|
oTable.Columns.Add("ADDED_QUERY_ID")
|
|
oTable.Columns.Add("STATUS")
|
|
|
|
Logger.Debug("Creating DataTable..")
|
|
|
|
For Each oResultItem As JToken In oResultList
|
|
Dim oRow = FillRowFromJson(pQueryData, oResultItem, pJobName, oTable)
|
|
|
|
If oRow Is Nothing Then
|
|
Logger.Error("DataRow could not be created!")
|
|
Continue For
|
|
End If
|
|
|
|
oTable.Rows.Add(oRow)
|
|
Next
|
|
oTable.AcceptChanges()
|
|
|
|
Logger.Debug("Starting Bulk Insert..")
|
|
|
|
'Bulk insert
|
|
Dim oBulkResult = BulkInsert(oTable, pQueryData.DestinationTable, oColumnList)
|
|
|
|
If oBulkResult = False Then
|
|
Logger.Error("Bulk Insert for Query [{0}] failed!", pQueryData.Name)
|
|
End If
|
|
|
|
Logger.Info("Bulk Insert finished. [{0}] rows inserted.", oTable.Rows.Count)
|
|
|
|
Return pQueryData
|
|
|
|
Catch ex As Exception
|
|
Logger.Error(ex)
|
|
Return Nothing
|
|
|
|
End Try
|
|
End Function
|
|
|
|
Private Function FillRowFromJson(pQueryData As Query, pToken As JToken, pJobName As String, pTable As DataTable) As DataRow
|
|
Try
|
|
Dim oValuesNew As New Dictionary(Of String, String)
|
|
Dim oRow As DataRow = pTable.NewRow()
|
|
|
|
For Each oMapping In pQueryData.MappingFields
|
|
Dim oValue As String = String.Empty
|
|
|
|
If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then
|
|
oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty)
|
|
Else
|
|
Dim oToken = pToken.SelectToken(oMapping.SourcePath)
|
|
|
|
If oToken Is Nothing Then
|
|
Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath)
|
|
oValue = String.Empty
|
|
Else
|
|
oValue = oToken.ToString
|
|
End If
|
|
End If
|
|
|
|
oValuesNew.Add(oMapping.DestinationColumn, oValue)
|
|
Next
|
|
|
|
oValuesNew.Add("ADDED_WHO", pJobName)
|
|
oValuesNew.Add("ADDED_QUERY_ID", pQueryData.Id)
|
|
oValuesNew.Add("STATUS", "1")
|
|
|
|
For Each oColumn As DataColumn In pTable.Columns
|
|
oRow.Item(oColumn.ColumnName) = oValuesNew.Item(oColumn.ColumnName)
|
|
Next
|
|
|
|
Return oRow
|
|
Catch ex As Exception
|
|
Logger.Error(ex)
|
|
Return Nothing
|
|
End Try
|
|
End Function
|
|
|
|
Private Function BulkInsert(pTable As DataTable, pDestinationTable As String, pColumns As List(Of String)) As Boolean
|
|
Using oConnection = Database.GetConnection()
|
|
Using oBulkCopy = New SqlBulkCopy(oConnection)
|
|
|
|
oBulkCopy.DestinationTableName = pDestinationTable
|
|
For Each oColumn In pColumns
|
|
oBulkCopy.ColumnMappings.Add(New SqlBulkCopyColumnMapping(oColumn, oColumn))
|
|
Next
|
|
|
|
Try
|
|
oBulkCopy.WriteToServer(pTable)
|
|
Catch ex As Exception
|
|
Logger.Error(ex)
|
|
Return False
|
|
End Try
|
|
End Using
|
|
End Using
|
|
|
|
Return True
|
|
End Function
|
|
|
|
Public Function ValidateJSONPath(pObject As Newtonsoft.Json.Linq.JObject, pJsonPath As String) As Boolean
|
|
Dim oSplitPath As List(Of String) = pJsonPath.Split(".").ToList()
|
|
Dim oCurrentPath As String = String.Empty
|
|
|
|
For Each oPart In oSplitPath
|
|
If oCurrentPath = String.Empty Then
|
|
oCurrentPath = oPart
|
|
Else
|
|
oCurrentPath &= "." & oPart
|
|
End If
|
|
|
|
Logger.Debug("Selecting Path Fragment [{0}]", oCurrentPath)
|
|
|
|
Try
|
|
pObject.SelectToken(oCurrentPath, errorWhenNoMatch:=True)
|
|
Catch ex As Exception
|
|
Logger.Warn("Path Fragment [{0}] did not return a valid token", oCurrentPath)
|
|
Return False
|
|
End Try
|
|
Next
|
|
|
|
Return True
|
|
End Function
|
|
End Class
|