Skip to content

Commit fcdc15b

Browse files
Refactor ExportQueryPlugin to inherit from new SplitgraphExportPlugin base class
Adopt the same pattern used for import plugin inheritance, so that we can create a plugin for exporting to Seafowl (as opposed to exporting to a parquet/csv file, like the current plugin), while re-using most of the code, particularly regarding waiting for jobs.
1 parent 7909ebd commit fcdc15b

File tree

4 files changed

+214
-162
lines changed

4 files changed

+214
-162
lines changed

packages/core/splitgraph.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ describe("makeSplitgraphHTTPContext", () => {
310310
},
311311
"transformRequestHeaders": [Function],
312312
},
313-
_ExportQueryPlugin {
313+
ExportQueryPlugin {
314314
"__name": "exportQuery",
315315
"graphqlClient": SplitgraphGraphQLClient {
316316
"graphqlClient": GraphQLClient {
@@ -353,7 +353,7 @@ describe("makeSplitgraphHTTPContext", () => {
353353
},
354354
"transformRequestHeaders": [Function],
355355
},
356-
_ExportQueryPlugin {
356+
ExportQueryPlugin {
357357
"__name": "exportQuery",
358358
"graphqlClient": SplitgraphGraphQLClient {
359359
"graphqlClient": GraphQLClient {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import type { ExportPlugin, WithOptionsInterface } from "@madatdata/base-db";
2+
import { SplitgraphGraphQLClient } from "../../gql-client/splitgraph-graphql-client";
3+
import { Retryable, BackOffPolicy } from "typescript-retry-decorator";
4+
5+
import { gql } from "graphql-request";
6+
import type {
7+
ExportJobStatusQuery,
8+
ExportJobStatusQueryVariables,
9+
} from "./base-export-plugin.generated";
10+
11+
// from unified spec
12+
// type _ExportJobStatus = "status" | "finished" | "output" | "started";
13+
14+
interface SplitgraphExportPluginOptions {
15+
graphqlEndpoint: string;
16+
transformRequestHeaders?: (requestHeaders: HeadersInit) => HeadersInit;
17+
}
18+
type DbInjectedOptions = Partial<SplitgraphExportPluginOptions>;
19+
20+
// 1 hour
21+
const MAX_POLL_TIMEOUT = 1_000 * 60 * 60;
22+
// const MAX_ATTEMPTS = MAX_POLL_TIMEOUT - (25.5 * 1000) / 10000;
23+
const MAX_BACKOFF_INTERVAL = 10_000;
24+
const MAX_ATTEMPTS = Math.ceil(
25+
(MAX_POLL_TIMEOUT - 25.5 * 1_000) / MAX_BACKOFF_INTERVAL
26+
);
27+
const retryOptions = {
28+
maxAttempts: MAX_ATTEMPTS,
29+
backOff: 500,
30+
backOffPolicy: BackOffPolicy.ExponentialBackOffPolicy,
31+
exponentialOption: { maxInterval: MAX_BACKOFF_INTERVAL, multiplier: 2 },
32+
};
33+
export abstract class SplitgraphExportPlugin<
34+
PluginName extends string,
35+
/** Concrete type of the derived class, for annotating return value of builder methods like withOptions */
36+
DerivedSplitgraphExportPlugin extends SplitgraphExportPlugin<
37+
PluginName,
38+
DerivedSplitgraphExportPlugin,
39+
ConcreteExportSourceOptions,
40+
ConcreteExportDestOptions,
41+
StartedExportJob,
42+
CompletedExportJob
43+
>,
44+
ConcreteExportSourceOptions extends object,
45+
ConcreteExportDestOptions extends object,
46+
StartedExportJob extends object,
47+
CompletedExportJob extends Awaited<
48+
ReturnType<ExportPlugin<PluginName>["exportData"]>
49+
>
50+
> implements
51+
ExportPlugin<PluginName>,
52+
WithOptionsInterface<DerivedSplitgraphExportPlugin>
53+
{
54+
public abstract readonly __name: PluginName;
55+
56+
private readonly opts: SplitgraphExportPluginOptions;
57+
public readonly graphqlEndpoint: SplitgraphExportPluginOptions["graphqlEndpoint"];
58+
public readonly graphqlClient: SplitgraphGraphQLClient;
59+
public readonly transformRequestHeaders: Required<SplitgraphExportPluginOptions>["transformRequestHeaders"];
60+
61+
// TODO: make sense? will be overridden?
62+
// public static readonly __name: PluginName;
63+
64+
constructor(opts: SplitgraphExportPluginOptions) {
65+
this.opts = opts;
66+
67+
this.graphqlEndpoint = opts.graphqlEndpoint;
68+
this.transformRequestHeaders = opts.transformRequestHeaders ?? IdentityFunc;
69+
70+
this.graphqlClient = new SplitgraphGraphQLClient({
71+
graphqlEndpoint: this.graphqlEndpoint,
72+
transformRequestHeaders: this.transformRequestHeaders,
73+
});
74+
}
75+
76+
// TODO: DRY with other plugins
77+
withOptions(injectOpts: DbInjectedOptions): DerivedSplitgraphExportPlugin {
78+
const mergedInjectOpts: SplitgraphExportPluginOptions = {
79+
...this.opts,
80+
...injectOpts,
81+
transformRequestHeaders: (reqHeaders) => {
82+
const withOriginal = {
83+
...reqHeaders,
84+
...this.opts.transformRequestHeaders?.(reqHeaders),
85+
};
86+
87+
const withNext = {
88+
...withOriginal,
89+
...injectOpts.transformRequestHeaders?.(withOriginal),
90+
};
91+
92+
return {
93+
...withOriginal,
94+
...withNext,
95+
};
96+
},
97+
};
98+
99+
return new (Object.getPrototypeOf(this).constructor)(mergedInjectOpts);
100+
}
101+
102+
public abstract exportData(
103+
sourceOptions: ConcreteExportSourceOptions,
104+
destOptions: ConcreteExportDestOptions
105+
): Promise<CompletedExportJob>;
106+
107+
protected abstract startExport(
108+
sourceOptions: ConcreteExportSourceOptions,
109+
destOptions: ConcreteExportDestOptions
110+
): Promise<StartedExportJob>;
111+
112+
// TODO: DRY (with at least splitgraph-import-csv-plugin)
113+
@Retryable({
114+
...retryOptions,
115+
doRetry: ({ type }) => type === "retry",
116+
})
117+
protected async waitForTask(taskId: string) {
118+
const {
119+
response: jobStatusResponse,
120+
error: jobStatusError,
121+
info: jobStatusInfo,
122+
} = await this.fetchExportJobStatus(taskId);
123+
124+
if (jobStatusError) {
125+
return {
126+
response: null,
127+
error: jobStatusError,
128+
info: { jobStatus: jobStatusInfo },
129+
};
130+
} else if (!jobStatusResponse) {
131+
throw { type: "retry" };
132+
// FIXME(codegen): this shouldn't be nullable
133+
} else if (taskUnresolved(jobStatusResponse.status as ExportTaskStatus)) {
134+
throw { type: "retry" };
135+
}
136+
137+
return {
138+
response: jobStatusResponse,
139+
error: jobStatusError,
140+
info: jobStatusInfo,
141+
};
142+
}
143+
144+
private async fetchExportJobStatus(taskId: string) {
145+
const { response, error, info } = await this.graphqlClient.send<
146+
ExportJobStatusQuery,
147+
ExportJobStatusQueryVariables
148+
>(
149+
gql`
150+
query ExportJobStatus($taskId: UUID!) {
151+
exportJobStatus(taskId: $taskId) {
152+
status
153+
started
154+
finished
155+
exportFormat
156+
output
157+
}
158+
}
159+
`,
160+
{
161+
taskId: taskId,
162+
}
163+
);
164+
165+
if (error || !response) {
166+
return { response: null, error, info };
167+
}
168+
169+
return {
170+
response: response.exportJobStatus,
171+
error: null,
172+
info,
173+
};
174+
}
175+
}
176+
177+
const IdentityFunc = <T>(x: T) => x;
178+
179+
enum ExportTaskStatus {
180+
// Standard Celery statuses
181+
Pending = "PENDING",
182+
Started = "STARTED",
183+
Success = "SUCCESS",
184+
Failure = "FAILURE",
185+
Revoked = "REVOKED",
186+
187+
// Custom Splitgraph statuses
188+
Lost = "LOST",
189+
TimedOut = "TIMED_OUT",
190+
191+
// Currently unused statuses
192+
Retry = "RETRY",
193+
Received = "RECEIVED",
194+
Rejected = "REJECTED",
195+
Ignored = "IGNORED",
196+
}
197+
198+
const standbyStatuses = [ExportTaskStatus.Pending, ExportTaskStatus.Started];
199+
200+
const taskUnresolved = (ts: ExportTaskStatus) => standbyStatuses.includes(ts);

0 commit comments

Comments
 (0)