Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor project #15

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
env:
commonjs: true
es2021: true
node: true

extends: eslint:recommended

parserOptions:
ecmaVersion: latest

rules:
linebreak-style:
- error
- unix
quotes:
- error
- single
semi:
- error
- always
18 changes: 18 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Test

on: push

jobs:
unit:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v3
with:
node-version: 20.x
- run: corepack enable
- run: yarn install --frozen-lockfile
- run: yarn lint
- run: docker compose up -d nsqd nsqlookupd
- run: yarn test
14 changes: 10 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
.PHONY: lint
lint:
@npx eslint \
--fix \
index.js \
examples \
lib \
test

.PHONY: test
test:
@./node_modules/.bin/mocha \
--require should \
@npx mocha \
--bail \
--timeout 20s \
test/unit/*.js \
test/acceptance/*.js

.PHONY: test
105 changes: 52 additions & 53 deletions Readme.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# @uphold/nsq.js

JavaScript NSQ client WIP.
JavaScript NSQ client WIP.

## Features

- actually written in js :p
- easier debugging via [debug()](https://github.com/visionmedia/debug) instrumentation
- native json message support
- does not arbitrarily apply backoff on requeues
- disabling of auto-RDY support for manual control (high throughput etc)
- reconnection to dead nsqd nodes
- graceful close support
- actually written in js :p
- easier debugging via [debug()](https://github.com/visionmedia/debug) instrumentation
- native json message support
- does not arbitrarily apply backoff on requeues
- disabling of auto-RDY support for manual control (high throughput etc)
- reconnection to dead nsqd nodes
- graceful close support

## Installation

Expand All @@ -22,9 +22,9 @@ $ npm install @uphold/nsq.js

### Debugging

The __DEBUG__ environment variable can be used to enable
traces within the module, for example all nsq debug() calls
except fo the framer:
The __DEBUG__ environment variable can be used to enable
traces within the module, for example all nsq debug() calls
except fo the framer:

```
$ DEBUG=nsq*,-nsq:framer node test
Expand All @@ -46,46 +46,44 @@ nsq:connection response OK +0ms

### Requeue backoff

The NSQD documentation recommends applying
backoff when requeueing implying that the
consumer is faulty, IMO this is a weird default,
and the opposite of what we need so it's not applied in
this client.
The NSQD documentation recommends applying
backoff when requeueing implying that the
consumer is faulty, IMO this is a weird default,
and the opposite of what we need so it's not applied in
this client.

## Example

```js
var nsq = require('@uphold/nsq.js');
const nsq = require('@uphold/nsq.js');

// subscribe

var reader = nsq.reader({
// Subscribe.
const reader = nsq.reader({
nsqd: [':4150'],
maxInFlight: 1,
maxAttempts: 5,
topic: 'events',
channel: 'ingestion'
});

reader.on('error', function(err){
reader.on('error', err => {
console.log(err.stack);
});

reader.on('message', function(msg){
var body = msg.body.toString();
reader.on('message', msg => {
const body = msg.body.toString();
console.log('%s attempts=%s', body, msg.attempts);
msg.requeue(2000);
});

reader.on('discard', function(msg){
var body = msg.body.toString();
reader.on('discard', msg => {
const body = msg.body.toString();
console.log('giving up on %s', body);
msg.finish();
});

// publish

var writer = nsq.writer(':4150');
// Publish.
const writer = nsq.writer();

writer.publish('events', 'foo');
writer.publish('events', 'bar');
Expand All @@ -96,7 +94,7 @@ writer.publish('events', 'baz');

### nsq.reader(options)

Create a reader:
Create a reader:

- `id` connection identifier *(see `client_id` in the [spec](http://nsq.io/clients/tcp_protocol_spec.html#identify))*
- `topic` topic name
Expand All @@ -117,63 +115,66 @@ Events:
- `discard` (msg) discarded message
- `error response` (err) response from nsq
- `error` (err)
- `subscribed` (topic) name of the subscribed topic

### reader#close([fn])

Gracefully close the reader's connection(s) and fire the optional [fn] when completed.

### reader#end([fn])

Close the reader's connection(s) and fire the optional [fn] when completed.

### nsq.writer([options|address])

Create a writer. By default a connection attempt to 0.0.0.0:4150 will be made unless one of the following options are provided:
Create a writer. By default a connection attempt to 0.0.0.0:4150 will be made unless
an address string is passed, or an object with the nsqd option:

- `port` number
- `host` name
- `nsqd` array of nsqd addresses
- `nsqlookupd` array of nsqlookupd addresses
- `nsqd` array of nsqd addresses
- `maxConnectionAttempts` max reconnection attempts [Infinity]

Events:

- `error response` (err) response from nsq
- `error` (err)
- `error response` (err) response from nsq
- `error` (err)

### writer#publish(topic, message, [fn])

Publish the given `message` to `topic` where `message`
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed. It will
wait for a connection to be established.
Publish the given `message` to `topic` where `message`
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed. It will
wait for a connection to be established.

### writer#close([fn])

Close the writer's connection(s) and fire the optional [fn] when completed.

## Message

A single message.
A single message.

### Message#finish()

Mark message as complete.
Mark message as complete.

### Message#requeue([delay])

Re-queue the message immediately, or with the
given `delay` in milliseconds, or a string such
as "5s", "10m" etc.
Re-queue the message immediately, or with the
given `delay` in milliseconds, or a string such
as "5s", "10m" etc.

### Message#touch()

Reset the message's timeout, increasing the length
of time before NSQD considers it timed out.
Reset the message's timeout, increasing the length
of time before NSQD considers it timed out.

### Message#json()

Return parsed JSON object.
Return parsed JSON object.

## Tracing

The following [jstrace](https://github.com/jstrace/jstrace) probes are available:
The following [jstrace](https://github.com/jstrace/jstrace) probes are available:

- `connection:ready` ready count sent
- `connection:message` message received
Expand All @@ -184,12 +185,10 @@ Close the writer's connection(s) and fire the optional [fn] when completed.
## Running tests

```
nsqd --lookupd-tcp-address=0.0.0.0:4160 &
nsqadmin --lookupd-http-address=0.0.0.0:4161 &
nsqlookupd &
docker-compose up -d nsqlookupd nsqd
make test
```

# License

MIT
MIT
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: '3.7'

services:
nsqd:
image: nsqio/nsq:v1.3.0
command: /nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- 4150:4150
- 4151:4151

nsqlookupd:
image: nsqio/nsq:v1.3.0
command: /nsqlookupd
ports:
- 4160:4160
- 4161:4161
34 changes: 18 additions & 16 deletions examples/bench.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
'use strict';

/**
* Module dependencies.
*/

var nsq = require('..');
const nsq = require('..');

// subscribe

var reader = nsq.reader({
// Subscribe.
const reader = nsq.reader({
nsqd: ['0.0.0.0:4150'],
topic: 'events',
channel: 'ingestion',
maxInFlight: 1000
});

var n = 0;
reader.on('message', function(msg){
process.stdout.write('\r ' + n++);
let n = 0;
const runtime = parseInt(process.argv[2], 10) || 10000;

reader.on('message', msg => {
process.stdout.write(`\r ${n++}`);
msg.finish();
});

// publish

var writer = nsq.writer({ port: 4150 });
// Publish.
const writer = nsq.writer();

function next() {
setImmediate(function(){
writer.publish('events', ['foo', 'bar', 'baz'], next);
});
writer.publish('events', ['foo', 'bar', 'baz'], next);
}

function finish() {
process.stdout.write(`\rTotal messages sent: ${n} (${n / runtime} msg/ms)`);
process.exit(0);
}

next();

setTimeout(function(){
process.exit(0);
}, 10000);
setTimeout(() => finish(), runtime);
Loading
Loading