Skip to content

Commit

Permalink
Add TLS support
Browse files Browse the repository at this point in the history
* Add TLS
* Cache GO build Dockerfile
* Add Node dialer
* Refactor TS security and connection modules
* Add benchmarking
* Improved GO error handling
  • Loading branch information
arjun-1 committed Feb 7, 2022
1 parent a50c392 commit 3fb7af9
Show file tree
Hide file tree
Showing 33 changed files with 569 additions and 155 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
deno-version: "~1.18"

- name: Setup Go
uses: actions/setup-go@v2.1.5
uses: actions/setup-go@v2
with:
go-version: "~1.17"

Expand All @@ -29,7 +29,7 @@ jobs:

- name: Build the WebAssembly binary
run: |
make build
make build-wasm
git diff --quiet --exit-code bin/kafkagosaur.wasm
- name: Start Docker containers
Expand All @@ -41,7 +41,7 @@ jobs:
- name: Write coverage report
run: deno coverage ./coverage --lcov > ./coverage/coverage.lcov

- name: Upload coverage to Codecov
- name: Upload coverage report
uses: codecov/codecov-action@v2
with:
files: ./coverage/coverage.lcov
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.vscode
coverage
.env
13 changes: 10 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
FROM golang:1.17.6-alpine3.15 AS build
# syntax=docker/dockerfile:1.3

COPY src /src
FROM golang:1.17-alpine AS build

COPY src/go.* /src/
WORKDIR /src

RUN GOOS=js GOARCH=wasm go build -o kafkagosaur.wasm
RUN go mod download
COPY src /src/

RUN --mount=type=cache,target=/root/.cache/go-build \
GOOS=js GOARCH=wasm go build -o kafkagosaur.wasm

FROM scratch AS export

COPY --from=build /src/kafkagosaur.wasm /
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
.PHONY: test

all: build
all: build-wasm

build:
docker build -o bin .
build-wasm:
DOCKER_BUILDKIT=1 docker build -o bin .

test:
deno test --allow-read --allow-net --coverage=coverage
deno test --allow-read --allow-net --unstable --coverage=coverage

docker-up:
docker-compose up -d
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ performance characteristics similar to the native code.
- [x] Writer
- [x] Reader
- [x] SASL
- [ ] TLS
- [x] TLS
- [x] TCP
- [ ] UDP
- [ ] Deno streams
Expand Down Expand Up @@ -70,13 +70,13 @@ make docker
To run the writer example

```bash
deno run --allow-read --allow-net examples/writer.ts
deno run --allow-read --allow-net --unstable examples/writer.ts
```

To run the reader example

```bash
deno run --allow-read --allow-net examples/reader.ts
deno run --allow-read --allow-net --unstable examples/reader.ts
```

## Documentation
Expand All @@ -91,7 +91,7 @@ API of kafka-go closely.
To build the WebAssemnbly module, first run

```bash
make build
make build-wasm
```

To run the tests, ensure first you have docker up and running. Then start the
Expand Down
14 changes: 14 additions & 0 deletions bench/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import "./deps.ts";

const getEnv = (key: string): string => {
const value = Deno.env.get(key);
if (value === undefined) throw Error(`Undefined environment variable ${key}`);
else return value;
};

const broker = getEnv("BROKER");
const topic = getEnv("TOPIC");
const username = getEnv("SASL_USERNAME");
const password = getEnv("SASL_PASSWORD");

export { broker, password, topic, username };
6 changes: 6 additions & 0 deletions bench/deps.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export {
bench,
runBenchmarks,
} from "https://deno.land/std@0.125.0/testing/bench.ts";

import "https://deno.land/x/dotenv@v3.2.0/load.ts";
47 changes: 47 additions & 0 deletions bench/reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import KafkaGoSaur from "../mod.ts";
import { SASLMechanism } from "../security/sasl.ts";
import { bench, runBenchmarks } from "./deps.ts";
import { broker, password, topic, username } from "./config.ts";

const nrOfMessages = 100000;

const readerConfig = {
brokers: [broker],
topic,
sasl: {
mechanism: SASLMechanism.PLAIN,
username,
password,
},
tls: {
insecureSkipVerify: true,
},
};

const kafkaGoSaur = new KafkaGoSaur();
const reader = await kafkaGoSaur.reader(readerConfig);

bench({
name: `readMessage#${nrOfMessages}`,
runs: 10,
async func(b): Promise<void> {
b.start();

for (let i = 0; i < nrOfMessages; i++) {
await reader.readMessage();
}

b.stop();
},
});

const benchmarkRunResults = await runBenchmarks();

console.log(benchmarkRunResults);
console.log(
`[kafkagosaur] readMessage msgs/s: ${
nrOfMessages / (benchmarkRunResults.results[0].measuredRunsAvgMs / 1000)
}`,
);

await reader.close();
57 changes: 57 additions & 0 deletions bench/writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import KafkaGoSaur from "../mod.ts";
import { SASLMechanism } from "../security/sasl.ts";
import { bench, runBenchmarks } from "./deps.ts";
import { broker, password, topic, username } from "./config.ts";

const nrOfMessages = 100000;
const msgBatchSize = 10000;
const msgSize = 1024;

