diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 6e37c87..cf8cd4d 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -20,7 +20,7 @@ jobs: deno-version: "~1.18" - name: Setup Go - uses: actions/setup-go@v2.1.5 + uses: actions/setup-go@v2 with: go-version: "~1.17" @@ -29,7 +29,7 @@ jobs: - name: Build the WebAssembly binary run: | - make build + make build-wasm git diff --quiet --exit-code bin/kafkagosaur.wasm - name: Start Docker containers @@ -41,7 +41,7 @@ jobs: - name: Write coverage report run: deno coverage ./coverage --lcov > ./coverage/coverage.lcov - - name: Upload coverage to Codecov + - name: Upload coverage report uses: codecov/codecov-action@v2 with: files: ./coverage/coverage.lcov diff --git a/.gitignore b/.gitignore index 739752d..d16b97e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .vscode coverage +.env diff --git a/Dockerfile b/Dockerfile index 92f5cd2..a2d0156 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,16 @@ -FROM golang:1.17.6-alpine3.15 AS build +# syntax=docker/dockerfile:1.3 -COPY src /src +FROM golang:1.17-alpine AS build + +COPY src/go.* /src/ WORKDIR /src -RUN GOOS=js GOARCH=wasm go build -o kafkagosaur.wasm +RUN go mod download +COPY src /src/ + +RUN --mount=type=cache,target=/root/.cache/go-build \ +GOOS=js GOARCH=wasm go build -o kafkagosaur.wasm FROM scratch AS export + COPY --from=build /src/kafkagosaur.wasm / diff --git a/Makefile b/Makefile index 2b12bf4..300fc2e 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,12 @@ .PHONY: test -all: build +all: build-wasm -build: - docker build -o bin . +build-wasm: + DOCKER_BUILDKIT=1 docker build -o bin . test: - deno test --allow-read --allow-net --coverage=coverage + deno test --allow-read --allow-net --unstable --coverage=coverage docker-up: docker-compose up -d diff --git a/README.md b/README.md index 3eee4ef..20e94f5 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ performance characteristics similar to the native code. - [x] Writer - [x] Reader - [x] SASL -- [ ] TLS +- [x] TLS - [x] TCP - [ ] UDP - [ ] Deno streams @@ -70,13 +70,13 @@ make docker To run the writer example ```bash -deno run --allow-read --allow-net examples/writer.ts +deno run --allow-read --allow-net --unstable examples/writer.ts ``` To run the reader example ```bash -deno run --allow-read --allow-net examples/reader.ts +deno run --allow-read --allow-net --unstable examples/reader.ts ``` ## Documentation @@ -91,7 +91,7 @@ API of kafka-go closely. To build the WebAssemnbly module, first run ```bash -make build +make build-wasm ``` To run the tests, ensure first you have docker up and running. Then start the diff --git a/bench/config.ts b/bench/config.ts new file mode 100644 index 0000000..9fffcf6 --- /dev/null +++ b/bench/config.ts @@ -0,0 +1,14 @@ +import "./deps.ts"; + +const getEnv = (key: string): string => { + const value = Deno.env.get(key); + if (value === undefined) throw Error(`Undefined environment variable ${key}`); + else return value; +}; + +const broker = getEnv("BROKER"); +const topic = getEnv("TOPIC"); +const username = getEnv("SASL_USERNAME"); +const password = getEnv("SASL_PASSWORD"); + +export { broker, password, topic, username }; diff --git a/bench/deps.ts b/bench/deps.ts new file mode 100644 index 0000000..e476f6e --- /dev/null +++ b/bench/deps.ts @@ -0,0 +1,6 @@ +export { + bench, + runBenchmarks, +} from "https://deno.land/std@0.125.0/testing/bench.ts"; + +import "https://deno.land/x/dotenv@v3.2.0/load.ts"; diff --git a/bench/reader.ts b/bench/reader.ts new file mode 100644 index 0000000..481bc13 --- /dev/null +++ b/bench/reader.ts @@ -0,0 +1,47 @@ +import KafkaGoSaur from "../mod.ts"; +import { SASLMechanism } from "../security/sasl.ts"; +import { bench, runBenchmarks } from "./deps.ts"; +import { broker, password, topic, username } from "./config.ts"; + +const nrOfMessages = 100000; + +const readerConfig = { + brokers: [broker], + topic, + sasl: { + mechanism: SASLMechanism.PLAIN, + username, + password, + }, + tls: { + insecureSkipVerify: true, + }, +}; + +const kafkaGoSaur = new KafkaGoSaur(); +const reader = await kafkaGoSaur.reader(readerConfig); + +bench({ + name: `readMessage#${nrOfMessages}`, + runs: 10, + async func(b): Promise { + b.start(); + + for (let i = 0; i < nrOfMessages; i++) { + await reader.readMessage(); + } + + b.stop(); + }, +}); + +const benchmarkRunResults = await runBenchmarks(); + +console.log(benchmarkRunResults); +console.log( + `[kafkagosaur] readMessage msgs/s: ${ + nrOfMessages / (benchmarkRunResults.results[0].measuredRunsAvgMs / 1000) + }`, +); + +await reader.close(); diff --git a/bench/writer.ts b/bench/writer.ts new file mode 100644 index 0000000..c4e09a9 --- /dev/null +++ b/bench/writer.ts @@ -0,0 +1,57 @@ +import KafkaGoSaur from "../mod.ts"; +import { SASLMechanism } from "../security/sasl.ts"; +import { bench, runBenchmarks } from "./deps.ts"; +import { broker, password, topic, username } from "./config.ts"; + +const nrOfMessages = 100000; +const msgBatchSize = 10000; +const msgSize = 1024; + +const writerConfig = { + address: broker, + topic, + sasl: { + mechanism: SASLMechanism.PLAIN, + username, + password, + }, + tls: { + insecureSkipVerify: true, + }, +}; + +const kafkaGoSaur = new KafkaGoSaur(); +const writer = await kafkaGoSaur.writer(writerConfig); + +const value = new Uint8Array(msgSize); +crypto.getRandomValues(value); + +bench({ + name: `writeMessages#${nrOfMessages}`, + runs: 10, + async func(b): Promise { + b.start(); + + for (let i = 0; i < nrOfMessages / msgBatchSize; i++) { + const msgs = []; + + for (let j = 0; j < msgBatchSize; j++) { + msgs.push({ value }); + } + await writer.writeMessages(msgs); + } + + b.stop(); + }, +}); + +const benchmarkRunResults = await runBenchmarks(); + +console.log(benchmarkRunResults); +console.log( + `[kafkagosaur] writeMessages msgs/s: ${ + nrOfMessages / (benchmarkRunResults.results[0].measuredRunsAvgMs / 1000) + }`, +); + +await writer.close(); diff --git a/bin/kafkagosaur.wasm b/bin/kafkagosaur.wasm index 2c2e638..b9e40aa 100755 Binary files a/bin/kafkagosaur.wasm and b/bin/kafkagosaur.wasm differ diff --git a/connection-with-deadline.ts b/connection-with-deadline.ts deleted file mode 100644 index 1df19a0..0000000 --- a/connection-with-deadline.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { deadline } from "./deps.ts"; - -const withAbsoluteDeadline = ( - p: Promise, - deadlineMs?: number, -): Promise => { - if (deadlineMs !== undefined) { - const delayMs = deadlineMs - Date.now(); - return deadline(p, delayMs); - } - - return p; -}; - -class ConnectionWithDeadline implements Deno.Conn { - #conn: Deno.Conn; - #readDeadlineMs?: number; - #writeDeadlineMs?: number; - - readonly localAddr: Deno.Addr; - readonly remoteAddr: Deno.Addr; - readonly rid: number; - - constructor(conn: Deno.Conn) { - this.#conn = conn; - this.localAddr = conn.localAddr; - this.remoteAddr = conn.remoteAddr; - this.rid = conn.rid; - } - - read(p: Uint8Array): Promise { - return withAbsoluteDeadline(this.#conn.read(p), this.#readDeadlineMs); - } - - write(p: Uint8Array): Promise { - return withAbsoluteDeadline(this.#conn.write(p), this.#writeDeadlineMs); - } - - closeWrite(): Promise { - return this.#conn.closeWrite(); - } - - close(): void { - return this.#conn.close(); - } - - setReadDeadline(timeMs: number): void { - this.#readDeadlineMs = timeMs; - } - - setWriteDeadline(timeMs: number): void { - this.#writeDeadlineMs = timeMs; - } -} - -const connect = async (options: Deno.ConnectOptions) => { - const conn = await Deno.connect(options); - return new ConnectionWithDeadline(conn); -}; - -export const setOnGlobal = () => { - (globalThis as Record).connectWithDeadline = connect; -}; diff --git a/deps.ts b/deps.ts index a8f3f39..2e2014c 100644 --- a/deps.ts +++ b/deps.ts @@ -2,5 +2,10 @@ export { deadline, DeadlineError, deferred, -} from "https://deno.land/std@0.116.0/async/mod.ts"; -export { delay } from "https://deno.land/std@0.116.0/async/delay.ts"; +} from "https://deno.land/std@0.125.0/async/mod.ts"; +export { + connect, + createConnection, + Socket, +} from "https://deno.land/std@0.125.0/node/net.ts"; +export { delay } from "https://deno.land/std@0.125.0/async/delay.ts"; diff --git a/dialer.ts b/dialer.ts index fa85f9e..b0f127e 100644 --- a/dialer.ts +++ b/dialer.ts @@ -1,7 +1,9 @@ -import { SASLConfig } from "./sasl.ts"; +import { SASLConfig } from "./security/sasl.ts"; +import { TLSConfig } from "./security/tls.ts"; export type KafkaDialerConfig = { sasl?: SASLConfig; + tls?: TLSConfig; }; export interface KafkaConn { diff --git a/examples/reader.ts b/examples/reader.ts index 4b47a5e..9ad4d18 100644 --- a/examples/reader.ts +++ b/examples/reader.ts @@ -1,5 +1,5 @@ -import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.1/mod.ts"; -import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.1/sasl.ts"; +import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.2/mod.ts"; +import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.2/security/sasl.ts"; const broker = "localhost:9093"; const topic = "test-0"; @@ -21,6 +21,6 @@ const reader = await kafkaGoSaur.reader(config); const dec = new TextDecoder(); const readMsg = await reader.readMessage(); const readValue = dec.decode(readMsg.value); -console.log(readValue); +console.log(`Read message ${readValue} from topic ${topic} at ${broker}`); await reader.close(); diff --git a/examples/writer.ts b/examples/writer.ts index 104d1db..4443f5f 100644 --- a/examples/writer.ts +++ b/examples/writer.ts @@ -1,5 +1,5 @@ -import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.1/mod.ts"; -import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.1/sasl.ts"; +import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.2/mod.ts"; +import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.2/security/sasl.ts"; const broker = "localhost:9093"; const topic = "test-0"; @@ -22,4 +22,5 @@ const enc = new TextEncoder(); const msgs = [{ value: enc.encode("value0") }, { value: enc.encode("value1") }]; await writer.writeMessages(msgs); +console.log(`Wrote ${msgs.length} messages to topic ${topic} at ${broker}`); await writer.close(); diff --git a/global.d.ts b/global.d.ts index 177a1c8..ccd00e4 100644 --- a/global.d.ts +++ b/global.d.ts @@ -4,6 +4,6 @@ declare namespace global { importObject: WebAssembly.Imports; - run(instance: WebAssembly.Instance): Promise; + run(instance: WebAssembly.Instance): Promise; } } diff --git a/mod.ts b/mod.ts index f627886..3b1130a 100644 --- a/mod.ts +++ b/mod.ts @@ -1,12 +1,13 @@ // @deno-types="./global.d.ts" import "./lib/wasm_exec.js"; import { deadline, DeadlineError, delay } from "./deps.ts"; -import { setOnGlobal as setConnectWithDeadlineOnGlobal } from "./connection-with-deadline.ts"; +import { Dial, setDialOnGlobal } from "./net/connection.ts"; +import { dial as nodeDial } from "./net/node-connection.ts"; import { KafkaDialer, KafkaDialerConfig } from "./dialer.ts"; import { KafkaReader, KafkaReaderConfig } from "./reader.ts"; import { KafkaWriter, KafkaWriterConfig } from "./writer.ts"; -const runGoWasm = async (wasmFilePath: string): Promise => { +const runGoWasm = async (wasmFilePath: string): Promise => { const go = new global.Go(); const wasmBytes = await Deno.readFile(wasmFilePath); const instiatedSource = await WebAssembly.instantiate( @@ -24,7 +25,7 @@ const untilGloballyDefined = ( const maxDelayMs = 1000; const loop = async (): Promise => { - const value = (global as Record)[key]; + const value = (globalThis as Record)[key]; if (value !== undefined) return Promise.resolve(value); else { await delay(backoffMs); @@ -40,8 +41,8 @@ const untilGloballyDefined = ( }; class KafkaGoSaur { - constructor() { - setConnectWithDeadlineOnGlobal(); + constructor(dial: Dial = nodeDial) { + setDialOnGlobal(dial); runGoWasm("./bin/kafkagosaur.wasm"); } diff --git a/net/connection.ts b/net/connection.ts new file mode 100644 index 0000000..0287b14 --- /dev/null +++ b/net/connection.ts @@ -0,0 +1,44 @@ +export type Address = { + /** + * Name of the network (for example, "tcp", "udp"). + */ + network: string; + /** + * String form of address (for example, "192.0.2.1:25", "[2001:db8::1]:80"). + */ + string: string; +}; + +/** + * A generic stream-oriented network connection, equivalent to a GO net.Conn. + */ +export interface Connection { + readonly localAddr: Address; + readonly remoteAddr: Address; + + /** + * Read reads data from the connection. + * Read can be made to time out and return an error after a fixed + * time limit; see SetDeadline and SetReadDeadline. + */ + read(bytes: Uint8Array): Promise; + + /** + * Write writes data to the connection. + * Write can be made to time out and return an error after a fixed + * time limit; see SetDeadline and SetWriteDeadline. + */ + write(bytes: Uint8Array): Promise; + + close(): Promise; + + setReadDeadline(timeMs: number): void; + + setWriteDeadline(timeMs: number): void; +} + +export type Dial = (hostname: string, port: number) => Promise; + +export const setDialOnGlobal = (dial: Dial) => { + (globalThis as Record).dial = dial; +}; diff --git a/net/deno-connection.ts b/net/deno-connection.ts new file mode 100644 index 0000000..29fd7b1 --- /dev/null +++ b/net/deno-connection.ts @@ -0,0 +1,57 @@ +import { Address, Connection, Dial } from "./connection.ts"; +import { joinHostPort, withAbsoluteDeadline } from "./util.ts"; + +export class DenoTCPConnection implements Connection { + #conn: Deno.Conn; + #readDeadlineMs?: number; + #writeDeadlineMs?: number; + + readonly localAddr: Address; + readonly remoteAddr: Address; + + constructor(conn: Deno.Conn) { + if ( + conn.localAddr.transport !== "tcp" || conn.remoteAddr.transport !== "tcp" + ) { + throw Error( + `Not supported dial option(s): ${conn.localAddr.transport}, ${conn.remoteAddr.transport}`, + ); + } + + this.#conn = conn; + this.localAddr = { + network: "tcp", + string: joinHostPort(conn.localAddr.hostname, conn.localAddr.port), + }; + this.remoteAddr = { + network: "tcp", + string: joinHostPort(conn.remoteAddr.hostname, conn.remoteAddr.port), + }; + } + + read(bytes: Uint8Array): Promise { + return withAbsoluteDeadline(this.#conn.read(bytes), this.#readDeadlineMs); + } + + write(bytes: Uint8Array): Promise { + return withAbsoluteDeadline(this.#conn.write(bytes), this.#writeDeadlineMs); + } + + close(): Promise { + this.#conn.close(); + return this.#conn.closeWrite(); + } + + setReadDeadline(timeMs: number): void { + this.#readDeadlineMs = timeMs; + } + + setWriteDeadline(timeMs: number): void { + this.#writeDeadlineMs = timeMs; + } +} + +export const dial: Dial = async (hostname: string, port: number) => { + const conn = await Deno.connect({ hostname, port }); + return new DenoTCPConnection(conn); +}; diff --git a/net/node-connection.ts b/net/node-connection.ts new file mode 100644 index 0000000..595a3d2 --- /dev/null +++ b/net/node-connection.ts @@ -0,0 +1,139 @@ +import { Address, Connection, Dial } from "./connection.ts"; +import { joinHostPort, withAbsoluteDeadline } from "./util.ts"; +import { createConnection, deferred, Socket } from "../deps.ts"; + +export class NodeTCPConnection implements Connection { + #socket: Socket; + #readDeadlineMs?: number; + #writeDeadlineMs?: number; + + readonly localAddr: Address; + readonly remoteAddr: Address; + + constructor(socket: Socket) { + if (socket.remoteAddress === undefined || socket.remotePort === undefined) { + throw Error("Socket remoteAddress or remotePort are undefined."); + } + this.#socket = socket; + + this.localAddr = { + network: "tcp", + string: joinHostPort(socket.localAddress, socket.localPort), + }; + this.remoteAddr = { + network: "tcp", + string: joinHostPort(socket.remoteAddress, socket.remotePort), + }; + } + + read(bytes: Uint8Array): Promise { + const p = deferred(); + + const onReadable = () => { + const readSize = Math.min(bytes.length, this.#socket.readableLength); + const chunk = this.#socket.read(readSize); + + if (chunk === null) { + // wait for next 'readable' event + return; + } + + if (chunk !== undefined && typeof chunk !== "string") { + bytes.set(chunk); + removeListeners(); + + return p.resolve(chunk.length); + } + + removeListeners(); + + p.reject(Error("Invalid chunk read")); + }; + const onError = (err: Error) => { + removeListeners(); + p.reject(err); + }; + const onEnd = () => { + removeListeners(); + p.resolve(null); // EOF + }; + const onClose = () => { + removeListeners(); + p.resolve(0); + }; + + this.#socket.on("readable", onReadable); + this.#socket.once("error", onError); + this.#socket.once("end", onEnd); + this.#socket.once("close", onClose); + + const removeListeners = () => { + this.#socket.removeListener("readable", onReadable); + this.#socket.removeListener("error", onError); + this.#socket.removeListener("end", onEnd); + this.#socket.removeListener("close", onClose); + }; + + onReadable(); + + return withAbsoluteDeadline(p, this.#readDeadlineMs); + } + + write(bytes: Uint8Array): Promise { + const p = deferred(); + + this.#socket.write(bytes, undefined, (error: Error | null | undefined) => { + if (error instanceof Error) { + p.reject(error); + } else { + p.resolve(bytes.length); + } + }); + + return withAbsoluteDeadline(p, this.#writeDeadlineMs); + } + + close(): Promise { + const p = deferred(); + const onError = (err: Error) => { + removeListeners(); + p.reject(err); + }; + const onClose = () => { + removeListeners(); + p.resolve(); + }; + + this.#socket.once("error", onError); + this.#socket.once("close", onClose); + + const removeListeners = () => { + this.#socket.removeListener("error", onError); + this.#socket.removeListener("close", onClose); + }; + + this.#socket.destroy(); + + return p; + } + + setReadDeadline(timeMs: number): void { + this.#readDeadlineMs = timeMs; + } + + setWriteDeadline(timeMs: number): void { + this.#writeDeadlineMs = timeMs; + } +} + +export const dial: Dial = async (host: string, port: number) => { + const p = deferred(); + + const socket = createConnection({ port, host }, p.resolve); + + await p; + + const connection = new NodeTCPConnection(socket); + + return connection; +}; diff --git a/net/util.ts b/net/util.ts new file mode 100644 index 0000000..5d75f79 --- /dev/null +++ b/net/util.ts @@ -0,0 +1,16 @@ +import { deadline } from "../deps.ts"; + +export const withAbsoluteDeadline = ( + p: Promise, + deadlineMs?: number, +): Promise => { + if (deadlineMs !== undefined) { + const delayMs = deadlineMs - Date.now(); + return deadline(p, delayMs); + } + + return p; +}; + +export const joinHostPort = (host: string, port: number) => + host.indexOf(":") >= 0 ? `[${host}]:${port}` : `${host}:${port}`; diff --git a/reader.ts b/reader.ts index 987a5fe..abeaac7 100644 --- a/reader.ts +++ b/reader.ts @@ -1,5 +1,6 @@ import { Header } from "./header.ts"; -import { SASLConfig } from "./sasl.ts"; +import { SASLConfig } from "./security/sasl.ts"; +import { TLSConfig } from "./security/tls.ts"; export type KafkaReadMessage = { topic: string; @@ -15,16 +16,16 @@ export type KafkaReadMessage = { export type KafkaReaderConfig = { brokers: string[]; topic: string; - groupId: string; + groupId?: string; sasl?: SASLConfig; + tls?: TLSConfig; }; export interface KafkaReader { - close: () => Promise; - // TODO: is actually null! + close: () => Promise; commitMessages: (msgs: KafkaReadMessage[]) => Promise; fetchMessage: () => Promise; readMessage: () => Promise; - setOffset: (offset: number) => Promise; - setOffsetAt: (timeMs: number) => Promise; + setOffset: (offset: number) => Promise; + setOffsetAt: (timeMs: number) => Promise; } diff --git a/sasl.ts b/security/sasl.ts similarity index 100% rename from sasl.ts rename to security/sasl.ts diff --git a/security/tls.ts b/security/tls.ts new file mode 100644 index 0000000..90a0182 --- /dev/null +++ b/security/tls.ts @@ -0,0 +1,10 @@ +export type X509KeyPair = { + key: string; + cert: string; +}; + +export type TLSConfig = boolean | { + insecureSkipVerify?: boolean; + keyPair?: X509KeyPair; + ca?: string; +}; diff --git a/src/interop/net.go b/src/interop/net.go index ae1934b..67309fc 100644 --- a/src/interop/net.go +++ b/src/interop/net.go @@ -12,21 +12,20 @@ import ( "time" ) -type denoNetAddr struct { +type denoAddr struct { jsAddr js.Value } -func (c *denoNetAddr) Network() string { - return c.jsAddr.Get("transport").String() +func (c *denoAddr) Network() string { + return c.jsAddr.Get("network").String() } -func (c *denoNetAddr) String() string { - // TODO: handle unix addr case - return net.JoinHostPort(c.jsAddr.Get("hostname").String(), c.jsAddr.Get("port").String()) +func (c *denoAddr) String() string { + return c.jsAddr.Get("string").String() } func mapDeadlineError(reason js.Value) error { - if reason.Get("name").String() == "DeadlineError" { + if reason := reason.Get("name"); !reason.IsUndefined() && reason.String() == "DeadlineError" { return os.ErrDeadlineExceeded } @@ -71,7 +70,7 @@ func (c *denoTCPConn) Write(b []byte) (n int, err error) { func (c *denoTCPConn) LocalAddr() net.Addr { jsAddr := c.jsConn.Get("localAddr") - return &denoNetAddr{ + return &denoAddr{ jsAddr: jsAddr, } } @@ -79,7 +78,7 @@ func (c *denoTCPConn) LocalAddr() net.Addr { func (c *denoTCPConn) RemoteAddr() net.Addr { jsAddr := c.jsConn.Get("remoteAddr") - return &denoNetAddr{ + return &denoAddr{ jsAddr: jsAddr, } } @@ -106,8 +105,7 @@ func (c *denoTCPConn) SetWriteDeadline(t time.Time) error { func (c *denoTCPConn) Close() error { c.closeOnce.Do(func() { - c.jsConn.Call("close") - Await(c.jsConn.Call("closeWrite")) + Await(c.jsConn.Call("close")) }) return nil @@ -129,13 +127,7 @@ func NewDenoConn(ctx context.Context, network string, address string) (net.Conn, return nil, err } - connectOptions := map[string]interface{}{ - "transport": network, - "hostname": host, - "port": portInt, - } - - jsTCPConn, err := Await(js.Global().Call("connectWithDeadline", connectOptions)) + jsTCPConn, err := Await(js.Global().Call("dial", host, portInt)) if err != nil { return nil, err diff --git a/src/interop/promise.go b/src/interop/promise.go index 4d16565..88a8a57 100644 --- a/src/interop/promise.go +++ b/src/interop/promise.go @@ -7,12 +7,16 @@ import ( "syscall/js" ) -func NewPromise(executor func(resolve func(interface{}), reject func(error))) js.Value { +func NewPromise(executor func(_ func(interface{}), _ func(error))) js.Value { executorJsFunc := js.FuncOf(func(this js.Value, args []js.Value) interface{} { resolve := func(value interface{}) { args[0].Invoke(value) } - reject := func(reason error) { args[1].Invoke(reason.Error()) } + reject := func(reason error) { + err := js.Global().Call("Error", reason.Error()) + + args[1].Invoke(err) + } go executor(resolve, reject) defer func() { @@ -30,7 +34,14 @@ func NewPromise(executor func(resolve func(interface{}), reject func(error))) js } func defaultError(reason js.Value) error { - return errors.New(js.Global().Get("JSON").Call("stringify", reason).String()) + var text string + if stack := reason.Get("stack"); !stack.IsUndefined() { + text = stack.String() + } else { + text = js.Global().Get("JSON").Call("stringify", reason).String() + } + + return errors.New(text) } func Await(promiseLike js.Value) (js.Value, error) { @@ -38,20 +49,20 @@ func Await(promiseLike js.Value) (js.Value, error) { } func AwaitWithErrorMapping(promiseLike js.Value, errorFn func(js.Value) error) (js.Value, error) { - value := make(chan js.Value) - defer close(value) + fullfilledChannel := make(chan js.Value) + defer close(fullfilledChannel) - reason := make(chan js.Value) - defer close(reason) + rejectedChannel := make(chan js.Value) + defer close(rejectedChannel) onFulfilled := js.FuncOf(func(this js.Value, args []js.Value) interface{} { - value <- args[0] + fullfilledChannel <- args[0] return nil }) defer onFulfilled.Release() onRejected := js.FuncOf(func(this js.Value, args []js.Value) interface{} { - reason <- args[0] + rejectedChannel <- args[0] return nil }) defer onRejected.Release() @@ -59,9 +70,9 @@ func AwaitWithErrorMapping(promiseLike js.Value, errorFn func(js.Value) error) ( promiseLike.Call("then", onFulfilled, onRejected) select { - case v := <-value: + case v := <-fullfilledChannel: return v, nil - case r := <-reason: + case r := <-rejectedChannel: return js.Undefined(), errorFn(r) } diff --git a/src/kafkagosaur/dialer.go b/src/kafkagosaur/dialer.go index 44c0b43..2d00560 100644 --- a/src/kafkagosaur/dialer.go +++ b/src/kafkagosaur/dialer.go @@ -43,18 +43,20 @@ func (d *dialer) toJSObject() map[string]interface{} { func NewKafkaDialer(dialConfigJs js.Value) *kafka.Dialer { - kafkaDialer := &kafka.Dialer{ - DialFunc: interop.NewDenoConn, - } - saslMechanism, err := SASLMechanism(dialConfigJs) + if err != nil { + panic(err) + } + tls, err := TLSConfig(dialConfigJs) if err != nil { panic(err) } - if saslMechanism != nil { - kafkaDialer.SASLMechanism = saslMechanism + kafkaDialer := &kafka.Dialer{ + DialFunc: interop.NewDenoConn, + SASLMechanism: saslMechanism, + TLS: tls, } return kafkaDialer diff --git a/src/kafkagosaur/sasl.go b/src/kafkagosaur/sasl.go index 95330fa..0a1923a 100644 --- a/src/kafkagosaur/sasl.go +++ b/src/kafkagosaur/sasl.go @@ -10,35 +10,32 @@ import ( ) func SASLMechanism(config js.Value) (sasl.Mechanism, error) { - var mechanism sasl.Mechanism = nil - var err error = nil - if saslConfig := config.Get("sasl"); !saslConfig.IsUndefined() { username := saslConfig.Get("username").String() password := saslConfig.Get("password").String() switch saslConfig.Get("mechanism").String() { case "PLAIN": - mechanism = plain.Mechanism{ + return plain.Mechanism{ Username: username, Password: password, - } + }, nil case "SCRAM-SHA-512": - mechanism, err = scram.Mechanism( + return scram.Mechanism( scram.SHA512, username, password, ) case "SCRAM-SHA-256": - mechanism, err = scram.Mechanism( + return scram.Mechanism( scram.SHA256, username, password, ) default: - err = errors.New("unknown SASL mechanism") + return nil, errors.New("unknown SASL mechanism") } } - return mechanism, err + return nil, nil } diff --git a/src/kafkagosaur/tls.go b/src/kafkagosaur/tls.go new file mode 100644 index 0000000..56a2759 --- /dev/null +++ b/src/kafkagosaur/tls.go @@ -0,0 +1,63 @@ +package kafkagosaur + +import ( + "crypto/tls" + "crypto/x509" + + "errors" + "syscall/js" +) + +func TLSConfig(config js.Value) (*tls.Config, error) { + tlsJs := config.Get("tls") + tlsConfig := &tls.Config{} + + handleBoolean := func() (*tls.Config, error) { + if tlsJs.Bool() { + return tlsConfig, nil + } else { + return nil, nil + } + } + + handleObject := func() (*tls.Config, error) { + if keyPairJs := config.Get("keyPair"); !keyPairJs.IsUndefined() { + key := keyPairJs.Get("key").String() + cert := keyPairJs.Get("cert").String() + + keyPair, err := tls.X509KeyPair([]byte(key), []byte(cert)) + if err != nil { + return nil, err + } + tlsConfig.Certificates = []tls.Certificate{keyPair} + } + + if caJs := tlsJs.Get("ca"); !caJs.IsUndefined() { + caCertPool := x509.NewCertPool() + + if !caCertPool.AppendCertsFromPEM([]byte(caJs.String())) { + return nil, errors.New("CA certificate could not be parsed") + } else { + tlsConfig.RootCAs = caCertPool + return tlsConfig, nil + } + } + + if insecureSkipVerifyJs := tlsJs.Get("insecureSkipVerify"); !insecureSkipVerifyJs.IsUndefined() { + tlsConfig.InsecureSkipVerify = insecureSkipVerifyJs.Bool() + } + + return tlsConfig, nil + } + + if !tlsJs.IsUndefined() { + switch tlsJs.Type() { + case js.TypeObject: + return handleObject() + default: + return handleBoolean() + } + } + + return nil, nil +} diff --git a/src/kafkagosaur/writer.go b/src/kafkagosaur/writer.go index fb15b99..99388c6 100644 --- a/src/kafkagosaur/writer.go +++ b/src/kafkagosaur/writer.go @@ -2,7 +2,6 @@ package kafkagosaur import ( "context" - "log" "syscall/js" "time" @@ -64,18 +63,20 @@ func (w *writer) toJSObject() map[string]interface{} { var NewWriterJsFunc = js.FuncOf(func(this js.Value, args []js.Value) interface{} { writerConfig := args[0] - transport := &kafka.Transport{ - Dial: interop.NewDenoConn, - } - saslMechanism, err := SASLMechanism(writerConfig) + if err != nil { + panic(err) + } + tls, err := TLSConfig(writerConfig) if err != nil { panic(err) } - if saslMechanism != nil { - transport.SASL = saslMechanism + transport := &kafka.Transport{ + Dial: interop.NewDenoConn, + SASL: saslMechanism, + TLS: tls, } if jsIdleTimeout := writerConfig.Get("idleTimeout"); !jsIdleTimeout.IsUndefined() { @@ -83,8 +84,8 @@ var NewWriterJsFunc = js.FuncOf(func(this js.Value, args []js.Value) interface{} } kafkaWriter := kafka.Writer{ - Addr: kafka.TCP(writerConfig.Get("address").String()), - Logger: log.Default(), + Addr: kafka.TCP(writerConfig.Get("address").String()), + // Logger: log.Default(), Transport: transport, } diff --git a/test/deps.ts b/test/deps.ts index 4bcd8fc..0766309 100644 --- a/test/deps.ts +++ b/test/deps.ts @@ -1,4 +1,4 @@ export { assert, assertEquals, -} from "https://deno.land/std@0.116.0/testing/asserts.ts"; +} from "https://deno.land/std@0.125.0/testing/asserts.ts"; diff --git a/test/integration/setup.ts b/test/integration/setup.ts index f7c1be9..8c2a020 100644 --- a/test/integration/setup.ts +++ b/test/integration/setup.ts @@ -1,9 +1,10 @@ import { delay } from "../../deps.ts"; import KafkaGoSaur from "../../mod.ts"; import { KafkaWriter, KafkaWriterConfig } from "../../writer.ts"; -import { SASLMechanism } from "../../sasl.ts"; +import { SASLMechanism } from "../../security/sasl.ts"; const kafkaGoSaur = new KafkaGoSaur(); +// Ensure promise to instantiate wasm is awaited, so no async ops are leaked. await delay(50); const broker = "localhost:9092"; diff --git a/writer.ts b/writer.ts index 518794f..9f65c76 100644 --- a/writer.ts +++ b/writer.ts @@ -1,5 +1,6 @@ import { Header } from "./header.ts"; -import { SASLConfig } from "./sasl.ts"; +import { SASLConfig } from "./security/sasl.ts"; +import { TLSConfig } from "./security/tls.ts"; export type KafkaWriteMessage = { topic?: string; offset?: number; @@ -15,9 +16,10 @@ export type KafkaWriterConfig = { address: string; idleTimeout?: number; sasl?: SASLConfig; + tls?: TLSConfig; }; export interface KafkaWriter { - writeMessages: (msgs: KafkaWriteMessage[]) => Promise; - close: () => Promise; + writeMessages: (msgs: KafkaWriteMessage[]) => Promise; + close: () => Promise; }