Skip to content

Commit

Permalink
Adam/ct 861 websocket topic for blockheight (#1705)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamfraser authored Jun 25, 2024
1 parent 0c9637e commit d36863c
Show file tree
Hide file tree
Showing 27 changed files with 432 additions and 27 deletions.
1 change: 1 addition & 0 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,5 @@ Other example subscription events:
{ "type": "subscribe", "channel": "v4_markets" }
{ "type": "subscribe", "channel": "v4_orderbook", "id": "BTC-USD" }
{ "type": "subscribe", "channel": "v4_subaccounts", "id": "address/0" }
{ "type": "subscribe", "channel": "v4_block_height" }
```
3 changes: 2 additions & 1 deletion indexer/docker-compose-local-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL_SAME_HOST://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
Expand Down
5 changes: 3 additions & 2 deletions indexer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ services:
environment:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS:
KAFKA_CREATE_TOPICS:
"to-ender:1:1,\
to-vulcan:1:1,\
to-websockets-orderbooks:1:1,\
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
postgres-test:
build:
context: .
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/kafka/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0';
export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0';
export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
2 changes: 2 additions & 0 deletions indexer/packages/kafka/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum WebsocketTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}

export enum KafkaTopics {
Expand All @@ -14,4 +15,5 @@ export enum KafkaTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,30 @@ export interface CandleMessageSDKType {

version: string;
}
/** Message to be sent through the 'to-websockets-block-height` kafka topic. */

export interface BlockHeightMessage {
/** Block height where the contents occur. */
blockHeight: string;
/** ISO formatted time of the block height. */

time: string;
/** Version of the websocket message. */

version: string;
}
/** Message to be sent through the 'to-websockets-block-height` kafka topic. */

export interface BlockHeightMessageSDKType {
/** Block height where the contents occur. */
block_height: string;
/** ISO formatted time of the block height. */

time: string;
/** Version of the websocket message. */

version: string;
}

function createBaseOrderbookMessage(): OrderbookMessage {
return {
Expand Down Expand Up @@ -629,4 +653,69 @@ export const CandleMessage = {
return message;
}

};

function createBaseBlockHeightMessage(): BlockHeightMessage {
return {
blockHeight: "",
time: "",
version: ""
};
}

export const BlockHeightMessage = {
encode(message: BlockHeightMessage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.blockHeight !== "") {
writer.uint32(10).string(message.blockHeight);
}

if (message.time !== "") {
writer.uint32(18).string(message.time);
}

if (message.version !== "") {
writer.uint32(26).string(message.version);
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): BlockHeightMessage {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseBlockHeightMessage();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.blockHeight = reader.string();
break;

case 2:
message.time = reader.string();
break;

case 3:
message.version = reader.string();
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<BlockHeightMessage>): BlockHeightMessage {
const message = createBaseBlockHeightMessage();
message.blockHeight = object.blockHeight ?? "";
message.time = object.time ?? "";
message.version = object.version ?? "";
return message;
}

};
7 changes: 7 additions & 0 deletions indexer/services/auxo/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ export const BAZOOKA_DB_MIGRATION_PAYLOAD: Uint8Array = new TextEncoder().encode
}),
);

export const BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD: Uint8Array = new TextEncoder().encode(
JSON.stringify({
migrate: true,
create_kafka_topics: true,
}),
);

