Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support websockets and redirect python stdout to the socket #56

Closed
wants to merge 9 commits into from
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,24 @@ The JS engine calls out to a long-running python process using
`node-calls-python`. Python modules are pretty free-form but must adhere to a
minimal structure. See the Contribution Guide for details.

## Websockets

TODO these are just devnotes but I do want a mention here

- trap all stdout and pipe through a websocket
- i need some kind of test client

I have something super basic set up. how do I want it to work?

- client connects
- client uploads data
- payload
- data
- server sends back logs -log
- server sends back finish

ok what if the client send start with a payload

## Python Setup

This repo uses `poetry` to manage dependencies.
Expand Down
59 changes: 49 additions & 10 deletions platform/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { $ } from "bun";
import { interpreter as py } from "node-calls-python";
import path from "node:path";

import setupLogger from "./util/streaming-logs";

// Use the major.minor python version to find the local poetry venv
// see https://github.com/OpenFn/gen/issues/45
const PYTHON_VERSION = "3.11";
Expand All @@ -15,9 +17,15 @@ if (allowReimport) {
);
}

export const run = async (scriptName: string, args: JSON) => {
// TODO I need to make this blocking so that only one thing runs at once
// OR I drop workerpool onto it
export const run = async (
scriptName: string,
args: JSON,
onLog?: (str: string) => void
) => {
try {
// poetry should be configured to use a vnv in the local filesystem
// poetry should be configured to use a venv in the local filesystem
// This makes it really easy to tell node-calls-python about the right env!
// IMPORTANT: if the python version changes, this path needs to be manually updated!
py.addImportPath(
Expand All @@ -31,16 +39,47 @@ export const run = async (scriptName: string, args: JSON) => {
py.reimport("/services/");
}

// import from a top level entry point
const pymodule = await py.import(
path.resolve(`./services/entry.py`),
allowReimport
);
return new Promise(async (resolve, reject) => {
let result: any;

let logfile = null;
let delimiter = ".";
let destroy = () => {};

const onComplete = () => {
resolve(result);
};

if (onLog) {
({ logfile, delimiter, destroy } = setupLogger(onLog, onComplete));
}

// import from a top level entry point
const pymodule = await py.import(
path.resolve(`./services/entry.py`),
allowReimport
);

try {
result = await py.call(pymodule, "main", [
scriptName,
args,
logfile,
delimiter,
]);

const result = await py.call(pymodule, "main", [scriptName, args]);
return result;
if (!onLog) {
onComplete();
}
} catch (e) {
// Note that the error coming out will be a string with no stack trace :(
console.log(e);
destroy();
reject(e);
}
});
} catch (e) {
// Note that the error coming out will be a string with no stack trace :(
// TODO I don't think we need this outer catch now
console.log(e);
}
};
Expand Down
33 changes: 33 additions & 0 deletions platform/src/middleware/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ export default async (app: Elysia) => {
app.group("/services", (app) => {
modules.forEach(({ name, readme }) => {
console.log(" - mounted /services/" + name);

// simple post
app.post(name, async (ctx) => {
console.log(` >> POST to /services/${name}`);
// Note that elysia handles json parsing for me - neat!
const payload = ctx.body;
const result = await run(name, payload as any);
Expand All @@ -20,6 +23,36 @@ export default async (app: Elysia) => {
return result;
});

// websocket
// TODO in the web socket API, does it make more sense to open a socket at root
// and then pick the service you want? So you'd connect to /ws an send { call: 'echo', payload: {} }
app.ws(name, {
open() {
console.log(` >> CONNECT at /services/${name}`);
},
message(ws, message) {
try {
if (message.event === "start") {
const onLog = (log: string) => {
ws.send({
event: "log",
data: log,
});
};

run(name, message.data as any, onLog).then((result) => {
ws.send({
event: "complete",
data: result,
});
});
}
} catch (e) {
console.log(e);
}
},
});

// TODO: it would be lovely to render the markdown into nice rich html
app.get(`/${name}/README.md`, async (ctx) => readme);
});
Expand Down
60 changes: 60 additions & 0 deletions platform/src/util/streaming-logs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import path from "node:path";
import readline from "node:readline";
import { rm } from "node:fs/promises";
import crypto from "node:crypto";
import { spawn } from "node:child_process";

const END_LOG_DELIM = "***END***";

/**
* This nasty little function sets up logfile and a watch
* to capture the srdout from the python process
* Python's logger is configured to write to this file,
* which will be watched and then re-directed
*
* I'm never going to be able to unit test this
*
* TODO: I'd like this to just be a promise, it'll be a bit slicker,
* But I'm still sketching it out really
*
*/
export default (onLog = (_l: string) => {}, onComplete = () => {}) => {
const id = crypto.randomUUID();

// Init the log file
const logfile = path.resolve(`tmp/logs/${id}.log`);
Bun.write(logfile, "");
console.log("Initing log file at", logfile);

// Get a stream onto the log file

// attempt #1: use bun shell
// sadly bun shell doesn't support streams so I cant do this:
// $`tail -f ${logfile}`;

// attempt #2: use child process to spawn a tail --follow instead
const child = spawn("tail", ["-f", logfile]);
const rl = readline.createInterface({
input: child.stdout,
crlfDelay: Infinity,
});

// listen for new lines
rl.on("line", (line) => {
if (line === END_LOG_DELIM) {
destroy();
onComplete();
} else {
onLog(line);
}
});

// kill the child process and remove the log file
const destroy = async () => {
child.kill();
// console.log("Removing logfile at ", logfile);
// await rm(logfile);
};

return { logfile, delimiter: END_LOG_DELIM, destroy };
};
30 changes: 30 additions & 0 deletions platform/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ describe("Main server", () => {

expect(status).toBe(200);
});

// send messages through a web socket
});

// It won't be appropriate at all to unit test many of these
Expand All @@ -52,5 +54,33 @@ describe("Python Services", () => {
const text = await response.json();
expect(text).toEqual(json);
});

// echo through web socket with result and log
it("returns through a websocket", async () => {
return new Promise<void>((done) => {
const payload = { a: 22 };

// TODO maybe create a helper to manage client ocnnections
const socket = new WebSocket(`ws://localhost:${port}/services/echo`);

socket.addEventListener("message", ({ type, data }) => {
const evt = JSON.parse(data);

if (evt.event === "complete") {
expect(evt.data).toEqual(payload);
done();
}
});

socket.addEventListener("open", (event) => {
socket.send(
JSON.stringify({
event: "start",
data: payload,
})
);
});
});
});
});
});
4 changes: 2 additions & 2 deletions services/adaptor_gen/adaptor_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from signature_generator import signature_generator as sig_gen
from code_generator import code_generator as code_gen
from util import createLogger

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = createLogger("adaptor_gen")


