Skip to content

Commit

Permalink
Fixes related to JetStream errors being prematurely processed by the …
Browse files Browse the repository at this point in the history
…core request layer (#288)

- [FIX] the base client request introspected a message, and if a status code was set, it turned that into an error. The logic for this was not entirely correct one requirement is for the message payload of this type of error to be empty and NATS core currently can only expect an error of 503 (no responders). Any interpretation of the status code unless a 503, should be delegated - in this case to JetStream which can then make a better interpretation of the code (which could simply be a marker for no messages or a request timeout on requests that have an expiration.

- [CHANGE] `js.pull(stream,dur)` now also adds an optional `expires` argument - the expires also overrides the request timeout default if the expires is longer.

- [CHANGE] [DEPRECATION] 409's take the description sent by the server - added a `ErrorCodeJetStream409` enum, and deprecated `ErrorCode.JetStream409MaxAckPendingExceeded`
- [UPDATE] ci nats-server version to 2.8.2

FIX #273
  • Loading branch information
aricart authored May 4, 2022
1 parent 2c61473 commit 9f67075
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo "NATS_VERSION=v2.8.1" >> $GITHUB_ENV
run: echo "NATS_VERSION=v2.8.2" >> $GITHUB_ENV

# this here because dns seems to be wedged on gha
- name: Add hosts to /etc/hosts
Expand Down
2 changes: 2 additions & 0 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ export enum ErrorCode {
JetStreamInvalidAck = "JESTREAM_INVALID_ACK",
JetStream404NoMessages = "404",
JetStream408RequestTimeout = "408",
//@deprecated: use JetStream409
JetStream409MaxAckPendingExceeded = "409",
JetStream409 = "409",
JetStreamNotEnabled = "503",

// emitted by the server
Expand Down
22 changes: 17 additions & 5 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,26 @@ export class JetStreamClientImpl extends BaseApiClient
return pa;
}

async pull(stream: string, durable: string): Promise<JsMsg> {
async pull(stream: string, durable: string, expires = 0): Promise<JsMsg> {
validateStreamName(stream);
validateDurableName(durable);

let timeout = this.timeout;
if (expires > timeout) {
timeout = expires;
}

expires = expires < 0 ? 0 : nanos(expires);
const pullOpts: PullOptions = {
batch: 1,
no_wait: expires === 0,
expires,
};

const msg = await this.nc.request(
// FIXME: specify expires
`${this.prefix}.CONSUMER.MSG.NEXT.${stream}.${durable}`,
this.jc.encode({ no_wait: true, batch: 1, expires: 0 }),
{ noMux: true, timeout: this.timeout },
this.jc.encode(pullOpts),
{ noMux: true, timeout },
);
const err = checkJsError(msg);
if (err) {
Expand Down Expand Up @@ -732,7 +744,7 @@ function iterMsgAdapter(
switch (ne.code) {
case ErrorCode.JetStream404NoMessages:
case ErrorCode.JetStream408RequestTimeout:
case ErrorCode.JetStream409MaxAckPendingExceeded:
case ErrorCode.JetStream409:
return [null, null];
default:
return [ne, null];
Expand Down
20 changes: 17 additions & 3 deletions nats-base-client/jsutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ export function millis(ns: Nanos) {
}

export function isFlowControlMsg(msg: Msg): boolean {
if (msg.data.length > 0) {
return false;
}
const h = msg.headers;
if (!h) {
return false;
Expand All @@ -78,6 +81,10 @@ export function isHeartbeatMsg(msg: Msg): boolean {
}

export function checkJsError(msg: Msg): NatsError | null {
// JS error only if no payload - otherwise assume it is application data
if (msg.data.length !== 0) {
return null;
}
const h = msg.headers;
if (!h) {
return null;
Expand All @@ -94,10 +101,17 @@ export function checkJsErrorCode(
}
description = description.toLowerCase();
switch (code) {
case 404:
// 404 for jetstream will provide different messages ensure we
// keep whatever the server returned
return new NatsError(description, ErrorCode.JetStream404NoMessages);
case 408:
return NatsError.errorForCode(
ErrorCode.JetStream408RequestTimeout,
new Error(description),
return new NatsError(description, ErrorCode.JetStream408RequestTimeout);
case 409:
// the description can be exceeded max waiting or max ack pending
return new NatsError(
description,
ErrorCode.JetStream409,
);
case 503:
return NatsError.errorForCode(
Expand Down
11 changes: 3 additions & 8 deletions nats-base-client/msg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@ import { TD } from "./encoders.ts";
import { ErrorCode, NatsError } from "./error.ts";

export function isRequestError(msg: Msg): (NatsError | null) {
if (msg && msg.headers) {
// to consider an error from the server we expect no payload
if (msg && msg.data.length === 0 && msg.headers) {
const headers = msg.headers as MsgHdrsImpl;
if (headers.hasError) {
// only 503s are expected from core NATS (404/408/409s are JetStream)
if (headers.code === 503) {
return NatsError.errorForCode(ErrorCode.NoResponders);
} else {
let desc = headers.description;
if (desc === "") {
desc = ErrorCode.RequestError;
}
desc = desc.toLowerCase();
return new NatsError(desc, headers.status);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export interface JetStreamClient {
data?: Uint8Array,
options?: Partial<JetStreamPublishOptions>,
): Promise<PubAck>;
pull(stream: string, durable: string): Promise<JsMsg>;
pull(stream: string, durable: string, expires?: number): Promise<JsMsg>;
fetch(
stream: string,
durable: string,
Expand Down
97 changes: 95 additions & 2 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function callbackConsume(debug = false): JsMsgCallback {
if (err) {
switch (err.code) {
case ErrorCode.JetStream408RequestTimeout:
case ErrorCode.JetStream409MaxAckPendingExceeded:
case ErrorCode.JetStream409:
case ErrorCode.JetStream404NoMessages:
return;
default:
Expand Down Expand Up @@ -817,7 +817,7 @@ Deno.test("jetstream - pull sub - attached callback", async () => {
if (err) {
switch (err.code) {
case ErrorCode.JetStream408RequestTimeout:
case ErrorCode.JetStream409MaxAckPendingExceeded:
case ErrorCode.JetStream409:
case ErrorCode.JetStream404NoMessages:
return;
default:
Expand Down Expand Up @@ -3010,3 +3010,96 @@ Deno.test("jetstream - pull next", async () => {

await cleanup(ns, nc);
});

Deno.test("jetstream - pull errors", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();

await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();

async function expectError(
expires: number,
code: ErrorCode,
) {
try {
await js.pull(stream, "me", expires);
} catch (err) {
assertEquals(err.code, code);
}
}

await expectError(0, ErrorCode.JetStream404NoMessages);
await expectError(1000, ErrorCode.JetStream408RequestTimeout);

await js.publish(subj);

// we expect a message
const a = await js.pull(stream, "me", 1000);
assertEquals(a.seq, 1);

await cleanup(ns, nc);
});

Deno.test("jetstream - pull error: max_waiting", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
if (await notCompatible(ns, nc, "2.8.2")) {
return;
}

const { stream } = await initStream(nc);
const jsm = await nc.jetstreamManager();

await jsm.consumers.add(stream, {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
max_waiting: 1,
});
const js = nc.jetstream();

async function expectError(
expires: number,
code: ErrorCode,
): Promise<NatsError> {
const d = deferred<NatsError>();
try {
await js.pull(stream, "me", expires);
} catch (err) {
d.resolve(err);
assertEquals(err.code, code);
}
return d;
}
await Promise.all([
expectError(
3000,
ErrorCode.JetStream408RequestTimeout,
),
expectError(3000, ErrorCode.JetStream409),
]);

await cleanup(ns, nc);
});

Deno.test("jetstream - pull error: js not enabled", async () => {
const { ns, nc } = await setup();
const js = nc.jetstream();
async function expectError(code: ErrorCode, expires: number) {
const noMsgs = deferred<NatsError>();
try {
await js.pull("stream", "me", expires);
} catch (err) {
noMsgs.resolve(err);
}
const ne = await noMsgs;
assertEquals(ne.code, code);
}

await expectError(ErrorCode.JetStreamNotEnabled, 0);
await cleanup(ns, nc);
});

0 comments on commit 9f67075

Please sign in to comment.