Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

[IPFS Queue] DSNP content builder #13

Merged
merged 5 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions apps/api/src/api.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import { ConfigService } from '../../../libs/common/src/config/config.service';
import { ScannerModule } from '../../../libs/common/src/scanner/scanner.module';
import { BlockchainModule } from '../../../libs/common/src/blockchain/blockchain.module';
import { CrawlerModule } from '../../../libs/common/src/crawler/crawler.module';
import { IPFSProcessorModule } from '../../../libs/common/src/ipfs/ipfs.module';

@Module({
imports: [
ConfigModule,
BlockchainModule,
ScannerModule,
CrawlerModule,
IPFSProcessorModule,
RedisModule.forRootAsync(
{
imports: [ConfigModule],
Expand Down Expand Up @@ -73,9 +75,6 @@ import { CrawlerModule } from '../../../libs/common/src/crawler/crawler.module';
{
name: QueueConstants.TOMBSTONE_QUEUE_NAME,
},
{
name: QueueConstants.UPDATE_QUEUE_NAME,
},
{
name: QueueConstants.PROFILE_QUEUE_NAME,
},
Expand Down Expand Up @@ -110,10 +109,6 @@ import { CrawlerModule } from '../../../libs/common/src/crawler/crawler.module';
name: QueueConstants.TOMBSTONE_QUEUE_NAME,
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: QueueConstants.UPDATE_QUEUE_NAME,
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: QueueConstants.PROFILE_QUEUE_NAME,
adapter: BullMQAdapter,
Expand Down
5 changes: 5 additions & 0 deletions libs/common/src/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
public async getNonce(account: Uint8Array): Promise<number> {
return this.rpc('system', 'accountNextIndex', account);
}

public async getSchema(schemaId: number): Promise<PalletSchemasSchema> {
const schema: PalletSchemasSchema = await this.query('schemas', 'schemas', schemaId);
return schema;
}
}
8 changes: 8 additions & 0 deletions libs/common/src/crawler/crawler.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ import { QueueConstants } from '../utils/queues';
}),
BullModule.registerQueue({
name: QueueConstants.REQUEST_QUEUE_NAME,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
},
removeOnComplete: true,
removeOnFail: false,
},
}),
ScheduleModule.forRoot(),
],
Expand Down
5 changes: 3 additions & 2 deletions libs/common/src/crawler/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export class CrawlerService extends BaseConsumer {
super();
}

