Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes related to JetStream errors being prematurely processed by the core request layer #288

Merged
merged 6 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.8.1" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.8.2" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
- name: Add hosts to /etc/hosts
Expand Down
2 changes: 2 additions & 0 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ export enum ErrorCode {
JetStreamInvalidAck = "JESTREAM_INVALID_ACK",
JetStream404NoMessages = "404",
JetStream408RequestTimeout = "408",
//@deprecated: use JetStream409
JetStream409MaxAckPendingExceeded = "409",
JetStream409 = "409",
JetStreamNotEnabled = "503",

// emitted by the server
Expand Down
22 changes: 17 additions & 5 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,26 @@ export class JetStreamClientImpl extends BaseApiClient
return pa;
}

async pull(stream: string, durable: string): Promise<JsMsg> {
async pull(stream: string, durable: string, expires = 0): Promise<JsMsg> {
validateStreamName(stream);
validateDurableName(durable);

let timeout = this.timeout;
if (expires > timeout) {
timeout = expires;
}

expires = expires < 0 ? 0 : nanos(expires);
const pullOpts: PullOptions = {
batch: 1,
no_wait: expires === 0,
expires,
};

const msg = await this.nc.request(
// FIXME: specify expires
`${this.prefix}.CONSUMER.MSG.NEXT.${stream}.${durable}`,
this.jc.encode({ no_wait: true, batch: 1, expires: 0 }),
{ noMux: true, timeout: this.timeout },
this.jc.encode(pullOpts),
{ noMux: true, timeout },
);
const err = checkJsError(msg);
if (err) {
Expand Down Expand Up @@ -732,7 +744,7 @@ function iterMsgAdapter(
switch (ne.code) {
case ErrorCode.JetStream404NoMessages:
case ErrorCode.JetStream408RequestTimeout:
case ErrorCode.JetStream409MaxAckPendingExceeded:
case ErrorCode.JetStream409:
return [null, null];
default:
return [ne, null];
Expand Down
20 changes: 17 additions & 3 deletions nats-base-client/jsutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ export function millis(ns: Nanos) {
}

export function isFlowControlMsg(msg: Msg): boolean {
if (msg.data.length > 0) {
return false;
}
const h = msg.headers;
if (!h) {
return false;
Expand All @@ -78,6 +81,10 @@ export function isHeartbeatMsg(msg: Msg): boolean {
}

export function checkJsError(msg: Msg): NatsError | null {
// JS error only if no payload - otherwise assume it is application data
if (msg.data.length !== 0) {
return null;
}
const h = msg.headers;
if (!h) {
return null;
Expand All @@ -94,10 +101,17 @@ export function checkJsErrorCode(
}
description = description.toLowerCase();
switch (code) {
case 404:
// 404 for jetstream will provide different messages ensure we
// keep whatever the server returned
return new NatsError(description, ErrorCode.JetStream404NoMessages);
case 408:
return NatsError.errorForCode(
ErrorCode.JetStream408RequestTimeout,
new Error(description),
return new NatsError(description, ErrorCode.JetStream408RequestTimeout);
case 409:
// the description can be exceeded max waiting or max ack pending
return new NatsError(
description,
ErrorCode.JetStream409,
);
case 503:
return NatsError.errorForCode(
Expand Down
11 changes: 3 additions & 8 deletions nats-base-client/msg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@ import { TD } from "./encoders.ts";
import { ErrorCode, NatsError } from "./error.ts";

export function isRequestError(msg: Msg): (NatsError | null) {
if (msg && msg.headers) {
// to consider an error from the server we expect no payload
if (msg && msg.data.length === 0 && msg.headers) {
const headers = msg.headers as MsgHdrsImpl;
if (headers.hasError) {
// only 503s are expected from core NATS (404/408/409s are JetStream)
if (headers.code === 503) {
return NatsError.errorForCode(ErrorCode.NoResponders);
} else {
let desc = headers.description;
if (desc === "") {
desc = ErrorCode.RequestError;
}
desc = desc.toLowerCase();
return new NatsError(desc, headers.status);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export interface JetStreamClient {
data?: Uint8Array,
options?: Partial<JetStreamPublishOptions>,
): Promise<PubAck>;
pull(stream: string, durable: string): Promise<JsMsg>;
pull(stream: string, durable: string, expires?: number): Promise<JsMsg>;
fetch(
stream: string,
durable: string,
Expand Down
97 changes: 95 additions & 2 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function callbackConsume(debug = false): JsMsgCallback {
if (err) {
switch (err.code) {
case ErrorCode.JetStream408RequestTimeout:
case ErrorCode.JetStream409MaxAckPendingExceeded:
case ErrorCode.JetStream409:
case ErrorCode.JetStream404NoMessages:
return;
default:
Expand Down Expand Up @@ -817,7 +817,7 @@ Deno.test("jetstream - pull sub - attached callback", async () => {
if (err) {
switch (err.code) {
case ErrorCode.JetStream408RequestTimeout:
case ErrorCode.JetStream409MaxAckPendingExceeded:
case ErrorCode.JetStream409:
case ErrorCode.JetStream404NoMessages:
return;
default:
Expand Down Expand Up @@ -3010,3 +3010,96 @@ Deno.test("jetstream - pull next", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - pull errors", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();

await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();

async function expectError(
expires: number,
code: ErrorCode,
) {
try {
await js.pull(stream, "me", expires);
} catch (err) {
assertEquals(err.code, code);
}
}

await expectError(0, ErrorCode.JetStream404NoMessages);
await expectError(1000, ErrorCode.JetStream408RequestTimeout);

await js.publish(subj);

// we expect a message
const a = await js.pull(stream, "me", 1000);
assertEquals(a.seq, 1);

await cleanup(ns, nc);
});

Deno.test("jetstream - pull error: max_waiting", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.8.2")) {
return;
}

const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();

await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_waiting: 1,
});
const js = nc.jetstream();

async function expectError(
expires: number,
code: ErrorCode,
): Promise<NatsError> {
const d = deferred<NatsError>();
try {
await js.pull(stream, "me", expires);
} catch (err) {
d.resolve(err);
assertEquals(err.code, code);
}
return d;
}
await Promise.all([
expectError(
3000,
ErrorCode.JetStream408RequestTimeout,
),
expectError(3000, ErrorCode.JetStream409),
]);

await cleanup(ns, nc);
});

Deno.test("jetstream - pull error: js not enabled", async () => {
const { ns, nc } = await setup();
const js = nc.jetstream();
async function expectError(code: ErrorCode, expires: number) {
const noMsgs = deferred<NatsError>();
try {
await js.pull("stream", "me", expires);
} catch (err) {
noMsgs.resolve(err);
}
const ne = await noMsgs;
assertEquals(ne.code, code);
}

await expectError(ErrorCode.JetStreamNotEnabled, 0);
await cleanup(ns, nc);
});