Skip to content

Commit

Permalink
JSON-RPC stdio server
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Nov 5, 2022
1 parent e93dc33 commit 7da212d
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

### Changes
- allow sender timestamp to be in the future, but not too much
- add JSON-RPC stdio server `deltachat-rpc-server` and use it for JSON-RPC tests #3695
- refactorings #3706

### Fixes
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "macros"]
members = [
"deltachat-ffi",
"deltachat_derive",
"deltachat-jsonrpc"
"deltachat-jsonrpc",
"deltachat-rpc-server"
]

[[example]]
Expand Down
4 changes: 2 additions & 2 deletions deltachat-jsonrpc/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
"prettier:check": "prettier --check **.ts",
"prettier:fix": "prettier --write **.ts",
"test": "run-s test:prepare test:run-coverage test:report-coverage",
"test:prepare": "cargo build --features webserver --bin deltachat-jsonrpc-server",
"test:prepare": "cargo build --package deltachat-rpc-server --bin deltachat-rpc-server",
"test:report-coverage": "node report_api_coverage.mjs",
"test:run": "mocha dist/test",
"test:run-coverage": "COVERAGE=1 NODE_OPTIONS=--enable-source-maps c8 --include 'dist/*' -r text -r html -r json mocha dist/test"
},
"type": "module",
"types": "dist/deltachat.d.ts",
"version": "1.98.0"
}
}
31 changes: 31 additions & 0 deletions deltachat-jsonrpc/typescript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,34 @@ export class DeltaChat extends BaseDeltaChat<WebsocketTransport> {
this.opts = opts;
}
}

export class StdioDeltaChat extends BaseDeltaChat<StdioTransport> {
close() {}
constructor(input: any, output: any) {
const transport = new StdioTransport(input, output);
super(transport);
}
}

export class StdioTransport extends BaseTransport {
constructor(public input: any, public output: any) {
super();

var buffer = "";
this.output.on("data", (data: any) => {
buffer += data.toString();
while (buffer.includes("\n")) {
const n = buffer.indexOf("\n");
const line = buffer.substring(0, n);
const message = JSON.parse(line);
this._onmessage(message);
buffer = buffer.substring(n + 1);
}
});
}

_send(message: RPC.Message): void {
const serialized = JSON.stringify(message);
this.input.write(serialized + "\n");
}
}
6 changes: 2 additions & 4 deletions deltachat-jsonrpc/typescript/test/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { strictEqual } from "assert";
import chai, { assert, expect } from "chai";
import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import { DeltaChat } from "../deltachat.js";
import { StdioDeltaChat as DeltaChat } from "../deltachat.js";

