@@ -428,6 +428,114 @@ describe("RunsReplicationService", () => {
428
428
}
429
429
) ;
430
430
431
+ containerTest (
432
+ "should insert the payload even if it's very large into ClickHouse when a TaskRun is created" ,
433
+ async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
434
+ await prisma . $executeRawUnsafe ( `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` ) ;
435
+
436
+ const clickhouse = new ClickHouse ( {
437
+ url : clickhouseContainer . getConnectionUrl ( ) ,
438
+ name : "runs-replication-payload" ,
439
+ } ) ;
440
+
441
+ const runsReplicationService = new RunsReplicationService ( {
442
+ clickhouse,
443
+ pgConnectionUrl : postgresContainer . getConnectionUri ( ) ,
444
+ serviceName : "runs-replication-payload" ,
445
+ slotName : "task_runs_to_clickhouse_v1" ,
446
+ publicationName : "task_runs_to_clickhouse_v1_publication" ,
447
+ redisOptions,
448
+ maxFlushConcurrency : 1 ,
449
+ flushIntervalMs : 100 ,
450
+ flushBatchSize : 1 ,
451
+ insertStrategy : "batching" ,
452
+ leaderLockTimeoutMs : 5000 ,
453
+ leaderLockExtendIntervalMs : 1000 ,
454
+ ackIntervalSeconds : 5 ,
455
+ } ) ;
456
+
457
+ await runsReplicationService . start ( "batching" ) ;
458
+
459
+ const organization = await prisma . organization . create ( {
460
+ data : {
461
+ title : "test-payload" ,
462
+ slug : "test-payload" ,
463
+ } ,
464
+ } ) ;
465
+
466
+ const project = await prisma . project . create ( {
467
+ data : {
468
+ name : "test-payload" ,
469
+ slug : "test-payload" ,
470
+ organizationId : organization . id ,
471
+ externalRef : "test-payload" ,
472
+ } ,
473
+ } ) ;
474
+
475
+ const runtimeEnvironment = await prisma . runtimeEnvironment . create ( {
476
+ data : {
477
+ slug : "test-payload" ,
478
+ type : "DEVELOPMENT" ,
479
+ projectId : project . id ,
480
+ organizationId : organization . id ,
481
+ apiKey : "test-payload" ,
482
+ pkApiKey : "test-payload" ,
483
+ shortcode : "test-payload" ,
484
+ } ,
485
+ } ) ;
486
+
487
+ // Insert a row into the table with a unique payload
488
+ const largePayload = {
489
+ foo : Array . from ( { length : 100 } , ( ) => "foo" ) . join ( "" ) ,
490
+ bar : Array . from ( { length : 100 } , ( ) => "bar" ) . join ( "" ) ,
491
+ baz : Array . from ( { length : 100 } , ( ) => "baz" ) . join ( "" ) ,
492
+ } ;
493
+
494
+ const taskRun = await prisma . taskRun . create ( {
495
+ data : {
496
+ friendlyId : `run_payload_${ Date . now ( ) } ` ,
497
+ taskIdentifier : "my-task-payload" ,
498
+ payload : JSON . stringify ( largePayload ) ,
499
+ payloadType : "application/json" ,
500
+ traceId : "payload-1234" ,
501
+ spanId : "payload-1234" ,
502
+ queue : "test-payload" ,
503
+ runtimeEnvironmentId : runtimeEnvironment . id ,
504
+ projectId : project . id ,
505
+ organizationId : organization . id ,
506
+ environmentType : "DEVELOPMENT" ,
507
+ engine : "V2" ,
508
+ } ,
509
+ } ) ;
510
+
511
+ // Wait for replication
512
+ await setTimeout ( 1000 ) ;
513
+
514
+ // Query ClickHouse for the replicated payload
515
+ const queryPayloads = clickhouse . reader . query ( {
516
+ name : "runs-replication-payload" ,
517
+ query : "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}" ,
518
+ schema : z . any ( ) ,
519
+ params : z . object ( { run_id : z . string ( ) } ) ,
520
+ } ) ;
521
+
522
+ const [ queryError , result ] = await queryPayloads ( { run_id : taskRun . id } ) ;
523
+
524
+ expect ( queryError ) . toBeNull ( ) ;
525
+ expect ( result ?. length ) . toBe ( 1 ) ;
526
+ expect ( result ?. [ 0 ] ) . toEqual (
527
+ expect . objectContaining ( {
528
+ run_id : taskRun . id ,
529
+ payload : expect . objectContaining ( {
530
+ data : largePayload ,
531
+ } ) ,
532
+ } )
533
+ ) ;
534
+
535
+ await runsReplicationService . stop ( ) ;
536
+ }
537
+ ) ;
538
+
431
539
containerTest (
432
540
"should replicate updates to an existing TaskRun to ClickHouse" ,
433
541
async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
0 commit comments