Imports System Imports System.Collections.Generic Imports System.Data Imports System.Data.SqlClient Imports System.Linq Imports DigitalData.Modules.Database Imports DigitalData.Modules.Jobs.GraphQL Imports DigitalData.Modules.Logging Imports Newtonsoft.Json.Linq Public Class GraphQLWriter Private ReadOnly Database As MSSQLServer Private ReadOnly LogConfig As LogConfig Private ReadOnly Logger As Logger Private Const PLACEHOLDER_STATIC = "STATIC:" Public Sub New(pLogConfig As LogConfig, pDatabase As MSSQLServer) Database = pDatabase LogConfig = pLogConfig Logger = pLogConfig.GetLogger() End Sub Public Function WriteNewQueryData(pJsonString As String, pQueryData As Query, pJobName As String) As GraphQL.Query Try Logger.Debug("Parsing JSON...") Dim oObj As JObject = JObject.Parse(pJsonString) Dim oResultList As JToken If ValidateJSONPath(oObj, pQueryData.MappingBasePath) = False Then Logger.Warn("There is an error in the MappingBasePath [{1}] configuration of query [{0}]", pQueryData.Name, pQueryData.MappingBasePath) End If Try oResultList = oObj.SelectToken(pQueryData.MappingBasePath, errorWhenNoMatch:=True) Catch ex As Exception Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name) Logger.Error(ex) Return Nothing End Try If oResultList Is Nothing Then Logger.Warn("Could not find BasePath: [{0}] for query [{1}]", pQueryData.MappingBasePath, pQueryData.Name) Return Nothing End If Logger.Info("Processing Query [{0}] with [{1}] Items", pQueryData.Name, oResultList.Count) Dim oTable As New DataTable Dim oColumnList = pQueryData.MappingFields.Select(Function(f) f.DestinationColumn).ToList() For Each oColumnName In oColumnList oTable.Columns.Add(oColumnName) Next oTable.Columns.Add("ADDED_WHO") oTable.Columns.Add("ADDED_QUERY_ID") oTable.Columns.Add("STATUS") Logger.Debug("Creating DataTable..") For Each oResultItem As JToken In oResultList Dim oRow = FillRowFromJson(pQueryData, oResultItem, pJobName, oTable) If oRow Is Nothing Then Logger.Error("DataRow could not be created!") Continue For End If oTable.Rows.Add(oRow) Next oTable.AcceptChanges() Logger.Debug("Starting Bulk Insert..") 'Bulk insert Dim oBulkResult = BulkInsert(oTable, pQueryData.DestinationTable, oColumnList) If oBulkResult = False Then Logger.Error("Bulk Insert for Query [{0}] failed!", pQueryData.Name) End If Logger.Info("Bulk Insert finished. [{0}] rows inserted.", oTable.Rows.Count) Return pQueryData Catch ex As Exception Logger.Error(ex) Return Nothing End Try End Function Private Function FillRowFromJson(pQueryData As Query, pToken As JToken, pJobName As String, pTable As DataTable) As DataRow Try Dim oValuesNew As New Dictionary(Of String, String) Dim oRow As DataRow = pTable.NewRow() For Each oMapping In pQueryData.MappingFields Dim oValue As String = String.Empty If oMapping.SourcePath.StartsWith(PLACEHOLDER_STATIC) Then oValue = oMapping.SourcePath.Replace(PLACEHOLDER_STATIC, String.Empty) Else Dim oToken = pToken.SelectToken(oMapping.SourcePath) If oToken Is Nothing Then Logger.Warn("WriteNewQueryData: Could not find value at SourcePath: {0}", oMapping.SourcePath) oValue = String.Empty Else oValue = oToken.ToString End If End If oValuesNew.Add(oMapping.DestinationColumn, oValue) Next oValuesNew.Add("ADDED_WHO", pJobName) oValuesNew.Add("ADDED_QUERY_ID", pQueryData.Id) oValuesNew.Add("STATUS", "1") For Each oColumn As DataColumn In pTable.Columns oRow.Item(oColumn.ColumnName) = oValuesNew.Item(oColumn.ColumnName) Next Return oRow Catch ex As Exception Logger.Error(ex) Return Nothing End Try End Function Private Function BulkInsert(pTable As DataTable, pDestinationTable As String, pColumns As List(Of String)) As Boolean Using oConnection = Database.GetConnection() Using oBulkCopy = New SqlBulkCopy(oConnection) oBulkCopy.DestinationTableName = pDestinationTable For Each oColumn In pColumns oBulkCopy.ColumnMappings.Add(New SqlBulkCopyColumnMapping(oColumn, oColumn)) Next Try oBulkCopy.WriteToServer(pTable) Catch ex As Exception Logger.Error(ex) Return False End Try End Using End Using Return True End Function Public Function ValidateJSONPath(pObject As Newtonsoft.Json.Linq.JObject, pJsonPath As String) As Boolean Dim oSplitPath As List(Of String) = pJsonPath.Split(".").ToList() Dim oCurrentPath As String = String.Empty For Each oPart In oSplitPath If oCurrentPath = String.Empty Then oCurrentPath = oPart Else oCurrentPath &= "." & oPart End If Logger.Debug("Selecting Path Fragment [{0}]", oCurrentPath) Try pObject.SelectToken(oCurrentPath, errorWhenNoMatch:=True) Catch ex As Exception Logger.Warn("Path Fragment [{0}] did not return a valid token", oCurrentPath) Return False End Try Next Return True End Function End Class