From 389e06a0cca05858d55415fc2d46598c632b6f3d Mon Sep 17 00:00:00 2001 From: Roger Qiu Date: Fri, 5 Aug 2022 23:43:44 +1000 Subject: [PATCH] WIP --- src/queue/Queue.ts | 89 ++++++++++++----------------------- src/queue/types.ts | 44 ++++++----------- tests/nodes/NodeGraph.test.ts | 1 - tests/queue/Queue.test.ts | 64 +++++++++++++++++++++---- 4 files changed, 101 insertions(+), 97 deletions(-) diff --git a/src/queue/Queue.ts b/src/queue/Queue.ts index ce7a19c960..e9f8e331ab 100644 --- a/src/queue/Queue.ts +++ b/src/queue/Queue.ts @@ -1,8 +1,10 @@ import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; import type { Task, TaskId } from './types'; import type KeyManager from '../keys/KeyManager'; +import type { POJO } from '../types'; import Logger from '@matrixai/logger'; import { IdInternal } from '@matrixai/id'; +import { extractTs } from '@matrixai/id/dist/IdSortable'; import { CreateDestroyStartStop, ready, @@ -114,73 +116,40 @@ class Queue { @ready(new queueErrors.ErrorQueueNotRunning()) - public async registerTask(taskName: string, taskFunction: () => Promise) { - this.handlers.set(taskName, taskFunction); + public async registerHandler( + name: string, + handler: () => Promise + ) { + this.handlers.set(name, handler); } - // you must await pushTask - // to finish the pushing - // but it returns you 2 thigns - - // const [task, taskP] = await queue.pushTask() - - // await taskP - - - // taskP is also cancellable - // which signals for cancellation - // but task is just data - // so maybe you just get both - // it depends - - @ready(new queueErrors.ErrorQueueNotRunning()) public async pushTask( - taskPointer: string, + name: string, + parameters: Array, + timeout: number = 0, tran?: DBTransaction - ): Promise<[ - Task, - TaskPromise - ]> { + ): Promise { if (tran == null) { return this.db.withTransactionF( (tran) => this.pushTask.apply(this, [...arguments, tran]) ); } - // we put in insertion time - // or do we also put in an index - // i think we do both - // weh ave to index by the time - // the task Id is auto generated - // we need to add in the index too - // so the same transaction is done in one go - - // Buffer - // do we say taskId.toBuffer? - // no we don't sttore the task Id - // but we do neeed to store the task name - - // it's the task execution name - // and we return a Task to you? - // thisis the Task we have pushed into it? - const taskId = this.generateTaskId(); - const taskDate = new Date(); - + const taskTimestamp = extractTs(taskId); + const taskData = { + name, + parameters, + timestamp: taskTimestamp + timeout + }; await tran.put( [...this.queueTasksDbPath, taskId.toBuffer()], - { - pointer: taskPointer, - time: new Date() - } + taskData ); - // Now we need to do lexi int of numeric - // representation of the scheduled time - - const taskTime = taskDate.getTime(); - const taskTimeEncoded = lexi.pack(taskTime, 'hex'); + // Timestamp -> TaskId + const taskTimeEncoded = lexi.pack(taskTimestamp, 'hex'); await tran.put( [...this.queueTaskTimeDbPath, taskTimeEncoded], taskId.toBuffer(), @@ -190,7 +159,6 @@ class Queue { - // is this a promise that resolves when the task finishes // or is just a promise of the task spec itself that we have created? // because technically you already know this task @@ -200,9 +168,17 @@ class Queue { // and a promise to the task... - return { - id: taskId - }; + // return { + // id: taskId + // }; + + // We want to return 2 things: + // a promise + // it returns a promise yes + // but the promise resolves to 2 things + // 1. the task information (this is because the taskdata combined together) + // 2. the task promise - a cancellable promise + // the task promise resolves when the task is executed } @@ -219,12 +195,9 @@ class Queue { (tran) => this.popTask.apply(this, [...arguments, tran]) ); } - - // need to use withF // to create a transaction // what other resources do we need here? - } // So the idea with js-db is that transaction contexts are passed in now diff --git a/src/queue/types.ts b/src/queue/types.ts index 46abf634a8..b9a7a66ed7 100644 --- a/src/queue/types.ts +++ b/src/queue/types.ts @@ -1,27 +1,6 @@ import type { Id } from '@matrixai/id'; import type { POJO, Opaque } from '../types'; -// What is a "task"? -// it is serialisation of parameters -// and a serialised pointer to a something to execute -// it is not just an arbitrary string -// this means code execution can still be done -// it just registers it... to be executed -// so tasks always execute against a callback? -// that string must be the handler - -// a task here is not a promise -// how many queues are there meant to be? -// 1 queue to manage all of it -// or multiple queues...? -// we must bring the abort controller too -// we have a time date to be created when you submit -// or the arbitrary delay -// we also know when this is created - -// are just strings -// handler ids - type HandlerId = Opaque<'HandlerId', string>; type TaskId = Opaque<'TaskId', Id>; @@ -29,24 +8,31 @@ type TaskIdString = Opaque<'TaskIdString', string>; type TaskIdEncoded = Opaque<'TaskIdEncoded', string>; type Task = { - function: string; - parameters: POJO; - delay: number; - timestamp: Date; + name: string; + parameters: Array; + timestamp: number; +}; + +type TaskInfo = { + id: TaskId; + handler: string; + parameters: Array; + timestamp: number; }; + // This is a lazy promise // it's a promise of something that may not yet immediately executed -type TaskPromise = Promise; +// type TaskPromise = Promise; // Consider these variants... (should standardise what these are to be used) // Task // Tasks (usually a record, sometimes an array) -// TaskData -// TaskInfo +// TaskData - lower level data of a task +// TaskInfo - higher level information that is inclusive of data -type TaskData = Record; +// type TaskData = Record; export type { TaskId, diff --git a/tests/nodes/NodeGraph.test.ts b/tests/nodes/NodeGraph.test.ts index 7a9a1d85e8..81a4fb1536 100644 --- a/tests/nodes/NodeGraph.test.ts +++ b/tests/nodes/NodeGraph.test.ts @@ -14,7 +14,6 @@ import { IdInternal } from '@matrixai/id'; import NodeGraph from '@/nodes/NodeGraph'; import KeyManager from '@/keys/KeyManager'; import * as keysUtils from '@/keys/utils'; - import * as nodesUtils from '@/nodes/utils'; import * as nodesErrors from '@/nodes/errors'; import * as utils from '@/utils'; diff --git a/tests/queue/Queue.test.ts b/tests/queue/Queue.test.ts index da05d8cfe4..9f03f9ff30 100644 --- a/tests/queue/Queue.test.ts +++ b/tests/queue/Queue.test.ts @@ -1,15 +1,61 @@ +import os from 'os'; +import path from 'path'; +import fs from 'fs'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import { DB } from '@matrixai/db'; +import KeyManager from '@/keys/KeyManager'; import Queue from '@/queue/Queue'; +import * as keysUtils from '@/keys/utils'; +import { globalRootKeyPems } from '../fixtures/globalRootKeyPems'; -if (require.main == null) { - describe(Queue.name, () => { - const logger = new Logger(`${Queue.name} test`, LogLevel.WARN, [ - new StreamHandler(), - ]); - test('do it', async () => { - console.log('wee'); +describe(Queue.name, () => { + const password = 'password'; + const logger = new Logger(`${Queue.name} test`, LogLevel.WARN, [ + new StreamHandler(), + ]); + let keyManager: KeyManager; + let dbKey: Buffer; + let dbPath: string; + let db: DB; + beforeAll(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const keysPath = `${dataDir}/keys`; + keyManager = await KeyManager.createKeyManager({ + password, + keysPath, + logger, + privateKeyPemOverride: globalRootKeyPems[0], }); + dbKey = await keysUtils.generateKey(); + dbPath = `${dataDir}/db`; }); -} + beforeEach(async () => { + db = await DB.createDB({ + dbPath, + logger, + crypto: { + key: dbKey, + ops: { + encrypt: keysUtils.encryptWithKey, + decrypt: keysUtils.decryptWithKey, + }, + }, + }); + }); + afterEach(async () => { + await db.stop(); + await db.destroy(); + }); + test('do it', async () => { + const queue = await Queue.createQueue({ + db, + keyManager, + logger, + }); -export default 3; + await queue.stop(); + await queue.destroy(); + }); +});