const writerConfig = {
address: broker,
topic,
sasl: {
mechanism: SASLMechanism.PLAIN,
username,
password,
},
tls: {
insecureSkipVerify: true,
},
};

const kafkaGoSaur = new KafkaGoSaur();
const writer = await kafkaGoSaur.writer(writerConfig);

const value = new Uint8Array(msgSize);
crypto.getRandomValues(value);

bench({
name: `writeMessages#${nrOfMessages}`,
runs: 10,
async func(b): Promise<void> {
b.start();

for (let i = 0; i < nrOfMessages / msgBatchSize; i++) {
const msgs = [];

for (let j = 0; j < msgBatchSize; j++) {
msgs.push({ value });
}
await writer.writeMessages(msgs);
}

b.stop();
},
});

const benchmarkRunResults = await runBenchmarks();

console.log(benchmarkRunResults);
console.log(
`[kafkagosaur] writeMessages msgs/s: ${
nrOfMessages / (benchmarkRunResults.results[0].measuredRunsAvgMs / 1000)
}`,
);

await writer.close();
Binary file modified bin/kafkagosaur.wasm
Binary file not shown.
63 changes: 0 additions & 63 deletions connection-with-deadline.ts

This file was deleted.

9 changes: 7 additions & 2 deletions deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,10 @@ export {
deadline,
DeadlineError,
deferred,
} from "https://deno.land/std@0.116.0/async/mod.ts";
export { delay } from "https://deno.land/std@0.116.0/async/delay.ts";
} from "https://deno.land/std@0.125.0/async/mod.ts";
export {
connect,
createConnection,
Socket,
} from "https://deno.land/std@0.125.0/node/net.ts";
export { delay } from "https://deno.land/std@0.125.0/async/delay.ts";
4 changes: 3 additions & 1 deletion dialer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { SASLConfig } from "./sasl.ts";
import { SASLConfig } from "./security/sasl.ts";
import { TLSConfig } from "./security/tls.ts";

export type KafkaDialerConfig = {
sasl?: SASLConfig;
tls?: TLSConfig;
};

export interface KafkaConn {
Expand Down
6 changes: 3 additions & 3 deletions examples/reader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.1/mod.ts";
import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.1/sasl.ts";
import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.2/mod.ts";
import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.2/security/sasl.ts";

const broker = "localhost:9093";
const topic = "test-0";
Expand All @@ -21,6 +21,6 @@ const reader = await kafkaGoSaur.reader(config);
const dec = new TextDecoder();
const readMsg = await reader.readMessage();
const readValue = dec.decode(readMsg.value);
console.log(readValue);
console.log(`Read message ${readValue} from topic ${topic} at ${broker}`);

await reader.close();
5 changes: 3 additions & 2 deletions examples/writer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.1/mod.ts";
import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.1/sasl.ts";
import KafkaGoSaur from "https://deno.land/kafkagosaur@0.0.2/mod.ts";
import { SASLMechanism } from "https://deno.land/kafkagosaur@0.0.2/security/sasl.ts";

const broker = "localhost:9093";
const topic = "test-0";
Expand All @@ -22,4 +22,5 @@ const enc = new TextEncoder();
const msgs = [{ value: enc.encode("value0") }, { value: enc.encode("value1") }];

await writer.writeMessages(msgs);
console.log(`Wrote ${msgs.length} messages to topic ${topic} at ${broker}`);
await writer.close();
2 changes: 1 addition & 1 deletion global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ declare namespace global {

importObject: WebAssembly.Imports;

run(instance: WebAssembly.Instance): Promise<unknown>;
run(instance: WebAssembly.Instance): Promise<void>;
}
}
11 changes: 6 additions & 5 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// @deno-types="./global.d.ts"
import "./lib/wasm_exec.js";
import { deadline, DeadlineError, delay } from "./deps.ts";
import { setOnGlobal as setConnectWithDeadlineOnGlobal } from "./connection-with-deadline.ts";
import { Dial, setDialOnGlobal } from "./net/connection.ts";
import { dial as nodeDial } from "./net/node-connection.ts";
import { KafkaDialer, KafkaDialerConfig } from "./dialer.ts";
import { KafkaReader, KafkaReaderConfig } from "./reader.ts";
import { KafkaWriter, KafkaWriterConfig } from "./writer.ts";

const runGoWasm = async (wasmFilePath: string): Promise<unknown> => {
const runGoWasm = async (wasmFilePath: string): Promise<void> => {
const go = new global.Go();
const wasmBytes = await Deno.readFile(wasmFilePath);
const instiatedSource = await WebAssembly.instantiate(
Expand All @@ -24,7 +25,7 @@ const untilGloballyDefined = (
const maxDelayMs = 1000;

const loop = async (): Promise<unknown> => {
const value = (global as Record<string, unknown>)[key];
const value = (globalThis as Record<string, unknown>)[key];
if (value !== undefined) return Promise.resolve(value);
else {
await delay(backoffMs);
Expand All @@ -40,8 +41,8 @@ const untilGloballyDefined = (
};

class KafkaGoSaur {
constructor() {
setConnectWithDeadlineOnGlobal();
constructor(dial: Dial = nodeDial) {
setDialOnGlobal(dial);
runGoWasm("./bin/kafkagosaur.wasm");
}

Expand Down
Loading

0 comments on commit 3fb7af9

Please sign in to comment.