Skip to content

Commit

Permalink
feat: support single process mode (#3430)
Browse files Browse the repository at this point in the history
  • Loading branch information
dead-horse authored Feb 3, 2019
1 parent 1336169 commit 20ba463
Show file tree
Hide file tree
Showing 14 changed files with 525 additions and 51 deletions.
6 changes: 6 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
*/
exports.startCluster = require('egg-cluster').startCluster;

/**
* Start egg application with single process mode
* @since 1.0.0
*/
exports.start = require('./lib/start');

/**
* @member {Application} Egg#Application
* @since 1.0.0
Expand Down
10 changes: 10 additions & 0 deletions lib/core/messenger/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
'use strict';

const LocalMessenger = require('./local');
const IPCMessenger = require('./ipc');

exports.create = egg => {
return egg.options.mode === 'single'
? new LocalMessenger(egg)
: new IPCMessenger(egg);
};
2 changes: 1 addition & 1 deletion lib/core/messenger.js → lib/core/messenger/ipc.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const debug = require('debug')('egg:util:messenger');
const debug = require('debug')('egg:util:messenger:ipc');
const is = require('is-type-of');
const sendmessage = require('sendmessage');
const EventEmitter = require('events');
Expand Down
139 changes: 139 additions & 0 deletions lib/core/messenger/local.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
'use strict';

const debug = require('debug')('egg:util:messenger:local');
const is = require('is-type-of');
const EventEmitter = require('events');

/**
* Communication between app worker and agent worker with EventEmitter
*/
class Messenger extends EventEmitter {

constructor(egg) {
super();
this.egg = egg;
}

/**
* Send message to all agent and app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
broadcast(action, data) {
debug('[%s] broadcast %s with %j', this.pid, action, data);
this.send(action, data, 'both');
return this;
}

/**
* send message to the specified process
* Notice: in single process mode, it only can send to self process,
* and it will send to both agent and app's messengers.
* @param {String} pid - the process id of the receiver
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendTo(pid, action, data) {
debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
if (pid !== process.pid) return;
this.send(action, data, 'both');
return this;
}

/**
* send message to one worker by random
* Notice: in single process mode, we only start one agent worker and one app worker
* - if it's running in agent, it will send to one of app workers
* - if it's running in app, it will send to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendRandom(action, data) {
debug('[%s] send %s with %j to opposite', this.pid, action, data);
this.send(action, data, 'opposite');
return this;
}

/**
* send message to app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToApp(action, data) {
debug('[%s] send %s with %j to all app', this.pid, action, data);
this.send(action, data, 'application');
return this;
}

/**
* send message to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToAgent(action, data) {
debug('[%s] send %s with %j to all agent', this.pid, action, data);
this.send(action, data, 'agent');
return this;
}

/**
* @param {String} action - message key
* @param {Object} data - message value
* @param {String} to - let master know how to send message
* @return {Messenger} this
*/
send(action, data, to) {
const { egg } = this;
let application;
let agent;
let opposite;
if (egg.type === 'application') {
application = egg;
agent = egg.agent;
opposite = agent;
} else {
agent = egg;
application = egg.application;
opposite = application;
}

// use nextTick to keep it async as IPC messenger
process.nextTick(() => {
if (application.messenger && (to === 'application' || to === 'both')) {
application.messenger._onMessage({ action, data });
}
if (agent.messenger && (to === 'agent' || to === 'both')) {
agent.messenger._onMessage({ action, data });
}
if (opposite.messenger && to === 'opposite') {
opposite.messenger._onMessage({ action, data });
}
});

return this;
}

_onMessage(message) {
if (message && is.string(message.action)) {
debug('[%s] got message %s with %j', this.pid, message.action, message.data);
this.emit(message.action, message.data);
}
}

close() {
this.removeAllListeners();
}

/**
* @method Messenger#on
* @param {String} action - message key
* @param {Object} data - message value
*/
}

module.exports = Messenger;
13 changes: 10 additions & 3 deletions lib/egg.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ class EggApplication extends EggCore {
* - {Object} [type] - type of instance, Agent and Application both extend koa, type can determine what it is.
* - {String} [baseDir] - app root dir, default is `process.cwd()`
* - {Object} [plugins] - custom plugin config, use it in unittest
* - {String} [mode] - process mode, can be cluster / single, default is `cluster`
*/
constructor(options) {
constructor(options = {}) {
options.mode = options.mode || 'cluster';
super(options);

// export context base classes, let framework can impl sub class and over context extend easily.
Expand All @@ -55,7 +57,7 @@ class EggApplication extends EggCore {
* @member {Messenger}
* @since 1.0.0
*/
this.messenger = new Messenger();
this.messenger = Messenger.create(this);

// trigger serverDidReady hook when all app workers
// and agent worker is ready
Expand Down Expand Up @@ -115,7 +117,12 @@ class EggApplication extends EggCore {
};

// register close function
this.beforeClose(() => {
this.beforeClose(async () => {
// single process mode will close agent before app close
if (this.type === 'application' && this.options.mode === 'single') {
await this.agent.close();
}

for (const logger of this.loggers.values()) {
logger.close();
}
Expand Down
25 changes: 25 additions & 0 deletions lib/start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict';

const detectPort = require('detect-port');
const Application = require('./application');
const Agent = require('./agent');

module.exports = async (options = {}) => {
console.warn('single process mode is still in experiment, please don\'t use it in production environment');

options.baseDir = options.baseDir || process.cwd();
options.mode = 'single';
// FIXME: cluster-client support single process mode
options.clusterPort = await detectPort();
const agent = new Agent(Object.assign({}, options));
await agent.ready();
const application = new Application(Object.assign({}, options));
application.agent = agent;
agent.application = application;
await application.ready();

// emit egg-ready message in agent and application
application.messenger.broadcast('egg-ready');

return application;
};
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
"cluster-client": "^2.1.1",
"debug": "^4.0.1",
"delegates": "^1.0.0",
"detect-port": "^1.3.0",
"egg-cluster": "^1.21.0",
"egg-cookies": "^2.2.6",
"egg-core": "^4.10.3",
"egg-core": "^4.14.0",
"egg-development": "^2.4.1",
"egg-i18n": "^2.0.0",
"egg-jsonp": "^2.0.0",
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures/apps/demo/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const egg = require('../../../../');

egg.start().then(app => {
app.listen(3000);
console.log('listen 3000');
});
1 change: 1 addition & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe('test/index.test.js', () => {
'Controller',
'Service',
'Subscription',
'start',
'startCluster',
]);
});
Expand Down
Loading

0 comments on commit 20ba463

Please sign in to comment.