Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add /deactivatepool API for deleting listeners #136

Merged
merged 1 commit into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,22 @@ export class EventStreamService {
);
}

async deleteSubscriptionByName(ctx: Context, streamId: string, name: string) {
const existingSubscriptions = await this.getSubscriptions(ctx);
const sub = existingSubscriptions.find(s => s.name === name && s.stream === streamId);
if (!sub) {
this.logger.log(`No subscription found for ${name}`);
return false;
}
await lastValueFrom(
this.http.delete(
new URL(`/subscriptions/${sub.id}`, this.baseUrl).href,
this.requestOptions(ctx),
),
);
return true;
}

connect(
url: string,
topic: string,
Expand Down
2 changes: 1 addition & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { NestApplicationOptions, ShutdownSignal, ValidationPipe } from '@nestjs/common';
import { ShutdownSignal, ValidationPipe } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { WsAdapter } from '@nestjs/platform-ws';
Expand Down
13 changes: 12 additions & 1 deletion src/tokens/tokens.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
TokenMint,
TokenPool,
TokenPoolActivate,
TokenPoolDeactivate,
TokenPoolEvent,
TokenTransfer,
} from './tokens.interfaces';
Expand Down Expand Up @@ -70,13 +71,23 @@ export class TokensController {
@Post('activatepool')
@HttpCode(200)
@ApiOperation({
summary: 'Activate a token pool to begin receiving transfer events',
summary: 'Activate a token pool to begin receiving transfer and approval events',
})
@ApiBody({ type: TokenPoolActivate })
activatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolActivate) {
return this.service.activatePool(ctx, dto);
}

@Post('deactivatepool')
@HttpCode(204)
@ApiOperation({
summary: 'Deactivate a token pool to delete all listeners and stop receiving events',
})
@ApiBody({ type: TokenPoolDeactivate })
async deactivatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolDeactivate) {
await this.service.deactivatePool(ctx, dto);
}

@Post('checkinterface')
@HttpCode(200)
@ApiOperation({
Expand Down
10 changes: 8 additions & 2 deletions src/tokens/tokens.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,15 @@ export class TokenPoolActivate {
@IsOptional()
config?: TokenPoolConfig;

@ApiProperty({ description: requestIdDescription })
@ApiProperty()
@IsOptional()
requestId?: string;
poolData?: string;
}

export class TokenPoolDeactivate {
@ApiProperty()
@IsNotEmpty()
poolLocator: string;

@ApiProperty()
@IsOptional()
Expand Down
81 changes: 63 additions & 18 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
TokenPool,
TokenPoolActivate,
TokenPoolConfig,
TokenPoolDeactivate,
TokenPoolEvent,
TokenTransfer,
TokenType,
Expand Down Expand Up @@ -316,6 +317,24 @@ export class TokensService {
}
}

private getEventAbis(poolLocator: IValidPoolLocator) {
const transferAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Transfer : ERC721Transfer;
if (transferAbi?.name === undefined) {
throw new NotFoundException('Transfer event ABI not found');
}
const approvalAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Approval : ERC721Approval;
if (approvalAbi?.name === undefined) {
throw new NotFoundException('Approval event ABI not found');
}
const approvalForAllAbi =
poolLocator.type === TokenType.FUNGIBLE ? undefined : ERC721ApprovalForAll;
return {
transferAbi,
approvalAbi,
approvalForAllAbi,
};
}

async activatePool(ctx: Context, dto: TokenPoolActivate) {
const poolLocator = unpackPoolLocator(dto.poolLocator);
if (!validatePoolLocator(poolLocator)) {
Expand All @@ -327,49 +346,39 @@ export class TokensService {
abi,
poolLocator.type === TokenType.FUNGIBLE,
);

const eventAbis = this.getEventAbis(poolLocator);
const stream = await this.getStream(ctx);
const transferAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Transfer : ERC721Transfer;
if (transferAbi?.name === undefined) {
throw new NotFoundException('Transfer event ABI not found');
}
const approvalAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Approval : ERC721Approval;
if (approvalAbi?.name === undefined) {
throw new NotFoundException('Approval event ABI not found');
}
const approvalForAllAbi =
poolLocator.type === TokenType.FUNGIBLE ? undefined : ERC721ApprovalForAll;

const promises = [
this.eventstream.getOrCreateSubscription(
ctx,
this.baseUrl,
transferAbi,
eventAbis.transferAbi,
stream.id,
packSubscriptionName(dto.poolLocator, transferAbi.name, dto.poolData),
packSubscriptionName(dto.poolLocator, eventAbis.transferAbi.name, dto.poolData),
poolLocator.address,
possibleMethods,
this.getSubscriptionBlockNumber(dto.config),
),
this.eventstream.getOrCreateSubscription(
ctx,
this.baseUrl,
approvalAbi,
eventAbis.approvalAbi,
stream.id,
packSubscriptionName(dto.poolLocator, approvalAbi.name, dto.poolData),
packSubscriptionName(dto.poolLocator, eventAbis.approvalAbi.name, dto.poolData),
poolLocator.address,
possibleMethods,
this.getSubscriptionBlockNumber(dto.config),
),
];
if (approvalForAllAbi?.name !== undefined) {
if (eventAbis.approvalForAllAbi?.name !== undefined) {
promises.push(
this.eventstream.getOrCreateSubscription(
ctx,
this.baseUrl,
approvalForAllAbi,
eventAbis.approvalForAllAbi,
stream.id,
packSubscriptionName(dto.poolLocator, approvalForAllAbi.name, dto.poolData),
packSubscriptionName(dto.poolLocator, eventAbis.approvalForAllAbi.name, dto.poolData),
poolLocator.address,
possibleMethods,
this.getSubscriptionBlockNumber(dto.config),
Expand Down Expand Up @@ -397,6 +406,42 @@ export class TokensService {
return tokenPoolEvent;
}

async deactivatePool(ctx: Context, dto: TokenPoolDeactivate) {
const poolLocator = unpackPoolLocator(dto.poolLocator);
if (!validatePoolLocator(poolLocator)) {
throw new BadRequestException('Invalid pool locator');
}

const stream = await this.getStream(ctx);
const eventAbis = this.getEventAbis(poolLocator);
const promises = [
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(dto.poolLocator, eventAbis.transferAbi.name, dto.poolData),
),
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(dto.poolLocator, eventAbis.approvalAbi.name, dto.poolData),
),
];
if (eventAbis.approvalForAllAbi?.name !== undefined) {
promises.push(
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(dto.poolLocator, eventAbis.approvalForAllAbi.name, dto.poolData),
),
);
}

const results = await Promise.all(promises);
if (results.every(deleted => !deleted)) {
throw new NotFoundException('No listeners found');
}
}

checkInterface(dto: CheckInterfaceRequest): CheckInterfaceResponse {
const poolLocator = unpackPoolLocator(dto.poolLocator);
if (!validatePoolLocator(poolLocator)) {
Expand Down