diff --git a/packages/utils/package.json b/packages/utils/package.json index 26bdd8e0d9..97328c61f2 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -68,6 +68,10 @@ "types": "./dist/src/close-source.d.ts", "import": "./dist/src/close-source.js" }, + "./debounce": { + "types": "./dist/src/debounce.d.ts", + "import": "./dist/src/debounce.js" + }, "./filters": { "types": "./dist/src/filters/index.d.ts", "import": "./dist/src/filters/index.js" @@ -112,6 +116,10 @@ "types": "./dist/src/rate-limiter.d.ts", "import": "./dist/src/rate-limiter.js" }, + "./repeating-task": { + "types": "./dist/src/repeating-task.d.ts", + "import": "./dist/src/repeating-task.js" + }, "./stream-to-ma-conn": { "types": "./dist/src/stream-to-ma-conn.d.ts", "import": "./dist/src/stream-to-ma-conn.js" diff --git a/packages/utils/src/debounce.ts b/packages/utils/src/debounce.ts new file mode 100644 index 0000000000..2cd04d8d72 --- /dev/null +++ b/packages/utils/src/debounce.ts @@ -0,0 +1,30 @@ +import type { Startable } from '@libp2p/interface' + +export interface DebouncedFunction extends Startable { + (): void +} + +/** + * Returns a function wrapper that will only call the passed function once + * + * Important - the passed function should not throw or reject + */ +export function debounce (func: () => void | Promise, wait: number): DebouncedFunction { + let timeout: ReturnType | undefined + + const output = function (): void { + const later = function (): void { + timeout = undefined + void func() + } + + clearTimeout(timeout) + timeout = setTimeout(later, wait) + } + output.start = () => {} + output.stop = () => { + clearTimeout(timeout) + } + + return output +} diff --git a/packages/utils/src/repeating-task.ts b/packages/utils/src/repeating-task.ts new file mode 100644 index 0000000000..a395e0c76c --- /dev/null +++ b/packages/utils/src/repeating-task.ts @@ -0,0 +1,82 @@ +import { setMaxListeners } from '@libp2p/interface' +import { anySignal } from 'any-signal' +import type { AbortOptions } from '@libp2p/interface' + +export interface RepeatingTask { + start(): void + stop(): void +} + +export interface RepeatingTaskOptions { + /** + * How long the task is allowed to run before the passed AbortSignal fires an + * abort event + */ + timeout?: number + + /** + * Whether to schedule the task to run immediately + */ + runImmediately?: boolean +} + +export function repeatingTask (fn: (options?: AbortOptions) => void | Promise, interval: number, options?: RepeatingTaskOptions): RepeatingTask { + let timeout: ReturnType + let shutdownController: AbortController + + function runTask (): void { + const opts: AbortOptions = { + signal: shutdownController.signal + } + + if (options?.timeout != null) { + const signal = anySignal([shutdownController.signal, AbortSignal.timeout(options.timeout)]) + setMaxListeners(Infinity, signal) + + opts.signal = signal + } + + Promise.resolve().then(async () => { + await fn(opts) + }) + .catch(() => {}) + .finally(() => { + if (shutdownController.signal.aborted) { + // task has been cancelled, bail + return + } + + // reschedule + timeout = setTimeout(runTask, interval) + }) + } + + let started = false + + return { + start: () => { + if (started) { + return + } + + started = true + shutdownController = new AbortController() + setMaxListeners(Infinity, shutdownController.signal) + + // run now + if (options?.runImmediately === true) { + queueMicrotask(() => { + runTask() + }) + } else { + // run later + timeout = setTimeout(runTask, interval) + } + }, + stop: () => { + clearTimeout(timeout) + shutdownController?.abort() + started = false + } + } +} diff --git a/packages/utils/test/debounce.spec.ts b/packages/utils/test/debounce.spec.ts new file mode 100644 index 0000000000..9423836123 --- /dev/null +++ b/packages/utils/test/debounce.spec.ts @@ -0,0 +1,46 @@ +import { stop } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import delay from 'delay' +import { debounce } from '../src/debounce.js' + +describe('debounce', () => { + it('should debounce function', async () => { + let invocations = 0 + const fn = (): void => { + invocations++ + } + + const debounced = debounce(fn, 10) + + debounced() + debounced() + debounced() + debounced() + debounced() + + await delay(500) + + expect(invocations).to.equal(1) + }) + + it('should cancel debounced function', async () => { + let invocations = 0 + const fn = (): void => { + invocations++ + } + + const debounced = debounce(fn, 10000) + + debounced() + debounced() + debounced() + debounced() + debounced() + + await stop(debounced) + + await delay(500) + + expect(invocations).to.equal(0) + }) +}) diff --git a/packages/utils/test/repeating-task.spec.ts b/packages/utils/test/repeating-task.spec.ts new file mode 100644 index 0000000000..5f6c87c178 --- /dev/null +++ b/packages/utils/test/repeating-task.spec.ts @@ -0,0 +1,70 @@ +import { expect } from 'aegir/chai' +import delay from 'delay' +import pDefer from 'p-defer' +import { repeatingTask } from '../src/repeating-task.js' + +describe('repeating-task', () => { + it('should repeat a task', async () => { + let count = 0 + + const task = repeatingTask(() => { + count++ + }, 100) + task.start() + + await delay(1000) + + task.stop() + + expect(count).to.be.greaterThan(1) + }) + + it('should run a task immediately', async () => { + let count = 0 + + const task = repeatingTask(() => { + count++ + }, 60000, { + runImmediately: true + }) + task.start() + + await delay(10) + + task.stop() + + expect(count).to.equal(1) + }) + + it('should time out a task', async () => { + const deferred = pDefer() + + const task = repeatingTask((opts) => { + opts?.signal?.addEventListener('abort', () => { + deferred.resolve() + }) + }, 100, { + timeout: 10 + }) + task.start() + + await deferred.promise + task.stop() + }) + + it('should repeat a task that throws', async () => { + let count = 0 + + const task = repeatingTask(() => { + count++ + throw new Error('Urk!') + }, 100) + task.start() + + await delay(1000) + + task.stop() + + expect(count).to.be.greaterThan(1) + }) +}) diff --git a/packages/utils/typedoc.json b/packages/utils/typedoc.json index f0c3288d84..7974d2e565 100644 --- a/packages/utils/typedoc.json +++ b/packages/utils/typedoc.json @@ -7,6 +7,7 @@ "./src/array-equals.ts", "./src/close.ts", "./src/close-source.ts", + "./src/debounce.ts", "./src/filters/index.ts", "./src/ip-port-to-multiaddr.ts", "./src/is-promise.ts", @@ -18,6 +19,7 @@ "./src/private-ip.ts", "./src/queue/index.ts", "./src/rate-limiter.ts", + "./src/repeating-task.ts", "./src/stream-to-ma-conn.ts", "./src/tracked-list.ts", "./src/tracked-map.ts"