class Payload(DictObj):
Expand Down
10 changes: 4 additions & 6 deletions services/code_generator/code_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from util import DictObj
from util import DictObj, createLogger

from .utils import (
generate_code_prompt,
Expand All @@ -8,8 +7,7 @@
from inference import inference


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = createLogger("code_generator")


class Payload(DictObj):
Expand All @@ -21,9 +19,9 @@ class Payload(DictObj):
# generate adaptor code based on a model and signature
def main(dataDict) -> str:
data = Payload(dataDict)

logger.generate("Running code generator with model {}".format(data.model))
result = generate(data.model, data.signature, data.get("api_key"))

logger.generate("Code generation complete!")
return result


Expand Down
5 changes: 2 additions & 3 deletions services/code_generator/prompts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import copy
import logging
from util import createLogger

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger = createLogger("code_generator.prompts")


prompts = {
Expand Down
5 changes: 4 additions & 1 deletion services/code_generator/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import os
from .prompts import generate_prompt

from util import createLogger

logger = createLogger("code_generator.utils")

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down
3 changes: 3 additions & 0 deletions services/echo/echo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# relative imports work well!
from .log import log

from util import createLogger


# Simple python service to echo requests back to the caller
# Used in test
def main(x):
logger = createLogger("echo")
log(x)
return x
10 changes: 8 additions & 2 deletions services/echo/log.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from util import createLogger

# TODO - in long running python, this needs to be re-intialised every time
logger = createLogger("echo2")


def log(x):
print("[echo]: Echoing request")
print("[echo]: " + str(x))
logger.info("Echoing request")
logger.info(x)
Loading