From 7ecf317338d596d6e1e38c2473d2fac4097c1eb7 Mon Sep 17 00:00:00 2001 From: Teal Larson Date: Thu, 27 Oct 2022 10:11:04 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=9F=20Apply=20default=20sync=20mode=20?= =?UTF-8?q?logic=20to=20new=20streams=20(#18451)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * brute force solution * cleanup * testing and comment * review cleanup --- .../calculateInitialCatalog.test.ts | 59 ++++++++++++++++++- .../ConnectionForm/calculateInitialCatalog.ts | 39 +++++++----- .../Connection/ConnectionForm/formConfig.tsx | 11 +++- 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts index f9323574f5ea..a4d7b5929c89 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts @@ -1,5 +1,5 @@ import { SyncSchema, SyncSchemaStream } from "core/domain/catalog"; -import { DestinationSyncMode, SyncMode } from "core/request/AirbyteClient"; +import { DestinationSyncMode, StreamDescriptor, SyncMode } from "core/request/AirbyteClient"; import calculateInitialCatalog from "./calculateInitialCatalog"; @@ -11,6 +11,7 @@ const mockSyncSchemaStream: SyncSchemaStream = { sourceDefinedPrimaryKey: [["new_primary_key"]], jsonSchema: {}, name: "test", + namespace: "namespace-test", supportedSyncModes: [], }, config: { @@ -535,4 +536,60 @@ describe("calculateInitialCatalog", () => { // cursor field expect(calculatedStreams[0].config?.cursorField).toEqual(config?.cursorField); }); + + it("should calculate optimal sync mode if stream is new", () => { + const { stream: sourceDefinedStream, config } = mockSyncSchemaStream; + + const newStreamDescriptors: StreamDescriptor[] = [{ name: "test", namespace: "namespace-test" }]; + + const { streams: calculatedStreams } = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...sourceDefinedStream, + name: "test", + namespace: "namespace-test", + sourceDefinedCursor: true, + defaultCursorField: ["id"], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + syncMode: SyncMode.incremental, + }, + }, + { + id: "1", + stream: { + ...sourceDefinedStream, + name: "test2", + namespace: "namespace-test", + sourceDefinedCursor: true, + defaultCursorField: ["id"], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.overwrite, + syncMode: SyncMode.incremental, + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + true, + newStreamDescriptors + ); + + // new stream has its sync mode calculated + expect(calculatedStreams[0].config?.syncMode).toEqual(SyncMode.incremental); + expect(calculatedStreams[0].config?.destinationSyncMode).toEqual(DestinationSyncMode.append_dedup); + + // existing stream remains as-is + expect(calculatedStreams[1].config?.syncMode).toEqual(SyncMode.incremental); + expect(calculatedStreams[1].config?.destinationSyncMode).toEqual(DestinationSyncMode.overwrite); + }); }); diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts index dcc2876025d9..983c83eebc22 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts @@ -1,5 +1,10 @@ import { SyncSchema, SyncSchemaStream } from "core/domain/catalog"; -import { DestinationSyncMode, SyncMode, AirbyteStreamConfiguration } from "core/request/AirbyteClient"; +import { + DestinationSyncMode, + SyncMode, + AirbyteStreamConfiguration, + StreamDescriptor, +} from "core/request/AirbyteClient"; const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => { if (streamNode.stream?.defaultCursorField?.length) { @@ -119,18 +124,24 @@ const getOptimalSyncMode = ( const calculateInitialCatalog = ( schema: SyncSchema, supportedDestinationSyncModes: DestinationSyncMode[], - isNotCreateMode?: boolean -): SyncSchema => ({ - streams: schema.streams.map((apiNode, id) => { - const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() }; - const nodeStream = verifySourceDefinedProperties(verifySupportedSyncModes(nodeWithId), isNotCreateMode || false); - - if (isNotCreateMode) { - return nodeStream; - } - - return getOptimalSyncMode(verifyConfigCursorField(nodeStream), supportedDestinationSyncModes); - }), -}); + isNotCreateMode?: boolean, + newStreamDescriptors?: StreamDescriptor[] +): SyncSchema => { + return { + streams: schema.streams.map((apiNode, id) => { + const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() }; + const nodeStream = verifySourceDefinedProperties(verifySupportedSyncModes(nodeWithId), isNotCreateMode || false); + + // if the stream is new since a refresh, we want to verify cursor and get optimal sync modes + const matches = newStreamDescriptors?.some( + (streamId) => streamId.name === nodeStream?.stream?.name && streamId.namespace === nodeStream.stream?.namespace + ); + if (isNotCreateMode && !matches) { + return nodeStream; + } + return getOptimalSyncMode(verifyConfigCursorField(nodeStream), supportedDestinationSyncModes); + }), + }; +}; export default calculateInitialCatalog; diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx index 996988201b41..8c0ef5da8cb5 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx @@ -252,14 +252,21 @@ export const useInitialValues = ( destDefinition: DestinationDefinitionSpecificationRead, isNotCreateMode?: boolean ): FormikConnectionFormValues => { + const { catalogDiff } = connection; + + const newStreamDescriptors = catalogDiff?.transforms + .filter((transform) => transform.transformType === "add_stream") + .map((stream) => stream.streamDescriptor); + const initialSchema = useMemo( () => calculateInitialCatalog( connection.syncCatalog, destDefinition?.supportedDestinationSyncModes || [], - isNotCreateMode + isNotCreateMode, + newStreamDescriptors ), - [connection.syncCatalog, destDefinition, isNotCreateMode] + [connection.syncCatalog, destDefinition?.supportedDestinationSyncModes, isNotCreateMode, newStreamDescriptors] ); return useMemo(() => {