async process(job: Job<ContentSearchRequestDto, any, string>, token?: string | undefined): Promise<any> {
this.logger.log(`Processing job ${job.id}`);
async process(job: Job<ContentSearchRequestDto, any, string>): Promise<any> {
this.logger.log(`Processing crawler job ${job.id}`);
const blockList: bigint[] = [];
const blockStart = BigInt(job.data.startBlock);
const blockEnd = BigInt(job.data.endBlock);
Expand Down Expand Up @@ -123,6 +123,7 @@ export class CrawlerService extends BaseConsumer {
}

const ipfsQueueJob = createIPFSQueueJob(
schemaId.toNumber(),
messageResponse.msa_id.unwrap().toString(),
messageResponse.provider_msa_id.unwrap().toString(),
blockNumber.toBigInt(),
Expand Down
7 changes: 7 additions & 0 deletions libs/common/src/interfaces/announcement_response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Announcement } from './dsnp';

export interface AnnouncementResponse {
requestId?: string;
schemaId: number;
announcement: Announcement;
}
15 changes: 13 additions & 2 deletions libs/common/src/interfaces/ipfs.job.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,31 @@ export interface IIPFSJob {
msaId: string;
providerId: string;
cid: string;
schemaId: number;
blockNumber: bigint;
index: number;
requestId?: string;
}

export function createIPFSQueueJob(msaId: string, providerId: string, blockNumber: bigint, cid: string, index: number, requestId: string): { key: string; data: IIPFSJob } {
export function createIPFSQueueJob(
schemaId: number,
msaId: string,
providerId: string,
blockNumber: bigint,
cid: string,
index: number,
requestId: string,
): { key: string; data: IIPFSJob } {
return {
key: `${msaId}:${providerId}:${blockNumber}:${index}`,
key: `${msaId}:${providerId}:${blockNumber}:${index}:${schemaId}`,
data: {
msaId,
providerId,
cid,
blockNumber,
index,
requestId,
schemaId,
} as IIPFSJob,
};
}
156 changes: 156 additions & 0 deletions libs/common/src/ipfs/ipfs.dsnp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import { Injectable, Logger } from '@nestjs/common';
import { Job, Queue } from 'bullmq';
import Redis from 'ioredis';
import { InjectQueue, Processor } from '@nestjs/bullmq';
import { SchedulerRegistry } from '@nestjs/schedule';
import { CID } from 'multiformats';
import { hexToString } from '@polkadot/util';
import { PalletSchemasSchema } from '@polkadot/types/lookup';
import { fromFrequencySchema } from '@dsnp/frequency-schemas/parquet';
import parquet from '@dsnp/parquetjs';
import { ConfigService } from '../config/config.service';
import { QueueConstants } from '..';
import { IIPFSJob } from '../interfaces/ipfs.job.interface';
import { BaseConsumer } from '../utils/base-consumer';
import { IpfsService } from '../utils/ipfs.client';
import { BlockchainService } from '../blockchain/blockchain.service';
import { RedisUtils } from '../utils/redis';
import { Announcement, AnnouncementType, BroadcastAnnouncement, ProfileAnnouncement, ReactionAnnouncement, ReplyAnnouncement, TombstoneAnnouncement } from '../interfaces/dsnp';
import { AnnouncementResponse } from '../interfaces/announcement_response';

@Injectable()
@Processor(QueueConstants.IPFS_QUEUE, {
concurrency: 2,
})
export class IPFSContentProcessor extends BaseConsumer {
public logger: Logger;

constructor(
@InjectRedis() private redis: Redis,
@InjectQueue(QueueConstants.BROADCAST_QUEUE_NAME) private broadcastQueue: Queue,
@InjectQueue(QueueConstants.TOMBSTONE_QUEUE_NAME) private tombstoneQueue: Queue,
@InjectQueue(QueueConstants.REACTION_QUEUE_NAME) private reactionQueue: Queue,
@InjectQueue(QueueConstants.REPLY_QUEUE_NAME) private replyQueue: Queue,
@InjectQueue(QueueConstants.PROFILE_QUEUE_NAME) private profileQueue: Queue,
private schedulerRegistry: SchedulerRegistry,
private configService: ConfigService,
private ipfsService: IpfsService,
private blockchainService: BlockchainService,
) {
super();
}

async process(job: Job<IIPFSJob, any, string>): Promise<any> {
try {
this.logger.log(`IPFS Processing job ${job.id}`);
this.logger.debug(`IPFS CID: ${job.data.cid} for schemaId: ${job.data.schemaId}`);
const cid = CID.parse(job.data.cid);

const ipfsHash = cid.toV0().toString();
this.logger.debug(`IPFS Hash: ${ipfsHash}`);

const contentBuffer = await this.ipfsService.getPinned(ipfsHash);
const schemaCacheKey = `schema:${job.data.schemaId}`;
let cachedSchema: string | null = await this.redis.get(schemaCacheKey);
if (!cachedSchema) {
const schemaResponse = await this.blockchainService.getSchema(job.data.schemaId);
cachedSchema = JSON.stringify(schemaResponse);
await this.redis.setex(schemaCacheKey, RedisUtils.STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, cachedSchema);
}

// make sure schemaId is a valid one to prevent DoS
const frequencySchema: PalletSchemasSchema = JSON.parse(cachedSchema);
const hexString: string = Buffer.from(frequencySchema.model).toString('utf8');
const schema = JSON.parse(hexToString(hexString));
if (!schema) {
throw new Error(`Unable to parse schema for schemaId ${job.data.schemaId}`);
}

const reader = await parquet.ParquetReader.openBuffer(contentBuffer);
const cursor = reader.getCursor();
const records: Map<string, Announcement> = new Map();

const record = await cursor.next();
while (record) {
const announcementRecordCast = record as Announcement;
if (records.has(announcementRecordCast.announcementType.toString())) {
records[announcementRecordCast.announcementType.toString()].push(announcementRecordCast);
} else {
records[announcementRecordCast.announcementType.toString()] = [announcementRecordCast];
}
}

await this.buildAndQueueDSNPAnnouncements(records, schema, job.data);

this.logger.log(`IPFS Job ${job.id} completed`);
} catch (e) {
this.logger.error(`IPFS Job ${job.id} failed with error: ${e}`);
throw e;
}
}

private async buildAndQueueDSNPAnnouncements(records: Map<string, Announcement>, schema: any, jobData: IIPFSJob): Promise<void> {
let jobRequestId = jobData.requestId;
if (!jobRequestId) {
jobRequestId = '🖨️ from Frequency';
}

records.forEach(async (mapRecord) => {
switch (mapRecord.announcementType) {
case AnnouncementType.Broadcast: {
const broadCastResponse: AnnouncementResponse = {
schemaId: jobData.schemaId,
announcement: mapRecord as BroadcastAnnouncement,
requestId: jobRequestId,
};
await this.broadcastQueue.add('Broadcast', broadCastResponse);
break;
}
case AnnouncementType.Tombstone: {
const tombstoneResponse: AnnouncementResponse = {
schemaId: jobData.schemaId,
announcement: mapRecord as TombstoneAnnouncement,
requestId: jobRequestId,
};
await this.tombstoneQueue.add('Tombstone', tombstoneResponse);
break;
}
case AnnouncementType.Reaction: {
const reactionResponse: AnnouncementResponse = {
schemaId: jobData.schemaId,
announcement: mapRecord as ReactionAnnouncement,
requestId: jobRequestId,
};
await this.reactionQueue.add('Reaction', reactionResponse);
break;
}
case AnnouncementType.Reply: {
const replyResponse: AnnouncementResponse = {
schemaId: jobData.schemaId,
announcement: mapRecord as ReplyAnnouncement,
requestId: jobRequestId,
};
await this.replyQueue.add('Reply', replyResponse);
break;
}
case AnnouncementType.Profile: {
const profileResponse: AnnouncementResponse = {
schemaId: jobData.schemaId,
announcement: mapRecord as ProfileAnnouncement,
requestId: jobRequestId,
};
break;
}
default:
throw new Error(`Unknown announcement type ${mapRecord}`);
}
});
}

private async isQueueFull(queue: Queue): Promise<boolean> {
const highWater = this.configService.getQueueHighWater();
const queueStats = await queue.getJobCounts();
return queueStats.waiting + queueStats.active >= highWater;
}
}
87 changes: 87 additions & 0 deletions libs/common/src/ipfs/ipfs.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
https://docs.nestjs.com/modules
*/

import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { RedisModule } from '@liaoliaots/nestjs-redis';
import { ConfigModule } from '../config/config.module';
import { ConfigService } from '../config/config.service';
import { BlockchainModule } from '../blockchain/blockchain.module';
import { QueueConstants } from '..';
import { IPFSContentProcessor } from './ipfs.dsnp';
import { IpfsService } from '../utils/ipfs.client';

@Module({
imports: [
BlockchainModule,
ConfigModule,
EventEmitterModule,
RedisModule.forRootAsync(
{
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
config: [{ url: configService.redisUrl.toString() }],
}),
inject: [ConfigService],
},
true, // isGlobal
),
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => {
// Note: BullMQ doesn't honor a URL for the Redis connection, and
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
// it by changing the URL to use 'http://' in order to parse out
// the host, port, username, password, etc.
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
// trying to keep the # of environment variables from proliferating
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
const { hostname, port, username, password, pathname } = url;
return {
connection: {
host: hostname || undefined,
port: port ? Number(port) : undefined,
username: username || undefined,
password: password || undefined,
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
},
};
},
inject: [ConfigService],
}),
BullModule.registerQueue(
{
name: QueueConstants.IPFS_QUEUE,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
},
removeOnComplete: true,
removeOnFail: false,
},
},
{
name: QueueConstants.BROADCAST_QUEUE_NAME,
},
{
name: QueueConstants.REACTION_QUEUE_NAME,
},
{
name: QueueConstants.REPLY_QUEUE_NAME,
},
{
name: QueueConstants.PROFILE_QUEUE_NAME,
},
{
name: QueueConstants.TOMBSTONE_QUEUE_NAME,
},
),
],
controllers: [],
providers: [IPFSContentProcessor, IpfsService],
exports: [BullModule, IPFSContentProcessor, IpfsService],
})
export class IPFSProcessorModule {}
3 changes: 2 additions & 1 deletion libs/common/src/scanner/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class ScannerService implements OnApplicationBootstrap {
}

async onApplicationBootstrap() {
const startingBlock = BigInt(this.configService.startingBlock)-1n;
const startingBlock = BigInt(this.configService.startingBlock) - 1n;
this.setLastSeenBlockNumber(startingBlock);
this.scheduleInitialScan();
this.scheduleBlockchainScan();
Expand Down Expand Up @@ -181,6 +181,7 @@ export class ScannerService implements OnApplicationBootstrap {
}

const ipfsQueueJob = createIPFSQueueJob(
schemaId.toNumber(),
messageResponse.msa_id.unwrap().toString(),
messageResponse.provider_msa_id.unwrap().toString(),
blockNumber.toBigInt(),
Expand Down
Loading