Skip to content

Commit

Permalink
[fix] publisher connection drain didn't flush pending buffer after dr…
Browse files Browse the repository at this point in the history
…aining connections

[fix] tests were incorrectly flushing masking issue
  • Loading branch information
aricart committed Nov 16, 2020
1 parent 8a4ce89 commit bf67eb7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ jobs:
with:
deno-version: ${{ matrix.deno-version }}

- name: Set NATS Server Version
run: echo '::set-env name=NATS_VERSION::v2.1.7'
- name: Get nats-server
run: |
wget "https://github.com/aricart/wsgnatsd/releases/download/v0.8.4/nats-server-$NATS_VERSION-linux-amd64.zip" -O tmp.zip
wget "https://github.com/nats-io/nats-server/releases/download/v2.1.9/nats-server-v2.1.9-linux-amd64.zip" -O tmp.zip
unzip tmp.zip
mv nats-server-$NATS_VERSION-linux-amd64 nats-server
mv nats-server-v2.1.9-linux-amd64 nats-server
- name: Lint Deno Module
run: deno fmt --check
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
subs.forEach((sub: Subscription) => {
promises.push(sub.drain());
});
promises.push(this.flush(deferred()));
return Promise.all(promises)
.then(async () => {
this.noMorePublishing = true;
Expand Down
41 changes: 25 additions & 16 deletions tests/drain_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,12 @@ Deno.test("drain - connection drain", async () => {

const nc1 = await connect({ servers: u });
let first = true;
const closed = deferred<void>();
await nc1.subscribe(subj, {
callback: () => {
lock.unlock();
if (first) {
first = false;
nc1.drain()
.then(() => {
closed.resolve();
})
.catch((err) => {
fail(err);
});
nc1.drain();
}
},
queue: "q1",
Expand All @@ -80,10 +73,8 @@ Deno.test("drain - connection drain", async () => {
for (let i = 0; i < max; i++) {
nc2.publish(subj);
}
await nc2.flush();
await closed;
await nc2.drain();
await lock;
await nc2.close();
assert(count > 0, "expected second connection to get some messages");
});

Expand Down Expand Up @@ -169,12 +160,8 @@ Deno.test("drain - publisher drain", async () => {

for (let i = 0; i < 10000; i++) {
nc2.publish(subj);
// FIXME: this shouldn't be necessary
if (i % 1000 === 0) {
await nc2.flush();
}
}
await nc2.flush();
await nc2.drain();

await lock;

Expand Down Expand Up @@ -315,3 +302,25 @@ Deno.test("drain - multiple sub drain returns same promise", async () => {
await p1;
await nc.close();
});

Deno.test("drain - simple publisher connection drain", async () => {
const nc = await connect({ servers: u });

const subj = createInbox();
const lock = Lock(500);

nc.subscribe(subj, {
callback: (err, msg) => {
lock.unlock();
},
});
await nc.flush();

const nc1 = await connect({ servers: u });
for (let i = 0; i < 500; i++) {
nc1.publish(subj);
}
await nc1.drain();
await lock;
await nc.close();
});

0 comments on commit bf67eb7

Please sign in to comment.