Skip to content

Commit

Permalink
Merge pull request #182 from klayrHQ/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
Theezr authored Nov 28, 2024
2 parents 214b39c + 5ea0fb4 commit 1686376
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,37 @@ CREATE TABLE "PosConstants" (
CONSTRAINT "PosConstants_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "NetworkPeer" (
"ip" TEXT NOT NULL,
"state" TEXT NOT NULL,
"chainID" TEXT NOT NULL,
"networkVersion" TEXT NOT NULL,
"nonce" TEXT NOT NULL,
"advertiseAddress" BOOLEAN NOT NULL,
"port" INTEGER NOT NULL,
"peerId" TEXT NOT NULL,
"height" INTEGER NOT NULL,
"maxHeightPrevoted" INTEGER NOT NULL,
"blockVersion" INTEGER NOT NULL,
"lastBlockID" TEXT NOT NULL,
"legacy" JSONB NOT NULL,

CONSTRAINT "NetworkPeer_pkey" PRIMARY KEY ("ip")
);

-- CreateTable
CREATE TABLE "Location" (
"ip" TEXT NOT NULL,
"countryCode" TEXT NOT NULL,
"countryName" TEXT NOT NULL,
"hostName" TEXT NOT NULL,
"latitude" TEXT NOT NULL,
"longitude" TEXT NOT NULL,

CONSTRAINT "Location_pkey" PRIMARY KEY ("ip")
);

