Skip to content

Commit beabd26

Browse files
committed
only send spans where there are transaction events
1 parent aaf65cc commit beabd26

File tree

2 files changed

+93
-31
lines changed

2 files changed

+93
-31
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

+23-31
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
type MessageUpdate,
88
type PgoutputMessage,
99
} from "@internal/replication";
10-
import { Span, startSpan, trace, type Tracer } from "@internal/tracing";
10+
import { startSpan, trace, type Tracer } from "@internal/tracing";
1111
import { Logger, LogLevel } from "@trigger.dev/core/logger";
1212
import { tryCatch } from "@trigger.dev/core/utils";
1313
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
@@ -23,6 +23,7 @@ interface TransactionEvent<T = any> {
2323
}
2424

2525
interface Transaction<T = any> {
26+
beginStartTimestamp: number;
2627
commitLsn: string | null;
2728
commitEndLsn: string | null;
2829
xid: number;
@@ -70,7 +71,6 @@ export class RunsReplicationService {
7071
private _isShuttingDown = false;
7172
private _isShutDownComplete = false;
7273
private _tracer: Tracer;
73-
private _currentSpan: Span | null = null;
7474
private _currentParseDurationMs: number | null = null;
7575
private _lastAcknowledgedAt: number | null = null;
7676
private _acknowledgeTimeoutMs: number;
@@ -208,17 +208,12 @@ export class RunsReplicationService {
208208
}
209209

210210
this._currentTransaction = {
211+
beginStartTimestamp: Date.now(),
211212
commitLsn: message.commitLsn,
212213
xid: message.xid,
213214
events: [],
214215
};
215216

216-
this._currentSpan = this._tracer.startSpan("handle_transaction", {
217-
attributes: {
218-
"transaction.xid": message.xid,
219-
},
220-
});
221-
222217
this._currentParseDurationMs = Number(parseDuration) / 1_000_000;
223218

224219
break;
@@ -283,11 +278,6 @@ export class RunsReplicationService {
283278
if (this._currentParseDurationMs) {
284279
this._currentParseDurationMs =
285280
this._currentParseDurationMs + Number(parseDuration) / 1_000_000;
286-
287-
this._currentSpan?.setAttribute(
288-
"transaction.parse_duration_ms",
289-
this._currentParseDurationMs
290-
);
291281
}
292282

293283
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
@@ -303,6 +293,11 @@ export class RunsReplicationService {
303293
this.#handleTransaction(transaction);
304294
break;
305295
}
296+
default: {
297+
this.logger.debug("Unknown message tag", {
298+
pgMessage: message,
299+
});
300+
}
306301
}
307302
}
308303

@@ -315,19 +310,8 @@ export class RunsReplicationService {
315310
});
316311
}
317312

318-
this._currentSpan?.setAttribute("transaction.replication_lag_ms", transaction.replicationLagMs);
319-
this._currentSpan?.setAttribute("transaction.xid", transaction.xid);
320-
321-
if (transaction.commitEndLsn) {
322-
this._currentSpan?.setAttribute("transaction.commit_end_lsn", transaction.commitEndLsn);
323-
}
324-
325-
this._currentSpan?.setAttribute("transaction.events", transaction.events.length);
326-
327313
// If there are no events, do nothing
328314
if (transaction.events.length === 0) {
329-
this._currentSpan?.end();
330-
331315
return;
332316
}
333317

@@ -336,8 +320,6 @@ export class RunsReplicationService {
336320
transaction,
337321
});
338322

339-
this._currentSpan?.end();
340-
341323
return;
342324
}
343325

@@ -350,10 +332,7 @@ export class RunsReplicationService {
350332
// If there are events, we need to handle them
351333
const _version = lsnToUInt64(transaction.commitEndLsn);
352334

353-
this._currentSpan?.setAttribute(
354-
"transaction.lsn_to_uint64_ms",
355-
Number(process.hrtime.bigint() - lsnToUInt64Start) / 1_000_000
356-
);
335+
const lsnToUInt64DurationMs = Number(process.hrtime.bigint() - lsnToUInt64Start) / 1_000_000;
357336

358337
this._concurrentFlushScheduler.addToBatch(
359338
transaction.events.map((event) => ({
@@ -363,7 +342,20 @@ export class RunsReplicationService {
363342
}))
364343
);
365344

366-
this._currentSpan?.end();
345+
const currentSpan = this._tracer.startSpan("handle_transaction", {
346+
attributes: {
347+
"transaction.xid": transaction.xid,
348+
"transaction.replication_lag_ms": transaction.replicationLagMs,
349+
"transaction.events": transaction.events.length,
350+
"transaction.commit_end_lsn": transaction.commitEndLsn,
351+
"transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined,
352+
"transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs,
353+
"transaction.version": _version.toString(),
354+
},
355+
startTime: transaction.beginStartTimestamp,
356+
});
357+
358+
currentSpan.end();
367359
}
368360

369361
async #acknowledgeLatestTransaction() {

apps/webapp/test/runsReplicationService.test.ts

+70
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,76 @@ describe("RunsReplicationService", () => {
130130
}
131131
);
132132

133+
containerTest(
134+
"should not produce any handle_transaction spans when no TaskRun events are produced",
135+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
136+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
137+
138+
const clickhouse = new ClickHouse({
139+
url: clickhouseContainer.getConnectionUrl(),
140+
name: "runs-replication",
141+
});
142+
143+
const { tracer, exporter } = createInMemoryTracing();
144+
145+
const runsReplicationService = new RunsReplicationService({
146+
clickhouse,
147+
pgConnectionUrl: postgresContainer.getConnectionUri(),
148+
serviceName: "runs-replication",
149+
slotName: "task_runs_to_clickhouse_v1",
150+
publicationName: "task_runs_to_clickhouse_v1_publication",
151+
redisOptions,
152+
maxFlushConcurrency: 1,
153+
flushIntervalMs: 100,
154+
flushBatchSize: 1,
155+
leaderLockTimeoutMs: 5000,
156+
leaderLockExtendIntervalMs: 1000,
157+
ackIntervalSeconds: 5,
158+
tracer,
159+
});
160+
161+
await runsReplicationService.start();
162+
163+
const organization = await prisma.organization.create({
164+
data: {
165+
title: "test",
166+
slug: "test",
167+
},
168+
});
169+
170+
const project = await prisma.project.create({
171+
data: {
172+
name: "test",
173+
slug: "test",
174+
organizationId: organization.id,
175+
externalRef: "test",
176+
},
177+
});
178+
179+
await prisma.runtimeEnvironment.create({
180+
data: {
181+
slug: "test",
182+
type: "DEVELOPMENT",
183+
projectId: project.id,
184+
organizationId: organization.id,
185+
apiKey: "test",
186+
pkApiKey: "test",
187+
shortcode: "test",
188+
},
189+
});
190+
191+
await setTimeout(1000);
192+
193+
const spans = exporter.getFinishedSpans();
194+
195+
const handleTransactionSpans = spans.filter((span) => span.name === "handle_transaction");
196+
197+
expect(handleTransactionSpans.length).toBe(0);
198+
199+
await runsReplicationService.stop();
200+
}
201+
);
202+
133203
containerTest(
134204
"should replicate a new TaskRun to ClickHouse using batching insert strategy",
135205
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

0 commit comments

Comments
 (0)