From 312c1db5fb0e20792b8d5360bba34b78df59cdfc Mon Sep 17 00:00:00 2001 From: Kallyn Gowdy Date: Wed, 15 Jul 2020 15:51:25 -0400 Subject: [PATCH] feat: Improve to be able to send messages back and forth - Doesn't work because Deno doesn't support MessageChannel yet (https://github.com/denoland/deno/issues/6691) - which is something Comlink wants. --- src/aux-vm-deno/typings/deno.d.ts | 9 +++- src/aux-vm-deno/vm/DenoEntry.ts | 84 ++++++++++++++++++++----------- src/aux-vm-deno/vm/DenoVM.ts | 61 +++++++++++++++++----- 3 files changed, 110 insertions(+), 44 deletions(-) diff --git a/src/aux-vm-deno/typings/deno.d.ts b/src/aux-vm-deno/typings/deno.d.ts index 8541b9c9f7..e94172e14e 100644 --- a/src/aux-vm-deno/typings/deno.d.ts +++ b/src/aux-vm-deno/typings/deno.d.ts @@ -16,20 +16,25 @@ declare var Deno: { interface Reader { read(p: Uint8Array): Promise; + readSync(p: Uint8Array): number | null; } interface Writer { write(p: Uint8Array): Promise; + writeSync(p: Uint8Array): number; } interface Connection extends Reader, Writer {} -declare class DenoBuffer { +declare class DenoBuffer implements Reader, Writer { + write(p: Uint8Array): Promise; + writeSync(p: Uint8Array): number; + read(p: Uint8Array): Promise; + readSync(p: Uint8Array): number; readonly length: number; readonly capacity: number; bytes(options?: { copy: boolean }): Uint8Array; truncate(num: number): void; grow(num: number): void; empty(): boolean; - readSync(p: Uint8Array): number | null; } diff --git a/src/aux-vm-deno/vm/DenoEntry.ts b/src/aux-vm-deno/vm/DenoEntry.ts index 0091b5286a..dd18350b6d 100644 --- a/src/aux-vm-deno/vm/DenoEntry.ts +++ b/src/aux-vm-deno/vm/DenoEntry.ts @@ -5,6 +5,8 @@ import { MessageChannelImpl } from './MessageChannel'; const port = parseInt(Deno.args[0]); +console.log('[DenoEntry] Listening on port', port); + init(); async function init() { @@ -12,6 +14,10 @@ async function init() { console.log('[DenoEntry] Listening for messages...'); expose(DenoAuxChannel, channel.port2); + + channel.port2.postMessage({ + type: 'init', + }); } async function tcpMessageChannel() { @@ -24,6 +30,7 @@ async function tcpMessageChannel() { // @ts-ignore channel.port1.addEventListener('message', e => { + console.log('[DenoEntry] Sending Message'); const json = JSON.stringify(e.data); // Messages to stdout all follow the same format: @@ -32,14 +39,15 @@ async function tcpMessageChannel() { // - According to MDN UTF-8 never has more than string.length * 3 bytes (https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder/encodeInto) // - Using a 32-bit number means we can't have messages larger than ~4GiB const byteBuffer = new Uint8Array(4 + json.length * 3); - const view = new DataView(byteBuffer.buffer); + const view = new DataView(byteBuffer.buffer, byteBuffer.byteOffset); // Encode the JSON as UTF-8 // Skip the first 4 bytes const result = encoder.encodeInto(json, byteBuffer.subarray(4)); - view.setUint32(0, result.written); + view.setUint32(0, result.written, true); - conn.write(byteBuffer.subarray(0, result.written)); + console.log(`[DenoEntry] Writing ${result.written} bytes`); + conn.write(byteBuffer.subarray(0, result.written + 4)); }); readMessages(); @@ -47,35 +55,51 @@ async function tcpMessageChannel() { return channel; async function readMessages() { - const iter = Deno.iter(conn, { - bufSize: 512 * 512, - }); - let messageBuffer = new Deno.Buffer(); - let messageSize = -1; - for await (const chunk of iter) { - messageBuffer.readSync(chunk); - if (messageSize < 0 && messageBuffer.length >= 4) { - const view = new DataView( - messageBuffer.bytes({ copy: false }), - 0, - 4 + try { + const iter = Deno.iter(conn, { + bufSize: 512 * 512, + }); + let messageBuffer = new Deno.Buffer(); + let messageSize = -1; + console.log('[DenoEntry] Reading messages...'); + for await (const chunk of iter) { + console.log( + '[DenoEntry] Got Data', + chunk.byteLength, + chunk.length ); - messageSize = view.getUint32(0); - messageBuffer.truncate(4); - messageBuffer.grow( - Math.max(0, messageSize - messageBuffer.capacity) - ); - } - if (messageSize >= 0 && messageBuffer.length >= messageSize) { - const messageBytes = messageBuffer - .bytes({ copy: false }) - .subarray(0, messageSize); - messageBuffer.truncate(messageSize); - messageSize = -1; - const json = decoder.decode(messageBytes); - const message = JSON.parse(json); - channel.port1.postMessage(message); + messageBuffer.writeSync(chunk); + console.log('[DenoEntry] Read data', messageBuffer.length); + if (messageSize < 0 && messageBuffer.length >= 4) { + const bytes = new Uint8Array(4); + messageBuffer.readSync(bytes); + const view = new DataView( + bytes.buffer, + bytes.byteOffset, + 4 + ); + messageSize = view.getUint32(0, true); + console.log('[DenoEntry] Got Length', messageSize); + messageBuffer.grow( + Math.max(0, messageSize - messageBuffer.capacity) + ); + console.log( + '[DenoEntry] Buffer length', + messageBuffer.length + ); + } + if (messageSize >= 0 && messageBuffer.length >= messageSize) { + const messageBytes = new Uint8Array(messageSize); + messageBuffer.readSync(messageBytes); + messageSize = -1; + const json = decoder.decode(messageBytes); + const message = JSON.parse(json); + console.log('[DenoEntry] Got Message'); + channel.port1.postMessage(message); + } } + } catch (err) { + console.error('[DenoEntry]', err); } } } diff --git a/src/aux-vm-deno/vm/DenoVM.ts b/src/aux-vm-deno/vm/DenoVM.ts index 743b0f0333..3fd05133c4 100644 --- a/src/aux-vm-deno/vm/DenoVM.ts +++ b/src/aux-vm-deno/vm/DenoVM.ts @@ -27,7 +27,7 @@ import { import childProcess, { ChildProcess } from 'child_process'; import { Server, AddressInfo } from 'net'; import { MessageChannel } from 'worker_threads'; -import { MessageChannelImpl } from './MessageChannel'; +import { MessageChannelImpl, MessageEvent } from './MessageChannel'; /** * Defines an interface for an AUX that is run inside a virtual machine. @@ -101,6 +101,7 @@ export class DenoVM implements AuxVM { this._server = new Server(conn => { this._channel.port2.addEventListener('message', e => { try { + console.log('[DenoVM] Sending message'); const json = JSON.stringify(e.data); // Messages to stdout all follow the same format: // 4 bytes (32-bit number) for the length of the message @@ -108,7 +109,10 @@ export class DenoVM implements AuxVM { // - According to MDN UTF-8 never has more than string.length * 3 bytes (https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder/encodeInto) // - Using a 32-bit number means we can't have messages larger than ~4GiB const byteBuffer = new Uint8Array(4 + json.length * 3); - const view = new DataView(byteBuffer.buffer); + const view = new DataView( + byteBuffer.buffer, + byteBuffer.byteOffset + ); // Encode the JSON as UTF-8 // Skip the first 4 bytes @@ -116,29 +120,62 @@ export class DenoVM implements AuxVM { json, byteBuffer.subarray(4) ); - view.setUint32(0, result.written); - conn.write(byteBuffer.subarray(0, result.written)); + view.setUint32(0, result.written, true); + console.log( + '[DenoVM] Writing ' + result.written + ' bytes' + ); + conn.write(byteBuffer.subarray(0, result.written + 4)); } catch (err) { console.error('[DenoVM]', err); } }); + let messageBuffer = Buffer.alloc(0); + let messageSize = -1; conn.on('data', (data: Buffer) => { try { console.log('[DenoVM] Got data'); - // TODO: Fix to properly handle different buffer sizes - const uint32 = new Uint32Array(data); - const numBytes = uint32[0]; - const messageBytes = data.subarray(4, numBytes + 4); - const json = decoder.decode(messageBytes); - const message = JSON.parse(json); - this._channel.port2.postMessage(message); + messageBuffer = Buffer.concat([messageBuffer, data]); + if (messageSize < 0 && messageBuffer.byteLength >= 4) { + const view = new DataView( + messageBuffer.buffer, + messageBuffer.byteOffset, + 4 + ); + messageSize = view.getUint32(0, true); + messageBuffer = messageBuffer.slice(4); + } + if ( + messageSize >= 0 && + messageBuffer.byteLength >= messageSize + ) { + // TODO: Fix to properly handle different buffer sizes + const view = messageBuffer.subarray(0, messageSize); + const json = decoder.decode(view); + const message = JSON.parse(json); + let buf = messageBuffer; + messageBuffer = Buffer.alloc( + buf.byteLength - view.byteLength + ); + buf.copy(messageBuffer); + messageSize = -1; + this._channel.port2.postMessage(message); + } } catch (err) { console.error('[DenoVM]', err); } }); - resolveConnection(); + const listener = (e: MessageEvent) => { + if (e && e.data && e.data.type === 'init') { + this._channel.port1.removeEventListener( + 'message', + listener + ); + resolveConnection(); + } + }; + this._channel.port1.addEventListener('message', listener); }); this._server.listen(() => {