Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSM consumer update #229

Merged
merged 2 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.6.3" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.6.5" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
- name: Add hosts to /etc/hosts
Expand Down
11 changes: 11 additions & 0 deletions nats-base-client/jsmconsumer_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ConsumerConfig,
ConsumerInfo,
ConsumerListResponse,
ConsumerUpdateConfig,
CreateConsumerRequest,
JetStreamOptions,
Lister,
Expand Down Expand Up @@ -64,6 +65,16 @@ export class ConsumerAPIImpl extends BaseApiClient implements ConsumerAPI {
return r as ConsumerInfo;
}

async update(
stream: string,
durable: string,
cfg: ConsumerUpdateConfig,
): Promise<ConsumerInfo> {
const ci = await this.info(stream, durable);
const changable = cfg as ConsumerConfig;
return this.add(stream, Object.assign(ci.config, changable));
}

async info(stream: string, name: string): Promise<ConsumerInfo> {
validateStreamName(stream);
validateDurableName(name);
Expand Down
20 changes: 14 additions & 6 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ export interface Lister<T> {
export interface ConsumerAPI {
info(stream: string, consumer: string): Promise<ConsumerInfo>;
add(stream: string, cfg: Partial<ConsumerConfig>): Promise<ConsumerInfo>;
update(
stream: string,
durable: string,
cfg: ConsumerUpdateConfig,
): Promise<ConsumerInfo>;
delete(stream: string, consumer: string): Promise<boolean>;
list(stream: string): Lister<ConsumerInfo>;
}
Expand Down Expand Up @@ -757,25 +762,28 @@ export interface AccountLimits {
"max_consumers": number;
}

export interface ConsumerConfig {
description?: string;
export interface ConsumerConfig extends ConsumerUpdateConfig {
"ack_policy": AckPolicy;
"ack_wait"?: Nanos;
"deliver_policy": DeliverPolicy;
"deliver_subject"?: string;
"deliver_group"?: string;
"durable_name"?: string;
"filter_subject"?: string;
"flow_control"?: boolean; // send message with status of 100 and reply subject
"idle_heartbeat"?: Nanos; // send empty message when idle longer than this
"max_ack_pending"?: number;
"max_deliver"?: number;
"max_waiting"?: number;
"opt_start_seq"?: number;
"opt_start_time"?: string;
"rate_limit_bps"?: number;
"replay_policy": ReplayPolicy;
}

export interface ConsumerUpdateConfig {
description?: string;
"ack_wait"?: Nanos;
"max_deliver"?: number;
"sample_freq"?: string;
"max_ack_pending"?: number;
"max_waiting"?: number;
"headers_only"?: boolean;
}

Expand Down
38 changes: 37 additions & 1 deletion tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
ErrorCode,
headers,
JSONCodec,
nanos,
NatsError,
nuid,
StreamConfig,
Expand All @@ -41,7 +42,7 @@ import {
setup,
} from "./jstest_util.ts";
import { connect } from "../src/mod.ts";
import { assertThrowsAsyncErrorCode, NatsServer } from "./helpers/mod.ts";
import { assertThrowsAsyncErrorCode, notCompatible } from "./helpers/mod.ts";
import { validateName } from "../nats-base-client/jsutil.ts";

const StreamNameRequired = "stream name required";
Expand Down Expand Up @@ -912,3 +913,38 @@ Deno.test("jsm - jetstream error info", async () => {
}
await cleanup(ns, nc);
});

Deno.test("jsm - update consumer", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.6.4")) {
return;
}
const { stream } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: "dur",
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(2000),
max_ack_pending: 500,
headers_only: false,
max_deliver: 100,
});

// update is simply syntatic sugar for add providing a type to
// help the IDE show editable properties - server will still
// reject options it doesn't deem editable
const ci = await jsm.consumers.update(stream, "dur", {
ack_wait: nanos(3000),
max_ack_pending: 5,
headers_only: true,
max_deliver: 2,
});

assertEquals(ci.config.ack_wait, nanos(3000));
assertEquals(ci.config.max_ack_pending, 5);
assertEquals(ci.config.headers_only, true);
assertEquals(ci.config.max_deliver, 2);

await cleanup(ns, nc);
});