Skip to content

Commit

Permalink
feat: init seeding worker
Browse files Browse the repository at this point in the history
 - manage it with env-var based filters
 - add webtorrent dependency
 - implement method seed to SeedingWorker class
  • Loading branch information
hlolli committed Oct 4, 2024
1 parent 6c29438 commit 2fa5e62
Show file tree
Hide file tree
Showing 6 changed files with 1,010 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This document describes the environment variables that can be used to configure
| WRITE_TRANSACTION_DB_SIGNATURES | Boolean | true | If true, the transactions signatures will be written to the database. |
| ENABLE_DATA_DB_WAL_CLEANUP | Boolean | false | If true, the data database WAL cleanup worker will be enabled |
| MAX_DATA_ITEM_QUEUE_SIZE | Number | 100000 | Sets the maximum number of data items to queue for indexing before skipping indexing new data items |
| SEEDING_WORKER_FILTER | String | undefined | The filter used to control what is seeded via the seeding worker (example via torrent protocol) |
| ARNS_ROOT_HOST | String | undefined | Domain name for ArNS host |
| SANDBOX_PROTOCOL | String | undefined | Protocol setting in process of creating sandbox domain in ArNS (ARNS_ROOT_HOST needs to be set for this env to have any effect) |
| START_WRITERS | Boolean | true | If true, start indexing blocks, tx, ANS104 bundles |
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"swagger-ui-express": "^4.5.0",
"umzug": "^3.2.1",
"wait": "^0.4.2",
"webtorrent": "^2.5.1",
"winston": "^3.7.2",
"yaml": "^2.3.4",
"yesql": "^7.0.0"
Expand All @@ -70,6 +71,7 @@
"@types/stream-json": "^1.7.2",
"@types/supertest": "^2.0.16",
"@types/swagger-ui-express": "^4.1.3",
"@types/webtorrent": "^0.109.8",
"@typescript-eslint/eslint-plugin": "^5.26.0",
"@typescript-eslint/parser": "^5.26.0",
"c8": "^8.0.1",
Expand Down
10 changes: 10 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ export const ON_DEMAND_RETRIEVAL_ORDER = env
)
.split(',');

// seeding-worker (torrent) filter
export const SEEDING_WORKER_FILTER_STRING = env.varOrUndefined(
'SEEDING_WORKER_FILTER',
);

export const SEEDING_WORKER_FILTER =
SEEDING_WORKER_FILTER_STRING === undefined
? undefined
: createFilter(JSON.parse(SEEDING_WORKER_FILTER_STRING));

//
// Indexing
//
Expand Down
13 changes: 13 additions & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import { S3DataSource } from './data/s3-data-source.js';
import { connect } from '@permaweb/aoconnect';
import { DataContentAttributeImporter } from './workers/data-content-attribute-importer.js';
import { SignatureFetcher } from './data/signature-fetcher.js';
import { SeedingWorker } from './workers/seeding-worker.js';
import { SQLiteWalCleanupWorker } from './workers/sqlite-wal-cleanup-worker.js';
import { KvArnsStore } from './store/kv-arns-store.js';
import { parquetExporter } from './routes/ar-io.js';
Expand Down Expand Up @@ -240,6 +241,16 @@ eventEmitter.on(events.TX_INDEXED, async (tx: MatchableItem) => {
eventEmitter.emit(events.ANS104_TX_INDEXED, tx);
eventEmitter.emit(events.ANS104_BUNDLE_INDEXED, tx);
}

const seedingWorkerFilter = config.SEEDING_WORKER_FILTER;

if (
seedingWorkerFilter !== undefined &&
tx.id !== undefined &&
(await seedingWorkerFilter.match(tx))
) {
seedingWorker.seed(tx.id);
}
});

eventEmitter.on(
Expand Down Expand Up @@ -616,6 +627,8 @@ if (dataSqliteWalCleanupWorker !== undefined) {
dataSqliteWalCleanupWorker.start();
}

export const seedingWorker = new SeedingWorker({ log, contiguousDataSource });

let isShuttingDown = false;

export const shutdown = async (express: Server) => {
Expand Down
61 changes: 61 additions & 0 deletions src/workers/seeding-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* AR.IO Gateway
* Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import { Logger } from 'winston';
import WebTorrent from 'webtorrent';
import { ContiguousDataSource } from '../types.js';

export class SeedingWorker {
private log: Logger;
private contiguousDataSource: ContiguousDataSource;

public webTorrentClient: WebTorrent.Instance;

constructor({
log,
contiguousDataSource,
}: {
log: Logger;
contiguousDataSource: ContiguousDataSource;
}) {
this.webTorrentClient = new WebTorrent();
this.contiguousDataSource = contiguousDataSource;
this.log = log.child({ class: 'SeedingWorker' });
}

async seed(txId: string) {
this.log.debug(`Seeding ${txId}`);
const data = await this.contiguousDataSource.getData({ id: txId });
new Promise<void>((resolve) =>
this.webTorrentClient.seed(
data.stream,
{
announce: [
'wss://tracker.btorrent.xyz',
'wss://tracker.openwebtorrent.com',
'wss://tracker.webtorrent.io',
],
},
(torrent: WebTorrent.Torrent) => {
this.log.debug(`Seeding ${txId} started: ${torrent.magnetURI}`);
resolve();
},
),
);
}
}
Loading

0 comments on commit 2fa5e62

Please sign in to comment.