Skip to content

Commit

Permalink
Rename Master to Leader (#323)
Browse files Browse the repository at this point in the history
* Builds but test cases fail due to redis connection error?

* Added ECONNREFUSED match case in 'can provide an error if connection failed' test cases

Co-authored-by: Charlie Roth <charlieroth4@icloud.com>
  • Loading branch information
evantahler and charlieroth authored Feb 20, 2020
1 parent c3e90ec commit fd436ef
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 80 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ async function boot() {
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("master", () => {
console.log("scheduler became master");
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", error => {
console.log(`scheduler error >> ${error}`);
Expand Down Expand Up @@ -379,7 +379,8 @@ If you know the name of a worker that should be removed, you can also call `awai

You may want to use node-resque to schedule jobs every minute/hour/day, like a distributed CRON system. There are a number of excellent node packages to help you with this, like [node-schedule](https://github.com/tejasmanohar/node-schedule) and [node-cron](https://github.com/ncb000gt/node-cron). Node-resque makes it possible for you to use the package of your choice to schedule jobs with.

Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduling the jobs. To help you with this, you can inspect which of the scheduler processes is currently acting as master, and flag only the master scheduler process to run the schedule. A full example can be found at [/examples/scheduledJobs.ts](https://github.com/actionhero/node-resque/blob/master/examples/scheduledJobs.ts), but the relevant section is:

Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduling the jobs. To help you with this, you can inspect which of the scheduler processes is currently acting as leader, and flag only the master scheduler process to run the schedule. A full example can be found at [/examples/scheduledJobs.ts](https://github.com/actionhero/node-resque/blob/master/examples/scheduledJobs.ts), but the relevant section is:

```javascript
const NodeResque = require("node-resque");
Expand All @@ -393,7 +394,7 @@ schedule.scheduleJob("10,20,30,40,50 * * * * *", async () => {
// do this job every 10 seconds, CRON style
// we want to ensure that only one instance of this job is scheduled in our environment at once,
// no matter how many schedulers we have running
if (scheduler.master) {
if (scheduler.leader) {
console.log(">>> enqueuing a job");
await queue.enqueue("time", "ticktock", new Date().toString());
}
Expand Down
4 changes: 2 additions & 2 deletions __tests__/core/connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Ioredis from "ioredis";
import specHelper from "../utils/specHelper";
import { Connection } from "../../src/index";
import specHelper from "../utils/specHelper";

describe("connection", () => {
beforeAll(async () => {
Expand Down Expand Up @@ -31,7 +31,7 @@ describe("connection", () => {
connection.connect();

connection.on("error", error => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT/);
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
connection.end();
resolve();
});
Expand Down
4 changes: 2 additions & 2 deletions __tests__/core/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import specHelper from "../utils/specHelper";
import { Queue, Worker } from "../../src/index";
import specHelper from "../utils/specHelper";
let queue;

describe("queue", () => {
Expand Down Expand Up @@ -37,7 +37,7 @@ describe("queue", () => {
queue.connect();

queue.on("error", error => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT/);
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
queue.end();
resolve();
});
Expand Down
18 changes: 9 additions & 9 deletions __tests__/core/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Queue, Scheduler, Worker } from "../../src/index";
import specHelper from "../utils/specHelper";
import { Scheduler, Queue, Worker } from "../../src/index";

let scheduler;
let queue;
Expand Down Expand Up @@ -42,12 +42,12 @@ describe("scheduler", () => {
brokenScheduler.on("poll", () => {
throw new Error("Should not emit poll");
});
brokenScheduler.on("master", () => {
throw new Error("Should not emit master");
brokenScheduler.on("leader", () => {
throw new Error("Should not emit leader");
});

brokenScheduler.on("error", async error => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT/);
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
await brokenScheduler.end();
done();
});
Expand All @@ -65,7 +65,7 @@ describe("scheduler", () => {
await specHelper.cleanup();
});

test("should only have one master, and can failover", async () => {
test("should only have one leader, and can failover", async () => {
const shedulerOne = new Scheduler({
connection: specHelper.connectionDetails,
name: "scheduler_1",
Expand All @@ -85,15 +85,15 @@ describe("scheduler", () => {
await new Promise(resolve => {
setTimeout(resolve, specHelper.timeout * 2);
});
expect(shedulerOne.master).toBe(true);
expect(shedulerTwo.master).toBe(false);
expect(shedulerOne.leader).toBe(true);
expect(shedulerTwo.leader).toBe(false);
await shedulerOne.end();

await new Promise(resolve => {
setTimeout(resolve, specHelper.timeout * 2);
});
expect(shedulerOne.master).toBe(false);
expect(shedulerTwo.master).toBe(true);
expect(shedulerOne.leader).toBe(false);
expect(shedulerTwo.leader).toBe(true);
await shedulerTwo.end();
});
});
Expand Down
4 changes: 2 additions & 2 deletions __tests__/core/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Queue, Worker } from "../../src/index";
import specHelper from "../utils/specHelper";
import { Worker, Queue } from "../../src/index";

const jobs = {
add: {
Expand Down Expand Up @@ -79,7 +79,7 @@ describe("worker", () => {
);

worker.on("error", async error => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT/);
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
await worker.end();
done();
});
Expand Down
4 changes: 2 additions & 2 deletions __tests__/utils/specHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const queue = 'test_queue'
const pkg = 'ioredis'
const NodeResque = require('../../src/index')

const SpecHeloer = {
const SpecHelper = {
pkg: pkg,
namespace: namespace,
queue: queue,
Expand Down Expand Up @@ -89,4 +89,4 @@ const SpecHeloer = {
}
}

export default SpecHeloer
export default SpecHelper
2 changes: 1 addition & 1 deletion examples/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ node-resque-producer_1 | adding jobs @ Mon Feb 17 2020 04:12:44 GMT+0000 (Coord
node-resque-producer_1 | adding jobs @ Mon Feb 17 2020 04:12:45 GMT+0000 (Coordinated Universal Time)
node-resque-producer_1 | adding jobs @ Mon Feb 17 2020 04:12:46 GMT+0000 (Coordinated Universal Time)
node-resque-worker_1 | worker check in @ 1581912766
node-resque-worker_1 | scheduler became master
node-resque-worker_1 | scheduler became leader
node-resque-worker_1 | scheduler polling
node-resque-worker_1 | scheduler working timestamp 1581912700
node-resque-worker_1 | scheduler enqueuing job 1581912700 >> {"class":"subtract","queue":"math","args":[2,1]}
Expand Down
4 changes: 2 additions & 2 deletions examples/docker/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ async function boot() {
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("master", () => {
console.log("scheduler became master");
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", error => {
console.log(`scheduler error >> ${error}`);
Expand Down
6 changes: 3 additions & 3 deletions examples/example.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env ts-node

import { Worker, Scheduler, Queue } from "./../src/index";
import { Queue, Scheduler, Worker } from "./../src/index";
// In your projects: import { Worker, Scheduler, Queue } from "node-resque";

async function boot() {
Expand Down Expand Up @@ -137,8 +137,8 @@ async function boot() {
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("master", () => {
console.log("scheduler became master");
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", error => {
console.log(`scheduler error >> ${error}`);
Expand Down
6 changes: 3 additions & 3 deletions examples/retry.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env ts-node

import { Worker, Scheduler, Queue } from "./../src/index";
import { Queue, Scheduler, Worker } from "./../src/index";
// In your projects: import { Worker, Scheduler, Queue } from "node-resque";

// ////////////////////////
Expand Down Expand Up @@ -109,8 +109,8 @@ async function boot() {
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("master", () => {
console.log("scheduler became master");
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", error => {
console.log(`scheduler error >> ${error}`);
Expand Down
9 changes: 4 additions & 5 deletions examples/scheduledJobs.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#!/usr/bin/env ts-node

import { Scheduler, Queue, Worker } from "./../src/index";
// In your projects: import { Worker, Scheduler, Queue } from "node-resque";

// We'll use https://github.com/tejasmanohar/node-schedule for this example,
// but there are many other excelent node scheduling projects
import * as schedule from "node-schedule";
import { Queue, Scheduler, Worker } from "./../src/index";

// ////////////////////////
// SET UP THE CONNECTION //
Expand Down Expand Up @@ -97,8 +96,8 @@ async function boot() {
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("master", () => {
console.log("scheduler became master");
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", error => {
console.log(`scheduler error >> ${error}`);
Expand All @@ -123,7 +122,7 @@ async function boot() {
// do this job every 10 seconds, cron style
// we want to ensure that only one instance of this job is scheduled in our enviornment at once,
// no matter how many schedulers we have running
if (scheduler.master) {
if (scheduler.leader) {
console.log(">>> enquing a job");
await queue.enqueue("time", "ticktock", [new Date().toString()]);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/stuckWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ async function boot() {
scheduler.on("poll", () => {
console.log("scheduler polling");
});
scheduler.on("master", () => {
console.log("scheduler became master");
scheduler.on("leader", () => {
console.log("scheduler became leader");
});
scheduler.on("error", error => {
console.log(`scheduler error >> ${error}`);
Expand Down
8 changes: 4 additions & 4 deletions src/core/queue.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as os from "os";
import { EventEmitter } from "events";
import * as os from "os";
import { ErrorPayload } from "../types/errorPayload";
import { Jobs } from "../types/jobs";
import { ConnectionOptions } from "../types/options";
import { Connection } from "./connection";
import { RunPlugins } from "./pluginRunner";
import { ConnectionOptions } from "../types/options";
import { Jobs } from "../types/jobs";
import { ErrorPayload } from "../types/errorPayload";

function arrayify(o) {
if (Array.isArray(o)) {
Expand Down
Loading

0 comments on commit fd436ef

Please sign in to comment.