Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request paritytech#57 from subspace/relayer-skip-uknown-pa…
Browse files Browse the repository at this point in the history
…raid

relayer: skip parachains that are not included in config
  • Loading branch information
isSerge authored Oct 6, 2021
2 parents a4a1de0 + 8bce020 commit 85d30f4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
50 changes: 29 additions & 21 deletions relayer/src/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Header, Hash, SignedBlock, Block } from "@polkadot/types/interfaces";
import { AddressOrPair } from "@polkadot/api/submittable/types";
import { BN } from '@polkadot/util';
import { concatMap, take, map, tap, concatAll } from "rxjs/operators";
import { from, merge } from 'rxjs';
import { from, merge, EMPTY } from 'rxjs';
import { Logger } from "pino";

import { ParaHeadAndId, TxData, ChainName } from "./types";
Expand Down Expand Up @@ -77,6 +77,7 @@ class Source {
}

this.logger.info(`Associated parablocks: ${result.length}`);
this.logger.debug(`ParaIds: ${result.map(({ paraId }) => paraId).join(", ")}`);

return result;
}
Expand All @@ -89,26 +90,33 @@ class Source {
.forEach(paraItem => this.logger.debug(`Extracted para head and id: ${JSON.stringify(paraItem)}`))))
// converts Observable<ParaHeadAndId[]> to Observable<ParaHeadAndId>
.pipe(concatAll())
.pipe(concatMap(({ paraId, paraHead }) => {
const parachain = this.parachainsMap.get(paraId);
if (!parachain) throw new Error(`Uknown paraId: ${paraId}`);

const { feedId, chain, signer } = parachain;

return parachain.fetchParaBlock(paraHead)
.pipe(map(({ block }) => {
const blockStr = JSON.stringify(block);
const number = this.api.createType("BlockNumber", block.header.number).toBn();
return this.addBlockTxData({
block: blockStr,
number,
hash: paraHead,
feedId,
chain,
signer
});
}));
}));
.pipe(
concatMap(({ paraId, paraHead }) => {
const parachain = this.parachainsMap.get(paraId);

// skip parachains that are not included in config
if (!parachain) {
this.logger.error(`Uknown paraId: ${paraId}`);
return EMPTY;
}

const { feedId, chain, signer } = parachain;

return parachain.fetchParaBlock(paraHead)
.pipe(map(({ block }) => {
const blockStr = JSON.stringify(block);
const number = this.api.createType("BlockNumber", block.header.number).toBn();
return this.addBlockTxData({
block: blockStr,
number,
hash: paraHead,
feedId,
chain,
signer
});
}));
})
);
}

private addBlockTxData({ block, number, hash, feedId, chain, signer }: TxDataInput): TxData {
Expand Down
3 changes: 2 additions & 1 deletion relayer/src/target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Target {

private async sendBlockTx({ feedId, block, metadata, chain, signer }: TxData) {
this.logger.info(`Sending ${chain} block to feed: ${feedId}`);
this.logger.info(`Signer: ${(signer as KeyringPair).address}`);
this.logger.debug(`Signer: ${(signer as KeyringPair).address}`);
// metadata is stored as Vec<u8>
// to decode: new TextDecoder().decode(new Uint8Array([...]))
const metadataPayload = JSON.stringify(metadata);
Expand All @@ -77,6 +77,7 @@ class Target {
// TODO: think about re-using existing feedIds instead of creating
async sendCreateFeedTx(signer: AddressOrPair): Promise<U64> {
this.logger.info(`Creating feed for signer ${(signer as KeyringPair).address}`);
this.logger.debug(`Signer: ${(signer as KeyringPair).address}`);
return new Promise((resolve) => {
this.api.rx.tx.feeds
.create()
Expand Down

0 comments on commit 85d30f4

Please sign in to comment.