Skip to content

Commit

Permalink
pass initial kafka timestamp from Ender -> Vulcan for better e2e orde…
Browse files Browse the repository at this point in the history
…r latency tracking (#1211)
  • Loading branch information
dydxwill authored Mar 20, 2024
1 parent 42cbee6 commit 293bb47
Show file tree
Hide file tree
Showing 30 changed files with 70 additions and 9 deletions.
12 changes: 8 additions & 4 deletions indexer/packages/kafka/__tests__/batch-kafka-producer.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { KafkaTopics } from '../src';
import { BatchKafkaProducer, ProducerMessage } from '../src/batch-kafka-producer';
import { producer } from '../src/producer';
import { IHeaders } from 'kafkajs';
import _ from 'lodash';

interface TestMessage {
key?: string,
value: string,
headers?: IHeaders,
}

function testMessage2ProducerMessage(data: TestMessage): ProducerMessage {
const key: Buffer | undefined = data.key === undefined ? undefined : Buffer.from(data.key);
return { key, value: Buffer.from(data.value) };
return { key, value: Buffer.from(data.value), headers: data.headers };
}

function testMessage2ProducerMessages(data: TestMessage[]): ProducerMessage[] {
Expand All @@ -35,9 +37,9 @@ describe('batch-kafka-producer', () => {
[
'will send key if key is not undefined',
5,
[{ key: '1', value: 'a' }, { key: '2', value: 'b' }, { key: '3', value: 'c' }],
[{ key: '1', value: 'a' }, { key: '2', value: 'b' }, { key: '3', value: 'c', headers: { timestamp: 'value' } }],
[[{ key: '1', value: 'a' }, { key: '2', value: 'b' }]],
[{ key: '3', value: 'c' }],
[{ key: '3', value: 'c', headers: { timestamp: 'value' } }],
],
[
'will not send message until the batch size is reached',
Expand Down Expand Up @@ -104,7 +106,9 @@ describe('batch-kafka-producer', () => {

for (const msg of messages) {
const key: Buffer | undefined = msg.key === undefined ? undefined : Buffer.from(msg.key);
batchProducer.addMessageAndMaybeFlush({ value: Buffer.from(msg.value), key });
batchProducer.addMessageAndMaybeFlush(
{ value: Buffer.from(msg.value), key, headers: msg.headers },
);
}

expect(producerSendMock.mock.calls).toHaveLength(expectedMessagesPerCall.length);
Expand Down
5 changes: 3 additions & 2 deletions indexer/packages/kafka/src/batch-kafka-producer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { logger } from '@dydxprotocol-indexer/base';
import { Producer, RecordMetadata } from 'kafkajs';
import { IHeaders, Producer, RecordMetadata } from 'kafkajs';
import _ from 'lodash';

import { KafkaTopics } from './types';
Expand All @@ -10,6 +10,7 @@ import { KafkaTopics } from './types';
export type ProducerMessage = {
key?: Buffer,
value: Buffer,
headers?: IHeaders,
};

/**
Expand Down Expand Up @@ -52,7 +53,7 @@ export class BatchKafkaProducer {
if (this.currentSize + msgBuffer.byteLength + keyByteLength > this.maxBatchSizeBytes) {
this.sendBatch();
}
this.producerMessages.push({ key: message.key, value: msgBuffer });
this.producerMessages.push({ key: message.key, value: msgBuffer, headers: message.headers });
this.currentSize += msgBuffer.byteLength;
this.currentSize += keyByteLength;
}
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/bazooka/src/vulcan-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import { Long } from '@dydxprotocol-indexer/v4-protos/build/codegen/helpers';
import Big from 'big.js';
import { IHeaders } from 'kafkajs';
import _ from 'lodash';

import config from './config';
Expand All @@ -30,6 +31,7 @@ import { ZERO } from './constants';
interface VulcanMessage {
key: Buffer,
value: OffChainUpdateV1,
headers?: IHeaders,
}

type IndexerOrderIdMap = { [orderUuid: string]: IndexerOrderId };
Expand Down Expand Up @@ -129,6 +131,7 @@ export async function sendStatefulOrderMessages() {
return {
key: message.key,
value: Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(message.value).finish())),
headers: message.headers,
};
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ describe('conditionalOrderTriggeredHandler', () => {
producerSendMock,
orderId: conditionalOrderId,
offchainUpdate: expectedOffchainUpdate,
headers: { message_received_timestamp: kafkaMessage.timestamp, event_type: 'ConditionalOrderTriggered' },
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ describe('statefulOrderPlacementHandler', () => {
producerSendMock,
orderId: defaultOrder.orderId!,
offchainUpdate: expectedOffchainUpdate,
headers: { message_received_timestamp: kafkaMessage.timestamp, event_type: 'StatefulOrderPlacement' },
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import { DydxIndexerSubtypes } from '../../../src/lib/types';
import {
defaultDateTime,
defaultHeight,
defaultOrderId, defaultPreviousHeight, defaultTime, defaultTxHash,
defaultOrderId,
defaultPreviousHeight,
defaultTime,
defaultTxHash,
} from '../../helpers/constants';
import { createKafkaMessageFromStatefulOrderEvent } from '../../helpers/kafka-helpers';
import { updateBlockCache } from '../../../src/caches/block-cache';
Expand Down Expand Up @@ -130,6 +133,7 @@ describe('statefulOrderRemovalHandler', () => {
producerSendMock,
orderId: defaultOrderId,
offchainUpdate: expectedOffchainUpdate,
headers: { message_received_timestamp: kafkaMessage.timestamp, event_type: 'StatefulOrderRemoval' },
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import {
PerpetualMarketCreateEventV1,
DeleveragingEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import { Message, ProducerRecord } from 'kafkajs';
import { IHeaders, Message, ProducerRecord } from 'kafkajs';
import _ from 'lodash';

import {
Expand Down Expand Up @@ -318,10 +318,12 @@ export function expectVulcanKafkaMessage({
producerSendMock,
orderId,
offchainUpdate,
headers,
}: {
producerSendMock: jest.SpyInstance,
orderId: IndexerOrderId,
offchainUpdate: OffChainUpdateV1,
headers?: IHeaders,
}): void {
expect(producerSendMock.mock.calls.length).toBeGreaterThanOrEqual(1);
expect(producerSendMock.mock.calls[0].length).toBeGreaterThanOrEqual(1);
Expand All @@ -339,7 +341,6 @@ export function expectVulcanKafkaMessage({
vulcanProducerRecord.messages,
(message: Message): VulcanMessage => {
expect(Buffer.isBuffer(message.value));

const messageValueBinary: Uint8Array = new Uint8Array(
// Can assume Buffer, since we check above that it is a buffer
message.value as Buffer,
Expand All @@ -348,13 +349,15 @@ export function expectVulcanKafkaMessage({
return {
key: message.key as Buffer,
value: OffChainUpdateV1.decode(messageValueBinary),
headers: message.headers,
};
},
);

expect(vulcanMessages).toContainEqual({
key: getOrderIdHash(orderId),
value: offchainUpdate,
headers,
});
}

Expand Down
2 changes: 2 additions & 0 deletions indexer/services/ender/__tests__/lib/block-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ describe('block-processor', () => {
const blockProcessor: BlockProcessor = new BlockProcessor(
block,
txId,
defaultDateTime.toString(),
);
blockProcessor.batchedHandlers = batchedHandlers;
blockProcessor.syncHandlers = syncHandlers;
Expand Down Expand Up @@ -149,6 +150,7 @@ describe('block-processor', () => {
const blockProcessor: BlockProcessor = new BlockProcessor(
block,
txId,
defaultDateTime.toString(),
);
blockProcessor.batchedHandlers = batchedHandlers;
blockProcessor.syncHandlers = syncHandlers;
Expand Down
7 changes: 7 additions & 0 deletions indexer/services/ender/src/handlers/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
OffChainUpdateV1,
SubaccountId,
} from '@dydxprotocol-indexer/v4-protos';
import { IHeaders } from 'kafkajs';
import { DateTime } from 'luxon';
import * as pg from 'pg';

Expand All @@ -33,6 +34,7 @@ export type HandlerInitializer = new (
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
event: EventMessage,
messageReceivedTimestamp?: string,
) => Handler<EventMessage>;

/**
Expand All @@ -49,20 +51,23 @@ export abstract class Handler<T> {
blockEventIndex: number;
event: T;
abstract eventType: string;
messageReceivedTimestamp?: string;

constructor(
block: IndexerTendermintBlock,
blockEventIndex: number,
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
event: T,
messageReceivedTimestamp?: string,
) {
this.block = block;
this.blockEventIndex = blockEventIndex;
this.indexerTendermintEvent = indexerTendermintEvent;
this.timestamp = DateTime.fromJSDate(block.time!);
this.txId = txId;
this.event = event;
this.messageReceivedTimestamp = messageReceivedTimestamp;
}

/**
Expand Down Expand Up @@ -178,6 +183,7 @@ export abstract class Handler<T> {
protected generateConsolidatedVulcanKafkaEvent(
key: Buffer,
offChainUpdate: OffChainUpdateV1,
headers?: IHeaders,
): ConsolidatedKafkaEvent {
stats.increment(`${config.SERVICE_NAME}.create_vulcan_kafka_event`, 1);

Expand All @@ -186,6 +192,7 @@ export abstract class Handler<T> {
message: {
key,
value: offChainUpdate,
headers,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ export class ConditionalOrderTriggeredHandler extends
this.generateConsolidatedVulcanKafkaEvent(
getOrderIdHash(order.orderId!),
offChainUpdate,
{
message_received_timestamp: this.messageReceivedTimestamp,
event_type: 'ConditionalOrderTriggered',
},
),
];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ export class StatefulOrderPlacementHandler extends
kafakEvents.push(this.generateConsolidatedVulcanKafkaEvent(
getOrderIdHash(order.orderId!),
offChainUpdate,
{
message_received_timestamp: this.messageReceivedTimestamp,
event_type: 'StatefulOrderPlacement',
},
));

return kafakEvents;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export class StatefulOrderRemovalHandler extends
this.generateConsolidatedVulcanKafkaEvent(
getOrderIdHash(orderIdProto),
offChainUpdate,
{
message_received_timestamp: this.messageReceivedTimestamp,
event_type: 'StatefulOrderRemoval',
},
),
];
}
Expand Down
4 changes: 4 additions & 0 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,16 @@ export class BlockProcessor {
txId: number;
batchedHandlers: BatchedHandlers;
syncHandlers: SyncHandlers;
messageReceivedTimestamp: string;

constructor(
block: IndexerTendermintBlock,
txId: number,
messageReceivedTimestamp: string,
) {
this.block = block;
this.txId = txId;
this.messageReceivedTimestamp = messageReceivedTimestamp;
this.sqlBlock = {
...this.block,
events: new Array(this.block.events.length),
Expand Down Expand Up @@ -205,6 +208,7 @@ export class BlockProcessor {
const handlers: Handler<EventMessage>[] = validator.createHandlers(
eventProto.indexerTendermintEvent,
this.txId,
this.messageReceivedTimestamp,
);

_.map(handlers, (handler: Handler<EventMessage>) => {
Expand Down
1 change: 1 addition & 0 deletions indexer/services/ender/src/lib/kafka-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ export class KafkaPublisher {
return {
key: message.key,
value: Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(message.value).finish())),
headers: message.headers,
};
}),
});
Expand Down
1 change: 1 addition & 0 deletions indexer/services/ender/src/lib/on-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
const blockProcessor: BlockProcessor = new BlockProcessor(
indexerTendermintBlock,
txId,
message.timestamp,
);
const kafkaPublisher: KafkaPublisher = await blockProcessor.process();

Expand Down
2 changes: 2 additions & 0 deletions indexer/services/ender/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
DeleveragingEventV1,
TradingRewardsEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import { IHeaders } from 'kafkajs';
import Long from 'long';

// Type sourced from protocol:
Expand Down Expand Up @@ -217,6 +218,7 @@ export interface AnnotatedSubaccountMessage extends SubaccountMessage {
export interface VulcanMessage {
key: Buffer,
value: OffChainUpdateV1,
headers?: IHeaders,
}

export type ConsolidatedKafkaEvent = {
Expand Down
1 change: 1 addition & 0 deletions indexer/services/ender/src/validators/asset-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export class AssetValidator extends Validator<AssetCreateEventV1> {
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
_: string,
): Handler<AssetCreateEventV1>[] {
const handler: Handler<AssetCreateEventV1> = new AssetCreationHandler(
this.block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export class DeleveragingValidator extends Validator<DeleveragingEventV1> {
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
_: string,
): Handler<DeleveragingEventV1>[] {
return [
new DeleveragingHandler(
Expand Down
1 change: 1 addition & 0 deletions indexer/services/ender/src/validators/funding-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export class FundingValidator extends Validator<FundingEventV1> {
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
_: string,
): Handler<FundingEventMessage>[] {
const handler: Handler<FundingEventMessage> = new FundingHandler(
this.block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export class LiquidityTierValidator extends Validator<LiquidityTierUpsertEventV1
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
_: string,
): Handler<LiquidityTierUpsertEventV1>[] {
const handler: Handler<LiquidityTierUpsertEventV1> = new LiquidityTierHandler(
this.block,
Expand Down
1 change: 1 addition & 0 deletions indexer/services/ender/src/validators/market-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export class MarketValidator extends Validator<MarketEventV1> {
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
__: string,
): Handler<MarketEventV1>[] {
const Initializer:
HandlerInitializer | undefined = this.getHandlerInitializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export class OrderFillValidator extends Validator<OrderFillEventV1> {
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
__: string,
): Handler<OrderFillWithLiquidity>[] {
const orderFillEventsWithLiquidity: OrderFillEventWithLiquidity[] = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export class PerpetualMarketValidator extends Validator<PerpetualMarketCreateEve
public createHandlers(
indexerTendermintEvent: IndexerTendermintEvent,
txId: number,
_: string,
): Handler<PerpetualMarketCreateEventV1>[] {
const handler: Handler<PerpetualMarketCreateEventV1> = new PerpetualMarketCreationHandler(
this.block,
Expand Down
Loading

0 comments on commit 293bb47

Please sign in to comment.