From 21e7290eb6cd0a328e4f11b26bb5e29c6168080d Mon Sep 17 00:00:00 2001 From: Teal Larson Date: Mon, 12 Dec 2022 08:39:52 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=9F=20=F0=9F=8E=89=20Clear=20user-defi?= =?UTF-8?q?ned=20primary=20key(s)=20and=20cursor=20field(s)=20if=20the=20f?= =?UTF-8?q?ield=20is=20removed=20(#20203)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * first pass * remove typecasting, no longer needed * cleanup existing tests * cleanup the cleanup * testing, review response * more cleanup * cleanup * cleanup with edmundo, fix tests * clean up looping * fix ze build! * cleanup return types when getting streamsWithBreakingChanges * cleanup * Update airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts Co-authored-by: Krishna (kc) Glick * Update airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts Co-authored-by: Krishna (kc) Glick Co-authored-by: Krishna (kc) Glick --- .../calculateInitialCatalog.test.ts | 215 +++++++++++++++++- .../ConnectionForm/calculateInitialCatalog.ts | 73 +++++- .../Connection/ConnectionForm/formConfig.tsx | 24 +- 3 files changed, 305 insertions(+), 7 deletions(-) diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts index daaacdc0afc2..c60751213823 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.test.ts @@ -1,5 +1,11 @@ import { SyncSchema, SyncSchemaStream } from "core/domain/catalog"; -import { DestinationSyncMode, StreamDescriptor, SyncMode } from "core/request/AirbyteClient"; +import { + DestinationSyncMode, + FieldTransformTransformType, + StreamDescriptor, + StreamTransformTransformType, + SyncMode, +} from "core/request/AirbyteClient"; import calculateInitialCatalog from "./calculateInitialCatalog"; @@ -33,6 +39,7 @@ describe("calculateInitialCatalog", () => { streams: [restProps], } as unknown as SyncSchema, [], + [], false ); @@ -58,6 +65,7 @@ describe("calculateInitialCatalog", () => { ], } as unknown as SyncSchema, [], + [], false ); @@ -107,6 +115,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup, DestinationSyncMode.overwrite], + [], false ); @@ -182,6 +191,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], false ); @@ -239,6 +249,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.overwrite], + [], false ); @@ -296,6 +307,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append], + [], false ); @@ -353,6 +365,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append], + [], false ); @@ -384,6 +397,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], true ); @@ -444,6 +458,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], false ); @@ -473,6 +488,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], true ); // primary keys @@ -509,6 +525,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], false ); @@ -536,6 +553,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], false ); @@ -563,6 +581,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], false ); @@ -590,6 +609,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], false ); // primary keys @@ -642,6 +662,7 @@ describe("calculateInitialCatalog", () => { ], }, [DestinationSyncMode.append_dedup], + [], true, newStreamDescriptors ); @@ -654,4 +675,196 @@ describe("calculateInitialCatalog", () => { expect(calculatedStreams[1].config?.syncMode).toEqual(SyncMode.incremental); expect(calculatedStreams[1].config?.destinationSyncMode).toEqual(DestinationSyncMode.overwrite); }); + + it("should remove the entire primary key if any path from it was removed", () => { + const { config, stream: sourceDefinedStream } = mockSyncSchemaStream; + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...sourceDefinedStream, + name: "test", + sourceDefinedCursor: true, + defaultCursorField: ["id"], + sourceDefinedPrimaryKey: [], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + primaryKey: [["id"], ["email"]], + }, + }, + { + id: "2", + stream: { + ...sourceDefinedStream, + name: "test-2", + sourceDefinedCursor: true, + defaultCursorField: ["updated_at"], + sourceDefinedPrimaryKey: [], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + primaryKey: [["id"]], + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + [ + { + transformType: StreamTransformTransformType.update_stream, + streamDescriptor: { name: "test", namespace: "namespace-test" }, + updateStream: [ + { + breaking: true, + transformType: FieldTransformTransformType.remove_field, + fieldName: ["id"], + }, + ], + }, + ], + true + ); + expect(values.streams[0].config?.primaryKey).toEqual([]); // was entirely cleared + expect(values.streams[1].config?.primaryKey).toEqual([["id"]]); // was not affected + }); + + it("should remove cursor from config if the old cursor field was removed, even if there is a default", () => { + const { config, stream: sourceDefinedStream } = mockSyncSchemaStream; + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...sourceDefinedStream, + name: "test", + sourceDefinedCursor: false, + defaultCursorField: ["id"], + sourceDefinedPrimaryKey: [], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + cursorField: ["updated_at"], + }, + }, + { + id: "2", + stream: { + ...sourceDefinedStream, + name: "test-2", + sourceDefinedCursor: true, + defaultCursorField: ["updated_at"], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + cursorField: ["updated_at"], + primaryKey: [["id"]], + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + [ + { + transformType: StreamTransformTransformType.update_stream, + streamDescriptor: { name: "test", namespace: "namespace-test" }, + updateStream: [ + { + breaking: true, + transformType: FieldTransformTransformType.remove_field, + fieldName: ["updated_at"], + }, + ], + }, + ], + true + ); + expect(values.streams[0].config?.cursorField).toEqual([]); // was entirely cleared and not replaced with default + expect(values.streams[1].config?.cursorField).toEqual(["updated_at"]); // was unaffected + }); + it("should clear multiple config fields if multiple fields were removed", () => { + const { config, stream: sourceDefinedStream } = mockSyncSchemaStream; + const values = calculateInitialCatalog( + { + streams: [ + { + id: "1", + stream: { + ...sourceDefinedStream, + name: "test", + sourceDefinedCursor: false, + defaultCursorField: ["id"], + sourceDefinedPrimaryKey: [], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + cursorField: ["updated_at"], + primaryKey: [["primary_key"], ["another_field"]], + }, + }, + { + id: "2", + stream: { + ...sourceDefinedStream, + name: "test-2", + sourceDefinedCursor: true, + defaultCursorField: ["updated_at"], + sourceDefinedPrimaryKey: [], + supportedSyncModes: [SyncMode.incremental], + }, + config: { + ...config, + destinationSyncMode: DestinationSyncMode.append_dedup, + syncMode: SyncMode.incremental, + cursorField: ["updated_at"], + primaryKey: [["id"]], + }, + }, + ], + }, + [DestinationSyncMode.append_dedup], + [ + { + transformType: StreamTransformTransformType.update_stream, + streamDescriptor: { name: "test", namespace: "namespace-test" }, + updateStream: [ + { + breaking: true, + transformType: FieldTransformTransformType.remove_field, + fieldName: ["updated_at"], + }, + { + breaking: true, + transformType: FieldTransformTransformType.remove_field, + fieldName: ["primary_key"], + }, + ], + }, + ], + true + ); + expect(values.streams[0].config?.primaryKey).toEqual([]); // was entirely cleared and not replaced with default + expect(values.streams[0].config?.cursorField).toEqual([]); // was entirely cleared and not replaced with default + + expect(values.streams[1].config?.primaryKey).toEqual([["id"]]); // was unaffected}) + expect(values.streams[1].config?.cursorField).toEqual(["updated_at"]); // was unaffected}) + }); }); diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts index a9b6dc54b937..f0db54cc38e9 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts @@ -1,9 +1,12 @@ +import isEqual from "lodash/isEqual"; + import { SyncSchema, SyncSchemaStream } from "core/domain/catalog"; import { DestinationSyncMode, SyncMode, AirbyteStreamConfiguration, StreamDescriptor, + StreamTransform, } from "core/request/AirbyteClient"; const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => { @@ -13,6 +16,49 @@ const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => { return streamNode.config?.cursorField || []; }; +const clearBreakingFieldChanges = (nodeStream: SyncSchemaStream, breakingChangesByStream: StreamTransform[]) => { + if (!breakingChangesByStream.length || !nodeStream.config) { + return nodeStream; + } + + let clearPrimaryKey = false; + let clearCursorField = false; + + for (const streamTransformation of breakingChangesByStream) { + // get all of the removed field paths for this transformation + const removedFieldPaths = streamTransformation.updateStream?.map((update) => update.fieldName); + + if (!removedFieldPaths?.length) { + continue; + } + + // if there is a primary key in the config, and any of its field paths were removed, we'll be clearing it + if ( + !!nodeStream.config?.primaryKey?.length && + nodeStream.config?.primaryKey?.some((key) => removedFieldPaths.some((removedPath) => isEqual(key, removedPath))) + ) { + clearPrimaryKey = true; + } + + // if there is a cursor field, and any of its field path was removed, we'll be clearing it + if ( + !!nodeStream.config?.cursorField?.length && + removedFieldPaths.some((removedPath) => isEqual(removedPath, nodeStream?.config?.cursorField)) + ) { + clearCursorField = true; + } + } + + return { + ...nodeStream, + config: { + ...nodeStream.config, + primaryKey: clearPrimaryKey ? [] : nodeStream.config.primaryKey, + cursorField: clearCursorField ? [] : nodeStream.config.cursorField, + }, + }; +}; + const verifySourceDefinedProperties = (streamNode: SyncSchemaStream, isEditMode: boolean) => { if (!streamNode.stream || !streamNode.config || !isEditMode) { return streamNode; @@ -125,6 +171,7 @@ const getOptimalSyncMode = ( const calculateInitialCatalog = ( schema: SyncSchema, supportedDestinationSyncModes: DestinationSyncMode[], + streamsWithBreakingFieldChanges?: StreamTransform[], isNotCreateMode?: boolean, newStreamDescriptors?: StreamDescriptor[] ): SyncSchema => { @@ -133,13 +180,29 @@ const calculateInitialCatalog = ( const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() }; const nodeStream = verifySourceDefinedProperties(verifySupportedSyncModes(nodeWithId), isNotCreateMode || false); - // if the stream is new since a refresh, we want to verify cursor and get optimal sync modes - const matches = newStreamDescriptors?.some( - (streamId) => streamId.name === nodeStream?.stream?.name && streamId.namespace === nodeStream.stream?.namespace + // if the stream is new since a refresh, verify cursor and get optimal sync modes + const isStreamNew = newStreamDescriptors?.some( + (streamIdFromDiff) => + streamIdFromDiff.name === nodeStream.stream?.name && + streamIdFromDiff.namespace === nodeStream.stream?.namespace ); - if (isNotCreateMode && !matches) { - return nodeStream; + + // narrow down the breaking field changes from this connection to only those relevant to this stream + const breakingChangesByStream = + streamsWithBreakingFieldChanges && streamsWithBreakingFieldChanges.length > 0 + ? streamsWithBreakingFieldChanges.filter((streamTransformFromDiff) => { + return ( + streamTransformFromDiff.streamDescriptor.name === nodeStream.stream?.name && + streamTransformFromDiff.streamDescriptor.namespace === nodeStream.stream?.namespace + ); + }) + : []; + + // if we're in edit or readonly mode and the stream is not new, check for breaking changes then return + if (isNotCreateMode && !isStreamNew) { + return clearBreakingFieldChanges(nodeStream, breakingChangesByStream); } + return getOptimalSyncMode(verifyConfigCursorField(nodeStream), supportedDestinationSyncModes); }), }; diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx index b47150dd6e24..cd9b46016fe7 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/formConfig.tsx @@ -23,6 +23,7 @@ import { OperationCreate, OperationRead, OperatorType, + SchemaChange, SyncMode, WebBackendConnectionRead, } from "core/request/AirbyteClient"; @@ -280,19 +281,40 @@ export const useInitialValues = ( const workspace = useCurrentWorkspace(); const { catalogDiff } = connection; + // used to determine if we should calculate optimal sync mode const newStreamDescriptors = catalogDiff?.transforms .filter((transform) => transform.transformType === "add_stream") .map((stream) => stream.streamDescriptor); + // used to determine if we need to clear any primary keys or cursor fields that were removed + const streamTransformsWithBreakingChange = useMemo(() => { + if (connection.schemaChange === SchemaChange.breaking) { + return catalogDiff?.transforms.filter((streamTransform) => { + if (streamTransform.transformType === "update_stream") { + return streamTransform.updateStream?.filter((fieldTransform) => fieldTransform.breaking === true); + } + return false; + }); + } + return undefined; + }, [catalogDiff?.transforms, connection]); + const initialSchema = useMemo( () => calculateInitialCatalog( connection.syncCatalog, destDefinition?.supportedDestinationSyncModes || [], + streamTransformsWithBreakingChange, isNotCreateMode, newStreamDescriptors ), - [connection.syncCatalog, destDefinition?.supportedDestinationSyncModes, isNotCreateMode, newStreamDescriptors] + [ + streamTransformsWithBreakingChange, + connection.syncCatalog, + destDefinition?.supportedDestinationSyncModes, + isNotCreateMode, + newStreamDescriptors, + ] ); return useMemo(() => {