Skip to content

Commit

Permalink
🪟 🎉 Clear user-defined primary key(s) and cursor field(s) if the fiel…
Browse files Browse the repository at this point in the history
…d is removed (#20203)

* first pass

* remove typecasting, no longer needed

* cleanup existing tests

* cleanup the cleanup

* testing, review response

* more cleanup

* cleanup

* cleanup with edmundo, fix tests

* clean up looping

* fix ze build!

* cleanup return types when getting streamsWithBreakingChanges

* cleanup

* Update airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts

Co-authored-by: Krishna (kc) Glick <krishna@airbyte.io>

* Update airbyte-webapp/src/views/Connection/ConnectionForm/calculateInitialCatalog.ts

Co-authored-by: Krishna (kc) Glick <krishna@airbyte.io>

Co-authored-by: Krishna (kc) Glick <krishna@airbyte.io>
  • Loading branch information
teallarson and krishnaglick authored Dec 12, 2022
1 parent bec8ef2 commit 21e7290
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { SyncSchema, SyncSchemaStream } from "core/domain/catalog";
import { DestinationSyncMode, StreamDescriptor, SyncMode } from "core/request/AirbyteClient";
import {
DestinationSyncMode,
FieldTransformTransformType,
StreamDescriptor,
StreamTransformTransformType,
SyncMode,
} from "core/request/AirbyteClient";

import calculateInitialCatalog from "./calculateInitialCatalog";

Expand Down Expand Up @@ -33,6 +39,7 @@ describe("calculateInitialCatalog", () => {
streams: [restProps],
} as unknown as SyncSchema,
[],
[],
false
);

Expand All @@ -58,6 +65,7 @@ describe("calculateInitialCatalog", () => {
],
} as unknown as SyncSchema,
[],
[],
false
);

Expand Down Expand Up @@ -107,6 +115,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup, DestinationSyncMode.overwrite],
[],
false
);

Expand Down Expand Up @@ -182,6 +191,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
false
);

Expand Down Expand Up @@ -239,6 +249,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.overwrite],
[],
false
);

Expand Down Expand Up @@ -296,6 +307,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append],
[],
false
);

Expand Down Expand Up @@ -353,6 +365,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append],
[],
false
);

Expand Down Expand Up @@ -384,6 +397,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
true
);

Expand Down Expand Up @@ -444,6 +458,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
false
);

Expand Down Expand Up @@ -473,6 +488,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
true
);
// primary keys
Expand Down Expand Up @@ -509,6 +525,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
false
);

Expand Down Expand Up @@ -536,6 +553,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
false
);

Expand Down Expand Up @@ -563,6 +581,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
false
);

Expand Down Expand Up @@ -590,6 +609,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
false
);
// primary keys
Expand Down Expand Up @@ -642,6 +662,7 @@ describe("calculateInitialCatalog", () => {
],
},
[DestinationSyncMode.append_dedup],
[],
true,
newStreamDescriptors
);
Expand All @@ -654,4 +675,196 @@ describe("calculateInitialCatalog", () => {
expect(calculatedStreams[1].config?.syncMode).toEqual(SyncMode.incremental);
expect(calculatedStreams[1].config?.destinationSyncMode).toEqual(DestinationSyncMode.overwrite);
});