-- CreateTable
CREATE TABLE "NextBlockToSync" (
"id" INTEGER NOT NULL,
Expand Down Expand Up @@ -384,6 +415,9 @@ ALTER TABLE "ChainEvents" ADD CONSTRAINT "ChainEvents_height_fkey" FOREIGN KEY (
-- AddForeignKey
ALTER TABLE "ChainEvents" ADD CONSTRAINT "ChainEvents_transactionID_fkey" FOREIGN KEY ("transactionID") REFERENCES "Transaction"("id") ON DELETE SET NULL ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "NetworkPeer" ADD CONSTRAINT "NetworkPeer_ip_fkey" FOREIGN KEY ("ip") REFERENCES "Location"("ip") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "Stake" ADD CONSTRAINT "Stake_staker_fkey" FOREIGN KEY ("staker") REFERENCES "Account"("address") ON DELETE RESTRICT ON UPDATE CASCADE;

Expand Down
27 changes: 27 additions & 0 deletions prisma/schema/networkPeer.prisma
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
model NetworkPeer {
ip String @id
location Location? @relation(fields: [ip], references: [ip])
state String
chainID String
networkVersion String
nonce String
advertiseAddress Boolean
port Int
peerId String
height Int
maxHeightPrevoted Int
blockVersion Int
lastBlockID String
legacy Json
}

model Location {
ip String @id
countryCode String
countryName String
hostName String
latitude String
longitude String
NetworkPeer NetworkPeer[]
}
15 changes: 15 additions & 0 deletions src/config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,18 @@ export enum BLOCKCHAIN_NETWORKS {
DEVNET = 'devnet',
}
export const GITHUB_BASE_URL = 'https://api.github.com/repos/KlayrHQ/app-registry/contents';

////////////////////////////
/// Apps ///
////////////////////////////

export const MAX_APPS_TO_FETCH = 100;

////////////////////////////
/// Network peers ///
////////////////////////////

export const LOCATION_API_URL = 'http://ip-api.com/json';
export const BLOCKS_TO_SAVE_NETWORK_PEERS = 100;
export const DEFAULT_NETWORK_PEERS_TO_FETCH = 10;
export const MAX_NETWORK_PEERS_TO_FETCH = 100;
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { ICommandHandler, CommandHandler } from '@nestjs/cqrs';
import { Prisma } from '@prisma/client';
import { LokiLogger } from 'nestjs-loki-logger';
import { BLOCKS_TO_SAVE_NETWORK_PEERS, LOCATION_API_URL } from 'src/config/constants';
import { NodeApi, NodeApiService } from 'src/modules/node-api/node-api.service';
import { NetworkPeers } from 'src/modules/node-api/types';
import { PrismaService } from 'src/modules/prisma/prisma.service';

export type IpGeolocationResponse = {
query: string;
status: string;
country: string;
countryCode: string;
lat: number;
lon: number;
isp: string;
as: string;
};

export class SaveNetworkPeersCommand {
constructor(public readonly blockHeight: number) {}
}

@CommandHandler(SaveNetworkPeersCommand)
export class SaveNetworkPeersHandler implements ICommandHandler<SaveNetworkPeersCommand> {
private readonly logger = new LokiLogger(SaveNetworkPeersHandler.name);

constructor(
private readonly prisma: PrismaService,
private readonly nodeApi: NodeApiService,
) {}

async execute({ blockHeight }: SaveNetworkPeersCommand) {
if (blockHeight % BLOCKS_TO_SAVE_NETWORK_PEERS !== 0) return;
this.logger.debug('Saving network peers');

const peers = await this.fetchConnectedNetworkPeers();
if (!peers) return;

const networkPeers = this.mapPeers(peers);
const newPeerIpAddresses = networkPeers.map((peer) => peer.ip);

await this.prisma.$transaction(async (prisma) => {
await this.deleteOldPeers(prisma, newPeerIpAddresses);
await this.upsertPeers(prisma, networkPeers);
});

this.logger.debug('Network peers saved successfully');
}

private async fetchConnectedNetworkPeers(): Promise<NetworkPeers[] | null> {
try {
const peers = await this.nodeApi.invokeApi<NetworkPeers[]>(
NodeApi.NETWORK_GET_CONNECTED_PEERS,
{},
);

return peers;
} catch (error) {
this.logger.error('Error: Failed to fetch network peers', error);
return null;
}
}

private mapPeers(peers: NetworkPeers[]): any[] {
return peers.map((peer) => ({
ip: peer.ipAddress,
chainID: peer.chainID,
networkVersion: peer.networkVersion,
nonce: peer.nonce,
advertiseAddress: peer.advertiseAddress,
port: peer.port,
peerId: peer.peerId,
height: peer.options.height,
maxHeightPrevoted: peer.options.maxHeightPrevoted,
blockVersion: peer.options.blockVersion,
lastBlockID: peer.options.lastBlockID,
legacy: peer.options.legacy,
state: 'connected',
}));
}

private async deleteOldPeers(
prisma: Prisma.TransactionClient,
newPeerIpAddresses: string[],
): Promise<void> {
await prisma.networkPeer.deleteMany({
where: {
ip: {
notIn: newPeerIpAddresses,
},
},
});
}

private async upsertPeers(prisma: Prisma.TransactionClient, networkPeers: any[]): Promise<void> {
for (const networkPeer of networkPeers) {
const existingLocation = await prisma.location.findUnique({
where: { ip: networkPeer.ip },
});

if (!existingLocation) {
const locationData = await this.findLocationByIp(networkPeer.ip);
if (!locationData) continue;
await prisma.location.create({
data: {
ip: networkPeer.ip,
countryCode: locationData.countryCode,
countryName: locationData.country,
hostName: locationData.as,
latitude: locationData.lat.toString(),
longitude: locationData.lon.toString(),
},
});
}

await prisma.networkPeer.upsert({
where: { ip: networkPeer.ip },
update: networkPeer,
create: networkPeer,
});
}
}
private async findLocationByIp(ipAddress: string): Promise<IpGeolocationResponse> {
try {
const response = await fetch(`${LOCATION_API_URL}/${ipAddress}`);
const data: IpGeolocationResponse = await response.json();

if (data.status === 'fail') {
this.logger.error(`Failed to fetch country for IP ${ipAddress}: `);
return;
}
return data;
} catch (error) {
this.logger.error(`Error fetching country for IP ${ipAddress}:`, error);
return;
}
}
}
2 changes: 2 additions & 0 deletions src/modules/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { AddStakesHandler } from './event/commands/add-stakes.command';
import { UpdateValidatorRanksHandler } from './event/commands/update-validator-ranks.command';
import { UpdateAccountHandler } from './event/commands/update-account.command';
import { UpdateSidechainHandler } from './event/commands/update-sidechain.command';
import { SaveNetworkPeersHandler } from './block/post-block-commands/save-network-peers.command';

@Module({
imports: [CqrsModule, StateModule, NodeApiModule],
Expand All @@ -39,6 +40,7 @@ import { UpdateSidechainHandler } from './event/commands/update-sidechain.comman
CheckForBlockFinalityHandler,
UpdateBlockGeneratorHandler,
UpdateBlocksFeeHandler,
SaveNetworkPeersHandler,

// chain events commands
ExecutionResultHandler,
Expand Down
14 changes: 11 additions & 3 deletions src/modules/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { UpdateValidatorRanks } from './event/commands/update-validator-ranks.co
import { OnEvent } from '@nestjs/event-emitter';
import * as fs from 'fs';
import * as path from 'path';
import { SaveNetworkPeersCommand } from './block/post-block-commands/save-network-peers.command';

// Sets `genesisHeight` as `nextBlockToSync`
// `SYNCING`: Will send new block events to queue from `nextBlockToSync` to current `nodeHeight`
Expand Down Expand Up @@ -88,9 +89,16 @@ export class IndexerService {
totalBurntPerBlockMap: Map<number, any>,
) {
this.logger.debug('Executing post block commands');
await this.commandBus.execute(new CheckForBlockFinalityCommand());
await this.commandBus.execute(new UpdateBlockGeneratorCommand(blocks, chainEvents));
await this.commandBus.execute(new UpdateBlocksFeeCommand(totalBurntPerBlockMap));

await Promise.all([
this.commandBus.execute(new SaveNetworkPeersCommand(blocks.at(0).header.height)),
this.commandBus.execute(new CheckForBlockFinalityCommand()),
]);

await Promise.all([
this.commandBus.execute(new UpdateBlockGeneratorCommand(blocks, chainEvents)),
this.commandBus.execute(new UpdateBlocksFeeCommand(totalBurntPerBlockMap)),
]);

if (this.state.get(Modules.INDEXER) === IndexerState.SYNCING) {
await this.commandBus.execute(new UpdateValidatorRanks());
Expand Down
4 changes: 3 additions & 1 deletion src/modules/interoperability/interoperability.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { NodeApi, NodeApiService } from '../node-api/node-api.service';
import { AnnualInflation } from '../node-api/types';
import { GetAppsStatsResDto, getAppsStatsRes } from './dto/get-app-stats-res.dto';
import { APP_STATUS } from './types';
import { MAX_APPS_TO_FETCH } from 'src/config/constants';

@ApiTags('Interoperability')
@Controller('blockchain')
Expand All @@ -31,6 +32,7 @@ export class InteroperabilityController {
@ApiResponse(getAppsRes)
async getAuth(@Query() query: GetAppsDto): Promise<GetAppsResDto> {
const { chainID, chainName, status, search, limit, offset } = query;
const take = Math.min(limit, MAX_APPS_TO_FETCH);

const where: Prisma.BlockchainAppWhereInput = {
...(chainName && { chainName: chainName }),
Expand All @@ -44,7 +46,7 @@ export class InteroperabilityController {
const [apps, total] = await Promise.all([
this.prisma.blockchainApp.findMany({
where,
take: limit,
take,
skip: offset,
include: { escrow: true },
}),
Expand Down
32 changes: 18 additions & 14 deletions src/modules/network/dto/get-network-peers-res.dto.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import { ApiResponseOptions } from '@nestjs/swagger';

export interface PeerOptions {
height: number;
maxHeightPrevoted: number;
blockVersion: number;
lastBlockID: string;
legacy: any[];
class Location {
countryCode: string;
countryName: string;
hostName: string;
ip: string;
latitude: string;
longitude: string;
}

export class GetNetworkPeersData {
chainID: string;
networkVersion: string;
nonce: string;
advertiseAddress: boolean;
options: PeerOptions;
ipAddress: string;
ip: string;
port: number;
peerId: string;
networkVersion: string;
chainID: string;
state: string;
height: number;
location: Location;
}

export class GetNetworkPeersMeta {}
export class GetNetworkPeersMeta {
count: number;
offset: number;
total: number;
}

export class GetNetworkPeersResDto {
data: GetNetworkPeersData[];
Expand Down
Loading

0 comments on commit 1686376

Please sign in to comment.