Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add auto-generated plugins to import data to Splitgraph from all supported data sources (Airbyte, Singer Taps, etc), and add export plugin to export from Splitgraph to Seafowl via Splitgraph GraphQL API #20

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b3f0c64
Copy relevant parts of SplitgraphImportCSVPlugin to base-import-plugi…
milesrichardson May 10, 2023
5ab75f8
Migrate `SplitgraphImportCSVPlugin` to extend base `SplitgraphImportP…
milesrichardson May 11, 2023
2111f28
Return instance of `Object.getPrototypeOf(this)` instead of storing `…
milesrichardson May 11, 2023
5b4ea36
Reduce plugin API surface by marking functions as `protected`
milesrichardson May 11, 2023
368f599
Upgrade `json-schema-to-typescript` to latest
milesrichardson May 12, 2023
3bb19ae
Name generated interfaces of plugin types PluginName + SchemaName
milesrichardson May 12, 2023
74dfe2a
Upgrade graphql-codegen packages to latest
milesrichardson May 15, 2023
57b0720
Make `PluginName` a generic type parameter
milesrichardson May 15, 2023
c525096
Add factory that returns class implementing ImportPlugin interface fo…
milesrichardson May 15, 2023
f52749b
Create `AirbyteGitHubImportPlugin` as proof-of-concept of auto-genera…
milesrichardson May 15, 2023
78af9cd
Add notes to CONTRIBUTING.md about how to use VSCode debugger with tests
milesrichardson May 15, 2023
3258aa1
Add integration test that ingests Seafowl GitHub repo with airbyte-gi…
milesrichardson May 16, 2023
0b99458
Auto-generate a PluginClass for every plugin in a `plugin.ts` file fo…
milesrichardson May 16, 2023
c06ea72
Add generic parameter `PluginName` to `ExportPlugin<PluginName>`
milesrichardson May 17, 2023
7909ebd
Refactor (readability): move public methods to top of `SplitgraphImpo…
milesrichardson May 17, 2023
fcdc15b
Refactor `ExportQueryPlugin` to inherit from new `SplitgraphExportPlu…
milesrichardson May 17, 2023
d1bd3b6
Add `SplitgraphExportToSeafowlPlugin` for exporting from Splitgraph t…
milesrichardson May 17, 2023
c2e2970
Add basic integration test for exporting query from Splitgraph to sel…
milesrichardson May 17, 2023
ae998c6
Refactor plugin names for consistency and accuracy
milesrichardson May 17, 2023
83a089d
Implement pollDeferredTask (WIP)
milesrichardson May 22, 2023
709d546
Make entire return type of pollDeferredTask generic, not just respons…
milesrichardson May 22, 2023
8ed0030
Cleanup/simplify types
milesrichardson May 22, 2023
8674498
Add test for completed deferred export task
milesrichardson May 22, 2023
ef315b2
Implement deferrable import tasks (covers all import plugins)
milesrichardson May 22, 2023
e9dc906
Implement deferrable `export-to-seafowl` plugin
milesrichardson May 22, 2023
4df66e0
Temporarily hardcode the two plugins used for the import/export demo,…
milesrichardson Jun 9, 2023
d16cb38
Fix test that was failing because Splitgraph export job completed too…
milesrichardson Jun 9, 2023
7ce363a
Update `export-to-seafowl` to conform to new API where exported table…
milesrichardson Jun 9, 2023
b19c0cc
Fix bug in `fingerprintQuery` when `window.crypto` is undefined
milesrichardson Jun 29, 2023
2212612
Remove stray debugger statement and fix return value of deferred expo…
milesrichardson Jun 29, 2023
3ec594a
Update `useSql` hook to accept query building function and abort signal
milesrichardson Jun 29, 2023
2ea2ae8
Fix tests (sort of: this accounts for hardcoded plugin initialization…
milesrichardson Jun 29, 2023
9496035
Bump version to `0.0.12` and prep for publish to `canary` tag
milesrichardson Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement deferrable import tasks (covers all import plugins)
Allow starting a task, getting a taskId, and not waiting for it,
so that later can check whether it's completed (point of this is
so a Vercel server function can do the checking and the polling
can be triggered from the client).
  • Loading branch information
