Skip to content

Commit

Permalink
🪟 Apply default sync mode logic to new streams (airbytehq#18451)
Browse files Browse the repository at this point in the history
* brute force solution

* cleanup

* testing and comment

* review cleanup
  • Loading branch information
teallarson authored and jhammarstedt committed Oct 31, 2022
1 parent 87d22f6 commit 7ecf317
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SyncSchema, SyncSchemaStream } from "core/domain/catalog";
import { DestinationSyncMode, SyncMode } from "core/request/AirbyteClient";
import { DestinationSyncMode, StreamDescriptor, SyncMode } from "core/request/AirbyteClient";

import calculateInitialCatalog from "./calculateInitialCatalog";

Expand All @@ -11,6 +11,7 @@ const mockSyncSchemaStream: SyncSchemaStream = {
sourceDefinedPrimaryKey: [["new_primary_key"]],
jsonSchema: {},
name: "test",
namespace: "namespace-test",
supportedSyncModes: [],
},
config: {
Expand Down Expand Up @@ -535,4 +536,60 @@ describe("calculateInitialCatalog", () => {
// cursor field
expect(calculatedStreams[0].config?.cursorField).toEqual(config?.cursorField);
});

it("should calculate optimal sync mode if stream is new", () => {
const { stream: sourceDefinedStream, config } = mockSyncSchemaStream;

const newStreamDescriptors: StreamDescriptor[] = [{ name: "test", namespace: "namespace-test" }];

const { streams: calculatedStreams } = calculateInitialCatalog(
{
streams: [
{
id: "1",
stream: {
...sourceDefinedStream,
name: "test",
namespace: "namespace-test",
sourceDefinedCursor: true,
defaultCursorField: ["id"],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.overwrite,
syncMode: SyncMode.incremental,
},
},
{
id: "1",
stream: {
...sourceDefinedStream,
name: "test2",
namespace: "namespace-test",
sourceDefinedCursor: true,
defaultCursorField: ["id"],
supportedSyncModes: [SyncMode.incremental],
},
config: {
...config,
destinationSyncMode: DestinationSyncMode.overwrite,
syncMode: SyncMode.incremental,
},
},
],
},
[DestinationSyncMode.append_dedup],
true,
newStreamDescriptors
);

// new stream has its sync mode calculated
expect(calculatedStreams[0].config?.syncMode).toEqual(SyncMode.incremental);
expect(calculatedStreams[0].config?.destinationSyncMode).toEqual(DestinationSyncMode.append_dedup);

// existing stream remains as-is
expect(calculatedStreams[1].config?.syncMode).toEqual(SyncMode.incremental);
expect(calculatedStreams[1].config?.destinationSyncMode).toEqual(DestinationSyncMode.overwrite);
});
});
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { SyncSchema, SyncSchemaStream } from "core/domain/catalog";
import { DestinationSyncMode, SyncMode, AirbyteStreamConfiguration } from "core/request/AirbyteClient";
import {
DestinationSyncMode,
SyncMode,
AirbyteStreamConfiguration,
StreamDescriptor,
} from "core/request/AirbyteClient";

const getDefaultCursorField = (streamNode: SyncSchemaStream): string[] => {
if (streamNode.stream?.defaultCursorField?.length) {
Expand Down Expand Up @@ -119,18 +124,24 @@ const getOptimalSyncMode = (
const calculateInitialCatalog = (
schema: SyncSchema,
supportedDestinationSyncModes: DestinationSyncMode[],
isNotCreateMode?: boolean
): SyncSchema => ({
streams: schema.streams.map<SyncSchemaStream>((apiNode, id) => {
const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() };
const nodeStream = verifySourceDefinedProperties(verifySupportedSyncModes(nodeWithId), isNotCreateMode || false);

if (isNotCreateMode) {
return nodeStream;
}

return getOptimalSyncMode(verifyConfigCursorField(nodeStream), supportedDestinationSyncModes);
}),
});
isNotCreateMode?: boolean,
newStreamDescriptors?: StreamDescriptor[]
): SyncSchema => {
return {
streams: schema.streams.map<SyncSchemaStream>((apiNode, id) => {
const nodeWithId: SyncSchemaStream = { ...apiNode, id: id.toString() };
const nodeStream = verifySourceDefinedProperties(verifySupportedSyncModes(nodeWithId), isNotCreateMode || false);

// if the stream is new since a refresh, we want to verify cursor and get optimal sync modes
const matches = newStreamDescriptors?.some(
(streamId) => streamId.name === nodeStream?.stream?.name && streamId.namespace === nodeStream.stream?.namespace
);
if (isNotCreateMode && !matches) {
return nodeStream;
}
return getOptimalSyncMode(verifyConfigCursorField(nodeStream), supportedDestinationSyncModes);
}),
};
};

export default calculateInitialCatalog;
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,21 @@ export const useInitialValues = (
destDefinition: DestinationDefinitionSpecificationRead,
isNotCreateMode?: boolean
): FormikConnectionFormValues => {
const { catalogDiff } = connection;

const newStreamDescriptors = catalogDiff?.transforms
.filter((transform) => transform.transformType === "add_stream")
.map((stream) => stream.streamDescriptor);

const initialSchema = useMemo(
() =>
calculateInitialCatalog(
connection.syncCatalog,
destDefinition?.supportedDestinationSyncModes || [],
isNotCreateMode
isNotCreateMode,
newStreamDescriptors
),
[connection.syncCatalog, destDefinition, isNotCreateMode]
[connection.syncCatalog, destDefinition?.supportedDestinationSyncModes, isNotCreateMode, newStreamDescriptors]
);

return useMemo(() => {
Expand Down

0 comments on commit 7ecf317

Please sign in to comment.