Skip to content

Commit

Permalink
use sync-actions
Browse files Browse the repository at this point in the history
  • Loading branch information
koyopro committed Dec 13, 2024
1 parent 00eec4b commit 97d801e
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 19 deletions.
27 changes: 15 additions & 12 deletions packages/accel-record-core/src/database.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import Knex from "knex";
import path from "path";
import { fileURLToPath } from "url";
import { buildSyncClient, SyncClient } from "./database/sync.js";
import { loadDmmf } from "./fields.js";
import { Model } from "./index.js";
import { loadI18n } from "./model/naming.js";
// @ts-ignore
import SyncRpc, { stop } from "./sync-rpc/index.js";

const log = (logLevel: LogLevel, ...args: any[]) => {
if (LogLevel.indexOf(logLevel) >= LogLevel.indexOf(_config.logLevel ?? "WARN")) {
Expand Down Expand Up @@ -44,8 +41,6 @@ const setupKnex = (config: Config) => {
throw new Error("No config for knex. Please call initAccelRecord(config) first.");
};

const __dirname = path.dirname(fileURLToPath(import.meta.url));

export const LogLevel = ["TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"] as const;
export type LogLevel = (typeof LogLevel)[number];

Expand Down Expand Up @@ -99,18 +94,26 @@ export interface Config {
* A function to transform the SQL before executing.
*/
sqlTransformer?: (sql: string) => string;

sync?: "thread" | "process";
}
let _config: Config = { type: "sqlite" };
let _rpcClient: any;

let _rpcClient: SyncClient | undefined;

let _queryCount: number = 0;
export const initAccelRecord = async (config: Config) => {
if (_rpcClient) {
log("WARN", "initAccelRecord() has already been called.");
return;
}
_config = Object.assign({}, config);
_config.logLevel ??= "WARN";
_config.sync ??= "thread";
if (_config.type == "postgresql") _config.type = "pg";

_rpcClient = SyncRpc(path.resolve(__dirname, "./worker.cjs"), {
knexConfig: getKnexConfig(config),
});
_rpcClient = buildSyncClient(_config.sync);
_rpcClient.launchWorker({ knexConfig: getKnexConfig(config) });
await loadDmmf();
await loadI18n();

Expand Down Expand Up @@ -147,7 +150,7 @@ export const execSQL = (params: {
if (!_rpcClient || !_config) {
throw new Error("Please call initAccelRecord(config) first.");
}
const ret = _rpcClient(params);
const ret = _rpcClient.execSQL(params);
const time = Date.now() - startTime;
const color = /begin|commit|rollback/i.test(sql) ? "\x1b[36m" : "\x1b[32m";
log(params.logLevel ?? "DEBUG", ` \x1b[36mSQL(${time}ms) ${color}${sql}\x1b[39m`, bindings);
Expand All @@ -167,5 +170,5 @@ const formatByEngine = (ret: any) => {
};

export const stopRpcClient = () => {
stop();
_rpcClient?.stopWorker();
};
44 changes: 44 additions & 0 deletions packages/accel-record-core/src/database/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { getKnexConfig, LogLevel } from "../database.js";
import { sync } from "../synclib/worker.js";
// @ts-ignore
import SyncRpc, { stop } from "../sync-rpc/index.js";

export const buildSyncClient = (type: "thread" | "process"): SyncClient => {
return type == "process" ? new ProcessSyncClient() : new ThreadSyncClient();
};

export interface SyncClient {
launchWorker(params: { knexConfig: ReturnType<typeof getKnexConfig> }): void;
execSQL(params: { sql: string; bindings: readonly any[]; logLevel?: LogLevel }): any;
stopWorker(): void;
}

export class ProcessSyncClient implements SyncClient {
private _rpc: any;
launchWorker(params: { knexConfig: ReturnType<typeof getKnexConfig> }): void {
if (this._rpc) return;
this._rpc = SyncRpc(new URL("../worker.cjs", import.meta.url).pathname, params);
}
execSQL(params: { sql: string; bindings: readonly any[]; logLevel?: LogLevel }): any {
return this._rpc(params);
}
stopWorker() {
stop();
}
}

export class ThreadSyncClient implements SyncClient {
private _rpc: ReturnType<typeof sync.launch> | undefined;

launchWorker(params: { knexConfig: ReturnType<typeof getKnexConfig> }): void {
if (this._rpc) return;
this._rpc = sync.launch();
this._rpc.actions.init(params);
}
execSQL(params: { sql: string; bindings: readonly any[]; logLevel?: LogLevel }): any {
return this._rpc?.actions?.execSQL(params);
}
stopWorker() {
this._rpc?.worker.terminate();
}
}
39 changes: 39 additions & 0 deletions packages/accel-record-core/src/synclib/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* eslint-disable */
import fs from "fs";
import Knex from "knex";
import { defineSyncWorker } from "sync-actions";

const log = (data: object) => {
fs.appendFile("query.log", JSON.stringify(data, null, 2) + "\n", (err) => {});
};

let knex: Knex.Knex;
let knexConfig: any;

function init(connection: { knexConfig: any }) {
knexConfig = connection.knexConfig;
knex = Knex(knexConfig);
}

function execSQL(query: { sql: string; bindings: readonly any[] }) {
const { sql, bindings } = query;
return knexConfig.client === "pg"
? postgresPromise(knex, sql, bindings)
: knex.raw(sql, bindings);
}

function postgresPromise(knex: Knex.Knex, sql: string, bindings: readonly any[]) {
return new Promise(async (resolve, error) => {
try {
const { command, rowCount, rows } = await knex.raw(sql, bindings);
return resolve({ command, rows, rowCount });
} catch (err) {
return error(err);
}
});
}

export const sync = defineSyncWorker(import.meta.filename, {
init,
execSQL,
});
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const loadModels = async (options: GeneratorOptions) => {
if (!fs.existsSync(filePath)) return undefined;
try {
buildFile(filePath, outfile);
process.env.DISABLE_SYNC_ACTIONS = "1";
return await eval(`import('${outfile}')`);
} catch {
console.log(
Expand Down
7 changes: 1 addition & 6 deletions tests/models/init.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import { Model, initAccelRecord } from "accel-record";
import { initAccelRecord } from "accel-record";
import { dbConfig } from "../vitest.setup";

describe("initAccelRecord", () => {
test("should not throw error even if called multiple times", async () => {
const subject = () => initAccelRecord(dbConfig());
// for afterEach
const restartTx = () => Model.startTransaction();

expect(async () => await subject()).not.toThrow();
restartTx();
expect(async () => await subject()).not.toThrow();
restartTx();
});
});
4 changes: 3 additions & 1 deletion tests/vitest.setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import { getDatabaseConfig } from "./models/index.js";
export const __dirname = path.dirname(fileURLToPath(import.meta.url));

export const dbConfig = () => {
const config = getDatabaseConfig();
const nodeVersion = Number(process.versions?.node?.split(".")?.[0]);
const sync = isFinite(nodeVersion) && nodeVersion >= 22 ? "thread" : "process";
const config = { ...getDatabaseConfig(), sync } as const;
if (process.env.DB_ENGINE == "mysql") {
return {
...config,
Expand Down

0 comments on commit 97d801e

Please sign in to comment.