import {
RpcServerHandle,
Expand All @@ -15,9 +15,7 @@ describe("basic tests", () => {

before(async () => {
serverHandle = await startServer();
// make sure server is up by the time we continue
await new Promise((res) => setTimeout(res, 100));
dc = new DeltaChat(serverHandle.url)
dc = new DeltaChat(serverHandle.stdin, serverHandle.stdout)
// dc.on("ALL", (event) => {
//console.log("event", event);
// });
Expand Down
4 changes: 2 additions & 2 deletions deltachat-jsonrpc/typescript/test/online.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { assert, expect } from "chai";
import { DeltaChat, DcEvent } from "../deltachat.js";
import { StdioDeltaChat as DeltaChat, DcEvent } from "../deltachat.js";
import { RpcServerHandle, createTempUser, startServer } from "./test_base.js";

const EVENT_TIMEOUT = 20000;
Expand Down Expand Up @@ -27,7 +27,7 @@ describe("online tests", function () {
this.skip();
}
serverHandle = await startServer();
dc = new DeltaChat(serverHandle.url);
dc = new DeltaChat(serverHandle.stdin, serverHandle.stdout);

dc.on("ALL", (contextId, { type }) => {
if (type !== "Info") console.log(contextId, type);
Expand Down
16 changes: 7 additions & 9 deletions deltachat-jsonrpc/typescript/test/test_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import { mkdtemp, rm } from "fs/promises";
import { existsSync } from "fs";
import { spawn, exec } from "child_process";
import fetch from "node-fetch";
import { Readable, Writable } from 'node:stream';

export const RPC_SERVER_PORT = 20808;

export type RpcServerHandle = {
url: string,
stdin: Writable,
stdout: Readable,
close: () => Promise<void>
}

export async function startServer(port: number = RPC_SERVER_PORT): Promise<RpcServerHandle> {
export async function startServer(): Promise<RpcServerHandle> {
const tmpDir = await mkdtemp(join(tmpdir(), "deltachat-jsonrpc-test"));

const pathToServerBinary = resolve(join(await getTargetDir(), "debug/deltachat-jsonrpc-server"));
const pathToServerBinary = resolve(join(await getTargetDir(), "debug/deltachat-rpc-server"));
console.log('using server binary: ' + pathToServerBinary);

if (!existsSync(pathToServerBinary)) {
Expand All @@ -30,7 +31,6 @@ export async function startServer(port: number = RPC_SERVER_PORT): Promise<RpcSe
cwd: tmpDir,
env: {
RUST_LOG: process.env.RUST_LOG || "info",
DC_PORT: '' + port,
RUST_MIN_STACK: "8388608"
},
});
Expand All @@ -44,12 +44,10 @@ export async function startServer(port: number = RPC_SERVER_PORT): Promise<RpcSe
});

server.stderr.pipe(process.stderr);
server.stdout.pipe(process.stdout)

const url = `ws://localhost:${port}/ws`

return {
url,
stdin: server.stdin,
stdout: server.stdout,
close: async () => {
shouldClose = true;
if (!server.kill()) {
Expand Down
26 changes: 26 additions & 0 deletions deltachat-rpc-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "deltachat-rpc-server"
version = "1.98.0"
description = "DeltaChat JSON-RPC server"
authors = ["Delta Chat Developers (ML) <delta@codespeak.net>"]
edition = "2021"
readme = "README.md"
license = "MPL-2.0"

keywords = ["deltachat", "chat", "openpgp", "email", "encryption"]
categories = ["cryptography", "std", "email"]

[[bin]]
name = "deltachat-rpc-server"

[dependencies]
deltachat-jsonrpc = { path = "../deltachat-jsonrpc" }

anyhow = "1"
env_logger = { version = "0.9.1" }
futures-lite = "1.12.0"
log = "0.4"
serde_json = "1.0.85"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.21.2", features = ["io-std"] }
yerpc = { version = "0.3.1", features = ["anyhow_expose"] }
68 changes: 68 additions & 0 deletions deltachat-rpc-server/src/bin/deltachat-rpc-server/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use deltachat_jsonrpc::api::events::event_to_json_rpc_notification;
use deltachat_jsonrpc::api::{Accounts, CommandApi};
use futures_lite::stream::StreamExt;
use tokio::io::{self, AsyncBufReadExt, BufReader};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use yerpc::{RpcClient, RpcSession};

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "./accounts".to_string());
log::info!("Starting with accounts directory `{}`.", path);
let accounts = Arc::new(RwLock::new(
Accounts::new(PathBuf::from(&path)).await.unwrap(),
));

log::info!("Creating JSON-RPC API.");
let state = CommandApi::from_arc(accounts.clone());

let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state);

// Events task converts core events to JSON-RPC notifications.
let events_task: JoinHandle<Result<()>> = tokio::spawn(async move {
let events = accounts.read().await.get_event_emitter();
while let Some(event) = events.recv().await {
let event = event_to_json_rpc_notification(event);
client.send_notification("event", Some(event)).await?;
}
Ok(())
});

// Send task prints JSON responses to stdout.
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
while let Some(message) = out_receiver.next().await {
let message = serde_json::to_string(&message)?;
log::trace!("RPC send {}", message);
println!("{}", message);
}
Ok(())
});

// Receiver task reads JSON requests from stdin.
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let stdin = io::stdin();
let mut lines = BufReader::new(stdin).lines();
while let Some(message) = lines.next_line().await? {
log::trace!("RPC recv {}", message);
session.handle_incoming(&message).await;
}
log::info!("EOF reached on stdin");
Ok(())
});

recv_task.await??;

// TODO: shutdown cleanly
drop(send_task);
drop(events_task);

Ok(())
}
7 changes: 6 additions & 1 deletion scripts/set_core_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ def main():
parser = ArgumentParser(prog="set_core_version")
parser.add_argument("newversion")

toml_list = ["Cargo.toml", "deltachat-ffi/Cargo.toml", "deltachat-jsonrpc/Cargo.toml"]
json_list = ["package.json", "deltachat-jsonrpc/typescript/package.json"]
toml_list = [
"Cargo.toml",
"deltachat-ffi/Cargo.toml",
"deltachat-jsonrpc/Cargo.toml",
"deltachat-rpc-server/Cargo.toml",
]
try:
opts = parser.parse_args()
except SystemExit:
Expand Down

0 comments on commit 7da212d

Please sign in to comment.