Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Aug 1, 2022
1 parent 6a95dc1 commit 99cd689
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 29 deletions.
159 changes: 132 additions & 27 deletions src/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
import type { Task, TaskId } from './types';
import type KeyManager from '../keys/KeyManager';
import Logger from '@matrixai/logger';
import { IdInternal } from '@matrixai/id';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import lexi from 'lexicographic-integer';
import * as queueUtils from './utils';
import * as queueErrors from './errors';

interface Queue extends CreateDestroyStartStop {}
Expand All @@ -15,33 +20,46 @@ class Queue {

public static async createQueue({
db,
keyManager,
logger = new Logger(this.name),
fresh = false,
}: {
db: DB;
keyManager: KeyManager;
logger?: Logger;
fresh?: boolean;
}): Promise<Queue> {
logger.info(`Creating ${this.name}`);
const queue = new this({ db, logger });
const queue = new this({ db, keyManager, logger });
await queue.start({ fresh });
logger.info(`Created ${this.name}`);
return queue;
}

protected logger: Logger;
protected db: DB;
protected keyManager: KeyManager;

protected queueDbPath: LevelPath = [this.constructor.name];
// TaskId -> Task
protected queueTasksDbPath: LevelPath = [this.constructor.name, 'tasks'];
// Time -> TaskId
protected queueTaskTimeDbPath: LevelPath = [this.constructor.name, 'time'];

// This is a not the task id
// but the executions
// you can say "program"
// or handlers
// HandlerId
protected handlers: Map<string, Function>;
protected generateTaskId: () => TaskId;

public constructor({ db, logger }: { db: DB; logger: Logger }) {
public constructor({
db,
keyManager,
logger
}: {
db: DB;
keyManager: KeyManager;
logger: Logger
}) {
this.logger = logger;
this.keyManager = keyManager;
this.db = db;
}

Expand All @@ -54,6 +72,11 @@ class Queue {
if (fresh) {
await this.db.clear(this.queueDbPath);
}
const lastTaskId = await this.getLastTaskId();
this.generateTaskId = queueUtils.createTaskIdGenerator(
this.keyManager.getNodeId(),
lastTaskId,
);
this.logger.info(`Started ${this.constructor.name}`);
}

Expand All @@ -68,6 +91,10 @@ class Queue {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

// when we insert
// we take the curent time
// then we take a delay as well

// we have something that has to be executed
// if you register something
// addEventListener
Expand All @@ -87,31 +114,95 @@ class Queue {


@ready(new queueErrors.ErrorQueueNotRunning())
public async registerTask(f: () => Promise<void>) {
public async registerTask(taskName: string, taskFunction: () => Promise<void>) {
this.handlers.set(taskName, taskFunction);
}

// the task may return something
// when you register a handler
// the task to be executed
// that's what is important
// schedule taskHandler
// it returns something
// it's important to realise
// this is a Map
// you must await pushTask
// to finish the pushing
// but it returns you 2 thigns

// const [task, taskP] = await queue.pushTask()

// await taskP

}

// register a task
// scheduleTask
// taskP is also cancellable
// which signals for cancellation
// but task is just data
// so maybe you just get both
// it depends


@ready(new queueErrors.ErrorQueueNotRunning())
public async pushTask(task, tran?: DBTransaction): Promise<void> {
public async pushTask(
taskPointer: string,
tran?: DBTransaction
): Promise<[
Task,
TaskPromise
]> {
if (tran == null) {
return this.db.withTransactionF(
(tran) => this.pushTask.apply(this, [...arguments, tran])
);
}
// Do the work

// we put in insertion time
// or do we also put in an index
// i think we do both
// weh ave to index by the time
// the task Id is auto generated
// we need to add in the index too
// so the same transaction is done in one go

// Buffer
// do we say taskId.toBuffer?
// no we don't sttore the task Id
// but we do neeed to store the task name

// it's the task execution name
// and we return a Task to you?
// thisis the Task we have pushed into it?

const taskId = this.generateTaskId();
const taskDate = new Date();

await tran.put(
[...this.queueTasksDbPath, taskId.toBuffer()],
{
pointer: taskPointer,
time: new Date()
}
);

// Now we need to do lexi int of numeric
// representation of the scheduled time

const taskTime = taskDate.getTime();
const taskTimeEncoded = lexi.pack(taskTime, 'hex');
await tran.put(
[...this.queueTaskTimeDbPath, taskTimeEncoded],
taskId.toBuffer(),
true
);





// is this a promise that resolves when the task finishes
// or is just a promise of the task spec itself that we have created?
// because technically you already know this task
// but maybe it's scheduled weirdly
// you can return both
// the Task itself
// and a promise to the task...


return {
id: taskId
};
}


Expand Down Expand Up @@ -140,12 +231,26 @@ class Queue {
// we don't just








@ready(new queueErrors.ErrorQueueNotRunning(), false, ['starting'])
public async getLastTaskId(tran?: DBTransaction): Promise<TaskId | undefined> {
if (tran == null) {
return this.db.withTransactionF(
(tran) => this.getLastTaskId.apply(this, [...arguments, tran])
);
}
let lastTaskId: TaskId | undefined;
for await (const [keyPath] of tran.iterator(
this.queueTasksDbPath,
{
limit: 1,
reverse: true,
values: false ,
}
)) {
lastTaskId = IdInternal.fromBuffer<TaskId>(keyPath[0] as Buffer);
}
return lastTaskId;
}
}

export default Queue;
Expand Down
4 changes: 4 additions & 0 deletions src/queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type Task = {
timestamp: Date;
};

// This is a lazy promise
// it's a promise of something that may not yet immediately executed
type TaskPromise<T> = Promise<T>;

// Consider these variants... (should standardise what these are to be used)
// Task
// Tasks (usually a record, sometimes an array)
Expand Down
8 changes: 6 additions & 2 deletions src/queue/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ import type { TaskId } from './types';
import type { NodeId } from '../nodes/types';
import { IdSortable } from '@matrixai/id';

// Wait our NodeId is literally the node id

/**
* Generates TaskId
* TaskIds are lexicographically sortable 128 bit IDs
* They are strictly monotonic and unique with respect to the `nodeId`
* When the `NodeId` changes, make sure to regenerate this generator
*/
function createTaskIdGenerator(nodeId: NodeId, lastTaskId?: TaskId) {
const generator = new IdSortable<TaskId>({
lastId: lastTaskId,
Expand Down

0 comments on commit 99cd689

Please sign in to comment.