Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NatsError code is concat of code + message? #273

Closed
byu opened this issue Mar 16, 2022 · 6 comments · Fixed by #288
Closed

NatsError code is concat of code + message? #273

byu opened this issue Mar 16, 2022 · 6 comments · Fixed by #288
Assignees

Comments

@byu
Copy link

byu commented Mar 16, 2022

Context

I have a pull consumer, to which I am calling single js.pull() s. From which I am attempting to catch the 404, 408 and 409 Jetstream errors to then "sleep" when no messages come back.

Issue

Examining err.code does not match any values in the ErrorCode enum.
Instead, the code field looks to be a concatenation of the enum code and a text message.

Example:

I'm expecting 404 as the nats.ErrorCode.JetStream404NoMessages code value, but i'm getting 404 No Messages ...

Sample Code

  import { nats } from "./deps.ts";

  ... [omitted class code]...

  async pull(): Promise<nats.JsMsg> {
    while (true) {
      try {
        console.log("awaiting");
        const msg = await this.js.pull(this.streamName, this.consumerName);
        return msg;
      } catch (err) {
        if (
          err.code == nats.ErrorCode.Timeout ||
          err.code == nats.ErrorCode.JetStream404NoMessages ||
          err.code == nats.ErrorCode.JetStream408RequestTimeout ||
          err.code == nats.ErrorCode.JetStream409MaxAckPendingExceeded
        ) {
          // Sleep Pause
          console.log(
            `JSPULL: ${nats.ErrorCode}; Sleep ${this.pollWaitInterval} ms`,
          );
          await new Promise((resolve) =>
            setTimeout(resolve, this.pollWaitInterval)
          );
        } else {
          console.log(`throwing??? ${err.code}`);
          throw err;
        }
      }
    }
  }

Environment

deps.ts

export * as nats from "https://deno.land/x/nats@v1.6.1/src/mod.ts";
export * as oak from "https://deno.land/x/oak@v10.4.0/mod.ts";
export * as prom from "https://deno.land/x/ts_prometheus@v0.3.0/mod.ts";

Dockerfile

FROM denoland/deno:1.19.3

WORKDIR /app

USER deno

ADD . .
RUN deno cache --unstable main.ts

EXPOSE 9091

CMD ["run", "--allow-net", "--allow-env", "--unstable", "main.ts"]

Initialization:


// We init the connection options, and force infinite reconnect attempts
const copts = { servers: NATS_SERVERS } as nats.ConnectionOptions;
copts.inboxPrefix = NATS_INBOX_PREFIX;
copts.debug = NATS_DEBUG == "true" ? true : false;
copts.maxReconnectAttempts = -1;
copts.reconnect = true;

// We use the nkey+jwt credentials data if available
if (NATS_CREDENTIALS) {
  copts.authenticator = nats.credsAuthenticator(
    new TextEncoder().encode(NATS_CREDENTIALS),
  );
}

// Connect to the NATS server. On failure to first connect,
// this will throw and error and stop the deno vm.
const nc = await nats.connect(copts);
const jsopts = {} as nats.JetStreamOptions;
jsopts.timeout = 10000;
const js = nc.jetstream(jsopts);
@aricart
Copy link
Member

aricart commented May 2, 2022

The issue is that for the 404, the server is setting the 404 as a status code, which gets parsed earlier by the request handling, rather than at the JetStream API handling. - The JavaScript clients are looking for any status code and failing the request if set.

nats-io/nats-server#3093