it("should remove the entire primary key if any path from it was removed", () => {
const { config, stream: sourceDefinedStream } = mockSyncSchemaStream;
const values = calculateInitialCatalog(
{
streams: [
{
id: "1",
stream: {
...sourceDefinedStream,
name: "test",
sourceDefinedCursor: true,
defaultCursorField: ["id"],
sourceDefinedPrimaryKey: [],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.append_dedup,
syncMode: SyncMode.incremental,
primaryKey: [["id"], ["email"]],
},
},
{
id: "2",
stream: {
...sourceDefinedStream,
name: "test-2",
sourceDefinedCursor: true,
defaultCursorField: ["updated_at"],
sourceDefinedPrimaryKey: [],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.append_dedup,
syncMode: SyncMode.incremental,
primaryKey: [["id"]],
},
},
],
},
[DestinationSyncMode.append_dedup],
[
{
transformType: StreamTransformTransformType.update_stream,
streamDescriptor: { name: "test", namespace: "namespace-test" },
updateStream: [
{
breaking: true,
transformType: FieldTransformTransformType.remove_field,
fieldName: ["id"],
},
],
},
],
true
);
expect(values.streams[0].config?.primaryKey).toEqual([]); // was entirely cleared
expect(values.streams[1].config?.primaryKey).toEqual([["id"]]); // was not affected
});

it("should remove cursor from config if the old cursor field was removed, even if there is a default", () => {
const { config, stream: sourceDefinedStream } = mockSyncSchemaStream;
const values = calculateInitialCatalog(
{
streams: [
{
id: "1",
stream: {
...sourceDefinedStream,
name: "test",
sourceDefinedCursor: false,
defaultCursorField: ["id"],
sourceDefinedPrimaryKey: [],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.append_dedup,
syncMode: SyncMode.incremental,
cursorField: ["updated_at"],
},
},
{
id: "2",
stream: {
...sourceDefinedStream,
name: "test-2",
sourceDefinedCursor: true,
defaultCursorField: ["updated_at"],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.append_dedup,
syncMode: SyncMode.incremental,
cursorField: ["updated_at"],
primaryKey: [["id"]],
},
},
],
},
[DestinationSyncMode.append_dedup],
[
{
transformType: StreamTransformTransformType.update_stream,
streamDescriptor: { name: "test", namespace: "namespace-test" },
updateStream: [
{
breaking: true,
transformType: FieldTransformTransformType.remove_field,
fieldName: ["updated_at"],
},
],
},
],
true
);
expect(values.streams[0].config?.cursorField).toEqual([]); // was entirely cleared and not replaced with default
expect(values.streams[1].config?.cursorField).toEqual(["updated_at"]); // was unaffected
});
it("should clear multiple config fields if multiple fields were removed", () => {
const { config, stream: sourceDefinedStream } = mockSyncSchemaStream;
const values = calculateInitialCatalog(
{
streams: [
{
id: "1",
stream: {
...sourceDefinedStream,
name: "test",
sourceDefinedCursor: false,
defaultCursorField: ["id"],
sourceDefinedPrimaryKey: [],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.append_dedup,
syncMode: SyncMode.incremental,
cursorField: ["updated_at"],
primaryKey: [["primary_key"], ["another_field"]],
},
},
{
id: "2",
stream: {
...sourceDefinedStream,
name: "test-2",
sourceDefinedCursor: true,
defaultCursorField: ["updated_at"],
sourceDefinedPrimaryKey: [],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.append_dedup,
syncMode: SyncMode.incremental,
cursorField: ["updated_at"],
primaryKey: [["id"]],
},
},
],
},
[DestinationSyncMode.append_dedup],
[
{
transformType: StreamTransformTransformType.update_stream,
streamDescriptor: { name: "test", namespace: "namespace-test" },
updateStream: [
{
breaking: true,
transformType: FieldTransformTransformType.remove_field,
fieldName: ["updated_at"],
},
{
breaking: true,
transformType: FieldTransformTransformType.remove_field,
fieldName: ["primary_key"],
},
],
},
],
true
);
expect(values.streams[0].config?.primaryKey).toEqual([]); // was entirely cleared and not replaced with default
expect(values.streams[0].config?.cursorField).toEqual([]); // was entirely cleared and not replaced with default

expect(values.streams[1].config?.primaryKey).toEqual([["id"]]); // was unaffected})
expect(values.streams[1].config?.cursorField).toEqual(["updated_at"]); // was unaffected})
});
});
Loading

0 comments on commit 21e7290

Please sign in to comment.