Skip to content

Commit

Permalink
adding macro queue to multi
Browse files Browse the repository at this point in the history
  • Loading branch information
mimiMonads committed Jan 28, 2025
1 parent 13c312b commit be22bae
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 63 deletions.
92 changes: 56 additions & 36 deletions experimental/checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,7 @@ export const checker = ({
queue: MultiQueue;
signalBox: MainSignal;
}) => {
// Create a single MessageChannel for scheduling. We’ll use it repeatedly.
const channel = new MessageChannel();

const scheduleCheck = () => {
check();
};

const openChannels = () => {
channel.port1.onmessage = scheduleCheck;
channel.port2.start();
channel.port1.start();
};

const closeChannels = () => {
channel.port1.close();
channel.port1.onmessage = null;
channel.port2.close();
};

// Helper to schedule the next iteration on the macrotask queue
const scheduleNext = channel.port2;

const check = () => {
function check() {
switch (signalBox.updateLastSignal()) {
case 0:
queue.solve();
Expand All @@ -39,53 +17,95 @@ export const checker = ({
} else {
signalBox.readyToRead();
}
queueMicrotask(check);
queueMicrotask(boundCheck);
return;

case 1:
signalBox.readyToRead();
queueMicrotask(check);
queueMicrotask(boundCheck);
return;

case 2:
if (queue.canWrite()) {
queue.sendNextToWorker();
queueMicrotask(check);
queueMicrotask(boundCheck);
} else {
signalBox.hasNoMoreMessages();
closeChannels();
this.channelHandler.close();
}
return;

case 127: {
openChannels();
scheduleNext.postMessage(null);
this.channelHandler.open(boundCheck);
this.channelHandler.channel.port2.postMessage(null);
return;
}
case 192:
case 224:
queueMicrotask(check);
queueMicrotask(boundCheck);
return;

case 254:
console.log("hi");
queue.sendNextToWorker();
queueMicrotask(check);
queueMicrotask(boundCheck);
return;

case 255:
if (queue.canWrite()) {
queue.sendNextToWorker();
queueMicrotask(check);
queueMicrotask(boundCheck);
} else {
console.log("Finish by 255");
}

return;
}

console.log(signalBox.updateLastSignal());
throw new Error("unrechable");
};
throw new Error("unreachable");
}

const boundCheck = check.bind({
channelHandler: new ChannelHandler(),
});

return check;
return boundCheck;
};

class ChannelHandler {
channel: MessageChannel;
isOpen: boolean;

constructor() {
this.channel = new MessageChannel();

this.isOpen = false;
}

scheduleCheck(f: Function) {
f();
}

open(f: Function) {
if (this.isOpen) {
return;
}

//@ts-ignore
this.channel.port1.onmessage = f;
this.channel.port2.start();
this.channel.port1.start();
this.isOpen = true;
}

close() {
if (!this.isOpen) {
return;
}

this.channel.port1.close();
this.channel.port1.onmessage = null;
this.channel.port2.close();
this.isOpen = false;
}
}
4 changes: 3 additions & 1 deletion experimental/signal.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export type SignalArguments = ReturnType<typeof signalsForWorker>;
export type MainSignal = ReturnType<typeof mainSignal>;
export type WorkerSignal = ReturnType<typeof workerSignal>;

type StatusSignalForVoid = 224 | 192;
export type StatusSignal = StatusSignalForVoid;
Expand Down Expand Up @@ -39,9 +40,10 @@ export const mainSignal = ({ status, id }: SignalArguments) => {
};

// Worker thread signal management.
export const workerSignal = ({ status }: SignalArguments) => ({
export const workerSignal = ({ status, id }: SignalArguments) => ({
curretSignal: () => status[0],
messageReady: (): 0 => (status[0] = 0),
messageWasRead: (): 1 => (status[0] = 1),
finishedAllTasks: (): 2 => (status[0] = 2),
getCurrentID: () => id[0],
});
28 changes: 15 additions & 13 deletions experimental/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import { compose } from "./fixpoint.ts";
import { aaa, bbb, ccc } from "./functions.ts";

const { termminate, resolver } = compose({
threads: 1,
threads: 2,
})({
ccc,
aaa,
bbb,
});



const a = await Promise.all([
resolver.aaa(new Uint8Array([1])),
resolver.aaa(new Uint8Array([1])),
Expand All @@ -22,19 +24,19 @@ const a = await Promise.all([
resolver.ccc(new Uint8Array([1])),
]);

const b = await Promise.all([
resolver.aaa(new Uint8Array([1])),
resolver.aaa(new Uint8Array([1])),
resolver.aaa(new Uint8Array([1])),
resolver.bbb(new Uint8Array([1])),
resolver.bbb(new Uint8Array([1])),
resolver.bbb(new Uint8Array([1])),
resolver.ccc(new Uint8Array([1])),
resolver.ccc(new Uint8Array([1])),
resolver.ccc(new Uint8Array([1])),
]);
// const b = await Promise.all([
// resolver.aaa(new Uint8Array([1])),
// resolver.aaa(new Uint8Array([1])),
// resolver.aaa(new Uint8Array([1])),
// resolver.bbb(new Uint8Array([1])),
// resolver.bbb(new Uint8Array([1])),
// resolver.bbb(new Uint8Array([1])),
// resolver.ccc(new Uint8Array([1])),
// resolver.ccc(new Uint8Array([1])),
// resolver.ccc(new Uint8Array([1])),
// ]);

console.log(a);
console.log(b);
//console.log(b);

termminate();
2 changes: 1 addition & 1 deletion experimental/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const mainLoop = async () => {
.then(
(objs) =>
objs.map(
(obj) => obj.f,
(obj) => [obj.f, obj.statusSignal],
),
);

Expand Down
17 changes: 5 additions & 12 deletions experimental/workerQueue.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { PartialQueueList, QueueList } from "./mainQueue.ts";
import type { StatusSignal, WorkerSignal } from "./signal.ts";

type ArgumetnsForMulti = {
jobs: Function[];
jobs: [Function, StatusSignal][];
max?: number;
writer: (job: QueueList) => void;
signal: WorkerSignal;
status: Uint8Array;
};
// Create and manage a working queue.
Expand Down Expand Up @@ -56,15 +58,6 @@ export const multi = ({ jobs, max, writer, status }: ArgumetnsForMulti) => {
write: () => {
const finishedTaskIndex = queue.findIndex((task) => task[2]);
if (finishedTaskIndex !== -1) {
// console.log(" 4 .- worker sends :");
// console.log(
// [
// queue[finishedTaskIndex][3],
// queue[finishedTaskIndex][4],
// queue[finishedTaskIndex][5],
// queue[finishedTaskIndex][7],
// ],
// );
writer(queue[finishedTaskIndex]); // Writes on playload
status[0] = 0; // The main can read it now;
queue[finishedTaskIndex][0] = false; // Reset OnUse
Expand All @@ -81,8 +74,8 @@ export const multi = ({ jobs, max, writer, status }: ArgumetnsForMulti) => {
queue[taskIndex][1] = true; // Lock the task
try {
queue[taskIndex][6] = queue[taskIndex][7] === 224
? await jobs[queue[taskIndex][5]]()
: await jobs[queue[taskIndex][5]](
? await jobs[queue[taskIndex][5]][0]()
: await jobs[queue[taskIndex][5]][0](
queue[taskIndex][4],
);
queue[taskIndex][2] = true; // Mark as solved
Expand Down

0 comments on commit be22bae

Please sign in to comment.