From b1a588b03c624039d166bff3b73a32614dbc4f29 Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 12:03:50 +0100 Subject: [PATCH 01/10] First draft --- businessCentral/app/src/ADLSE.Codeunit.al | 21 ++++++++ .../app/src/ADLSIntegrations.Interface.al | 12 +++++ .../AzureIntegration.Codeunit.al | 28 +++++++++++ .../app/src/Communication.Codeunit.al | 49 +++---------------- .../FabricLakehouseIntegration.Codeunit.al | 32 ++++++++++++ businessCentral/app/src/StorageType.Enum.al | 4 +- businessCentral/app/src/Table.Table.al | 11 +++-- 7 files changed, 110 insertions(+), 47 deletions(-) create mode 100644 businessCentral/app/src/ADLSIntegrations.Interface.al create mode 100644 businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al create mode 100644 businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al diff --git a/businessCentral/app/src/ADLSE.Codeunit.al b/businessCentral/app/src/ADLSE.Codeunit.al index 3288114..c9ac0a7 100644 --- a/businessCentral/app/src/ADLSE.Codeunit.al +++ b/businessCentral/app/src/ADLSE.Codeunit.al @@ -22,4 +22,25 @@ codeunit 82567 ADLSE internal procedure OnTableExported(TableID: Integer; LastTimeStampExported: BigInteger) begin end; + + internal procedure selectbc2adlsIntegrations(var AdlsIntegrations: Interface "ADLS Integrations") + var + ADLSESetup: Record "ADLSE Setup"; + AzureIntegration: Codeunit "Azure Integration"; + FabricLakehouseIntegration: Codeunit "Fabric Lakehouse Integration"; + begin + //TODO: Make it extendible + ADLSESetup.GetSingleton(); + case + ADLSESetup."Storage Type" of + ADLSESetup."Storage Type"::"Azure Data Lake": + AdlsIntegrations := AzureIntegration; + ADLSESetup."Storage Type"::"Microsoft Fabric": + AdlsIntegrations := FabricLakehouseIntegration; + else + Error('The storage type is not supported.'); + end; + end; + + } \ No newline at end of file diff --git a/businessCentral/app/src/ADLSIntegrations.Interface.al b/businessCentral/app/src/ADLSIntegrations.Interface.al new file mode 100644 index 0000000..517301e --- /dev/null +++ b/businessCentral/app/src/ADLSIntegrations.Interface.al @@ -0,0 +1,12 @@ +interface "ADLS Integrations" +{ + /// + /// Get the base url of the integration + /// + procedure GetBaseUrl(): Text + + /// + /// Resets the table inside the external system + /// + procedure ResetTableExport(ltableId: Integer); +} \ No newline at end of file diff --git a/businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al new file mode 100644 index 0000000..e3b5dfa --- /dev/null +++ b/businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al @@ -0,0 +1,28 @@ +codeunit 82577 "Azure Integration" implements "ADLS Integrations" +{ + procedure GetBaseUrl(): Text + var + ADLSESetup: Record "ADLSE Setup"; + DefaultContainerName: Text; + ContainerUrlTxt: Label 'https://%1.blob.core.windows.net/%2', Comment = '%1: Account name, %2: Container Name'; + begin + ADLSESetup.GetSingleton(); + + if DefaultContainerName = '' then + DefaultContainerName := ADLSESetup.Container; + + exit(StrSubstNo(ContainerUrlTxt, ADLSESetup."Account Name", DefaultContainerName)); + end; + + procedure ResetTableExport(ltableId: Integer) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSECredentials: Codeunit "ADLSE Credentials"; + begin + ADLSESetup.GetSingleton(); + ADLSECredentials.Init(); + ADLSEGen2Util.RemoveDeltasFromDataLake(ADLSEUtil.GetDataLakeCompliantTableName(ltableId), ADLSECredentials); + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/Communication.Codeunit.al b/businessCentral/app/src/Communication.Codeunit.al index 8a3f527..0fd8944 100644 --- a/businessCentral/app/src/Communication.Codeunit.al +++ b/businessCentral/app/src/Communication.Codeunit.al @@ -5,7 +5,9 @@ codeunit 82562 "ADLSE Communication" Access = Internal; var + ADLSE: Codeunit "ADLSE"; ADLSECredentials: Codeunit "ADLSE Credentials"; + AdlsIntegrations: Interface "ADLS Integrations"; TableID: Integer; FieldIdList: List of [Integer]; DataBlobPath: Text; @@ -17,13 +19,11 @@ codeunit 82562 "ADLSE Communication" NumberOfFlushes: Integer; EntityName: Text; EntityJson: JsonObject; - DefaultContainerName: Text; MaxSizeOfPayloadMiB: Integer; EmitTelemetry: Boolean; DeltaCdmManifestNameTxt: Label 'deltas.manifest.cdm.json', Locked = true; DataCdmManifestNameTxt: Label 'data.manifest.cdm.json', Locked = true; EntityManifestNameTemplateTxt: Label '%1.cdm.json', Locked = true, Comment = '%1 = Entity name'; - ContainerUrlTxt: Label 'https://%1.blob.core.windows.net/%2', Comment = '%1: Account name, %2: Container Name'; CorpusJsonPathTxt: Label '/%1', Comment = '%1 = name of the blob', Locked = true; CannotAddedMoreBlocksErr: Label 'The number of blocks that can be added to the blob has reached its maximum limit.'; SingleRecordTooLargeErr: Label 'A single record payload exceeded the max payload size. Please adjust the payload size or reduce the fields to be exported for the record.'; @@ -31,9 +31,6 @@ codeunit 82562 "ADLSE Communication" ExportOfSchemaNotPerformendTxt: Label 'Please export the schema first before trying to export the data.'; EntitySchemaChangedErr: Label 'The schema of the table %1 has changed. %2', Comment = '%1 = Entity name, %2 = NotAllowedOnSimultaneousExportTxt'; CdmSchemaChangedErr: Label 'There may have been a change in the tables to export. %1', Comment = '%1 = NotAllowedOnSimultaneousExportTxt'; - MSFabricUrlTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2.Lakehouse/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; - MSFabricUrlGuidTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; - ResetTableExportTxt: Label '/reset/%1.txt', Locked = true, Comment = '%1 = Table name'; procedure SetupBlobStorage() var @@ -46,25 +43,8 @@ codeunit 82562 "ADLSE Communication" end; local procedure GetBaseUrl(): Text - var - ADLSESetup: Record "ADLSE Setup"; - ValidGuid: Guid; begin - ADLSESetup.GetSingleton(); - case ADLSESetup.GetStorageType() of - ADLSESetup."Storage Type"::"Azure Data Lake": - begin - if DefaultContainerName = '' then - DefaultContainerName := ADLSESetup.Container; - - exit(StrSubstNo(ContainerUrlTxt, ADLSESetup."Account Name", DefaultContainerName)); - end; - ADLSESetup."Storage Type"::"Microsoft Fabric": - if not Evaluate(ValidGuid, ADLSESetup.Lakehouse) then - exit(StrSubstNo(MSFabricUrlTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)) - else - exit(StrSubstNo(MSFabricUrlGuidTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)); - end; + exit(AdlsIntegrations.GetBaseUrl()); end; procedure Init(TableIDValue: Integer; FieldIdListValue: List of [Integer]; LastFlushedTimeStampValue: BigInteger; EmitTelemetryValue: Boolean) @@ -74,6 +54,8 @@ codeunit 82562 "ADLSE Communication" ADLSEExecution: Codeunit "ADLSE Execution"; CustomDimensions: Dictionary of [Text, Text]; begin + ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + TableID := TableIDValue; FieldIdList := FieldIdListValue; @@ -82,6 +64,7 @@ codeunit 82562 "ADLSE Communication" LastFlushedTimeStamp := LastFlushedTimeStampValue; ADLSESetup.GetSingleton(); + MaxSizeOfPayloadMiB := ADLSESetup.MaxPayloadSizeMiB; EmitTelemetry := EmitTelemetryValue; if EmitTelemetry then begin @@ -258,7 +241,6 @@ codeunit 82562 "ADLSE Communication" ADLSESetup: Record "ADLSE Setup"; ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; ADLSEExecution: Codeunit "ADLSE Execution"; - ADLSE: Codeunit ADLSE; CustomDimensions: Dictionary of [Text, Text]; BlockID: Text; begin @@ -358,23 +340,4 @@ codeunit 82562 "ADLSE Communication" if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then ADLSEGen2Util.ReleaseBlob(BlobPath, ADLSECredentials, LeaseID); end; - - procedure ResetTableExport(ltableId: Integer) - var - ADLSESetup: Record "ADLSE Setup"; - ADLSEUtil: Codeunit "ADLSE Util"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - Body: JsonObject; - begin - ADLSESetup.GetSingleton(); - ADLSECredentials.Init(); - case ADLSESetup."Storage Type" of - "ADLSE Storage Type"::"Microsoft Fabric": - ADLSEGen2Util.CreateOrUpdateJsonBlob(GetBaseUrl() + StrSubstNo(ResetTableExportTxt, ADLSEUtil.GetDataLakeCompliantTableName(ltableId)), ADLSECredentials, '', Body); - "ADLSE Storage Type"::"Azure Data Lake": - ADLSEGen2Util.RemoveDeltasFromDataLake(ADLSEUtil.GetDataLakeCompliantTableName(ltableId), ADLSECredentials); - end; - end; - - } diff --git a/businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al new file mode 100644 index 0000000..a847e52 --- /dev/null +++ b/businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al @@ -0,0 +1,32 @@ +codeunit 82578 "Fabric Lakehouse Integration" implements "ADLS Integrations" +{ + procedure GetBaseUrl(): Text + var + ADLSESetup: Record "ADLSE Setup"; + ValidGuid: Guid; + MSFabricUrlTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2.Lakehouse/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; + MSFabricUrlGuidTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; + begin + ADLSESetup.GetSingleton(); + + if not Evaluate(ValidGuid, ADLSESetup.Lakehouse) then + exit(StrSubstNo(MSFabricUrlTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)) + else + exit(StrSubstNo(MSFabricUrlGuidTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)); + end; + + procedure ResetTableExport(ltableId: Integer) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSECredentials: Codeunit "ADLSE Credentials"; + Body: JsonObject; + ResetTableExportTxt: Label '/reset/%1.txt', Locked = true, Comment = '%1 = Table name'; + begin + ADLSESetup.GetSingleton(); + ADLSECredentials.Init(); + + ADLSEGen2Util.CreateOrUpdateJsonBlob(GetBaseUrl() + StrSubstNo(ResetTableExportTxt, ADLSEUtil.GetDataLakeCompliantTableName(ltableId)), ADLSECredentials, '', Body); + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/StorageType.Enum.al b/businessCentral/app/src/StorageType.Enum.al index c6066b7..8c74ec8 100644 --- a/businessCentral/app/src/StorageType.Enum.al +++ b/businessCentral/app/src/StorageType.Enum.al @@ -1,4 +1,4 @@ -enum 82563 "ADLSE Storage Type" +enum 82563 "ADLSE Storage Type" implements "ADLS Integrations" { Access = Internal; Extensible = false; @@ -7,10 +7,12 @@ enum 82563 "ADLSE Storage Type" value(0; "Azure Data Lake") { Caption = 'Azure Data Lake'; + Implementation = "ADLS Integrations" = "Azure Integration"; } #pragma warning restore LC0045 value(1; "Microsoft Fabric") { Caption = 'Microsoft Fabric'; + Implementation = "ADLS Integrations" = "Fabric Lakehouse Integration"; } } \ No newline at end of file diff --git a/businessCentral/app/src/Table.Table.al b/businessCentral/app/src/Table.Table.al index f99be78..670f419 100644 --- a/businessCentral/app/src/Table.Table.al +++ b/businessCentral/app/src/Table.Table.al @@ -187,7 +187,8 @@ table 82561 "ADLSE Table" ADLSEDeletedRecord: Record "ADLSE Deleted Record"; ADLSETableLastTimestamp: Record "ADLSE Table Last Timestamp"; ADLSESetup: Record "ADLSE Setup"; - ADLSECommunication: Codeunit "ADLSE Communication"; + ADLSE: Codeunit "ADLSE"; + ADLSIntegrations: Interface "ADLS Integrations"; Counter: Integer; begin if Rec.FindSet(true) then @@ -204,8 +205,11 @@ table 82561 "ADLSE Table" ADLSEDeletedRecord.DeleteAll(false); ADLSESetup.GetSingleton(); - if (ADLSESetup."Delete Table") then - ADLSECommunication.ResetTableExport(Rec."Table ID"); + if (ADLSESetup."Delete Table") then begin + ADLSE.selectbc2adlsIntegrations(ADLSIntegrations); + ADLSIntegrations.ResetTableExport(Rec."Table ID"); + + end; OnAfterResetSelected(Rec); @@ -293,6 +297,7 @@ table 82561 "ADLSE Table" end; until Field.Next() = 0; end; + [IntegrationEvent(false, false)] local procedure OnAfterResetSelected(ADLSETable: Record "ADLSE Table") begin From cf55ce079b472b8b1d69b67247dbe8216dbe023a Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 12:29:22 +0100 Subject: [PATCH 02/10] Added some events --- .../AzureDataLake/AzureSubscribers.Codeunit.al | 17 +++++++++++++++++ .../app/src/Communication.Codeunit.al | 10 ---------- businessCentral/app/src/Execution.Codeunit.al | 10 ++++++++-- 3 files changed, 25 insertions(+), 12 deletions(-) create mode 100644 businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al diff --git a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al new file mode 100644 index 0000000..b585ccf --- /dev/null +++ b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al @@ -0,0 +1,17 @@ +codeunit 82579 "Azure Subscribers" +{ + [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Execution", 'OnStartExportOnAfterCheckSetup', '', true, true)] + local procedure OnStartExportOnAfterCheckSetup() + var + ADLSE: Codeunit ADLSE; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSECredentials: Codeunit "ADLSE Credentials"; + ADLSIntegrations: Interface "ADLS Integrations"; + begin + ADLSE.selectbc2adlsIntegrations(ADLSIntegrations); + ADLSECredentials.Init(); + + if not ADLSEGen2Util.ContainerExists(ADLSIntegrations.GetBaseUrl(), ADLSECredentials) then + ADLSEGen2Util.CreateContainer(ADLSIntegrations.GetBaseUrl(), ADLSECredentials); + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/Communication.Codeunit.al b/businessCentral/app/src/Communication.Codeunit.al index 0fd8944..43935e1 100644 --- a/businessCentral/app/src/Communication.Codeunit.al +++ b/businessCentral/app/src/Communication.Codeunit.al @@ -32,16 +32,6 @@ codeunit 82562 "ADLSE Communication" EntitySchemaChangedErr: Label 'The schema of the table %1 has changed. %2', Comment = '%1 = Entity name, %2 = NotAllowedOnSimultaneousExportTxt'; CdmSchemaChangedErr: Label 'There may have been a change in the tables to export. %1', Comment = '%1 = NotAllowedOnSimultaneousExportTxt'; - procedure SetupBlobStorage() - var - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - begin - ADLSECredentials.Init(); - - if not ADLSEGen2Util.ContainerExists(GetBaseUrl(), ADLSECredentials) then - ADLSEGen2Util.CreateContainer(GetBaseUrl(), ADLSECredentials); - end; - local procedure GetBaseUrl(): Text begin exit(AdlsIntegrations.GetBaseUrl()); diff --git a/businessCentral/app/src/Execution.Codeunit.al b/businessCentral/app/src/Execution.Codeunit.al index fc71da2..5a574bc 100644 --- a/businessCentral/app/src/Execution.Codeunit.al +++ b/businessCentral/app/src/Execution.Codeunit.al @@ -33,10 +33,12 @@ codeunit 82569 "ADLSE Execution" Started: Integer; begin ADLSESetup.CheckSetup(ADLSESetupRec); + + OnStartExportOnAfterCheckSetup(); + EmitTelemetry := ADLSESetupRec."Emit telemetry"; ADLSECurrentSession.CleanupSessions(); - if ADLSESetupRec.GetStorageType() = ADLSESetupRec."Storage Type"::"Azure Data Lake" then //Because Fabric doesn't have do create a container - ADLSECommunication.SetupBlobStorage(); + ADLSESessionManager.Init(); ADLSEExternalEvents.OnExport(ADLSESetupRec); @@ -220,6 +222,10 @@ codeunit 82569 "ADLSE Execution" [IntegrationEvent(false, false)] local procedure OnBeforeScheduleExport(var Handled: Boolean) begin + end; + [IntegrationEvent(false, false)] + local procedure OnStartExportOnAfterCheckSetup() + begin end; } \ No newline at end of file From d1c0db9c0a436ee8f17e5643270c8a77c21d060b Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 12:50:36 +0100 Subject: [PATCH 03/10] Move to different objects --- businessCentral/app/src/ADLSE.Codeunit.al | 2 +- .../AzureDataLake/AzureSetupExt.PageExt.al | 29 +++++++ .../AzureDataLake/AzureSetupExt.TableExt.al | 53 ++++++++++++ ....Codeunit.al => FLIntegration.Codeunit.al} | 2 +- .../src/FabricLakehouse/FLSetupExt.PageExt.al | 29 +++++++ .../FabricLakehouse/FLSetupExt.TableExt.al | 52 ++++++++++++ businessCentral/app/src/Setup.Page.al | 18 ----- businessCentral/app/src/Setup.Table.al | 80 ------------------- businessCentral/app/src/StorageType.Enum.al | 2 +- 9 files changed, 166 insertions(+), 101 deletions(-) create mode 100644 businessCentral/app/src/AzureDataLake/AzureSetupExt.PageExt.al create mode 100644 businessCentral/app/src/AzureDataLake/AzureSetupExt.TableExt.al rename businessCentral/app/src/FabricLakehouse/{FabricLakehouseIntegration.Codeunit.al => FLIntegration.Codeunit.al} (94%) create mode 100644 businessCentral/app/src/FabricLakehouse/FLSetupExt.PageExt.al create mode 100644 businessCentral/app/src/FabricLakehouse/FLSetupExt.TableExt.al diff --git a/businessCentral/app/src/ADLSE.Codeunit.al b/businessCentral/app/src/ADLSE.Codeunit.al index c9ac0a7..70b3b50 100644 --- a/businessCentral/app/src/ADLSE.Codeunit.al +++ b/businessCentral/app/src/ADLSE.Codeunit.al @@ -27,7 +27,7 @@ codeunit 82567 ADLSE var ADLSESetup: Record "ADLSE Setup"; AzureIntegration: Codeunit "Azure Integration"; - FabricLakehouseIntegration: Codeunit "Fabric Lakehouse Integration"; + FabricLakehouseIntegration: Codeunit "FL Integration"; begin //TODO: Make it extendible ADLSESetup.GetSingleton(); diff --git a/businessCentral/app/src/AzureDataLake/AzureSetupExt.PageExt.al b/businessCentral/app/src/AzureDataLake/AzureSetupExt.PageExt.al new file mode 100644 index 0000000..7751646 --- /dev/null +++ b/businessCentral/app/src/AzureDataLake/AzureSetupExt.PageExt.al @@ -0,0 +1,29 @@ +pageextension 82560 "Azure Setup Ext" extends "ADLSE Setup" +{ + layout + { + addafter("Tenant ID") + { + group(Account) + { + Caption = 'Azure Data Lake'; + Editable = AzureDataLake; + field(Container; Rec.Container) + { + ApplicationArea = All; + } + field(AccountName; Rec."Account Name") + { + ApplicationArea = All; + } + } + } + } + var + AzureDataLake: Boolean; + + trigger OnAfterGetRecord() + begin + AzureDataLake := Rec."Storage Type" = Rec."Storage Type"::"Azure Data Lake"; + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/AzureDataLake/AzureSetupExt.TableExt.al b/businessCentral/app/src/AzureDataLake/AzureSetupExt.TableExt.al new file mode 100644 index 0000000..3c13d41 --- /dev/null +++ b/businessCentral/app/src/AzureDataLake/AzureSetupExt.TableExt.al @@ -0,0 +1,53 @@ +tableextension 82561 "Azure Setup Ext" extends "ADLSE Setup" +{ + fields + { + field(82570; "Account Name"; Text[24]) + { + Caption = 'Account Name'; + ToolTip = 'Specifies the name of the storage account.'; + + trigger OnValidate() + begin + // Name constraints based on https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-name + if (StrLen(Rec."Account Name") < 3) or (StrLen(Rec."Account Name") > 24) // between 3 and 24 characters long + or TextCharactersOtherThan(Rec."Account Name", 'abcdefghijklmnopqrstuvwxyz1234567890') // only made of lower case letters and numerals + then + Error(AccountNameIncorrectFormatErr); + end; + } + + field(82571; Container; Text[63]) + { + Caption = 'Container'; + ToolTip = 'Specifies the name of the container where the data is going to be uploaded. Please refer to constraints on container names at https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata.'; + + trigger OnValidate() + begin + // Name constraints based on https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata + if (StrLen(Container) < 3) or (StrLen(Container) > 63) // between 6 and 63 characters long + or TextCharactersOtherThan(Container, 'abcdefghijklmnopqrstuvwxyz1234567890-') // only made of lower case letters, numerals and dashes + or (StrPos(Container, '--') <> 0) // no occurence of multiple dashes together + then + Error(ContainerNameIncorrectFormatErr); + end; + } + + } + + var + ContainerNameIncorrectFormatErr: Label 'The container name is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_'; + AccountNameIncorrectFormatErr: Label 'The account name is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890'; + + local procedure TextCharactersOtherThan(String: Text; CharString: Text): Boolean + var + Index: Integer; + Letter: Text; + begin + for Index := 1 to StrLen(String) do begin + Letter := CopyStr(String, Index, 1); + if StrPos(CharString, Letter) = 0 then + exit(true); + end; + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al similarity index 94% rename from businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al rename to businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al index a847e52..a265ed9 100644 --- a/businessCentral/app/src/FabricLakehouse/FabricLakehouseIntegration.Codeunit.al +++ b/businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al @@ -1,4 +1,4 @@ -codeunit 82578 "Fabric Lakehouse Integration" implements "ADLS Integrations" +codeunit 82578 "FL Integration" implements "ADLS Integrations" { procedure GetBaseUrl(): Text var diff --git a/businessCentral/app/src/FabricLakehouse/FLSetupExt.PageExt.al b/businessCentral/app/src/FabricLakehouse/FLSetupExt.PageExt.al new file mode 100644 index 0000000..23da5d1 --- /dev/null +++ b/businessCentral/app/src/FabricLakehouse/FLSetupExt.PageExt.al @@ -0,0 +1,29 @@ +pageextension 82561 "FL Setup Ext" extends "ADLSE Setup" +{ + layout + { + addafter("Tenant ID") + { + group(MSFabric) + { + Caption = 'Microsoft Fabric'; + Editable = MSFabric; + field(Workspace; Rec.Workspace) + { + ApplicationArea = All; + } + field(Lakehouse; Rec.Lakehouse) + { + ApplicationArea = All; + } + } + } + } + var + MSFabric: Boolean; + + trigger OnAfterGetRecord() + begin + MSFabric := Rec."Storage Type" = Rec."Storage Type"::"Microsoft Fabric"; + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/FabricLakehouse/FLSetupExt.TableExt.al b/businessCentral/app/src/FabricLakehouse/FLSetupExt.TableExt.al new file mode 100644 index 0000000..a9acd40 --- /dev/null +++ b/businessCentral/app/src/FabricLakehouse/FLSetupExt.TableExt.al @@ -0,0 +1,52 @@ +tableextension 82560 "FL Setup Ext" extends "ADLSE Setup" +{ + fields + { + field(82560; Workspace; Text[100]) + { + Caption = 'Workspace'; + ToolTip = 'Specifies the name of the Workspace where the data is going to be uploaded. This can be a name or a GUID.'; + trigger OnValidate() + var + ValidGuid: Guid; + begin + if not Evaluate(ValidGuid, Rec.Workspace) then + if (StrLen(Rec.Workspace) < 3) or (StrLen(Rec.Workspace) > 24) + or TextCharactersOtherThan(Rec.Workspace, 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_') + then + Error(WorkspaceIncorrectFormatErr); + end; + } + field(82561; Lakehouse; Text[100]) + { + Caption = 'Lakehouse'; + ToolTip = 'Specifies the name of the Lakehouse where the data is going to be uploaded. This can be a name or a GUID.'; + trigger OnValidate() + var + ValidGuid: Guid; + begin + if not Evaluate(ValidGuid, Rec.Lakehouse) then + if (StrLen(Rec.Lakehouse) < 3) or (StrLen(Rec.Lakehouse) > 24) + or TextCharactersOtherThan(Rec.Lakehouse, 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_') + then + Error(LakehouseIncorrectFormatErr); + end; + } + } + + var + WorkspaceIncorrectFormatErr: Label 'The workspace is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_ or a valid GUID'; + LakehouseIncorrectFormatErr: Label 'The lakehouse is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_ or a valid GUID'; + + local procedure TextCharactersOtherThan(String: Text; CharString: Text): Boolean + var + Index: Integer; + Letter: Text; + begin + for Index := 1 to StrLen(String) do begin + Letter := CopyStr(String, Index, 1); + if StrPos(CharString, Letter) = 0 then + exit(true); + end; + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/Setup.Page.al b/businessCentral/app/src/Setup.Page.al index cc6ac8e..706775f 100644 --- a/businessCentral/app/src/Setup.Page.al +++ b/businessCentral/app/src/Setup.Page.al @@ -37,21 +37,6 @@ page 82560 "ADLSE Setup" end; } } - - group(Account) - { - Caption = 'Azure Data Lake'; - Editable = AzureDataLake; - field(Container; Rec.Container) { } - field(AccountName; Rec."Account Name") { } - } - group(MSFabric) - { - Caption = 'Microsoft Fabric'; - Editable = not AzureDataLake; - field(Workspace; Rec.Workspace) { } - field(Lakehouse; Rec.Lakehouse) { } - } group(Access) { Caption = 'App registration'; @@ -83,7 +68,6 @@ page 82560 "ADLSE Setup" Caption = 'Execution'; field(MaxPayloadSize; Rec.MaxPayloadSizeMiB) { - Editable = not AzureDataLake; } @@ -309,7 +293,6 @@ page 82560 "ADLSE Setup" } var - AzureDataLake: Boolean; ClientSecretLbl: Label 'Secret not shown'; ClientIdLbl: Label 'ID not shown'; @@ -334,7 +317,6 @@ page 82560 "ADLSE Setup" TrackedDeletedRecordsExist := not ADLSEDeletedRecord.IsEmpty(); OldLogsExist := ADLSERun.OldRunsExist(); UpdateNotificationIfAnyTableExportFailed(); - AzureDataLake := Rec."Storage Type" = Rec."Storage Type"::"Azure Data Lake"; end; var diff --git a/businessCentral/app/src/Setup.Table.al b/businessCentral/app/src/Setup.Table.al index f70f657..04e96dc 100644 --- a/businessCentral/app/src/Setup.Table.al +++ b/businessCentral/app/src/Setup.Table.al @@ -8,7 +8,6 @@ table 82560 "ADLSE Setup" Caption = 'ADLSE Setup'; DataClassification = CustomerContent; DataPerCompany = false; - DataCaptionFields = Container; fields { @@ -18,38 +17,6 @@ table 82560 "ADLSE Setup" Caption = 'Primary Key'; Editable = false; } - - field(5; "Account Name"; Text[24]) - { - Caption = 'Account Name'; - ToolTip = 'Specifies the name of the storage account.'; - - trigger OnValidate() - begin - // Name constraints based on https://learn.microsoft.com/en-us/azure/storage/common/storage-account-overview#storage-account-name - if (StrLen(Rec."Account Name") < 3) or (StrLen(Rec."Account Name") > 24) // between 3 and 24 characters long - or TextCharactersOtherThan(Rec."Account Name", 'abcdefghijklmnopqrstuvwxyz1234567890') // only made of lower case letters and numerals - then - Error(AccountNameIncorrectFormatErr); - end; - } - - field(2; Container; Text[63]) - { - Caption = 'Container'; - ToolTip = 'Specifies the name of the container where the data is going to be uploaded. Please refer to constraints on container names at https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata.'; - - trigger OnValidate() - begin - // Name constraints based on https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata - if (StrLen(Container) < 3) or (StrLen(Container) > 63) // between 6 and 63 characters long - or TextCharactersOtherThan(Container, 'abcdefghijklmnopqrstuvwxyz1234567890-') // only made of lower case letters, numerals and dashes - or (StrPos(Container, '--') <> 0) // no occurence of multiple dashes together - then - Error(ContainerNameIncorrectFormatErr); - end; - } - field(3; MaxPayloadSizeMiB; Integer) { Caption = 'Max payload size (MiBs)'; @@ -114,37 +81,6 @@ table 82560 "ADLSE Setup" Caption = 'Storage type'; ToolTip = 'Specifies the type of storage type to use.'; } - - field(30; Workspace; Text[100]) - { - Caption = 'Workspace'; - ToolTip = 'Specifies the name of the Workspace where the data is going to be uploaded. This can be a name or a GUID.'; - trigger OnValidate() - var - ValidGuid: Guid; - begin - if not Evaluate(ValidGuid, Rec.Workspace) then - if (StrLen(Rec.Workspace) < 3) or (StrLen(Rec.Workspace) > 24) - or TextCharactersOtherThan(Rec.Workspace, 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_') - then - Error(WorkspaceIncorrectFormatErr); - end; - } - field(31; Lakehouse; Text[100]) - { - Caption = 'Lakehouse'; - ToolTip = 'Specifies the name of the Lakehouse where the data is going to be uploaded. This can be a name or a GUID.'; - trigger OnValidate() - var - ValidGuid: Guid; - begin - if not Evaluate(ValidGuid, Rec.Lakehouse) then - if (StrLen(Rec.Lakehouse) < 3) or (StrLen(Rec.Lakehouse) > 24) - or TextCharactersOtherThan(Rec.Lakehouse, 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_') - then - Error(LakehouseIncorrectFormatErr); - end; - } field(35; "Schema Exported On"; DateTime) { AllowInCustomizations = Always; @@ -214,28 +150,12 @@ table 82560 "ADLSE Setup" var MaxReqErrorInfo: ErrorInfo; - ContainerNameIncorrectFormatErr: Label 'The container name is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_'; - AccountNameIncorrectFormatErr: Label 'The account name is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890'; - WorkspaceIncorrectFormatErr: Label 'The workspace is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_ or a valid GUID'; - LakehouseIncorrectFormatErr: Label 'The lakehouse is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_ or a valid GUID'; RecordDoesNotExistErr: Label 'No record on this table exists.'; PrimaryKeyValueLbl: Label '0', Locked = true; SchemaAlreadyExportedErr: Label 'Schema already exported. Please perform the action "clear schema export date" before changing the schema.'; MaximumRetriesErr: Label 'Please enter a value that is equal or smaller than 10 for the maximum retries.'; NoSchemaExportedErr: Label 'No schema has been exported yet. Please export schema first before exporting the data.'; - local procedure TextCharactersOtherThan(String: Text; CharString: Text): Boolean - var - Index: Integer; - Letter: Text; - begin - for Index := 1 to StrLen(String) do begin - Letter := CopyStr(String, Index, 1); - if StrPos(CharString, Letter) = 0 then - exit(true); - end; - end; - procedure GetSingleton() begin if not Exists() then diff --git a/businessCentral/app/src/StorageType.Enum.al b/businessCentral/app/src/StorageType.Enum.al index 8c74ec8..87d17ce 100644 --- a/businessCentral/app/src/StorageType.Enum.al +++ b/businessCentral/app/src/StorageType.Enum.al @@ -13,6 +13,6 @@ enum 82563 "ADLSE Storage Type" implements "ADLS Integrations" value(1; "Microsoft Fabric") { Caption = 'Microsoft Fabric'; - Implementation = "ADLS Integrations" = "Fabric Lakehouse Integration"; + Implementation = "ADLS Integrations" = "FL Integration"; } } \ No newline at end of file From 1553c254628efdcf68a7b56ce27e41200bc9e62c Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 14:18:05 +0100 Subject: [PATCH 04/10] Moved communication to interface --- businessCentral/app/src/ADLSE.Codeunit.al | 6 +- .../app/src/ADLSIntegrations.Interface.al | 21 +- .../AzureCommunication.Codeunit.al | 389 ++++++++++++++++++ .../AzureIntegration.Codeunit.al | 28 -- .../AzureSubscribers.Codeunit.al | 5 + .../app/src/Communication.Codeunit.al | 306 +------------- businessCentral/app/src/Execution.Codeunit.al | 8 +- .../FabricLakehouse/FLIntegration.Codeunit.al | 32 -- .../FabricCommunication.Codeunit.al | 383 +++++++++++++++++ businessCentral/app/src/Gen2Util.Codeunit.al | 95 +---- businessCentral/app/src/Setup.Codeunit.al | 19 - businessCentral/app/src/StorageType.Enum.al | 4 +- 12 files changed, 823 insertions(+), 473 deletions(-) create mode 100644 businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al delete mode 100644 businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al delete mode 100644 businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al create mode 100644 businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al diff --git a/businessCentral/app/src/ADLSE.Codeunit.al b/businessCentral/app/src/ADLSE.Codeunit.al index 70b3b50..b51d633 100644 --- a/businessCentral/app/src/ADLSE.Codeunit.al +++ b/businessCentral/app/src/ADLSE.Codeunit.al @@ -26,8 +26,8 @@ codeunit 82567 ADLSE internal procedure selectbc2adlsIntegrations(var AdlsIntegrations: Interface "ADLS Integrations") var ADLSESetup: Record "ADLSE Setup"; - AzureIntegration: Codeunit "Azure Integration"; - FabricLakehouseIntegration: Codeunit "FL Integration"; + AzureIntegration: Codeunit "Azure Communication"; + FabricLakehouseIntegration: Codeunit "Fabric Communication"; begin //TODO: Make it extendible ADLSESetup.GetSingleton(); @@ -41,6 +41,4 @@ codeunit 82567 ADLSE Error('The storage type is not supported.'); end; end; - - } \ No newline at end of file diff --git a/businessCentral/app/src/ADLSIntegrations.Interface.al b/businessCentral/app/src/ADLSIntegrations.Interface.al index 517301e..8b49fec 100644 --- a/businessCentral/app/src/ADLSIntegrations.Interface.al +++ b/businessCentral/app/src/ADLSIntegrations.Interface.al @@ -1,12 +1,31 @@ interface "ADLS Integrations" { /// - /// Get the base url of the integration + /// Get the base url of the external system /// procedure GetBaseUrl(): Text + /// + /// Checks if the setup of bc2adls is all correct + /// + procedure CheckSetup(); + + /// + /// Create a block blob in the external system + /// + procedure CreateBlockBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials"; LeaseID: Text; Body: Text; IsJson: Boolean) + /// /// Resets the table inside the external system /// procedure ResetTableExport(ltableId: Integer); + + procedure Init(TableIDValue: Integer; FieldIdListValue: List of [Integer]; LastFlushedTimeStampValue: BigInteger; EmitTelemetryValue: Boolean) + + procedure CheckEntity(CdmDataFormat: Enum "ADLSE CDM Format"; var EntityJsonNeedsUpdate: Boolean; var ManifestJsonsNeedsUpdate: Boolean; SchemaUpdate: Boolean) + + procedure CreateEntityContent() + procedure TryCollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; var LastTimestampExported: BigInteger) + procedure TryFinish(var LastTimestampExported: BigInteger) + procedure UpdateCdmJsons(EntityJsonNeedsUpdate: Boolean; ManifestJsonsNeedsUpdate: Boolean) } \ No newline at end of file diff --git a/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al new file mode 100644 index 0000000..fbfb8e6 --- /dev/null +++ b/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al @@ -0,0 +1,389 @@ +codeunit 82577 "Azure Communication" implements "ADLS Integrations" +{ + var + ADLSE: Codeunit "ADLSE"; + ADLSECredentials: Codeunit "ADLSE Credentials"; + AdlsIntegrations: Interface "ADLS Integrations"; + TableID: Integer; + FieldIdList: List of [Integer]; + DataBlobPath: Text; + DataBlobBlockIDs: List of [Text]; + BlobContentLength: Integer; + LastRecordOnPayloadTimeStamp: BigInteger; + Payload: TextBuilder; + LastFlushedTimeStamp: BigInteger; + NumberOfFlushes: Integer; + EntityName: Text; + EntityJson: JsonObject; + MaxSizeOfPayloadMiB: Integer; + EmitTelemetry: Boolean; + DeltaCdmManifestNameTxt: Label 'deltas.manifest.cdm.json', Locked = true; + DataCdmManifestNameTxt: Label 'data.manifest.cdm.json', Locked = true; + EntityManifestNameTemplateTxt: Label '%1.cdm.json', Locked = true, Comment = '%1 = Entity name'; + CorpusJsonPathTxt: Label '/%1', Comment = '%1 = name of the blob', Locked = true; + CannotAddedMoreBlocksErr: Label 'The number of blocks that can be added to the blob has reached its maximum limit.'; + SingleRecordTooLargeErr: Label 'A single record payload exceeded the max payload size. Please adjust the payload size or reduce the fields to be exported for the record.'; + DeltasFileCsvTok: Label '/deltas/%1/%2.csv', Comment = '%1: Entity, %2: File identifier guid', Locked = true; + ExportOfSchemaNotPerformendTxt: Label 'Please export the schema first before trying to export the data.'; + EntitySchemaChangedErr: Label 'The schema of the table %1 has changed. %2', Comment = '%1 = Entity name, %2 = NotAllowedOnSimultaneousExportTxt'; + CdmSchemaChangedErr: Label 'There may have been a change in the tables to export. %1', Comment = '%1 = NotAllowedOnSimultaneousExportTxt'; + + procedure GetBaseUrl(): Text + var + ADLSESetup: Record "ADLSE Setup"; + DefaultContainerName: Text; + ContainerUrlTxt: Label 'https://%1.blob.core.windows.net/%2', Comment = '%1: Account name, %2: Container Name'; + begin + ADLSESetup.GetSingleton(); + + if DefaultContainerName = '' then + DefaultContainerName := ADLSESetup.Container; + + exit(StrSubstNo(ContainerUrlTxt, ADLSESetup."Account Name", DefaultContainerName)); + end; + + procedure CheckSetup() + var + ADLSESetup: Record "ADLSE Setup"; + ADLSECurrentSession: Record "ADLSE Current Session"; + begin + ADLSESetup.GetSingleton(); + ADLSESetup.TestField(Container); + + ADLSESetup.CheckSchemaExported(); + + if ADLSECurrentSession.AreAnySessionsActive() then + ADLSECurrentSession.CheckForNoActiveSessions(); + + ADLSECredentials.Check(); + end; + + procedure CreateBlockBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials"; LeaseID: Text; Body: Text; IsJson: Boolean) + var + ADLSEHttp: Codeunit "ADLSE Http"; + Response: Text; + CouldNotCreateBlobErr: Label 'Could not create blob %1. %2', Comment = '%1: blob path, %2: error text'; + begin + ADLSEHttp.SetMethod("ADLSE Http Method"::Put); + + ADLSEHttp.SetUrl(BlobPath); + + ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); + ADLSEHttp.AddHeader('x-ms-blob-type', 'BlockBlob'); + if IsJson then begin + ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeJson()); + ADLSEHttp.SetContentIsJson(); + end else + ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeTextCsv()); + ADLSEHttp.SetBody(Body); + if LeaseID <> '' then + ADLSEHttp.AddHeader('x-ms-lease-id', LeaseID); + if not ADLSEHttp.InvokeRestApi(Response) then + Error(CouldNotCreateBlobErr, BlobPath, Response); + end; + + procedure ResetTableExport(ltableId: Integer) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + begin + ADLSESetup.GetSingleton(); + ADLSECredentials.Init(); + ADLSEGen2Util.RemoveDeltasFromDataLake(ADLSEUtil.GetDataLakeCompliantTableName(ltableId), ADLSECredentials); + end; + + local procedure AddBlockToDataBlob(BlobPath: Text; Body: Text; ADLSECredentials: Codeunit "ADLSE Credentials") BlockID: Text + var + Base64Convert: Codeunit "Base64 Convert"; + ADLSEHttp: Codeunit "ADLSE Http"; + Response: Text; + CouldNotAppendDataToBlobErr: Label 'Could not append data to %1. %2', Comment = '%1: blob path, %2: Http response.'; + PutBlockSuffixTxt: Label '?comp=block&blockid=%1', Locked = true, Comment = '%1 = the block id being added'; + begin + ADLSEHttp.SetMethod("ADLSE Http Method"::Put); + BlockID := Base64Convert.ToBase64(CreateGuid()); + ADLSEHttp.SetUrl(BlobPath + StrSubstNo(PutBlockSuffixTxt, BlockID)); + ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); + ADLSEHttp.SetBody(Body); + if not ADLSEHttp.InvokeRestApi(Response) then + Error(CouldNotAppendDataToBlobErr, BlobPath, Response); + end; + + procedure Init(TableIDValue: Integer; FieldIdListValue: List of [Integer]; LastFlushedTimeStampValue: BigInteger; EmitTelemetryValue: Boolean) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimensions: Dictionary of [Text, Text]; + begin + ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + + TableID := TableIDValue; + FieldIdList := FieldIdListValue; + + ADLSECredentials.Init(); + EntityName := ADLSEUtil.GetDataLakeCompliantTableName(TableID); + + LastFlushedTimeStamp := LastFlushedTimeStampValue; + ADLSESetup.GetSingleton(); + + MaxSizeOfPayloadMiB := ADLSESetup.MaxPayloadSizeMiB; + EmitTelemetry := EmitTelemetryValue; + if EmitTelemetry then begin + CustomDimensions.Add('Entity', EntityName); + CustomDimensions.Add('Last flushed time stamp', Format(LastFlushedTimeStampValue)); + ADLSEExecution.Log('ADLSE-041', 'Initialized ADLSE Communication to write to the lake.', Verbosity::Verbose); + end; + end; + + procedure CheckEntity(CdmDataFormat: Enum "ADLSE CDM Format"; var EntityJsonNeedsUpdate: Boolean; var ManifestJsonsNeedsUpdate: Boolean; SchemaUpdate: Boolean) + var + ADLSECdmUtil: Codeunit "ADLSE CDM Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + OldJson: JsonObject; + NewJson: JsonObject; + BlobExists: Boolean; + BlobEntityPath: Text; + begin + // check entity + EntityJson := ADLSECdmUtil.CreateEntityContent(TableID, FieldIdList); + BlobEntityPath := StrSubstNo(CorpusJsonPathTxt, StrSubstNo(EntityManifestNameTemplateTxt, EntityName)); + OldJson := ADLSEGen2Util.GetBlobContent(GetBaseUrl() + BlobEntityPath, ADLSECredentials, BlobExists); + if BlobExists and not SchemaUpdate then + ADLSECdmUtil.CheckChangeInEntities(OldJson, EntityJson, EntityName); + if not ADLSECdmUtil.CompareEntityJsons(OldJson, EntityJson) then begin + if EmitTelemetry then + ADLSEExecution.Log('ADLSE-028', GetLastErrorText() + GetLastErrorCallStack(), Verbosity::Warning); + ClearLastError(); + + EntityJsonNeedsUpdate := true; + JsonsDifferent(OldJson, EntityJson); // to log the difference + end; + + // check manifest. Assume that if the data manifest needs change, the delta manifest will also need be updated + OldJson := ADLSEGen2Util.GetBlobContent(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DataCdmManifestNameTxt), ADLSECredentials, BlobExists); + NewJson := ADLSECdmUtil.UpdateDefaultManifestContent(OldJson, TableID, 'data', CdmDataFormat); + ManifestJsonsNeedsUpdate := JsonsDifferent(OldJson, NewJson); + + if not SchemaUpdate then begin + if EntityJsonNeedsUpdate then + Error(EntitySchemaChangedErr, EntityName, ExportOfSchemaNotPerformendTxt); + if ManifestJsonsNeedsUpdate then + Error(CdmSchemaChangedErr, ExportOfSchemaNotPerformendTxt); + end; + end; + + procedure CreateEntityContent() + var + ADLSECdmUtil: Codeunit "ADLSE CDM Util"; + begin + EntityJson := ADLSECdmUtil.CreateEntityContent(TableID, FieldIdList); + end; + + local procedure JsonsDifferent(Json1: JsonObject; Json2: JsonObject) Result: Boolean + var + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimensions: Dictionary of [Text, Text]; + Content1: Text; + Content2: Text; + begin + Json1.WriteTo(Content1); + Json2.WriteTo(Content2); + Result := Content1 <> Content2; + if Result and EmitTelemetry then begin + CustomDimensions.Add('Content1', Content1); + CustomDimensions.Add('Content2', Content2); + ADLSEExecution.Log('ADLSE-023', 'Jsons were found to be different.', Verbosity::Warning, CustomDimensions); + end; + end; + + local procedure CreateDataBlob() Created: Boolean + var + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimension: Dictionary of [Text, Text]; + FileIdentifer: Guid; + begin + if DataBlobPath <> '' then + // Microsoft Fabric has a limit on the blob size. Create a new blob before reaching this limit + if not ADLSEGen2Util.IsMaxBlobFileSize(DataBlobPath, BlobContentLength, Payload.Length()) then + exit // no need to create a new blob + else begin + if EmitTelemetry then begin + Clear(CustomDimension); + CustomDimension.Add('Entity', EntityName); + CustomDimension.Add('DataBlobPath', DataBlobPath); + CustomDimension.Add('BlobContentLength', Format(BlobContentLength)); + CustomDimension.Add('PayloadContentLength', Format(Payload.Length())); + ADLSEExecution.Log('ADLSE-030', 'Maximum blob size reached.', Verbosity::Normal, CustomDimension); + end; + Created := true; + BlobContentLength := 0; + end; + + FileIdentifer := CreateGuid(); + + DataBlobPath := StrSubstNo(DeltasFileCsvTok, EntityName, ADLSEUtil.ToText(FileIdentifer)); + ADLSEGen2Util.CreateDataBlob(GetBaseUrl() + DataBlobPath, ADLSECredentials); + Created := true; + if EmitTelemetry then begin + Clear(CustomDimension); + CustomDimension.Add('Entity', EntityName); + CustomDimension.Add('DataBlobPath', DataBlobPath); + ADLSEExecution.Log('ADLSE-012', 'Created new blob to hold the data to be exported', Verbosity::Normal, CustomDimension); + end; + end; + + procedure TryCollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; var LastTimestampExported: BigInteger) + var + DataBlobCreated: Boolean; + begin + ClearLastError(); + DataBlobCreated := CreateDataBlob(); + LastTimestampExported := CollectAndSendRecord(RecordRef, RecordTimeStamp, DataBlobCreated); + end; + + local procedure CollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; DataBlobCreated: Boolean) LastTimestampExported: BigInteger + var + ADLSEUtil: Codeunit "ADLSE Util"; + RecordPayLoad: Text; + begin + if NumberOfFlushes = 50000 then // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks + Error(CannotAddedMoreBlocksErr); + + // Add headers into the existing Payload + if (DataBlobCreated) and (Payload.Length() <> 0) then + Payload.Insert(1, ADLSEUtil.CreateCsvHeader(RecordRef, FieldIdList)); + + RecordPayLoad := ADLSEUtil.CreateCsvPayload(RecordRef, FieldIdList, Payload.Length() = 0); + // check if payload exceeds the limit + if Payload.Length() + StrLen(RecordPayLoad) + 2 > MaxPayloadSize() then begin // the 2 is to account for new line characters + if Payload.Length() = 0 then + // the record alone exceeds the max payload size + Error(SingleRecordTooLargeErr); + FlushPayload(); + end; + LastTimestampExported := LastFlushedTimeStamp; + + Payload.Append(RecordPayLoad); + LastRecordOnPayloadTimeStamp := RecordTimeStamp; + end; + + procedure TryFinish(var LastTimestampExported: BigInteger) + begin + ClearLastError(); + LastTimestampExported := Finish(); + end; + + local procedure Finish() LastTimestampExported: BigInteger + begin + FlushPayload(); + + LastTimestampExported := LastFlushedTimeStamp; + end; + + local procedure MaxPayloadSize(): Integer + var + MaxLimitForPutBlockCalls: Integer; + MaxCapacityOfTextBuilder: Integer; + begin + MaxLimitForPutBlockCalls := MaxSizeOfPayloadMiB * 1024 * 1024; + MaxCapacityOfTextBuilder := Payload.MaxCapacity(); + if MaxLimitForPutBlockCalls < MaxCapacityOfTextBuilder then + exit(MaxLimitForPutBlockCalls); + exit(MaxCapacityOfTextBuilder); + end; + + local procedure FlushPayload() + var + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimensions: Dictionary of [Text, Text]; + BlockID: Text; + begin + if Payload.Length() = 0 then + exit; + + if EmitTelemetry then begin + CustomDimensions.Add('Length of payload', Format(Payload.Length())); + ADLSEExecution.Log('ADLSE-013', 'Flushing the payload', Verbosity::Normal, CustomDimensions); + end; + + BlockID := AddBlockToDataBlob(GetBaseUrl() + DataBlobPath, Payload.ToText(), ADLSECredentials); + if EmitTelemetry then begin + Clear(CustomDimensions); + CustomDimensions.Add('Block ID', BlockID); + ADLSEExecution.Log('ADLSE-014', 'Block added to blob', Verbosity::Normal, CustomDimensions); + end; + DataBlobBlockIDs.Add(BlockID); + ADLSEGen2Util.CommitAllBlocksOnDataBlob(GetBaseUrl() + DataBlobPath, ADLSECredentials, DataBlobBlockIDs); + + if EmitTelemetry then + ADLSEExecution.Log('ADLSE-015', 'Block committed', Verbosity::Normal); + + + LastFlushedTimeStamp := LastRecordOnPayloadTimeStamp; + Payload.Clear(); + LastRecordOnPayloadTimeStamp := 0; + NumberOfFlushes += 1; + + ADLSE.OnTableExported(TableID, LastFlushedTimeStamp); + if EmitTelemetry then begin + Clear(CustomDimensions); + CustomDimensions.Add('Flushed count', Format(NumberOfFlushes)); + ADLSEExecution.Log('ADLSE-016', 'Flushed the payload', Verbosity::Normal, CustomDimensions); + end; + end; + + procedure UpdateCdmJsons(EntityJsonNeedsUpdate: Boolean; ManifestJsonsNeedsUpdate: Boolean) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + LeaseID: Text; + BlobPath: Text; + BlobExists: Boolean; + begin + // update entity json + if EntityJsonNeedsUpdate then begin + BlobPath := GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, StrSubstNo(EntityManifestNameTemplateTxt, EntityName)); + LeaseID := ADLSEGen2Util.AcquireLease(BlobPath, ADLSECredentials, BlobExists); + ADLSEGen2Util.CreateOrUpdateJsonBlob(BlobPath, ADLSECredentials, LeaseID, EntityJson); + + ADLSEGen2Util.ReleaseBlob(BlobPath, ADLSECredentials, LeaseID); + end; + + // update manifest + if ManifestJsonsNeedsUpdate then begin + // Expected that multiple sessions that export data from different tables will be competing for writing to + // manifest. Semaphore applied. + ADLSESetup.ReadIsolation := IsolationLevel::UpdLock; + ADLSESetup.GetSingleton(); + + UpdateManifest(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DataCdmManifestNameTxt), 'data', ADLSESetup.DataFormat); + + UpdateManifest(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DeltaCdmManifestNameTxt), 'deltas', "ADLSE CDM Format"::Csv); + Commit(); // to release the lock above + end; + end; + + local procedure UpdateManifest(BlobPath: Text; Folder: Text; ADLSECdmFormat: Enum "ADLSE CDM Format") + var + ADLSECdmUtil: Codeunit "ADLSE CDM Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ManifestJson: JsonObject; + LeaseID: Text; + BlobExists: Boolean; + begin + LeaseID := ADLSEGen2Util.AcquireLease(BlobPath, ADLSECredentials, BlobExists); + if BlobExists then + ManifestJson := ADLSEGen2Util.GetBlobContent(BlobPath, ADLSECredentials, BlobExists); + + ManifestJson := ADLSECdmUtil.UpdateDefaultManifestContent(ManifestJson, TableID, Folder, ADLSECdmFormat); + ADLSEGen2Util.CreateOrUpdateJsonBlob(BlobPath, ADLSECredentials, LeaseID, ManifestJson); + + ADLSEGen2Util.ReleaseBlob(BlobPath, ADLSECredentials, LeaseID); + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al deleted file mode 100644 index e3b5dfa..0000000 --- a/businessCentral/app/src/AzureDataLake/AzureIntegration.Codeunit.al +++ /dev/null @@ -1,28 +0,0 @@ -codeunit 82577 "Azure Integration" implements "ADLS Integrations" -{ - procedure GetBaseUrl(): Text - var - ADLSESetup: Record "ADLSE Setup"; - DefaultContainerName: Text; - ContainerUrlTxt: Label 'https://%1.blob.core.windows.net/%2', Comment = '%1: Account name, %2: Container Name'; - begin - ADLSESetup.GetSingleton(); - - if DefaultContainerName = '' then - DefaultContainerName := ADLSESetup.Container; - - exit(StrSubstNo(ContainerUrlTxt, ADLSESetup."Account Name", DefaultContainerName)); - end; - - procedure ResetTableExport(ltableId: Integer) - var - ADLSESetup: Record "ADLSE Setup"; - ADLSEUtil: Codeunit "ADLSE Util"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ADLSECredentials: Codeunit "ADLSE Credentials"; - begin - ADLSESetup.GetSingleton(); - ADLSECredentials.Init(); - ADLSEGen2Util.RemoveDeltasFromDataLake(ADLSEUtil.GetDataLakeCompliantTableName(ltableId), ADLSECredentials); - end; -} \ No newline at end of file diff --git a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al index b585ccf..e9b4a8d 100644 --- a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al +++ b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al @@ -3,11 +3,16 @@ codeunit 82579 "Azure Subscribers" [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Execution", 'OnStartExportOnAfterCheckSetup', '', true, true)] local procedure OnStartExportOnAfterCheckSetup() var + ADLSESetup: Record "ADLSE Setup"; ADLSE: Codeunit ADLSE; ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; ADLSECredentials: Codeunit "ADLSE Credentials"; ADLSIntegrations: Interface "ADLS Integrations"; begin + ADLSESetup.GetSingleton(); + if ADLSESetup."Storage Type" <> ADLSESetup."Storage Type"::"Azure Data Lake" then + exit; + ADLSE.selectbc2adlsIntegrations(ADLSIntegrations); ADLSECredentials.Init(); diff --git a/businessCentral/app/src/Communication.Codeunit.al b/businessCentral/app/src/Communication.Codeunit.al index 43935e1..a894a9f 100644 --- a/businessCentral/app/src/Communication.Codeunit.al +++ b/businessCentral/app/src/Communication.Codeunit.al @@ -6,328 +6,44 @@ codeunit 82562 "ADLSE Communication" var ADLSE: Codeunit "ADLSE"; - ADLSECredentials: Codeunit "ADLSE Credentials"; AdlsIntegrations: Interface "ADLS Integrations"; - TableID: Integer; - FieldIdList: List of [Integer]; - DataBlobPath: Text; - DataBlobBlockIDs: List of [Text]; - BlobContentLength: Integer; - LastRecordOnPayloadTimeStamp: BigInteger; - Payload: TextBuilder; - LastFlushedTimeStamp: BigInteger; - NumberOfFlushes: Integer; - EntityName: Text; - EntityJson: JsonObject; - MaxSizeOfPayloadMiB: Integer; - EmitTelemetry: Boolean; - DeltaCdmManifestNameTxt: Label 'deltas.manifest.cdm.json', Locked = true; - DataCdmManifestNameTxt: Label 'data.manifest.cdm.json', Locked = true; - EntityManifestNameTemplateTxt: Label '%1.cdm.json', Locked = true, Comment = '%1 = Entity name'; - CorpusJsonPathTxt: Label '/%1', Comment = '%1 = name of the blob', Locked = true; - CannotAddedMoreBlocksErr: Label 'The number of blocks that can be added to the blob has reached its maximum limit.'; - SingleRecordTooLargeErr: Label 'A single record payload exceeded the max payload size. Please adjust the payload size or reduce the fields to be exported for the record.'; - DeltasFileCsvTok: Label '/deltas/%1/%2.csv', Comment = '%1: Entity, %2: File identifier guid', Locked = true; - ExportOfSchemaNotPerformendTxt: Label 'Please export the schema first before trying to export the data.'; - EntitySchemaChangedErr: Label 'The schema of the table %1 has changed. %2', Comment = '%1 = Entity name, %2 = NotAllowedOnSimultaneousExportTxt'; - CdmSchemaChangedErr: Label 'There may have been a change in the tables to export. %1', Comment = '%1 = NotAllowedOnSimultaneousExportTxt'; - - local procedure GetBaseUrl(): Text - begin - exit(AdlsIntegrations.GetBaseUrl()); - end; procedure Init(TableIDValue: Integer; FieldIdListValue: List of [Integer]; LastFlushedTimeStampValue: BigInteger; EmitTelemetryValue: Boolean) var - ADLSESetup: Record "ADLSE Setup"; - ADLSEUtil: Codeunit "ADLSE Util"; - ADLSEExecution: Codeunit "ADLSE Execution"; - CustomDimensions: Dictionary of [Text, Text]; begin ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); - - TableID := TableIDValue; - FieldIdList := FieldIdListValue; - - ADLSECredentials.Init(); - EntityName := ADLSEUtil.GetDataLakeCompliantTableName(TableID); - - LastFlushedTimeStamp := LastFlushedTimeStampValue; - ADLSESetup.GetSingleton(); - - MaxSizeOfPayloadMiB := ADLSESetup.MaxPayloadSizeMiB; - EmitTelemetry := EmitTelemetryValue; - if EmitTelemetry then begin - CustomDimensions.Add('Entity', EntityName); - CustomDimensions.Add('Last flushed time stamp', Format(LastFlushedTimeStampValue)); - ADLSEExecution.Log('ADLSE-041', 'Initialized ADLSE Communication to write to the lake.', Verbosity::Verbose); - end; + AdlsIntegrations.Init(TableIDValue, FieldIdListValue, LastFlushedTimeStampValue, EmitTelemetryValue); end; procedure CheckEntity(CdmDataFormat: Enum "ADLSE CDM Format"; var EntityJsonNeedsUpdate: Boolean; var ManifestJsonsNeedsUpdate: Boolean; SchemaUpdate: Boolean) - var - ADLSECdmUtil: Codeunit "ADLSE CDM Util"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ADLSEExecution: Codeunit "ADLSE Execution"; - OldJson: JsonObject; - NewJson: JsonObject; - BlobExists: Boolean; - BlobEntityPath: Text; begin - // check entity - EntityJson := ADLSECdmUtil.CreateEntityContent(TableID, FieldIdList); - BlobEntityPath := StrSubstNo(CorpusJsonPathTxt, StrSubstNo(EntityManifestNameTemplateTxt, EntityName)); - OldJson := ADLSEGen2Util.GetBlobContent(GetBaseUrl() + BlobEntityPath, ADLSECredentials, BlobExists); - if BlobExists and not SchemaUpdate then - ADLSECdmUtil.CheckChangeInEntities(OldJson, EntityJson, EntityName); - if not ADLSECdmUtil.CompareEntityJsons(OldJson, EntityJson) then begin - if EmitTelemetry then - ADLSEExecution.Log('ADLSE-028', GetLastErrorText() + GetLastErrorCallStack(), Verbosity::Warning); - ClearLastError(); - - EntityJsonNeedsUpdate := true; - JsonsDifferent(OldJson, EntityJson); // to log the difference - end; - - // check manifest. Assume that if the data manifest needs change, the delta manifest will also need be updated - OldJson := ADLSEGen2Util.GetBlobContent(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DataCdmManifestNameTxt), ADLSECredentials, BlobExists); - NewJson := ADLSECdmUtil.UpdateDefaultManifestContent(OldJson, TableID, 'data', CdmDataFormat); - ManifestJsonsNeedsUpdate := JsonsDifferent(OldJson, NewJson); - - if not SchemaUpdate then begin - if EntityJsonNeedsUpdate then - Error(EntitySchemaChangedErr, EntityName, ExportOfSchemaNotPerformendTxt); - if ManifestJsonsNeedsUpdate then - Error(CdmSchemaChangedErr, ExportOfSchemaNotPerformendTxt); - end; + //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + AdlsIntegrations.CheckEntity(CdmDataFormat, EntityJsonNeedsUpdate, ManifestJsonsNeedsUpdate, SchemaUpdate); end; procedure CreateEntityContent() - var - ADLSECdmUtil: Codeunit "ADLSE CDM Util"; - begin - EntityJson := ADLSECdmUtil.CreateEntityContent(TableID, FieldIdList); - end; - - local procedure JsonsDifferent(Json1: JsonObject; Json2: JsonObject) Result: Boolean - var - ADLSEExecution: Codeunit "ADLSE Execution"; - CustomDimensions: Dictionary of [Text, Text]; - Content1: Text; - Content2: Text; begin - Json1.WriteTo(Content1); - Json2.WriteTo(Content2); - Result := Content1 <> Content2; - if Result and EmitTelemetry then begin - CustomDimensions.Add('Content1', Content1); - CustomDimensions.Add('Content2', Content2); - ADLSEExecution.Log('ADLSE-023', 'Jsons were found to be different.', Verbosity::Warning, CustomDimensions); - end; - end; - - local procedure CreateDataBlob() Created: Boolean - var - ADLSEUtil: Codeunit "ADLSE Util"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ADLSEExecution: Codeunit "ADLSE Execution"; - CustomDimension: Dictionary of [Text, Text]; - FileIdentifer: Guid; - begin - if DataBlobPath <> '' then - // Microsoft Fabric has a limit on the blob size. Create a new blob before reaching this limit - if not ADLSEGen2Util.IsMaxBlobFileSize(DataBlobPath, BlobContentLength, Payload.Length()) then - exit // no need to create a new blob - else begin - if EmitTelemetry then begin - Clear(CustomDimension); - CustomDimension.Add('Entity', EntityName); - CustomDimension.Add('DataBlobPath', DataBlobPath); - CustomDimension.Add('BlobContentLength', Format(BlobContentLength)); - CustomDimension.Add('PayloadContentLength', Format(Payload.Length())); - ADLSEExecution.Log('ADLSE-030', 'Maximum blob size reached.', Verbosity::Normal, CustomDimension); - end; - Created := true; - BlobContentLength := 0; - end; - - FileIdentifer := CreateGuid(); - - DataBlobPath := StrSubstNo(DeltasFileCsvTok, EntityName, ADLSEUtil.ToText(FileIdentifer)); - ADLSEGen2Util.CreateDataBlob(GetBaseUrl() + DataBlobPath, ADLSECredentials); - Created := true; - if EmitTelemetry then begin - Clear(CustomDimension); - CustomDimension.Add('Entity', EntityName); - CustomDimension.Add('DataBlobPath', DataBlobPath); - ADLSEExecution.Log('ADLSE-012', 'Created new blob to hold the data to be exported', Verbosity::Normal, CustomDimension); - end; + //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + AdlsIntegrations.CreateEntityContent(); end; [TryFunction] procedure TryCollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; var LastTimestampExported: BigInteger) - var - DataBlobCreated: Boolean; begin - ClearLastError(); - DataBlobCreated := CreateDataBlob(); - LastTimestampExported := CollectAndSendRecord(RecordRef, RecordTimeStamp, DataBlobCreated); - end; - - local procedure CollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; DataBlobCreated: Boolean) LastTimestampExported: BigInteger - var - ADLSEUtil: Codeunit "ADLSE Util"; - RecordPayLoad: Text; - begin - if NumberOfFlushes = 50000 then // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks - Error(CannotAddedMoreBlocksErr); - - // Add headers into the existing Payload - if (DataBlobCreated) and (Payload.Length() <> 0) then - Payload.Insert(1, ADLSEUtil.CreateCsvHeader(RecordRef, FieldIdList)); - - RecordPayLoad := ADLSEUtil.CreateCsvPayload(RecordRef, FieldIdList, Payload.Length() = 0); - // check if payload exceeds the limit - if Payload.Length() + StrLen(RecordPayLoad) + 2 > MaxPayloadSize() then begin // the 2 is to account for new line characters - if Payload.Length() = 0 then - // the record alone exceeds the max payload size - Error(SingleRecordTooLargeErr); - FlushPayload(); - end; - LastTimestampExported := LastFlushedTimeStamp; - - Payload.Append(RecordPayLoad); - LastRecordOnPayloadTimeStamp := RecordTimeStamp; + //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + AdlsIntegrations.TryCollectAndSendRecord(RecordRef, RecordTimeStamp, LastTimestampExported); end; [TryFunction] procedure TryFinish(var LastTimestampExported: BigInteger) begin - ClearLastError(); - LastTimestampExported := Finish(); - end; - - local procedure Finish() LastTimestampExported: BigInteger - begin - FlushPayload(); - - LastTimestampExported := LastFlushedTimeStamp; - end; - - local procedure MaxPayloadSize(): Integer - var - MaxLimitForPutBlockCalls: Integer; - MaxCapacityOfTextBuilder: Integer; - begin - MaxLimitForPutBlockCalls := MaxSizeOfPayloadMiB * 1024 * 1024; - MaxCapacityOfTextBuilder := Payload.MaxCapacity(); - if MaxLimitForPutBlockCalls < MaxCapacityOfTextBuilder then - exit(MaxLimitForPutBlockCalls); - exit(MaxCapacityOfTextBuilder); - end; - - local procedure FlushPayload() - var - ADLSESetup: Record "ADLSE Setup"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ADLSEExecution: Codeunit "ADLSE Execution"; - CustomDimensions: Dictionary of [Text, Text]; - BlockID: Text; - begin - if Payload.Length() = 0 then - exit; - - if EmitTelemetry then begin - CustomDimensions.Add('Length of payload', Format(Payload.Length())); - ADLSEExecution.Log('ADLSE-013', 'Flushing the payload', Verbosity::Normal, CustomDimensions); - end; - - case ADLSESetup.GetStorageType() of - ADLSESetup."Storage Type"::"Azure Data Lake": - begin - BlockID := ADLSEGen2Util.AddBlockToDataBlob(GetBaseUrl() + DataBlobPath, Payload.ToText(), ADLSECredentials); - if EmitTelemetry then begin - Clear(CustomDimensions); - CustomDimensions.Add('Block ID', BlockID); - ADLSEExecution.Log('ADLSE-014', 'Block added to blob', Verbosity::Normal, CustomDimensions); - end; - DataBlobBlockIDs.Add(BlockID); - ADLSEGen2Util.CommitAllBlocksOnDataBlob(GetBaseUrl() + DataBlobPath, ADLSECredentials, DataBlobBlockIDs); - - if EmitTelemetry then - ADLSEExecution.Log('ADLSE-015', 'Block committed', Verbosity::Normal); - end; - ADLSESetup."Storage Type"::"Microsoft Fabric": - begin - ADLSEGen2Util.AddBlockToDataBlob(GetBaseUrl() + DataBlobPath, Payload.ToText(), BlobContentLength, ADLSECredentials); - BlobContentLength := ADLSEGen2Util.GetBlobContentLength(GetBaseUrl() + DataBlobPath, ADLSECredentials); - end; - end; - - LastFlushedTimeStamp := LastRecordOnPayloadTimeStamp; - Payload.Clear(); - LastRecordOnPayloadTimeStamp := 0; - NumberOfFlushes += 1; - - ADLSE.OnTableExported(TableID, LastFlushedTimeStamp); - if EmitTelemetry then begin - Clear(CustomDimensions); - CustomDimensions.Add('Flushed count', Format(NumberOfFlushes)); - ADLSEExecution.Log('ADLSE-016', 'Flushed the payload', Verbosity::Normal, CustomDimensions); - end; + //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + AdlsIntegrations.TryFinish(LastTimestampExported); end; procedure UpdateCdmJsons(EntityJsonNeedsUpdate: Boolean; ManifestJsonsNeedsUpdate: Boolean) - var - ADLSESetup: Record "ADLSE Setup"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - LeaseID: Text; - BlobPath: Text; - BlobExists: Boolean; - begin - // update entity json - if EntityJsonNeedsUpdate then begin - BlobPath := GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, StrSubstNo(EntityManifestNameTemplateTxt, EntityName)); - if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then - LeaseID := ADLSEGen2Util.AcquireLease(BlobPath, ADLSECredentials, BlobExists); - ADLSEGen2Util.CreateOrUpdateJsonBlob(BlobPath, ADLSECredentials, LeaseID, EntityJson); - if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then - ADLSEGen2Util.ReleaseBlob(BlobPath, ADLSECredentials, LeaseID); - end; - - // update manifest - if ManifestJsonsNeedsUpdate then begin - // Expected that multiple sessions that export data from different tables will be competing for writing to - // manifest. Semaphore applied. - ADLSESetup.ReadIsolation := IsolationLevel::UpdLock; - ADLSESetup.GetSingleton(); - - UpdateManifest(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DataCdmManifestNameTxt), 'data', ADLSESetup.DataFormat); - - UpdateManifest(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DeltaCdmManifestNameTxt), 'deltas', "ADLSE CDM Format"::Csv); - Commit(); // to release the lock above - end; - end; - - local procedure UpdateManifest(BlobPath: Text; Folder: Text; ADLSECdmFormat: Enum "ADLSE CDM Format") - var - ADLSESetup: Record "ADLSE Setup"; - ADLSECdmUtil: Codeunit "ADLSE CDM Util"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ManifestJson: JsonObject; - LeaseID: Text; - BlobExists: Boolean; begin - if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then - LeaseID := ADLSEGen2Util.AcquireLease(BlobPath, ADLSECredentials, BlobExists); - if BlobExists and (ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake") then - ManifestJson := ADLSEGen2Util.GetBlobContent(BlobPath, ADLSECredentials, BlobExists) - else - ManifestJson := ADLSEGen2Util.GetBlobContent(BlobPath, ADLSECredentials, BlobExists); - - ManifestJson := ADLSECdmUtil.UpdateDefaultManifestContent(ManifestJson, TableID, Folder, ADLSECdmFormat); - ADLSEGen2Util.CreateOrUpdateJsonBlob(BlobPath, ADLSECredentials, LeaseID, ManifestJson); - if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then - ADLSEGen2Util.ReleaseBlob(BlobPath, ADLSECredentials, LeaseID); + //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + AdlsIntegrations.UpdateCdmJsons(EntityJsonNeedsUpdate, ManifestJsonsNeedsUpdate); end; } diff --git a/businessCentral/app/src/Execution.Codeunit.al b/businessCentral/app/src/Execution.Codeunit.al index 5a574bc..ba92f67 100644 --- a/businessCentral/app/src/Execution.Codeunit.al +++ b/businessCentral/app/src/Execution.Codeunit.al @@ -25,17 +25,19 @@ codeunit 82569 "ADLSE Execution" ADLSETable: Record "ADLSE Table"; ADLSEField: Record "ADLSE Field"; ADLSECurrentSession: Record "ADLSE Current Session"; - ADLSESetup: Codeunit "ADLSE Setup"; - ADLSECommunication: Codeunit "ADLSE Communication"; + ADLSE: Codeunit "ADLSE"; ADLSESessionManager: Codeunit "ADLSE Session Manager"; ADLSEExternalEvents: Codeunit "ADLSE External Events"; + ADLSIntegrations: Interface "ADLS Integrations"; Counter: Integer; Started: Integer; begin - ADLSESetup.CheckSetup(ADLSESetupRec); + ADLSE.selectbc2adlsIntegrations(ADLSIntegrations); + ADLSIntegrations.CheckSetup(); OnStartExportOnAfterCheckSetup(); + ADLSESetupRec.GetSingleton(); EmitTelemetry := ADLSESetupRec."Emit telemetry"; ADLSECurrentSession.CleanupSessions(); diff --git a/businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al deleted file mode 100644 index a265ed9..0000000 --- a/businessCentral/app/src/FabricLakehouse/FLIntegration.Codeunit.al +++ /dev/null @@ -1,32 +0,0 @@ -codeunit 82578 "FL Integration" implements "ADLS Integrations" -{ - procedure GetBaseUrl(): Text - var - ADLSESetup: Record "ADLSE Setup"; - ValidGuid: Guid; - MSFabricUrlTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2.Lakehouse/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; - MSFabricUrlGuidTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; - begin - ADLSESetup.GetSingleton(); - - if not Evaluate(ValidGuid, ADLSESetup.Lakehouse) then - exit(StrSubstNo(MSFabricUrlTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)) - else - exit(StrSubstNo(MSFabricUrlGuidTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)); - end; - - procedure ResetTableExport(ltableId: Integer) - var - ADLSESetup: Record "ADLSE Setup"; - ADLSEUtil: Codeunit "ADLSE Util"; - ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ADLSECredentials: Codeunit "ADLSE Credentials"; - Body: JsonObject; - ResetTableExportTxt: Label '/reset/%1.txt', Locked = true, Comment = '%1 = Table name'; - begin - ADLSESetup.GetSingleton(); - ADLSECredentials.Init(); - - ADLSEGen2Util.CreateOrUpdateJsonBlob(GetBaseUrl() + StrSubstNo(ResetTableExportTxt, ADLSEUtil.GetDataLakeCompliantTableName(ltableId)), ADLSECredentials, '', Body); - end; -} \ No newline at end of file diff --git a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al new file mode 100644 index 0000000..47c470e --- /dev/null +++ b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al @@ -0,0 +1,383 @@ +codeunit 82578 "Fabric Communication" implements "ADLS Integrations" +{ + var + ADLSE: Codeunit "ADLSE"; + ADLSECredentials: Codeunit "ADLSE Credentials"; + AdlsIntegrations: Interface "ADLS Integrations"; + TableID: Integer; + FieldIdList: List of [Integer]; + DataBlobPath: Text; + DataBlobBlockIDs: List of [Text]; + BlobContentLength: Integer; + LastRecordOnPayloadTimeStamp: BigInteger; + Payload: TextBuilder; + LastFlushedTimeStamp: BigInteger; + NumberOfFlushes: Integer; + EntityName: Text; + EntityJson: JsonObject; + MaxSizeOfPayloadMiB: Integer; + EmitTelemetry: Boolean; + DeltaCdmManifestNameTxt: Label 'deltas.manifest.cdm.json', Locked = true; + DataCdmManifestNameTxt: Label 'data.manifest.cdm.json', Locked = true; + EntityManifestNameTemplateTxt: Label '%1.cdm.json', Locked = true, Comment = '%1 = Entity name'; + CorpusJsonPathTxt: Label '/%1', Comment = '%1 = name of the blob', Locked = true; + CannotAddedMoreBlocksErr: Label 'The number of blocks that can be added to the blob has reached its maximum limit.'; + SingleRecordTooLargeErr: Label 'A single record payload exceeded the max payload size. Please adjust the payload size or reduce the fields to be exported for the record.'; + DeltasFileCsvTok: Label '/deltas/%1/%2.csv', Comment = '%1: Entity, %2: File identifier guid', Locked = true; + ExportOfSchemaNotPerformendTxt: Label 'Please export the schema first before trying to export the data.'; + EntitySchemaChangedErr: Label 'The schema of the table %1 has changed. %2', Comment = '%1 = Entity name, %2 = NotAllowedOnSimultaneousExportTxt'; + CdmSchemaChangedErr: Label 'There may have been a change in the tables to export. %1', Comment = '%1 = NotAllowedOnSimultaneousExportTxt'; + + procedure GetBaseUrl(): Text + var + ADLSESetup: Record "ADLSE Setup"; + ValidGuid: Guid; + MSFabricUrlTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2.Lakehouse/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; + MSFabricUrlGuidTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; + begin + ADLSESetup.GetSingleton(); + + if not Evaluate(ValidGuid, ADLSESetup.Lakehouse) then + exit(StrSubstNo(MSFabricUrlTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)) + else + exit(StrSubstNo(MSFabricUrlGuidTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)); + end; + + procedure CheckSetup() + var + ADLSESetup: Record "ADLSE Setup"; + ADLSECurrentSession: Record "ADLSE Current Session"; + begin + ADLSESetup.GetSingleton(); + ADLSESetup.TestField(Workspace); + + ADLSESetup.CheckSchemaExported(); + + if ADLSECurrentSession.AreAnySessionsActive() then + ADLSECurrentSession.CheckForNoActiveSessions(); + + ADLSECredentials.Check(); + end; + + procedure CreateBlockBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials"; LeaseID: Text; Body: Text; IsJson: Boolean) + var + ADLSEHttp: Codeunit "ADLSE Http"; + Response: Text; + BlobPathOrg: Text; + CouldNotCreateBlobErr: Label 'Could not create blob %1. %2', Comment = '%1: blob path, %2: error text'; + begin + ADLSEHttp.SetMethod("ADLSE Http Method"::Put); + + BlobPathOrg := BlobPath; + ADLSEHttp.SetUrl(BlobPath + '?resource=file'); + + ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); + ADLSEHttp.AddHeader('x-ms-blob-type', 'BlockBlob'); + if IsJson then begin + ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeJson()); + ADLSEHttp.SetContentIsJson(); + end else + ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeTextCsv()); + ADLSEHttp.SetBody(Body); + if LeaseID <> '' then + ADLSEHttp.AddHeader('x-ms-lease-id', LeaseID); + if not ADLSEHttp.InvokeRestApi(Response) then + Error(CouldNotCreateBlobErr, BlobPath, Response); + + //Upload Json for Microsoft Fabric + if IsJson then + AddBlockToDataBlob(BlobPathOrg, Body, 0, ADLSECredentials); + end; + + procedure ResetTableExport(ltableId: Integer) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSECredentials: Codeunit "ADLSE Credentials"; + Body: JsonObject; + ResetTableExportTxt: Label '/reset/%1.txt', Locked = true, Comment = '%1 = Table name'; + begin + ADLSESetup.GetSingleton(); + ADLSECredentials.Init(); + + ADLSEGen2Util.CreateOrUpdateJsonBlob(GetBaseUrl() + StrSubstNo(ResetTableExportTxt, ADLSEUtil.GetDataLakeCompliantTableName(ltableId)), ADLSECredentials, '', Body); + end; + + local procedure AddBlockToDataBlob(BlobPath: Text; Body: Text; Position: Integer; ADLSECredentials: Codeunit "ADLSE Credentials") + var + ADLSEHttp: Codeunit "ADLSE Http"; + Response: Text; + CouldNotAppendDataToBlobErr: Label 'Could not append data to %1. %2', Comment = '%1: blob path, %2: Http response.'; + begin + ADLSEHttp.SetMethod("ADLSE Http Method"::Patch); + ADLSEHttp.SetUrl(BlobPath + '?position=' + Format(Position) + '&action=append&flush=true'); + ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); + ADLSEHttp.SetBody(Body); + if not ADLSEHttp.InvokeRestApi(Response) then + Error(CouldNotAppendDataToBlobErr, BlobPath, Response); + end; + + procedure Init(TableIDValue: Integer; FieldIdListValue: List of [Integer]; LastFlushedTimeStampValue: BigInteger; EmitTelemetryValue: Boolean) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimensions: Dictionary of [Text, Text]; + begin + ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); + + TableID := TableIDValue; + FieldIdList := FieldIdListValue; + + ADLSECredentials.Init(); + EntityName := ADLSEUtil.GetDataLakeCompliantTableName(TableID); + + LastFlushedTimeStamp := LastFlushedTimeStampValue; + ADLSESetup.GetSingleton(); + + MaxSizeOfPayloadMiB := ADLSESetup.MaxPayloadSizeMiB; + EmitTelemetry := EmitTelemetryValue; + if EmitTelemetry then begin + CustomDimensions.Add('Entity', EntityName); + CustomDimensions.Add('Last flushed time stamp', Format(LastFlushedTimeStampValue)); + ADLSEExecution.Log('ADLSE-041', 'Initialized ADLSE Communication to write to the lake.', Verbosity::Verbose); + end; + end; + + procedure CheckEntity(CdmDataFormat: Enum "ADLSE CDM Format"; var EntityJsonNeedsUpdate: Boolean; var ManifestJsonsNeedsUpdate: Boolean; SchemaUpdate: Boolean) + var + ADLSECdmUtil: Codeunit "ADLSE CDM Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + OldJson: JsonObject; + NewJson: JsonObject; + BlobExists: Boolean; + BlobEntityPath: Text; + begin + // check entity + EntityJson := ADLSECdmUtil.CreateEntityContent(TableID, FieldIdList); + BlobEntityPath := StrSubstNo(CorpusJsonPathTxt, StrSubstNo(EntityManifestNameTemplateTxt, EntityName)); + OldJson := ADLSEGen2Util.GetBlobContent(GetBaseUrl() + BlobEntityPath, ADLSECredentials, BlobExists); + if BlobExists and not SchemaUpdate then + ADLSECdmUtil.CheckChangeInEntities(OldJson, EntityJson, EntityName); + if not ADLSECdmUtil.CompareEntityJsons(OldJson, EntityJson) then begin + if EmitTelemetry then + ADLSEExecution.Log('ADLSE-028', GetLastErrorText() + GetLastErrorCallStack(), Verbosity::Warning); + ClearLastError(); + + EntityJsonNeedsUpdate := true; + JsonsDifferent(OldJson, EntityJson); // to log the difference + end; + + // check manifest. Assume that if the data manifest needs change, the delta manifest will also need be updated + OldJson := ADLSEGen2Util.GetBlobContent(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DataCdmManifestNameTxt), ADLSECredentials, BlobExists); + NewJson := ADLSECdmUtil.UpdateDefaultManifestContent(OldJson, TableID, 'data', CdmDataFormat); + ManifestJsonsNeedsUpdate := JsonsDifferent(OldJson, NewJson); + + if not SchemaUpdate then begin + if EntityJsonNeedsUpdate then + Error(EntitySchemaChangedErr, EntityName, ExportOfSchemaNotPerformendTxt); + if ManifestJsonsNeedsUpdate then + Error(CdmSchemaChangedErr, ExportOfSchemaNotPerformendTxt); + end; + end; + + procedure CreateEntityContent() + var + ADLSECdmUtil: Codeunit "ADLSE CDM Util"; + begin + EntityJson := ADLSECdmUtil.CreateEntityContent(TableID, FieldIdList); + end; + + local procedure JsonsDifferent(Json1: JsonObject; Json2: JsonObject) Result: Boolean + var + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimensions: Dictionary of [Text, Text]; + Content1: Text; + Content2: Text; + begin + Json1.WriteTo(Content1); + Json2.WriteTo(Content2); + Result := Content1 <> Content2; + if Result and EmitTelemetry then begin + CustomDimensions.Add('Content1', Content1); + CustomDimensions.Add('Content2', Content2); + ADLSEExecution.Log('ADLSE-023', 'Jsons were found to be different.', Verbosity::Warning, CustomDimensions); + end; + end; + + local procedure CreateDataBlob() Created: Boolean + var + ADLSEUtil: Codeunit "ADLSE Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimension: Dictionary of [Text, Text]; + FileIdentifer: Guid; + begin + if DataBlobPath <> '' then + // Microsoft Fabric has a limit on the blob size. Create a new blob before reaching this limit + if not ADLSEGen2Util.IsMaxBlobFileSize(DataBlobPath, BlobContentLength, Payload.Length()) then + exit // no need to create a new blob + else begin + if EmitTelemetry then begin + Clear(CustomDimension); + CustomDimension.Add('Entity', EntityName); + CustomDimension.Add('DataBlobPath', DataBlobPath); + CustomDimension.Add('BlobContentLength', Format(BlobContentLength)); + CustomDimension.Add('PayloadContentLength', Format(Payload.Length())); + ADLSEExecution.Log('ADLSE-030', 'Maximum blob size reached.', Verbosity::Normal, CustomDimension); + end; + Created := true; + BlobContentLength := 0; + end; + + FileIdentifer := CreateGuid(); + + DataBlobPath := StrSubstNo(DeltasFileCsvTok, EntityName, ADLSEUtil.ToText(FileIdentifer)); + ADLSEGen2Util.CreateDataBlob(GetBaseUrl() + DataBlobPath, ADLSECredentials); + Created := true; + if EmitTelemetry then begin + Clear(CustomDimension); + CustomDimension.Add('Entity', EntityName); + CustomDimension.Add('DataBlobPath', DataBlobPath); + ADLSEExecution.Log('ADLSE-012', 'Created new blob to hold the data to be exported', Verbosity::Normal, CustomDimension); + end; + end; + + procedure TryCollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; var LastTimestampExported: BigInteger) + var + DataBlobCreated: Boolean; + begin + ClearLastError(); + DataBlobCreated := CreateDataBlob(); + LastTimestampExported := CollectAndSendRecord(RecordRef, RecordTimeStamp, DataBlobCreated); + end; + + local procedure CollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; DataBlobCreated: Boolean) LastTimestampExported: BigInteger + var + ADLSEUtil: Codeunit "ADLSE Util"; + RecordPayLoad: Text; + begin + if NumberOfFlushes = 50000 then // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks + Error(CannotAddedMoreBlocksErr); + + // Add headers into the existing Payload + if (DataBlobCreated) and (Payload.Length() <> 0) then + Payload.Insert(1, ADLSEUtil.CreateCsvHeader(RecordRef, FieldIdList)); + + RecordPayLoad := ADLSEUtil.CreateCsvPayload(RecordRef, FieldIdList, Payload.Length() = 0); + // check if payload exceeds the limit + if Payload.Length() + StrLen(RecordPayLoad) + 2 > MaxPayloadSize() then begin // the 2 is to account for new line characters + if Payload.Length() = 0 then + // the record alone exceeds the max payload size + Error(SingleRecordTooLargeErr); + FlushPayload(); + end; + LastTimestampExported := LastFlushedTimeStamp; + + Payload.Append(RecordPayLoad); + LastRecordOnPayloadTimeStamp := RecordTimeStamp; + end; + + procedure TryFinish(var LastTimestampExported: BigInteger) + begin + ClearLastError(); + LastTimestampExported := Finish(); + end; + + local procedure Finish() LastTimestampExported: BigInteger + begin + FlushPayload(); + + LastTimestampExported := LastFlushedTimeStamp; + end; + + local procedure MaxPayloadSize(): Integer + var + MaxLimitForPutBlockCalls: Integer; + MaxCapacityOfTextBuilder: Integer; + begin + MaxLimitForPutBlockCalls := MaxSizeOfPayloadMiB * 1024 * 1024; + MaxCapacityOfTextBuilder := Payload.MaxCapacity(); + if MaxLimitForPutBlockCalls < MaxCapacityOfTextBuilder then + exit(MaxLimitForPutBlockCalls); + exit(MaxCapacityOfTextBuilder); + end; + + local procedure FlushPayload() + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ADLSEExecution: Codeunit "ADLSE Execution"; + CustomDimensions: Dictionary of [Text, Text]; + BlockID: Text; + begin + if Payload.Length() = 0 then + exit; + + if EmitTelemetry then begin + CustomDimensions.Add('Length of payload', Format(Payload.Length())); + ADLSEExecution.Log('ADLSE-013', 'Flushing the payload', Verbosity::Normal, CustomDimensions); + end; + + AddBlockToDataBlob(GetBaseUrl() + DataBlobPath, Payload.ToText(), BlobContentLength, ADLSECredentials); + BlobContentLength := ADLSEGen2Util.GetBlobContentLength(GetBaseUrl() + DataBlobPath, ADLSECredentials); + + + LastFlushedTimeStamp := LastRecordOnPayloadTimeStamp; + Payload.Clear(); + LastRecordOnPayloadTimeStamp := 0; + NumberOfFlushes += 1; + + ADLSE.OnTableExported(TableID, LastFlushedTimeStamp); + if EmitTelemetry then begin + Clear(CustomDimensions); + CustomDimensions.Add('Flushed count', Format(NumberOfFlushes)); + ADLSEExecution.Log('ADLSE-016', 'Flushed the payload', Verbosity::Normal, CustomDimensions); + end; + end; + + procedure UpdateCdmJsons(EntityJsonNeedsUpdate: Boolean; ManifestJsonsNeedsUpdate: Boolean) + var + ADLSESetup: Record "ADLSE Setup"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + LeaseID: Text; + BlobPath: Text; + begin + // update entity json + if EntityJsonNeedsUpdate then begin + BlobPath := GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, StrSubstNo(EntityManifestNameTemplateTxt, EntityName)); + ADLSEGen2Util.CreateOrUpdateJsonBlob(BlobPath, ADLSECredentials, LeaseID, EntityJson); + end; + + // update manifest + if ManifestJsonsNeedsUpdate then begin + // Expected that multiple sessions that export data from different tables will be competing for writing to + // manifest. Semaphore applied. + ADLSESetup.ReadIsolation := IsolationLevel::UpdLock; + ADLSESetup.GetSingleton(); + + UpdateManifest(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DataCdmManifestNameTxt), 'data', ADLSESetup.DataFormat); + + UpdateManifest(GetBaseUrl() + StrSubstNo(CorpusJsonPathTxt, DeltaCdmManifestNameTxt), 'deltas', "ADLSE CDM Format"::Csv); + Commit(); // to release the lock above + end; + end; + + local procedure UpdateManifest(BlobPath: Text; Folder: Text; ADLSECdmFormat: Enum "ADLSE CDM Format") + var + ADLSESetup: Record "ADLSE Setup"; + ADLSECdmUtil: Codeunit "ADLSE CDM Util"; + ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; + ManifestJson: JsonObject; + LeaseID: Text; + BlobExists: Boolean; + begin + if BlobExists then + ManifestJson := ADLSEGen2Util.GetBlobContent(BlobPath, ADLSECredentials, BlobExists); + + ManifestJson := ADLSECdmUtil.UpdateDefaultManifestContent(ManifestJson, TableID, Folder, ADLSECdmFormat); + ADLSEGen2Util.CreateOrUpdateJsonBlob(BlobPath, ADLSECredentials, LeaseID, ManifestJson); + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/Gen2Util.Codeunit.al b/businessCentral/app/src/Gen2Util.Codeunit.al index 94dab36..3b8e4e3 100644 --- a/businessCentral/app/src/Gen2Util.Codeunit.al +++ b/businessCentral/app/src/Gen2Util.Codeunit.al @@ -6,22 +6,19 @@ codeunit 82568 "ADLSE Gen 2 Util" SingleInstance = true; var + Adlse: codeunit ADLSE; + ADLSIntegrations: Interface "ADLS Integrations"; AcquireLeaseSuffixTxt: Label '?comp=lease', Locked = true; LeaseDurationSecsTxt: Label '60', Locked = true, Comment = 'This is the maximum duration for a lock on the blobs'; AcquireLeaseTimeoutSecondsTxt: Label '180', Locked = true, Comment = 'The number of seconds to continuously try to acquire a lock on the blob. This must be more than the value specified for AcquireLeaseSleepSecondsTxt.'; AcquireLeaseSleepSecondsTxt: Label '10', Locked = true, Comment = 'The number of seconds to sleep for before re-trying to acquire a lock on the blob. This must be less than the value specified for AcquireLeaseTimeoutSecondsTxt.'; TimedOutWaitingForLockOnBlobErr: Label 'Timed out waiting to acquire lease on blob %1 after %2 seconds. %3', Comment = '%1: blob name, %2: total waiting time in seconds, %3: Http Response'; CouldNotReleaseLockOnBlobErr: Label 'Could not release lock on blob %1. %2', Comment = '%1: blob name, %2: Http response.'; - CreateContainerSuffixTxt: Label '?restype=container', Locked = true; CoundNotCreateContainerErr: Label 'Could not create container %1. %2', Comment = '%1: container name; %2: error text'; GetContainerMetadataSuffixTxt: Label '?restype=container&comp=metadata', Locked = true; - - PutBlockSuffixTxt: Label '?comp=block&blockid=%1', Locked = true, Comment = '%1 = the block id being added'; PutLockListSuffixTxt: Label '?comp=blocklist', Locked = true; - CouldNotAppendDataToBlobErr: Label 'Could not append data to %1. %2', Comment = '%1: blob path, %2: Http response.'; CouldNotCommitBlocksToDataBlobErr: Label 'Could not commit blocks to %1. %2', Comment = '%1: Blob path, %2: Http Response'; - CouldNotCreateBlobErr: Label 'Could not create blob %1. %2', Comment = '%1: blob path, %2: error text'; CouldNotReadDataInBlobErr: Label 'Could not read data on %1. %2', Comment = '%1: blob path, %2: Http respomse'; CouldNotReadResponseHeaderErr: Label 'Could not read %1 from %2.', Comment = '%1: content header value , %2: blob path'; LatestBlockTagTok: Label '%1', Comment = '%1: block ID', Locked = true; @@ -110,94 +107,14 @@ codeunit 82568 "ADLSE Gen 2 Util" BodyAsText: Text; begin Body.WriteTo(BodyAsText); - CreateBlockBlob(BlobPath, ADLSECredentials, LeaseID, BodyAsText, true); - end; - - local procedure CreateBlockBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials"; LeaseID: Text; Body: Text; IsJson: Boolean) - var - ADLSESetup: Record "ADLSE Setup"; - ADLSEHttp: Codeunit "ADLSE Http"; - Response: Text; - BlobPathOrg: Text; - IsHandled: Boolean; - begin - OnBeforeCreateBlockBlob(BlobPath, LeaseID, Body, IsJson, IsHandled); - if IsHandled then - exit; - - ADLSEHttp.SetMethod("ADLSE Http Method"::Put); - - case ADLSESetup.GetStorageType() of - ADLSESetup."Storage Type"::"Azure Data Lake": - ADLSEHttp.SetUrl(BlobPath); - ADLSESetup."Storage Type"::"Microsoft Fabric": - begin - BlobPathOrg := BlobPath; - ADLSEHttp.SetUrl(BlobPath + '?resource=file'); - end; - end; - - ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); - ADLSEHttp.AddHeader('x-ms-blob-type', 'BlockBlob'); - if IsJson then begin - ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeJson()); - ADLSEHttp.SetContentIsJson(); - end else - ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeTextCsv()); - ADLSEHttp.SetBody(Body); - if LeaseID <> '' then - ADLSEHttp.AddHeader('x-ms-lease-id', LeaseID); - if not ADLSEHttp.InvokeRestApi(Response) then - Error(CouldNotCreateBlobErr, BlobPath, Response); - - //Upload Json for Microsoft Fabric - if (ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Microsoft Fabric") and (IsJson) then - AddBlockToDataBlob(BlobPathOrg, Body, 0, ADLSECredentials); + Adlse.selectbc2adlsIntegrations(ADLSIntegrations); + ADLSIntegrations.CreateBlockBlob(BlobPath, ADLSECredentials, LeaseID, BodyAsText, true); end; procedure CreateDataBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials") begin - CreateBlockBlob(BlobPath, ADLSECredentials, '', '', false); - end; - - // Storage Type - Azure Data Lake Storage - procedure AddBlockToDataBlob(BlobPath: Text; Body: Text; ADLSECredentials: Codeunit "ADLSE Credentials") BlockID: Text - var - Base64Convert: Codeunit "Base64 Convert"; - ADLSEHttp: Codeunit "ADLSE Http"; - Response: Text; - IsHandled: Boolean; - begin - OnBeforeAddBlockToDataBlob(BlobPath, Body, BlockID, IsHandled); - if IsHandled then - exit; - - ADLSEHttp.SetMethod("ADLSE Http Method"::Put); - BlockID := Base64Convert.ToBase64(CreateGuid()); - ADLSEHttp.SetUrl(BlobPath + StrSubstNo(PutBlockSuffixTxt, BlockID)); - ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); - ADLSEHttp.SetBody(Body); - if not ADLSEHttp.InvokeRestApi(Response) then - Error(CouldNotAppendDataToBlobErr, BlobPath, Response); - end; - - // Storage Type - Microsoft Fabric - procedure AddBlockToDataBlob(BlobPath: Text; Body: Text; Position: Integer; ADLSECredentials: Codeunit "ADLSE Credentials") - var - ADLSEHttp: Codeunit "ADLSE Http"; - Response: Text; - IsHandled: Boolean; - begin - OnBeforeAddBlockToDataBlob(BlobPath, Body, Format(Position), IsHandled); - if IsHandled then - exit; - - ADLSEHttp.SetMethod("ADLSE Http Method"::Patch); - ADLSEHttp.SetUrl(BlobPath + '?position=' + Format(Position) + '&action=append&flush=true'); - ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); - ADLSEHttp.SetBody(Body); - if not ADLSEHttp.InvokeRestApi(Response) then - Error(CouldNotAppendDataToBlobErr, BlobPath, Response); + Adlse.selectbc2adlsIntegrations(ADLSIntegrations); + ADLSIntegrations.CreateBlockBlob(BlobPath, ADLSECredentials, '', '', false); end; procedure CommitAllBlocksOnDataBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials"; BlockIDList: List of [Text]) diff --git a/businessCentral/app/src/Setup.Codeunit.al b/businessCentral/app/src/Setup.Codeunit.al index 2034076..04505b8 100644 --- a/businessCentral/app/src/Setup.Codeunit.al +++ b/businessCentral/app/src/Setup.Codeunit.al @@ -75,25 +75,6 @@ codeunit 82560 "ADLSE Setup" exit(true); end; - procedure CheckSetup(var ADLSESetup: Record "ADLSE Setup") - var - ADLSECurrentSession: Record "ADLSE Current Session"; - ADLSECredentials: Codeunit "ADLSE Credentials"; - begin - ADLSESetup.GetSingleton(); - if ADLSESetup."Storage Type" = ADLSESetup."Storage Type"::"Azure Data Lake" then - ADLSESetup.TestField(Container) - else - ADLSESetup.TestField(Workspace); - - ADLSESetup.CheckSchemaExported(); - - if ADLSECurrentSession.AreAnySessionsActive() then - ADLSECurrentSession.CheckForNoActiveSessions(); - - ADLSECredentials.Check(); - end; - [InherentPermissions(PermissionObjectType::TableData, Database::"ADLSE Field", 'rd')] [InherentPermissions(PermissionObjectType::TableData, Database::"ADLSE Table", 'rd')] [InherentPermissions(PermissionObjectType::TableData, Database::"ADLSE Setup", 'm')] diff --git a/businessCentral/app/src/StorageType.Enum.al b/businessCentral/app/src/StorageType.Enum.al index 87d17ce..33fa5e1 100644 --- a/businessCentral/app/src/StorageType.Enum.al +++ b/businessCentral/app/src/StorageType.Enum.al @@ -7,12 +7,12 @@ enum 82563 "ADLSE Storage Type" implements "ADLS Integrations" value(0; "Azure Data Lake") { Caption = 'Azure Data Lake'; - Implementation = "ADLS Integrations" = "Azure Integration"; + Implementation = "ADLS Integrations" = "Azure Communication"; } #pragma warning restore LC0045 value(1; "Microsoft Fabric") { Caption = 'Microsoft Fabric'; - Implementation = "ADLS Integrations" = "FL Integration"; + Implementation = "ADLS Integrations" = "Fabric Communication"; } } \ No newline at end of file From 4255ad6cc4a970c0a41cf72b1088f700fc716772 Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 14:18:36 +0100 Subject: [PATCH 05/10] removed comments --- businessCentral/app/src/Communication.Codeunit.al | 5 ----- 1 file changed, 5 deletions(-) diff --git a/businessCentral/app/src/Communication.Codeunit.al b/businessCentral/app/src/Communication.Codeunit.al index a894a9f..537f6fa 100644 --- a/businessCentral/app/src/Communication.Codeunit.al +++ b/businessCentral/app/src/Communication.Codeunit.al @@ -17,33 +17,28 @@ codeunit 82562 "ADLSE Communication" procedure CheckEntity(CdmDataFormat: Enum "ADLSE CDM Format"; var EntityJsonNeedsUpdate: Boolean; var ManifestJsonsNeedsUpdate: Boolean; SchemaUpdate: Boolean) begin - //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); AdlsIntegrations.CheckEntity(CdmDataFormat, EntityJsonNeedsUpdate, ManifestJsonsNeedsUpdate, SchemaUpdate); end; procedure CreateEntityContent() begin - //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); AdlsIntegrations.CreateEntityContent(); end; [TryFunction] procedure TryCollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; var LastTimestampExported: BigInteger) begin - //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); AdlsIntegrations.TryCollectAndSendRecord(RecordRef, RecordTimeStamp, LastTimestampExported); end; [TryFunction] procedure TryFinish(var LastTimestampExported: BigInteger) begin - //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); AdlsIntegrations.TryFinish(LastTimestampExported); end; procedure UpdateCdmJsons(EntityJsonNeedsUpdate: Boolean; ManifestJsonsNeedsUpdate: Boolean) begin - //ADLSE.selectbc2adlsIntegrations(AdlsIntegrations); AdlsIntegrations.UpdateCdmJsons(EntityJsonNeedsUpdate, ManifestJsonsNeedsUpdate); end; } From 24b2eeae42736756b8161b64126910b57c1ec245 Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 14:28:33 +0100 Subject: [PATCH 06/10] moved precedure to interface --- .../AzureCommunication.Codeunit.al | 17 ++------------- .../FabricCommunication.Codeunit.al | 21 ++++++++++++++++++- businessCentral/app/src/Gen2Util.Codeunit.al | 19 ----------------- 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al index fbfb8e6..c2f4dde 100644 --- a/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al +++ b/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al @@ -208,21 +208,8 @@ codeunit 82577 "Azure Communication" implements "ADLS Integrations" FileIdentifer: Guid; begin if DataBlobPath <> '' then - // Microsoft Fabric has a limit on the blob size. Create a new blob before reaching this limit - if not ADLSEGen2Util.IsMaxBlobFileSize(DataBlobPath, BlobContentLength, Payload.Length()) then - exit // no need to create a new blob - else begin - if EmitTelemetry then begin - Clear(CustomDimension); - CustomDimension.Add('Entity', EntityName); - CustomDimension.Add('DataBlobPath', DataBlobPath); - CustomDimension.Add('BlobContentLength', Format(BlobContentLength)); - CustomDimension.Add('PayloadContentLength', Format(Payload.Length())); - ADLSEExecution.Log('ADLSE-030', 'Maximum blob size reached.', Verbosity::Normal, CustomDimension); - end; - Created := true; - BlobContentLength := 0; - end; + // already created blob + exit; FileIdentifer := CreateGuid(); diff --git a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al index 47c470e..77f4bec 100644 --- a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al +++ b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al @@ -217,7 +217,7 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" begin if DataBlobPath <> '' then // Microsoft Fabric has a limit on the blob size. Create a new blob before reaching this limit - if not ADLSEGen2Util.IsMaxBlobFileSize(DataBlobPath, BlobContentLength, Payload.Length()) then + if not IsMaxBlobFileSize(BlobContentLength, Payload.Length()) then exit // no need to create a new blob else begin if EmitTelemetry then begin @@ -245,6 +245,25 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" end; end; + local procedure IsMaxBlobFileSize(BlobContentLength: Integer; PayloadLength: Integer): Boolean + var + ADLSESetup: Record "ADLSE Setup"; + BlobTotalContentSize: BigInteger; + begin + if ADLSESetup.GetStorageType() <> ADLSESetup."Storage Type"::"Microsoft Fabric" then + exit(false); + + // To prevent a overflow, use a BigInterger to calculate the total value + BlobTotalContentSize := BlobContentLength; + BlobTotalContentSize += PayloadLength; + + // Microsoft Fabric has a limit of 2 GB (2147483647) for a blob. + if BlobTotalContentSize < 2147483647 then + exit(false); + + exit(true); + end; + procedure TryCollectAndSendRecord(RecordRef: RecordRef; RecordTimeStamp: BigInteger; var LastTimestampExported: BigInteger) var DataBlobCreated: Boolean; diff --git a/businessCentral/app/src/Gen2Util.Codeunit.al b/businessCentral/app/src/Gen2Util.Codeunit.al index 3b8e4e3..fb425bc 100644 --- a/businessCentral/app/src/Gen2Util.Codeunit.al +++ b/businessCentral/app/src/Gen2Util.Codeunit.al @@ -194,25 +194,6 @@ codeunit 82568 "ADLSE Gen 2 Util" Error(CouldNotReleaseLockOnBlobErr, BlobPath, Response); end; - procedure IsMaxBlobFileSize(DataBlobPath: Text; BlobContentLength: Integer; PayloadLength: Integer): Boolean - var - ADLSESetup: Record "ADLSE Setup"; - BlobTotalContentSize: BigInteger; - begin - if ADLSESetup.GetStorageType() <> ADLSESetup."Storage Type"::"Microsoft Fabric" then - exit(false); - - // To prevent a overflow, use a BigInterger to calculate the total value - BlobTotalContentSize := BlobContentLength; - BlobTotalContentSize += PayloadLength; - - // Microsoft Fabric has a limit of 2 GB (2147483647) for a blob. - if BlobTotalContentSize < 2147483647 then - exit(false); - - exit(true); - end; - procedure RemoveDeltasFromDataLake(ADLSEntityName: Text; ADLSECredentials: Codeunit "ADLSE Credentials") var ADLSESetup: Record "ADLSE Setup"; From a5dd5c6d894b193416a6a9e4901fbaeb056ce677 Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 14:43:45 +0100 Subject: [PATCH 07/10] moved addcontent to an event --- .../AzureSubscribers.Codeunit.al | 23 +++++++++++++ .../FabricCommunication.Codeunit.al | 4 --- .../FabricSubscribers.Codeunit.al | 26 +++++++++++++++ businessCentral/app/src/Http.Codeunit.al | 32 ++++++------------- 4 files changed, 58 insertions(+), 27 deletions(-) create mode 100644 businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al diff --git a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al index e9b4a8d..ba8f070 100644 --- a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al +++ b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al @@ -19,4 +19,27 @@ codeunit 82579 "Azure Subscribers" if not ADLSEGen2Util.ContainerExists(ADLSIntegrations.GetBaseUrl(), ADLSECredentials) then ADLSEGen2Util.CreateContainer(ADLSIntegrations.GetBaseUrl(), ADLSECredentials); end; + + + [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Http", 'OnBeforeAddContent', '', true, true)] + local procedure OnBeforeAddContent(var HttpContent: HttpContent; ContentTypeJson: Boolean; body: Text) + var + ADLSESetup: Record "ADLSE Setup"; + Headers: HttpHeaders; + begin + ADLSESetup.GetSingleton(); + if ADLSESetup."Storage Type" <> ADLSESetup."Storage Type"::"Azure Data Lake" then + exit; + + if (ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake") then + HttpContent.WriteFrom(Body); + + HttpContent.GetHeaders(Headers); + + if ContentTypeJson then begin + Headers.Remove('Content-Type'); + Headers.Add('Content-Type', 'application/json'); + Headers.Remove('Content-Length'); + end; + end; } \ No newline at end of file diff --git a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al index 77f4bec..6ba5f65 100644 --- a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al +++ b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al @@ -247,12 +247,8 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" local procedure IsMaxBlobFileSize(BlobContentLength: Integer; PayloadLength: Integer): Boolean var - ADLSESetup: Record "ADLSE Setup"; BlobTotalContentSize: BigInteger; begin - if ADLSESetup.GetStorageType() <> ADLSESetup."Storage Type"::"Microsoft Fabric" then - exit(false); - // To prevent a overflow, use a BigInterger to calculate the total value BlobTotalContentSize := BlobContentLength; BlobTotalContentSize += PayloadLength; diff --git a/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al new file mode 100644 index 0000000..18421b6 --- /dev/null +++ b/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al @@ -0,0 +1,26 @@ +codeunit 82580 "Fabric Subscribers" +{ + [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Http", 'OnBeforeAddContent', '', true, true)] + local procedure OnBeforeAddContent(var HttpContent: HttpContent; ContentTypeJson: Boolean; body: Text) + var + ADLSESetup: Record "ADLSE Setup"; + Headers: HttpHeaders; + begin + ADLSESetup.GetSingleton(); + if ADLSESetup."Storage Type" <> ADLSESetup."Storage Type"::"Microsoft Fabric" then + exit; + + if not ContentTypeJson then + HttpContent.WriteFrom(Body); + + HttpContent.GetHeaders(Headers); + + if ContentTypeJson then begin + Headers.Remove('Content-Type'); + Headers.Add('Content-Type', 'application/json'); + Headers.Remove('Content-Length'); + Headers.Add('Content-Length', '0'); + end else + Headers.Remove('Content-Length'); + end; +} \ No newline at end of file diff --git a/businessCentral/app/src/Http.Codeunit.al b/businessCentral/app/src/Http.Codeunit.al index 9bfe964..656dff6 100644 --- a/businessCentral/app/src/Http.Codeunit.al +++ b/businessCentral/app/src/Http.Codeunit.al @@ -132,7 +132,7 @@ codeunit 82563 "ADLSE Http" begin HttpRequestMessage.Method('PUT'); HttpRequestMessage.SetRequestUri(Url); - AddContent(HttpContent); + AddContent(HttpContent, body); HttpClient.Put(Url, HttpContent, HttpResponseMessage); end; "ADLSE Http Method"::Delete: @@ -141,7 +141,7 @@ codeunit 82563 "ADLSE Http" begin HttpRequestMessage.Method('PATCH'); HttpRequestMessage.SetRequestUri(Url); - AddContent(HttpContent); + AddContent(HttpContent, body); HttpRequestMessage.Content(HttpContent); HttpClient.Send(HttpRequestMessage, HttpResponseMessage); end; @@ -163,28 +163,9 @@ codeunit 82563 "ADLSE Http" StatusCode := HttpResponseMessage.HttpStatusCode(); end; - local procedure AddContent(var HttpContent: HttpContent) - var - ADLSESetup: Record "ADLSE Setup"; - Headers: HttpHeaders; + local procedure AddContent(var HttpContent: HttpContent; body: Text) begin - if (ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake") or - (ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Microsoft Fabric") and (not ContentTypeJson) - then - HttpContent.WriteFrom(Body); - - HttpContent.GetHeaders(Headers); - - if ContentTypeJson then begin - Headers.Remove('Content-Type'); - Headers.Add('Content-Type', 'application/json'); - Headers.Remove('Content-Length'); - if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Microsoft Fabric" then - Headers.Add('Content-Length', '0'); - end; - - if (ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Microsoft Fabric") and (not ContentTypeJson) then - Headers.Remove('Content-Length'); + OnBeforeAddContent(HttpContent, ContentTypeJson, body); end; [NonDebuggable] @@ -263,4 +244,9 @@ codeunit 82563 "ADLSE Http" AccessToken := ADSEUtil.GetTextValueForKeyInJson(Json, 'access_token'); // TODO: Store access token in cache, and use it based on expiry date. end; + + [IntegrationEvent(false, false)] + local procedure OnBeforeAddContent(var HttpContent: HttpContent; ContentTypeJson: Boolean; body: Text) + begin + end; } \ No newline at end of file From 07c7144c12cc0101beea3074cbaa59308aed8cbe Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Fri, 22 Nov 2024 14:51:57 +0100 Subject: [PATCH 08/10] ScopeUrlEncoded to event --- .../src/AzureDataLake/AzureSubscribers.Codeunit.al | 12 ++++++++++++ .../FabricLakehouse/FabricSubscribers.Codeunit.al | 12 ++++++++++++ businessCentral/app/src/Http.Codeunit.al | 12 ++++++------ 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al index ba8f070..fd5a706 100644 --- a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al +++ b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al @@ -42,4 +42,16 @@ codeunit 82579 "Azure Subscribers" Headers.Remove('Content-Length'); end; end; + + [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Http", 'OnBeforeAcquireTokenOAuth2', '', true, true)] + local procedure OnBeforeAcquireTokenOAuth2(var ScopeUrlEncoded: Text) + var + ADLSESetup: Record "ADLSE Setup"; + begin + ADLSESetup.GetSingleton(); + if ADLSESetup."Storage Type" <> ADLSESetup."Storage Type"::"Azure Data Lake" then + exit; + + ScopeUrlEncoded := 'https%3A%2F%2Fstorage.azure.com%2Fuser_impersonation'; // url encoded form of https://storage.azure.com/user_impersonation + end; } \ No newline at end of file diff --git a/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al index 18421b6..29e1030 100644 --- a/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al +++ b/businessCentral/app/src/FabricLakehouse/FabricSubscribers.Codeunit.al @@ -23,4 +23,16 @@ codeunit 82580 "Fabric Subscribers" end else Headers.Remove('Content-Length'); end; + + [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Http", 'OnBeforeAcquireTokenOAuth2', '', true, true)] + local procedure OnBeforeAcquireTokenOAuth2(var ScopeUrlEncoded: Text) + var + ADLSESetup: Record "ADLSE Setup"; + begin + ADLSESetup.GetSingleton(); + if ADLSESetup."Storage Type" <> ADLSESetup."Storage Type"::"Microsoft Fabric" then + exit; + + ScopeUrlEncoded := 'https%3A%2F%2Fstorage.azure.com%2F.default'; // url encoded form of https://storage.azure.com/.default + end; } \ No newline at end of file diff --git a/businessCentral/app/src/Http.Codeunit.al b/businessCentral/app/src/Http.Codeunit.al index 656dff6..c86abc3 100644 --- a/businessCentral/app/src/Http.Codeunit.al +++ b/businessCentral/app/src/Http.Codeunit.al @@ -210,12 +210,7 @@ codeunit 82563 "ADLSE Http" Json: JsonObject; ScopeUrlEncoded: Text; begin - case ADLSESetup.GetStorageType() of - ADLSESetup."Storage Type"::"Azure Data Lake": - ScopeUrlEncoded := 'https%3A%2F%2Fstorage.azure.com%2Fuser_impersonation'; // url encoded form of https://storage.azure.com/user_impersonation - ADLSESetup."Storage Type"::"Microsoft Fabric": - ScopeUrlEncoded := 'https%3A%2F%2Fstorage.azure.com%2F.default'; // url encoded form of https://storage.azure.com/.default - end; + OnBeforeAcquireTokenOAuth2(ScopeUrlEncoded); Uri := StrSubstNo(OAuthTok, Credentials.GetTenantID()); HttpRequestMessage.Method('POST'); @@ -249,4 +244,9 @@ codeunit 82563 "ADLSE Http" local procedure OnBeforeAddContent(var HttpContent: HttpContent; ContentTypeJson: Boolean; body: Text) begin end; + + [IntegrationEvent(false, false)] + local procedure OnBeforeAcquireTokenOAuth2(var ScopeUrlEncoded: Text) + begin + end; } \ No newline at end of file From 2bd29d5ed06173345d11a21e7fcc56466b9671f4 Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Mon, 25 Nov 2024 14:53:02 +0100 Subject: [PATCH 09/10] Add event subscriber for OnBeforeInvokeRestApi to handle additional request headers --- .../AzureSubscribers.Codeunit.al | 21 +++++++++++++++++++ businessCentral/app/src/Http.Codeunit.al | 20 +++++++----------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al index fd5a706..1d65872 100644 --- a/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al +++ b/businessCentral/app/src/AzureDataLake/AzureSubscribers.Codeunit.al @@ -54,4 +54,25 @@ codeunit 82579 "Azure Subscribers" ScopeUrlEncoded := 'https%3A%2F%2Fstorage.azure.com%2Fuser_impersonation'; // url encoded form of https://storage.azure.com/user_impersonation end; + + [EventSubscriber(ObjectType::Codeunit, Codeunit::"ADLSE Http", 'OnBeforeInvokeRestApi', '', true, true)] + local procedure OnBeforeInvokeRestApi(AdditionalRequestHeaders: Dictionary of [Text, Text]; var Headers: HttpHeaders; HttpClient: HttpClient) + var + ADLSESetup: Record "ADLSE Setup"; + HeaderKey: Text; + HeaderValue: Text; + begin + ADLSESetup.GetSingleton(); + if ADLSESetup."Storage Type" <> ADLSESetup."Storage Type"::"Azure Data Lake" then + exit; + + if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then + if AdditionalRequestHeaders.Count() > 0 then begin + Headers := HttpClient.DefaultRequestHeaders(); + foreach HeaderKey in AdditionalRequestHeaders.Keys do begin + AdditionalRequestHeaders.Get(HeaderKey, HeaderValue); + Headers.Add(HeaderKey, HeaderValue); + end; + end; + end; } \ No newline at end of file diff --git a/businessCentral/app/src/Http.Codeunit.al b/businessCentral/app/src/Http.Codeunit.al index c86abc3..6aa2ab1 100644 --- a/businessCentral/app/src/Http.Codeunit.al +++ b/businessCentral/app/src/Http.Codeunit.al @@ -107,24 +107,15 @@ codeunit 82563 "ADLSE Http" HttpRequestMessage: HttpRequestMessage; HttpResponseMessage: HttpResponseMessage; HttpContent: HttpContent; - HeaderKey: Text; - HeaderValue: Text; begin + OnBeforeInvokeRestApi(AdditionalRequestHeaders, Headers, HttpClient); + ADLSESetup.GetSingleton(); HttpClient.SetBaseAddress(Url); if not AddAuthorization(HttpClient, Response) then exit(false); - if ADLSESetup.GetStorageType() = ADLSESetup."Storage Type"::"Azure Data Lake" then - if AdditionalRequestHeaders.Count() > 0 then begin - Headers := HttpClient.DefaultRequestHeaders(); - foreach HeaderKey in AdditionalRequestHeaders.Keys do begin - AdditionalRequestHeaders.Get(HeaderKey, HeaderValue); - Headers.Add(HeaderKey, HeaderValue); - end; - end; - case HttpMethod of "ADLSE Http Method"::Get: HttpClient.Get(Url, HttpResponseMessage); @@ -197,7 +188,6 @@ codeunit 82563 "ADLSE Http" [NonDebuggable] local procedure AcquireTokenOAuth2(var AuthError: Text) AccessToken: Text var - ADLSESetup: Record "ADLSE Setup"; ADSEUtil: Codeunit "ADLSE Util"; HttpClient: HttpClient; HttpRequestMessage: HttpRequestMessage; @@ -249,4 +239,10 @@ codeunit 82563 "ADLSE Http" local procedure OnBeforeAcquireTokenOAuth2(var ScopeUrlEncoded: Text) begin end; + + [IntegrationEvent(false, false)] + local procedure OnBeforeInvokeRestApi(AdditionalRequestHeaders: Dictionary of [Text, Text]; var Headers: HttpHeaders; HttpClient: HttpClient) + begin + end; + } \ No newline at end of file From a8a003e5dca43480d6a166eb46f87356f424943b Mon Sep 17 00:00:00 2001 From: Bert Verbeek Date: Mon, 25 Nov 2024 14:58:12 +0100 Subject: [PATCH 10/10] Refactor Azure and Fabric Communication codeunits to remove unused variables and improve parameter naming --- .../AzureDataLake/AzureCommunication.Codeunit.al | 1 - .../FabricCommunication.Codeunit.al | 15 +++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al b/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al index c2f4dde..67038ed 100644 --- a/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al +++ b/businessCentral/app/src/AzureDataLake/AzureCommunication.Codeunit.al @@ -8,7 +8,6 @@ codeunit 82577 "Azure Communication" implements "ADLS Integrations" FieldIdList: List of [Integer]; DataBlobPath: Text; DataBlobBlockIDs: List of [Text]; - BlobContentLength: Integer; LastRecordOnPayloadTimeStamp: BigInteger; Payload: TextBuilder; LastFlushedTimeStamp: BigInteger; diff --git a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al index 6ba5f65..7b3faf4 100644 --- a/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al +++ b/businessCentral/app/src/FabricLakehouse/FabricCommunication.Codeunit.al @@ -7,7 +7,6 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" TableID: Integer; FieldIdList: List of [Integer]; DataBlobPath: Text; - DataBlobBlockIDs: List of [Text]; BlobContentLength: Integer; LastRecordOnPayloadTimeStamp: BigInteger; Payload: TextBuilder; @@ -59,7 +58,7 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" ADLSECredentials.Check(); end; - procedure CreateBlockBlob(BlobPath: Text; ADLSECredentials: Codeunit "ADLSE Credentials"; LeaseID: Text; Body: Text; IsJson: Boolean) + procedure CreateBlockBlob(BlobPath: Text; lADLSECredentials: Codeunit "ADLSE Credentials"; LeaseID: Text; Body: Text; IsJson: Boolean) var ADLSEHttp: Codeunit "ADLSE Http"; Response: Text; @@ -71,7 +70,7 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" BlobPathOrg := BlobPath; ADLSEHttp.SetUrl(BlobPath + '?resource=file'); - ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); + ADLSEHttp.SetAuthorizationCredentials(lADLSECredentials); ADLSEHttp.AddHeader('x-ms-blob-type', 'BlockBlob'); if IsJson then begin ADLSEHttp.AddHeader('x-ms-blob-content-type', ADLSEHttp.GetContentTypeJson()); @@ -86,7 +85,7 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" //Upload Json for Microsoft Fabric if IsJson then - AddBlockToDataBlob(BlobPathOrg, Body, 0, ADLSECredentials); + AddBlockToDataBlob(BlobPathOrg, Body, 0, lADLSECredentials); end; procedure ResetTableExport(ltableId: Integer) @@ -94,7 +93,6 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" ADLSESetup: Record "ADLSE Setup"; ADLSEUtil: Codeunit "ADLSE Util"; ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; - ADLSECredentials: Codeunit "ADLSE Credentials"; Body: JsonObject; ResetTableExportTxt: Label '/reset/%1.txt', Locked = true, Comment = '%1 = Table name'; begin @@ -104,7 +102,7 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" ADLSEGen2Util.CreateOrUpdateJsonBlob(GetBaseUrl() + StrSubstNo(ResetTableExportTxt, ADLSEUtil.GetDataLakeCompliantTableName(ltableId)), ADLSECredentials, '', Body); end; - local procedure AddBlockToDataBlob(BlobPath: Text; Body: Text; Position: Integer; ADLSECredentials: Codeunit "ADLSE Credentials") + local procedure AddBlockToDataBlob(BlobPath: Text; Body: Text; Position: Integer; lADLSECredentials: Codeunit "ADLSE Credentials") var ADLSEHttp: Codeunit "ADLSE Http"; Response: Text; @@ -112,7 +110,7 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" begin ADLSEHttp.SetMethod("ADLSE Http Method"::Patch); ADLSEHttp.SetUrl(BlobPath + '?position=' + Format(Position) + '&action=append&flush=true'); - ADLSEHttp.SetAuthorizationCredentials(ADLSECredentials); + ADLSEHttp.SetAuthorizationCredentials(lADLSECredentials); ADLSEHttp.SetBody(Body); if not ADLSEHttp.InvokeRestApi(Response) then Error(CouldNotAppendDataToBlobErr, BlobPath, Response); @@ -322,11 +320,9 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" local procedure FlushPayload() var - ADLSESetup: Record "ADLSE Setup"; ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; ADLSEExecution: Codeunit "ADLSE Execution"; CustomDimensions: Dictionary of [Text, Text]; - BlockID: Text; begin if Payload.Length() = 0 then exit; @@ -382,7 +378,6 @@ codeunit 82578 "Fabric Communication" implements "ADLS Integrations" local procedure UpdateManifest(BlobPath: Text; Folder: Text; ADLSECdmFormat: Enum "ADLSE CDM Format") var - ADLSESetup: Record "ADLSE Setup"; ADLSECdmUtil: Codeunit "ADLSE CDM Util"; ADLSEGen2Util: Codeunit "ADLSE Gen 2 Util"; ManifestJson: JsonObject;