Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Aug 5, 2022
1 parent 99cd689 commit 389e06a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 97 deletions.
89 changes: 31 additions & 58 deletions src/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
import type { Task, TaskId } from './types';
import type KeyManager from '../keys/KeyManager';
import type { POJO } from '../types';
import Logger from '@matrixai/logger';
import { IdInternal } from '@matrixai/id';
import { extractTs } from '@matrixai/id/dist/IdSortable';
import {
CreateDestroyStartStop,
ready,
Expand Down Expand Up @@ -114,73 +116,40 @@ class Queue {


@ready(new queueErrors.ErrorQueueNotRunning())
public async registerTask(taskName: string, taskFunction: () => Promise<void>) {
this.handlers.set(taskName, taskFunction);
public async registerHandler(
name: string,
handler: () => Promise<void>
) {
this.handlers.set(name, handler);
}

// you must await pushTask
// to finish the pushing
// but it returns you 2 thigns

// const [task, taskP] = await queue.pushTask()

// await taskP


// taskP is also cancellable
// which signals for cancellation
// but task is just data
// so maybe you just get both
// it depends


@ready(new queueErrors.ErrorQueueNotRunning())
public async pushTask(
taskPointer: string,
name: string,
parameters: Array<any>,
timeout: number = 0,
tran?: DBTransaction
): Promise<[
Task,
TaskPromise
]> {
): Promise<any> {
if (tran == null) {
return this.db.withTransactionF(
(tran) => this.pushTask.apply(this, [...arguments, tran])
);
}

// we put in insertion time
// or do we also put in an index
// i think we do both
// weh ave to index by the time
// the task Id is auto generated
// we need to add in the index too
// so the same transaction is done in one go

// Buffer
// do we say taskId.toBuffer?
// no we don't sttore the task Id
// but we do neeed to store the task name

// it's the task execution name
// and we return a Task to you?
// thisis the Task we have pushed into it?

const taskId = this.generateTaskId();
const taskDate = new Date();

const taskTimestamp = extractTs(taskId);
const taskData = {
name,
parameters,
timestamp: taskTimestamp + timeout
};
await tran.put(
[...this.queueTasksDbPath, taskId.toBuffer()],
{
pointer: taskPointer,
time: new Date()
}
taskData
);

// Now we need to do lexi int of numeric
// representation of the scheduled time

const taskTime = taskDate.getTime();
const taskTimeEncoded = lexi.pack(taskTime, 'hex');
// Timestamp -> TaskId
const taskTimeEncoded = lexi.pack(taskTimestamp, 'hex');
await tran.put(
[...this.queueTaskTimeDbPath, taskTimeEncoded],
taskId.toBuffer(),
Expand All @@ -190,7 +159,6 @@ class Queue {




// is this a promise that resolves when the task finishes
// or is just a promise of the task spec itself that we have created?
// because technically you already know this task
Expand All @@ -200,9 +168,17 @@ class Queue {
// and a promise to the task...


return {
id: taskId
};
// return {
// id: taskId
// };

// We want to return 2 things:
// a promise
// it returns a promise yes
// but the promise resolves to 2 things
// 1. the task information (this is because the taskdata combined together)
// 2. the task promise - a cancellable promise
// the task promise resolves when the task is executed
}


Expand All @@ -219,12 +195,9 @@ class Queue {
(tran) => this.popTask.apply(this, [...arguments, tran])
);
}


// need to use withF
// to create a transaction
// what other resources do we need here?

}

// So the idea with js-db is that transaction contexts are passed in now
Expand Down
44 changes: 15 additions & 29 deletions src/queue/types.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,38 @@
import type { Id } from '@matrixai/id';
import type { POJO, Opaque } from '../types';

// What is a "task"?
// it is serialisation of parameters
// and a serialised pointer to a something to execute
// it is not just an arbitrary string
// this means code execution can still be done
// it just registers it... to be executed
// so tasks always execute against a callback?
// that string must be the handler

// a task here is not a promise
// how many queues are there meant to be?
// 1 queue to manage all of it
// or multiple queues...?
// we must bring the abort controller too
// we have a time date to be created when you submit
// or the arbitrary delay
// we also know when this is created

// are just strings
// handler ids

type HandlerId = Opaque<'HandlerId', string>;

type TaskId = Opaque<'TaskId', Id>;
type TaskIdString = Opaque<'TaskIdString', string>;
type TaskIdEncoded = Opaque<'TaskIdEncoded', string>;

type Task = {
function: string;
parameters: POJO;
delay: number;
timestamp: Date;
name: string;
parameters: Array<any>;
timestamp: number;
};

type TaskInfo = {
id: TaskId;
handler: string;
parameters: Array<any>;
timestamp: number;
};


// This is a lazy promise
// it's a promise of something that may not yet immediately executed
type TaskPromise<T> = Promise<T>;
// type TaskPromise<T> = Promise<T>;

// Consider these variants... (should standardise what these are to be used)
// Task
// Tasks (usually a record, sometimes an array)
// TaskData
// TaskInfo
// TaskData - lower level data of a task
// TaskInfo - higher level information that is inclusive of data


type TaskData = Record<TaskIdEncoded, Task>;
// type TaskData = Record<TaskIdEncoded, Task>;

export type {
TaskId,
Expand Down
1 change: 0 additions & 1 deletion tests/nodes/NodeGraph.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { IdInternal } from '@matrixai/id';
import NodeGraph from '@/nodes/NodeGraph';
import KeyManager from '@/keys/KeyManager';
import * as keysUtils from '@/keys/utils';

import * as nodesUtils from '@/nodes/utils';
import * as nodesErrors from '@/nodes/errors';
import * as utils from '@/utils';
Expand Down
64 changes: 55 additions & 9 deletions tests/queue/Queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,61 @@
import os from 'os';
import path from 'path';
import fs from 'fs';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { DB } from '@matrixai/db';
import KeyManager from '@/keys/KeyManager';
import Queue from '@/queue/Queue';
import * as keysUtils from '@/keys/utils';
import { globalRootKeyPems } from '../fixtures/globalRootKeyPems';

if (require.main == null) {
describe(Queue.name, () => {
const logger = new Logger(`${Queue.name} test`, LogLevel.WARN, [
new StreamHandler(),
]);
test('do it', async () => {
console.log('wee');
describe(Queue.name, () => {
const password = 'password';
const logger = new Logger(`${Queue.name} test`, LogLevel.WARN, [
new StreamHandler(),
]);
let keyManager: KeyManager;
let dbKey: Buffer;
let dbPath: string;
let db: DB;
beforeAll(async () => {
dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
);
const keysPath = `${dataDir}/keys`;
keyManager = await KeyManager.createKeyManager({
password,
keysPath,
logger,
privateKeyPemOverride: globalRootKeyPems[0],
});
dbKey = await keysUtils.generateKey();
dbPath = `${dataDir}/db`;
});
}
beforeEach(async () => {
db = await DB.createDB({
dbPath,
logger,
crypto: {
key: dbKey,
ops: {
encrypt: keysUtils.encryptWithKey,
decrypt: keysUtils.decryptWithKey,
},
},
});
});
afterEach(async () => {
await db.stop();
await db.destroy();
});
test('do it', async () => {
const queue = await Queue.createQueue({
db,
keyManager,
logger,
});

export default 3;
await queue.stop();
await queue.destroy();
});
});

0 comments on commit 389e06a

Please sign in to comment.