export const ECS_SERVICE_NAMES: EcsServiceNames[] = [
EcsServiceNames.COMLINK,
EcsServiceNames.ENDER,
Expand Down
16 changes: 11 additions & 5 deletions indexer/services/auxo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import _ from 'lodash';

import config from './config';
import {
BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD,
BAZOOKA_DB_MIGRATION_PAYLOAD,
BAZOOKA_LAMBDA_FUNCTION_NAME,
ECS_SERVICE_NAMES,
Expand All @@ -40,7 +41,7 @@ import { AuxoEventJson, EcsServiceNames, TaskDefinitionArnMap } from './types';
/**
* Upgrades all services and run migrations
* 1. Upgrade Bazooka
* 2. Run db migration in Bazooka
* 2. Run db migration in Bazooka, and update kafka topics
* 3. Create new ECS Task Definition for ECS Services with new image
* 4. Upgrade all ECS Services (comlink, ender, roundtable, socks, vulcan)
*/
Expand All @@ -66,8 +67,9 @@ export async function handler(
// 1. Upgrade Bazooka
await upgradeBazooka(lambda, ecr, event);

// 2. Run db migration in Bazooka
await runDbMigration(lambda);
// 2. Run db migration in Bazooka,
// boolean flag used to determine if new kafka topics should be created
await runDbAndKafkaMigration(event.addNewKafkaTopics, lambda);

// 3. Create new ECS Task Definition for ECS Services with new image
const taskDefinitionArnMap: TaskDefinitionArnMap = await createNewEcsTaskDefinitions(
Expand Down Expand Up @@ -192,16 +194,20 @@ async function getImageDetail(

}

async function runDbMigration(
async function runDbAndKafkaMigration(
createNewKafkaTopics: boolean,
lambda: ECRClient,
): Promise<void> {
logger.info({
at: 'index#runDbMigration',
message: 'Running db migration',
});
const payload = createNewKafkaTopics
? BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD
: BAZOOKA_DB_MIGRATION_PAYLOAD;
const response: InvokeCommandOutput = await lambda.send(new InvokeCommand({
FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME,
Payload: BAZOOKA_DB_MIGRATION_PAYLOAD,
Payload: payload,
// RequestResponse means that the lambda is synchronously invoked
InvocationType: 'RequestResponse',
}));
Expand Down
1 change: 1 addition & 0 deletions indexer/services/auxo/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface AuxoEventJson {
region: string;
// In our naming we often times use the appreviated region name
regionAbbrev: string;
addNewKafkaTopics: boolean;
}

// EcsServiceName to task definition arn mapping
Expand Down
4 changes: 3 additions & 1 deletion indexer/services/bazooka/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const KAFKA_TOPICS: KafkaTopics[] = [
KafkaTopics.TO_WEBSOCKETS_TRADES,
KafkaTopics.TO_WEBSOCKETS_MARKETS,
KafkaTopics.TO_WEBSOCKETS_CANDLES,
KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT,
];

const DEFAULT_NUM_REPLICAS: number = 3;
Expand All @@ -30,6 +31,7 @@ const KAFKA_TOPICS_TO_PARTITIONS: { [key in KafkaTopics]: number } = {
[KafkaTopics.TO_WEBSOCKETS_TRADES]: 1,
[KafkaTopics.TO_WEBSOCKETS_MARKETS]: 1,
[KafkaTopics.TO_WEBSOCKETS_CANDLES]: 1,
[KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT]: 1,
};

export interface BazookaEventJson {
Expand Down Expand Up @@ -196,7 +198,7 @@ async function createKafkaTopics(
_.forEach(KAFKA_TOPICS, (kafkaTopic: KafkaTopics) => {
if (_.includes(existingKafkaTopics, kafkaTopic)) {
logger.info({
at: 'index#clearKafkaTopics',
at: 'index#createKafkaTopics',
message: `Cannot create kafka topic that does exist: ${kafkaTopic}`,
});
return;
Expand Down
57 changes: 57 additions & 0 deletions indexer/services/ender/__tests__/lib/block-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { BatchedHandlers } from '../../src/lib/batched-handlers';
import { SyncHandlers } from '../../src/lib/sync-handlers';
import { mock, MockProxy } from 'jest-mock-extended';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import { BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka';

describe('block-processor', () => {
let batchedHandlers: MockProxy<BatchedHandlers>;
Expand Down Expand Up @@ -162,4 +163,60 @@ describe('block-processor', () => {
batchedHandlers.process.mock.invocationCallOrder[0],
);
});

it('Adds a block height message to the Kafka publisher', async () => {
const block: IndexerTendermintBlock = createIndexerTendermintBlock(
defaultHeight,
defaultTime,
events,
[
defaultTxHash,
defaultTxHash2,
],
);

const txId: number = await Transaction.start();
const blockProcessor: BlockProcessor = new BlockProcessor(
block,
txId,
defaultDateTime.toString(),
);
const processor = await blockProcessor.process();
await Transaction.commit(txId);
expect(processor.blockHeightMessages).toHaveLength(1);
expect(processor.blockHeightMessages[0].blockHeight).toEqual(String(defaultHeight));
expect(processor.blockHeightMessages[0].version)
.toEqual(BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION);
expect(processor.blockHeightMessages[0].time).toEqual(defaultDateTime.toString());
});

it('createBlockHeightMsg creates a BlockHeightMessage', async () => {
const block: IndexerTendermintBlock = createIndexerTendermintBlock(
defaultHeight,
defaultTime,
events,
[
defaultTxHash,
defaultTxHash2,
],
);

const txId: number = await Transaction.start();
const blockProcessor: BlockProcessor = new BlockProcessor(
block,
txId,
defaultDateTime.toString(),
);
await Transaction.commit(txId);

const msg = blockProcessor.createBlockHeightMsg();
expect(msg).toEqual({
topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT,
message: {
blockHeight: String(defaultHeight),
time: defaultDateTime.toString(),
version: BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION,
},
});
});
});
30 changes: 29 additions & 1 deletion indexer/services/ender/__tests__/lib/kafka-publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import {
TradeType,
TransferFromDatabase,
} from '@dydxprotocol-indexer/postgres';
import { IndexerSubaccountId, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import {
BlockHeightMessage, IndexerSubaccountId, SubaccountMessage, TradeMessage,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types';

import { KafkaPublisher } from '../../src/lib/kafka-publisher';
import {
defaultDateTime,
defaultSubaccountMessage,
defaultTradeContent,
defaultTradeKafkaEvent,
Expand All @@ -43,6 +46,7 @@ import {
} from '../../src/helpers/kafka-helper';
import { DateTime } from 'luxon';
import { convertToSubaccountMessage } from '../../src/lib/helper';
import { defaultBlock } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';

describe('kafka-publisher', () => {
let producerSendMock: jest.SpyInstance;
Expand Down Expand Up @@ -105,6 +109,30 @@ describe('kafka-publisher', () => {
});
});

it('successfully publishes block height messages', async () => {
const message: BlockHeightMessage = {
blockHeight: String(defaultBlock),
version: '1.0.0',
time: defaultDateTime.toString(),
};
const blockHeightEvent: ConsolidatedKafkaEvent = {
topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT,
message,
};

const publisher: KafkaPublisher = new KafkaPublisher();
publisher.addEvents([blockHeightEvent]);

await publisher.publish();
expect(producerSendMock).toHaveBeenCalledTimes(1);
expect(producerSendMock).toHaveBeenCalledWith({
topic: blockHeightEvent.topic,
messages: [{
value: Buffer.from(BlockHeightMessage.encode(blockHeightEvent.message).finish()),
}],
});
});

describe('sortTradeEvents', () => {
const trade: SingleTradeMessage = contentToSingleTradeMessage(
{} as TradeContent,
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/ender/__tests__/lib/on-message.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ describe('on-message', () => {
expectBlock(defaultHeight.toString(), defaultDateTime.toISO()),
]);

expect(producerSendMock).toHaveBeenCalledTimes(2);
expect(producerSendMock).toHaveBeenCalledTimes(3);
// First message batch sent should contain the first message
expect(producerSendMock.mock.calls[0][0].messages).toHaveLength(1);
// Second message batch should contain the second message
Expand Down
Loading

0 comments on commit d36863c

Please sign in to comment.