Skip to content

Commit

Permalink
feat: new grpc call for subscribring alerts such as low balance (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsercano committed Nov 16, 2020
1 parent 74a59dc commit 8814ed5
Show file tree
Hide file tree
Showing 18 changed files with 1,871 additions and 351 deletions.
75 changes: 75 additions & 0 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions lib/cli/commands/subscribealerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { ServiceError, status } from 'grpc';
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { setTimeoutPromise } from '../../utils/utils';
import { loadXudClient } from '../command';
import { AlertType } from '../../constants/enums';

export const command = 'subscribealerts [--pretty]';

export const describe = 'subscribe alerts such as low balance';

export const builder = (argv: Argv) => argv
.option('pretty', {
type: 'boolean',
})
.example('$0 subscribealerts', 'prints alert payload in a JSON structure')
.example('$0 subscribealerts --pretty', 'prints alert message only');

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, subscribing for alerts');
subscribeAlerts(argv);
}
});
};

const subscribeAlerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
if (argv.pretty) {
console.log(`${AlertType[alert.getType()]}: ${alert.getMessage()}`);
} else {
console.log(JSON.stringify(alert, undefined, 2));
}
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', async (err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection(argv);
});
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
35 changes: 33 additions & 2 deletions lib/connextclient/ConnextClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from 'assert';
import http from 'http';
import { SwapClientType, SwapRole, SwapState } from '../constants/enums';
import { ChannelBalanceSide, SwapClientType, SwapRole, SwapState } from '../constants/enums';
import { CurrencyInstance } from '../db/types';
import { XudError } from '../types';
import Logger from '../Logger';
Expand All @@ -14,7 +14,7 @@ import SwapClient, {
PaymentStatus,
WithdrawArguments,
} from '../swaps/SwapClient';
import { SwapDeal, CloseChannelParams, OpenChannelParams, SwapCapacities } from '../swaps/types';
import { SwapDeal, CloseChannelParams, OpenChannelParams, SwapCapacities, ChannelBalanceAlert } from '../swaps/types';
import { UnitConverter } from '../utils/UnitConverter';
import errors, { errorCodes } from './errors';
import {
Expand Down Expand Up @@ -46,13 +46,15 @@ interface ConnextClient {
on(event: 'htlcAccepted', listener: (rHash: string, amount: number, currency: string) => void): this;
on(event: 'connectionVerified', listener: (swapClientInfo: SwapClientInfo) => void): this;
on(event: 'depositConfirmed', listener: (hash: string) => void): this;
on(event: 'lowBalance', listener: (alert: ChannelBalanceAlert) => void): this;
once(event: 'initialized', listener: () => void): this;
emit(event: 'htlcAccepted', rHash: string, amount: number, currency: string): boolean;
emit(event: 'connectionVerified', swapClientInfo: SwapClientInfo): boolean;
emit(event: 'initialized'): boolean;
emit(event: 'preimage', preimageRequest: ProvidePreimageEvent): void;
emit(event: 'transferReceived', transferReceivedRequest: TransferReceivedEvent): void;
emit(event: 'depositConfirmed', hash: string): void;
emit(event: 'lowBalance', alert: ChannelBalanceAlert): boolean;
}

/**
Expand Down Expand Up @@ -332,6 +334,35 @@ class ConnextClient extends SwapClient {
channelBalancePromises.push(this.channelBalance(currency));
}
await Promise.all(channelBalancePromises);

for (const [currency, address] of this.tokenAddresses) {
const remoteBalance = this.inboundAmounts.get(currency) || 0;
const localBalance = this.outboundAmounts.get(currency) || 0;
const totalBalance = remoteBalance + localBalance;
const alertThreshold = totalBalance * 0.1;

if (localBalance < alertThreshold) {
this.emit('lowBalance', {
totalBalance,
currency,
channelPoint: address,
side: ChannelBalanceSide.Local,
sideBalance: localBalance,
bound: 10,
});
}

if (remoteBalance < alertThreshold) {
this.emit('lowBalance', {
totalBalance,
currency,
channelPoint: address,
side: ChannelBalanceSide.Remote,
sideBalance: remoteBalance,
bound: 10,
});
}
}
} catch (e) {
this.logger.error('failed to update total outbound capacity', e);
}
Expand Down
9 changes: 9 additions & 0 deletions lib/constants/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,12 @@ export enum DisconnectionReason {
AuthFailureInvalidSignature = 12,
WireProtocolErr = 13,
}

export enum AlertType {
LowBalance = 0,
}

export enum ChannelBalanceSide {
Remote = 0,
Local = 1,
}
29 changes: 27 additions & 2 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import grpc, { ServerWritableStream, status } from 'grpc';
import { fromEvent } from 'rxjs';
import { take } from 'rxjs/operators';
import { SwapFailureReason } from '../constants/enums';
import { AlertType, SwapFailureReason } from '../constants/enums';
import { LndInfo } from '../lndclient/types';
import { isOwnOrder, Order, OrderPortion, PlaceOrderEventType, PlaceOrderResult } from '../orderbook/types';
import * as xudrpc from '../proto/xudrpc_pb';
import Service from '../service/Service';
import { ServiceOrder, ServicePlaceOrderEvent } from '../service/types';
import { SwapAccepted, SwapFailure, SwapSuccess } from '../swaps/types';
import { ChannelBalanceAlert, SwapAccepted, SwapFailure, SwapSuccess } from '../swaps/types';
import getGrpcError from './getGrpcError';

/**
Expand Down Expand Up @@ -870,6 +870,31 @@ class GrpcService {
}
}

/*
* See [[Service.subscribeAlerts]]
*/
public subscribeAlerts: grpc.handleServerStreamingCall<xudrpc.SubscribeAlertsRequest, xudrpc.Alert> = (call) => {
if (!this.isReady(this.service, call)) {
return;
}

const cancelled$ = getCancelled$(call);
this.service.subscribeAlerts((type: AlertType, message: string, payload: ChannelBalanceAlert) => {
const alert = new xudrpc.Alert();
alert.setType(type as number);
alert.setMessage(message);
const channelBalanceAlert = new xudrpc.ChannelBalanceAlert();
channelBalanceAlert.setBound(payload.bound);
channelBalanceAlert.setChannelPoint(payload.channelPoint);
channelBalanceAlert.setSide(payload.side as number);
channelBalanceAlert.setSideBalance(payload.sideBalance);
channelBalanceAlert.setTotalBalance(payload.totalBalance);
alert.setBalanceAlert(channelBalanceAlert);
call.write(alert);
},
cancelled$);
}

/*
* See [[Service.subscribeOrders]]
*/
Expand Down
Loading

0 comments on commit 8814ed5

Please sign in to comment.