milesrichardson committed May 22, 2023
commit ef315b22f74e541ad263f981075b9e661307b721
10 changes: 8 additions & 2 deletions packages/base-db/base-db.ts
Original file line number Diff line number Diff line change
@@ -29,8 +29,14 @@ export interface ImportPlugin<
__name: PluginName;
importData: (
sourceOptions: ConcreteSourceOptions,
destOptions: ConcreteDestOptions
) => Promise<{ response: any | null; error: any | null; info?: any | null }>;
destOptions: ConcreteDestOptions,
importOptions?: { defer: boolean }
) => Promise<{
taskId?: string | null;
response: any | null;
error: any | null;
info?: any | null;
}>;
}

// interface ImportPluginWithOptions extends ImportPlugin {
79 changes: 79 additions & 0 deletions packages/db-splitgraph/db-splitgraph.test.ts
Original file line number Diff line number Diff line change
@@ -634,6 +634,7 @@ describe("importData for SplitgraphImportCSVPlugin", () => {

// TODO: Make a mocked version of this test
describe.skipIf(shouldSkipIntegrationTests())("real export query", () => {
// NOTE: test assumes that the task hasn't completed by the time we send the first check
it("deferred exports basic postgres query to parquet returns a taskId", async () => {
const db = createRealDb();
const {
@@ -895,6 +896,84 @@ describe.skipIf(shouldSkipIntegrationTests())("real DDN", () => {
expect(info?.jobStatus.status).toEqual("SUCCESS");

expect(info?.jobLog?.url.includes(info.jobStatus.taskId)).toBe(true);

// PIGGYBACK on this test to also test pollDeferredTask (just like with export)
// We wouldn't normally do this since we didn't defer the task and have already
// awaited it, but since we know it's complete we can conveniently check the
// test case of pollDeferredTask returning a completed task
const shouldBeCompletedTask = await db.pollDeferredTask("csv", {
// This is the hacky part, note that this didn't come from the return value
taskId: info.jobStatus.taskId,
namespace,
repository: "dunno",
});

expect(shouldBeCompletedTask.completed).toBe(true);
expect(shouldBeCompletedTask.error).toBeNull();
expect(shouldBeCompletedTask.info?.jobStatus).not.toBeNull();
expect(shouldBeCompletedTask.info?.jobLog).not.toBeNull();

expect(shouldBeCompletedTask.response).not.toBeNull();
expect(typeof shouldBeCompletedTask.response?.jobLog?.url).toEqual(
"string"
);
expect(shouldBeCompletedTask.response?.jobStatus?.status).toEqual(
"SUCCESS"
);
expect(shouldBeCompletedTask.response?.jobStatus?.taskId).toEqual(
info.jobStatus.taskId
);
expect(shouldBeCompletedTask.response?.jobStatus?.isManual).toEqual(true);
expect(typeof shouldBeCompletedTask.response?.jobStatus?.finished).toEqual(
"string"
);
expect(typeof shouldBeCompletedTask.response?.jobStatus?.started).toEqual(
"string"
);
}, 20_000);

// NOTE: test assumes that the task hasn't completed by the time we send the first check
it("upload starts a deferred task", async () => {
const db = createRealDb();
const { username: namespace } = await fetchToken(db);

const { response, info, taskId } = await db.importData(
"csv",
{ data: Buffer.from(`name;candies\r\nBob;5\r\nAlice;10`) },
{
tableName: `irrelevant-${randSuffix()}`,
namespace,
repository: "dunno",
tableParams: {
delimiter: ";",
},
},
{ defer: true }
);

expect(typeof taskId).toBe("string");
expect(taskId?.length).toEqual(36);

expect(taskId).toBeDefined();

expect(response).toBeDefined();
expect(info).toBeDefined();

const startedTask = await db.pollDeferredTask("csv", {
taskId: taskId as string,
namespace,
repository: "dunno",
});

expect(startedTask.completed).toBe(false);
expect(startedTask.error).toBeNull();
expect(startedTask.info).toBeNull();
expect(startedTask.response?.jobStatus?.status).toBe("STARTED");
expect(startedTask.response?.jobStatus?.finished).toBeNull();
expect(startedTask.response?.jobStatus?.taskId).toEqual(taskId);
expect(typeof startedTask.response?.jobStatus?.started).toEqual("string");
expect(startedTask.response?.jobStatus?.finished).toBeNull();
expect(startedTask.response?.jobStatus?.isManual).toEqual(true);
}, 20_000);
});
describe("makeFakeJwt and claimsFromJwt", () => {
38 changes: 20 additions & 18 deletions packages/db-splitgraph/db-splitgraph.ts
Original file line number Diff line number Diff line change
@@ -357,7 +357,7 @@ export class DbSplitgraph<SplitgraphPluginList extends PluginList>
// TODO: type error in ...rest
// this.plugins.callFunction(pluginName, "importData", ...rest);

const [sourceOpts, destOpts] = rest;
const [sourceOpts, destOpts, importOpts] = rest;

const plugin = this.plugins
.selectMatchingPlugins(
@@ -382,31 +382,33 @@ export class DbSplitgraph<SplitgraphPluginList extends PluginList>
throw new Error("plugin does not implement withOptions");
}

return await plugin
.withOptions({
...this.pluginConfig,
...plugin,
transformRequestHeaders: (headers: HeadersInit) =>
(
(plugin as PluginWithTransformRequestHeadersOption<typeof plugin>)
.transformRequestHeaders ?? IdentityFunc
)(this.pluginConfig.transformRequestHeaders(headers)),
})
.importData(sourceOpts, destOpts);
const instantiatedPlugin = plugin.withOptions({
...this.pluginConfig,
...plugin,
transformRequestHeaders: (headers: HeadersInit) =>
(
(plugin as PluginWithTransformRequestHeadersOption<typeof plugin>)
.transformRequestHeaders ?? IdentityFunc
)(this.pluginConfig.transformRequestHeaders(headers)),
});

return importOpts
? await instantiatedPlugin.importData(sourceOpts, destOpts, importOpts)
: await instantiatedPlugin.importData(sourceOpts, destOpts);
}

async pollDeferredTask<
PluginName extends ExtractPlugin<
SplitgraphPluginList,
DeferredTaskPlugin<string, any>
>["__name"],
MatchingPlugin extends ExtractPlugin<
SplitgraphPluginList,
// discriminate on __name to avoid including return type of every plugin with pollDeferredTask
{ __name: PluginName } & DeferredTaskPlugin<string, any>
>,
DeferredTaskResponse extends Awaited<
ReturnType<
ExtractPlugin<
SplitgraphPluginList,
DeferredTaskPlugin<string, any>
>["pollDeferredTask"]
>
ReturnType<MatchingPlugin["pollDeferredTask"]>
>
>(
pluginName: PluginName,
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import type {
StartExternalRepositoryLoadMutation,
StartExternalRepositoryLoadMutationVariables,
} from "./splitgraph-base-import-plugin.generated";
import type { DeferredTaskPlugin } from "@madatdata/base-db/base-db";

export type SplitgraphDestOptions = {
namespace: string;
@@ -49,6 +50,25 @@ type ProvidedExternalLoadMutationVariables = Pick<
Omit<StartExternalRepositoryLoadMutationVariables, "tables" | "pluginName">
>;

// We query for multple nods and then filter client-side for the node matching namespace and repository
type JobStatusNode = Exclude<
RepositoryIngestionJobStatusQuery["repositoryIngestionJobStatus"],
null
>["nodes"][number];

export type DeferredSplitgraphImportTask = {
completed: boolean;
response: {
jobStatus: JobStatusNode | null;
jobLog?: { url: string };
} | null;
error: "no response" | "failed status" | null | any;
info: {
jobStatus: { status: number; headers: any } | null;
jobLog?: { status: number; headers: any } | null;
} | null;
};

export abstract class SplitgraphImportPlugin<
PluginName extends string,
/** The "params" schema for the plugin, i.e. provided by auto-generated type */
@@ -64,13 +84,20 @@ export abstract class SplitgraphImportPlugin<
PluginTableParamsSchema,
PluginCredentialsSchema,
DerivedSplitgraphImportPlugin,
StartedImportJob,
CompletedImportJob,
ConcreteImportDestOptions,
ConcreteImportSourceOptions
>,
StartedImportJob extends object,
CompletedImportJob extends Awaited<
ReturnType<ImportPlugin<PluginName>["importData"]>
>,
ConcreteImportDestOptions extends SplitgraphDestOptions = SplitgraphDestOptions,
ConcreteImportSourceOptions extends object = Record<string, never>
> implements
ImportPlugin<PluginName>,
DeferredTaskPlugin<PluginName, DeferredSplitgraphImportTask>,
WithOptionsInterface<DerivedSplitgraphImportPlugin>
{
public abstract readonly __name: PluginName;
@@ -130,7 +157,8 @@ export abstract class SplitgraphImportPlugin<

public async importData(
rawSourceOptions: ConcreteImportSourceOptions,
rawDestOptions: ConcreteImportDestOptions
rawDestOptions: ConcreteImportDestOptions,
importOptions?: { defer: boolean }
) {
const {
sourceOptions = rawSourceOptions,
@@ -146,6 +174,7 @@ export abstract class SplitgraphImportPlugin<

if (loadError || !loadResponse) {
return {
...(importOptions?.defer ? { taskId: null } : {}),
response: null,
error: loadError,
info: { ...importCtx.info, ...loadInfo },
@@ -154,6 +183,15 @@ export abstract class SplitgraphImportPlugin<

const { taskId } = loadResponse.startExternalRepositoryLoad;

if (importOptions?.defer) {
return {
taskId,
response: loadResponse,
error: loadError ?? null,
info: { ...importCtx.info, ...loadInfo },
};
}

const { response: statusResponse, error: statusError } =
await this.waitForTask(taskId, destOptions);

@@ -396,6 +434,55 @@ export abstract class SplitgraphImportPlugin<
};
}

public async pollDeferredTask({
taskId,
namespace,
repository,
}: {
taskId: string;
namespace: string;
repository: string;
}): Promise<DeferredSplitgraphImportTask> {
try {
const taskStatus = await this.waitForTaskOnce(taskId, {
namespace,
repository,
});

return {
completed: true,
...taskStatus,
};
} catch (err) {
if (
typeof err === "object" &&
err !== null &&
"type" in err &&
(err as { type: "retry"; response: JobStatusNode }).type === "retry"
) {
return {
completed: false,
error: null, // it's just a retry, so we don't include error
response: {
jobStatus:
"response" in err
? (err as { response: JobStatusNode }).response
: null,
},
info: null,
};
} else {
// We got an unknown/unexpected error (basically, caught something that was not retry)
return {
completed: true,
error: err,
response: null,
info: null,
};
}
}
}

@Retryable({
...retryOptions,
doRetry: ({ type }) => type === "retry",
@@ -406,6 +493,16 @@ export abstract class SplitgraphImportPlugin<
namespace,
repository,
}: Pick<ConcreteImportDestOptions, "namespace" | "repository">
) {
return await this.waitForTaskOnce(taskId, { namespace, repository });
}

private async waitForTaskOnce(
taskId: string,
{
namespace,
repository,
}: Pick<ConcreteImportDestOptions, "namespace" | "repository">
) {
const {
response: jobStatusResponse,
@@ -426,7 +523,7 @@ export abstract class SplitgraphImportPlugin<
throw { type: "retry" };
// FIXME(codegen): this shouldn't be nullable
} else if (taskUnresolved(jobStatusResponse.status!)) {
throw { type: "retry" };
throw { type: "retry", response: jobStatusResponse };
}

const {
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import type { ImportPlugin, WithOptionsInterface } from "@madatdata/base-db";

import { SplitgraphImportPlugin } from "./splitgraph-base-import-plugin";
import {
DeferredSplitgraphImportTask,
SplitgraphImportPlugin,
} from "./splitgraph-base-import-plugin";
import type { SplitgraphDestOptions } from "./splitgraph-base-import-plugin";
import type { SplitgraphImportPluginOptions } from "./splitgraph-base-import-plugin";

import type { ExternalTableColumnInput } from "../../gql-client/unified-types";
import type { DeferredTaskPlugin } from "@madatdata/base-db/base-db";

export interface BaseGeneratedImportSourceOptions<
PluginParamsSchema extends object
@@ -74,6 +78,8 @@ export function makeGeneratedImportPlugin<
TableParamsSchema,
CredentialsSchema,
SplitgraphGeneratedImportPlugin,
Record<string, unknown>,
Awaited<ReturnType<ImportPlugin<PluginName>["importData"]>>,
ConcreteImportDestOptions,
ConcreteImportSourceOptions
>
@@ -83,6 +89,7 @@ export function makeGeneratedImportPlugin<
ConcreteImportSourceOptions,
ConcreteImportDestOptions
>,
DeferredTaskPlugin<PluginName, DeferredSplitgraphImportTask>,
WithOptionsInterface<SplitgraphGeneratedImportPlugin>
{
public readonly __name = pluginName;
Loading