Skip to content

Commit a00ba7a

Browse files
committed
Change PipelineTransform to PipelineTransformationEntrypoint class implementation
1 parent e60522f commit a00ba7a

File tree

5 files changed

+77
-47
lines changed

5 files changed

+77
-47
lines changed

src/cloudflare/internal/pipeline-transform.ts

+44-26
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable @typescript-eslint/no-non-null-assertion */
12
// Copyright (c) 2024 Cloudflare, Inc.
23
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
34
// https://opensource.org/licenses/Apache-2.0
@@ -57,26 +58,39 @@ enum Format {
5758
JSON_STREAM = 'json_stream', // jsonl
5859
}
5960

60-
export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {
61+
type PipelineBatchMetadata = {
62+
pipelineId: string;
63+
pipelineName: string;
64+
};
65+
66+
type PipelineRecord = Record<string, unknown>;
67+
68+
export class PipelineTransformImpl<
69+
I extends PipelineRecord,
70+
O extends PipelineRecord,
71+
> extends entrypoints.WorkerEntrypoint {
6172
#batch?: Batch;
6273
#initalized: boolean = false;
6374

64-
// stub overridden on the sub class
75+
// stub overridden on the subclass
6576
// eslint-disable-next-line @typescript-eslint/require-await
66-
public async transformJson(_data: object[]): Promise<object[]> {
77+
public async run(
78+
_records: I[],
79+
_metadata: PipelineBatchMetadata
80+
): Promise<O[]> {
6781
throw new Error('should be implemented by parent');
6882
}
6983

70-
// called by the dispatcher which then calls the subclass methods
84+
// called by the dispatcher to validate that run is properly implemented by the subclass
7185
// @ts-expect-error thinks ping is never used
7286
private _ping(): Promise<void> {
7387
// making sure the function was overridden by an implementing subclass
74-
if (this.transformJson !== PipelineTransformImpl.prototype.transformJson) {
88+
if (this.run !== PipelineTransformImpl.prototype.run) {
7589
return Promise.resolve();
7690
} else {
7791
return Promise.reject(
7892
new Error(
79-
'the transformJson method must be overridden by the PipelineTransform subclass'
93+
'the run method must be overridden by the PipelineTransformationEntrypoint subclass'
8094
)
8195
);
8296
}
@@ -85,55 +99,59 @@ export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {
8599
// called by the dispatcher which then calls the subclass methods
86100
// the reason this is typescript private and not javascript private is that this must be
87101
// able to be called by the dispatcher but should not be called by the class implementer
88-
// @ts-expect-error _transform is called by rpc
89-
private async _transform(batch: Batch): Promise<JsonStream> {
102+
// @ts-expect-error _run is called by rpc
103+
private async _run(
104+
batch: Batch,
105+
metadata: PipelineBatchMetadata
106+
): Promise<JsonStream> {
90107
if (this.#initalized) {
91108
throw new Error('pipeline entrypoint has already been initialized');
92109
}
93110

94111
this.#batch = batch;
95112
this.#initalized = true;
96113

97-
switch (this.#batch.format) {
98-
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
99-
case Format.JSON_STREAM: {
100-
const data = await this.#readJsonStream();
101-
const transformed = await this.transformJson(data);
102-
return this.#sendJson(transformed);
103-
}
104-
default:
105-
throw new Error('unsupported batch format');
114+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
115+
if (this.#batch.format === Format.JSON_STREAM) {
116+
const records: I[] = await this.#readJsonStream();
117+
const transformed = await this.run(records, metadata);
118+
return this.#sendJson(transformed);
119+
} else {
120+
throw new Error(
121+
'PipelineTransformationEntrypoint run supports only the JSON_STREAM batch format'
122+
);
106123
}
107124
}
108125

109-
async #readJsonStream(): Promise<object[]> {
126+
async #readJsonStream(): Promise<I[]> {
110127
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
111128
if (this.#batch!.format !== Format.JSON_STREAM) {
129+
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
112130
throw new Error(`expected JSON_STREAM not ${this.#batch!.format}`);
113131
}
114132

115133
const batch = this.#batch!.data as ReadableStream<Uint8Array>;
116134
const decoder = batch.pipeThrough(new TextDecoderStream());
117135

118-
const data: object[] = [];
136+
const data: I[] = [];
119137
for await (const line of readLines(decoder)) {
120-
data.push(JSON.parse(line) as object);
138+
data.push(JSON.parse(line) as I);
121139
}
122140

123141
return data;
124142
}
125143

126-
#sendJson(data: object[]): JsonStream {
127-
if (!(data instanceof Array)) {
128-
throw new Error('transformJson must return an array of objects');
144+
#sendJson(records: O[]): JsonStream {
145+
if (!(records instanceof Array)) {
146+
throw new Error('transformations must return an array of PipelineRecord');
129147
}
130148

131149
let written = 0;
132150
const encoder = new TextEncoder();
133151
const readable = new ReadableStream<Uint8Array>({
134152
start(controller): void {
135-
for (const obj of data) {
136-
const encoded = encoder.encode(`${JSON.stringify(obj)}\n`);
153+
for (const record of records) {
154+
const encoded = encoder.encode(`${JSON.stringify(record)}\n`);
137155
written += encoded.length;
138156
controller.enqueue(encoded);
139157
}
@@ -149,7 +167,7 @@ export class PipelineTransformImpl extends entrypoints.WorkerEntrypoint {
149167
format: Format.JSON_STREAM,
150168
size: {
151169
bytes: written,
152-
rows: data.length,
170+
rows: records.length,
153171
},
154172
data: readable,
155173
};

src/cloudflare/internal/test/pipeline-transform/transform-test.js

+13-10
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@
44
// https://opensource.org/licenses/Apache-2.0
55

66
import assert from 'node:assert';
7-
import { PipelineTransform } from 'cloudflare:pipeline-transform';
7+
import { PipelineTransformationEntrypoint } from 'cloudflare:pipeline-transform';
88

99
// this is how "Pipeline" would be implemented by the user
10-
const customTransform = class MyEntrypoint extends PipelineTransform {
10+
const customTransform = class MyEntrypoint extends PipelineTransformationEntrypoint {
1111
/**
1212
* @param {any} batch
1313
* @override
1414
*/
15-
async transformJson(batch) {
16-
for (const obj of batch) {
17-
obj.dispatcher = 'was here!';
15+
async run(records, _) {
16+
for (const record of records) {
17+
record.dispatcher = 'was here!';
1818
await new Promise((resolve) => setTimeout(resolve, 50));
19-
obj.wait = 'happened!';
19+
record.wait = 'happened!';
2020
}
2121

22-
return batch;
22+
return records;
2323
}
2424
};
2525

@@ -52,11 +52,11 @@ export const tests = {
5252
async test(ctr, env, ctx) {
5353
{
5454
// should fail dispatcher test call when PipelineTransform class not extended
55-
const transformer = new PipelineTransform(ctx, env);
55+
const transformer = new PipelineTransformationEntrypoint(ctx, env);
5656
await assert.rejects(transformer._ping(), (err) => {
5757
assert.strictEqual(
5858
err.message,
59-
'the transformJson method must be overridden by the PipelineTransform subclass'
59+
'the run method must be overridden by the PipelineTransformationEntrypoint subclass'
6060
);
6161
return true;
6262
});
@@ -73,7 +73,10 @@ export const tests = {
7373
const transformer = new customTransform(ctx, env);
7474
const batch = newBatch();
7575

76-
const result = await transformer._transform(batch);
76+
const result = await transformer._run(batch, {
77+
id: 'abc',
78+
name: 'mypipeline',
79+
});
7780
assert.equal(true, result.data instanceof ReadableStream);
7881

7982
const reader = result.data

src/cloudflare/pipeline-transform.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
import { PipelineTransformImpl } from 'cloudflare-internal:pipeline-transform';
22

3-
export const PipelineTransform = PipelineTransformImpl;
3+
export const PipelineTransformationEntrypoint = PipelineTransformImpl;

types/defines/pipeline-transform.d.ts

+13-5
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,20 @@
22
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
33
// https://opensource.org/licenses/Apache-2.0
44

5-
export abstract class PipelineTransform {
5+
export type PipelineRecord = Record<string, unknown>
6+
7+
export type PipelineBatchMetadata = {
8+
pipelineId: string;
9+
pipelineName: string;
10+
};
11+
12+
export abstract class PipelineTransformationEntrypoint<I extends PipelineRecord, O extends PipelineRecord> {
613
/**
7-
* transformJson recieves an array of javascript objects which can be
14+
* run recieves an array of PipelineRecord which can be
815
* mutated and returned to the pipeline
9-
* @param data The data to be mutated
10-
* @returns A promise containing the mutated data
16+
* @param records Incoming records from the pipeline to be transformed
17+
* @param metadata Information about the specific pipeline calling the transformation entrypoint
18+
* @returns A promise containing the transformed PipelineRecord array
1119
*/
12-
public transformJson(data: object[]): Promise<object[]>;
20+
public run(records: I[], metadata: PipelineBatchMetadata): Promise<O[]>;
1321
}

types/defines/pipelines.d.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
33
// https://opensource.org/licenses/Apache-2.0
44

5-
export interface Pipeline {
5+
import { PipelineRecord } from "./pipeline-transform";
6+
7+
export interface Pipeline<T extends PipelineRecord> {
68
/**
7-
* send takes an array of javascript objects which are
8-
* then received by the pipeline for processing
9+
* The Pipeline interface represents the type of a binding to a Pipeline
910
*
10-
* @param data The data to be sent
11+
* @param records The records to send to the pipeline
1112
*/
12-
send(data: object[]): Promise<void>
13+
send(records: T[]): Promise<void>
1314
}

0 commit comments

Comments
 (0)