aricart added a commit that referenced this issue May 3, 2022
… code was set, it turned that into an error. The logic for this was not quite correct one requirement is for the message payload of this type of error be empty and NATS core currently can only expect error of 503 (no responders). Any interpretation of the status code unless a 503, should be delegated - in this case to JetStream which can then make better interpretation of the code (which could simply be a marker for no messages, or a request timeout on requests that have an expiration.

[CHANGE] js.pull(stream,dur) now also adds an optional expires - the expires also overrides the request timeout.

FIX #273
@aricart
Copy link
Member

aricart commented May 3, 2022

Your code is almost correct - ErrorCode.JetStream409MaxAckPendingExceeded will never be a possibility for a pull consumer - to control the max number of pulls you use the consumer option max_waiting instead. Note that currently there's an issue in the server that will make this behave improperly.

Also - in looking at your code, I have also updated pull(stream, durable, millis) to have an optional millis - this allows you to wait for messages up to that amount of time. This simplifies your code, as you don't have to sleep or have other timers, as the request will remain open for up to the specified amount of time and resolve to a message or fail. So you can re-request as soon as possible.

@aricart aricart self-assigned this May 3, 2022
@byu
Copy link
Author

byu commented May 4, 2022

I have also updated pull(stream, durable, millis) to have an optional millis

So the following would be the correct simplification of the poll loop?

And, if I'm reading the commit correctly: eb836b8 ,
Error ErrorCode.JetStream404NoMessages is thrown for no messages when millis is 0, and ErrorCode.JetStream408RequestTimeout is thrown when millis is set to greater than 0?

  import { nats } from "./deps.ts";

  ... [omitted class code]...

  async pull(): Promise<nats.JsMsg> {
    while (true) {
      try {
        console.log("awaiting");
        const msg = await this.js.pull(this.streamName, this.consumerName, this.waitIntervalMs);
        return msg;
      } catch (err) {
        if (
          err.code == nats.ErrorCode.JetStream404NoMessages ||
          err.code == nats.ErrorCode.JetStream408RequestTimeout) {
           /// No-Op
        } else {
          console.log(`throwing??? ${err.code}`);
          throw err;
        }
      }
    }
  }

If the above is correct, it makes my use-case easier...

And I'm assuming that eb836b8 fix all cases of code matching ErrorCode enum values, in addition to the specific error case this issue initially found?

thanks

@aricart
Copy link
Member

aricart commented May 4, 2022

correct, previously pull() didn't specify an expires - this is part of the PullOptions for pull subscriptions:

https://github.com/nats-io/nats.deno/blob/main/nats-base-client/types.ts#L247

Now the pull() api will expose the expires, pull(stream: string, durable: string, expires?: number): Promise<JsMsg>;

If you don't specify an expires, no_wait will be set to true instructing the server to return a 404 - no messages if no messages are available.

If you specify an expires, no_wait will be set to false, and the request will remain on the server until that amount of millis. If a message arrives before the expires, you'll get the message - so effectively the request will remain open for that amount of time, and resulting in a 408 being returned - Note that it is still somewhat possible for a Timeout to happen (this is locally the client issuing the timeout), which is effectively a 408 (client didn't get a response in the allowed time).

If you find yourself doing this more frequently - you may want to move to a pull subscription. - There's some overhead on setting up thepull() for each call you are doing - it sets a custom subscription, but it is by far the simplest.

If you want to move to a pull subscription, your code would look something like this:

let inbox = "";
let sub: JetStreamPullSubscription;
const pullOpts = { batch: 1, expires: 30 * 1000 };
const opts = consumerOpts();
opts.bind(ci.stream, ci.durable);
opts.ackExplicit();
opts.manualAck();
opts.callback((err, m) => {
  if (err) {
    switch (err.code) {
      case ErrorCode.JetStream404NoMessages:
      case ErrorCode.JetStream408RequestTimeout:
        // re-pull here
        sub.pull(pullOpts);
        break;
      default:
        // this is a real error
        log.error(`error from ${ci.name}: ${err.message}\n${err.stack}`);
        // this will make the service stop
        sub.unsubscribe();
    }
    return;
  }
  // do something with your message
  // next acks the message and requests the next batch
  m.next(inbox, pullOpts);
});

const js = nc.jetstream();
sub = await js.pullSubscribe(
  ci.subject,
  opts,
);
const done = sub.closed;
inbox = sub.getSubject();
// do the initial pull
sub.pull(pullOpts);

@aricart
Copy link
Member

aricart commented May 4, 2022

The above snippet is fairly server friendly you have a single subscription that is setup to handle all the polling for messages on the next()

@aricart
Copy link
Member

aricart commented May 4, 2022

(note next(), and the new pull options are not yet released, but I'll do a release very soon in the next few days)

aricart added a commit that referenced this issue May 4, 2022
…core request layer (#288)

- [FIX] the base client request introspected a message, and if a status code was set, it turned that into an error. The logic for this was not entirely correct one requirement is for the message payload of this type of error to be empty and NATS core currently can only expect an error of 503 (no responders). Any interpretation of the status code unless a 503, should be delegated - in this case to JetStream which can then make a better interpretation of the code (which could simply be a marker for no messages or a request timeout on requests that have an expiration.

- [CHANGE] `js.pull(stream,dur)` now also adds an optional `expires` argument - the expires also overrides the request timeout default if the expires is longer.

- [CHANGE] [DEPRECATION] 409's take the description sent by the server - added a `ErrorCodeJetStream409` enum, and deprecated `ErrorCode.JetStream409MaxAckPendingExceeded`
- [UPDATE] ci nats-server version to 2.8.2

FIX #273
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants