Skip to content

Commit

Permalink
WIP #36
Browse files Browse the repository at this point in the history
  • Loading branch information
claustres committed Oct 22, 2019
1 parent 3d11dd3 commit c0a38fc
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 121 deletions.
243 changes: 125 additions & 118 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,118 @@ import { LocalService, RemoteService } from './service'

const debug = makeDebug('feathers-distributed')

const isInternalService = (app, serviceDescriptor) => {
// Default is to expose all services
if (!app.distributionOptions.services) return false
if (typeof app.distributionOptions.services === 'function') return !app.distributionOptions.services(serviceDescriptor)
else return !app.distributionOptions.services.includes(serviceDescriptor.path)
}
const isDiscoveredService = (app, serviceDescriptor) => {
// Default is to discover all services
if (!app.distributionOptions.remoteServices) return true
if (typeof app.distributionOptions.remoteServices === 'function') return app.distributionOptions.remoteServices(serviceDescriptor)
else return app.distributionOptions.remoteServices.includes(serviceDescriptor.path)
}

function publishApplication (app) {
app.servicePublisher.publish('application', { uuid: app.uuid })
debug('Published local app with uuid ' + app.uuid)
}

function publishService (app, serviceDescriptor) {
const service = app.service(serviceDescriptor.path)
if (!service || (typeof service !== 'object')) return
if (service.remote) {
debug('Ignoring remote service publication on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
return
}
// Skip internal services
if (isInternalService(app, serviceDescriptor)) {
debug('Ignoring local service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
return
}
// Register the responder to handle remote calls to the service
if (!service.responder) service.responder = new LocalService({ app, path: serviceDescriptor.path })
// Publish new local service
app.servicePublisher.publish('service', serviceDescriptor)
debug('Published local service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
}

function registerService (app, serviceDescriptor) {
// Do not register our own services
if (serviceDescriptor.uuid === app.uuid) {
debug('Ignoring local service registration on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
return
}
// Skip already registered services
const service = app.service(serviceDescriptor.path)
if (service) {
if (service instanceof RemoteService) {
debug('Already registered service as remote on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
} else {
debug('Already registered local service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
}
return
}
// Skip services we are not interested into
if (!isDiscoveredService(app, serviceDescriptor)) {
debug('Ignoring remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)
return
}
// Initialize our service by providing any required middleware
let args = [serviceDescriptor.path]
if (app.distributionOptions.middlewares.before) args = args.concat(app.distributionOptions.middlewares.before)
args.push(new RemoteService(serviceDescriptor))
if (app.distributionOptions.middlewares.after) args = args.concat(app.distributionOptions.middlewares.after)
app.use(...args)
debug('Registered remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)

// registering hook object on every remote service
if (app.distributionOptions.hooks) {
app.service(serviceDescriptor.path).hooks(app.distributionOptions.hooks)
}
debug('Registered hooks on remote service on path ' + serviceDescriptor.path + ' for app with uuid ' + app.uuid)

// Dispatch an event internally through node so that async processes can run
app.emit('service', serviceDescriptor)
}

function initializeCote (app) {
debug('Initializing cote with options', app.coteOptions)
// Setup cote with options
app.cote = makeCote(app.coteOptions)

// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new app.cote.Subscriber({
name: 'feathers services subscriber',
namespace: 'services',
key: 'services',
subscribesTo: ['application', 'service']
}, app.coteOptions)
debug('Services subscriber ready for app with uuid ' + app.uuid)
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', serviceDescriptor => {
registerService(app, serviceDescriptor)
})
// This publisher publishes an event each time a local app or service is registered
app.servicePublisher = new app.cote.Publisher({
name: 'feathers services publisher',
namespace: 'services',
key: 'services',
broadcasts: ['application', 'service']
}, app.coteOptions)
debug('Services publisher ready for app with uuid ' + app.uuid)
// Also each time a new app pops up so that it does not depend of the initialization order of the apps
app.serviceSubscriber.on('application', applicationDescriptor => {
Object.getOwnPropertyNames(app.services).forEach(path => {
publishService(app, { uuid: app.uuid, path })
})
})
// Tell others apps I'm here
// Add a timeout so that the publisher/subscriber has been initialized on the node
setTimeout(_ => { publishApplication(app) }, app.distributionOptions.publicationDelay)
}

export default function init (options = {}) {
return function () {
const app = this
Expand All @@ -16,139 +128,34 @@ export default function init (options = {}) {
nodeTimeout: 30000,
masterTimeout: 60000,
log: false,
basePort: 10000
basePort: process.env.BASE_PORT || 10000
}, options.cote)
const distributionOptions = Object.assign({
publicationDelay: 10000,
app.distributionOptions = Object.assign({
publicationDelay: process.env.PUBLICATION_DELAY || 10000,
coteDelay: process.env.COTE_DELAY,
middlewares: {}
}, options)
const isInternalService = service => {
// Default is to expose all services
if (!distributionOptions.services) return false
if (typeof distributionOptions.services === 'function') return !distributionOptions.services(service)
else return !distributionOptions.services.includes(service.path)
}
const isDiscoveredService = service => {
// Default is to discover all services
if (!distributionOptions.remoteServices) return true
if (typeof distributionOptions.remoteServices === 'function') return distributionOptions.remoteServices(service)
else return distributionOptions.remoteServices.includes(service.path)
}

debug('Initializing feathers-distributed with options', app.distributionOptions)
// Change default base port for automated port finding
portfinder.basePort = app.coteOptions.basePort
// Setup cote with options
app.cote = makeCote(app.coteOptions)
// We need to uniquely identify the app to avoid infinite loop by registering our own services
app.uuid = uuid()
debug('Initializing feathers-distributed with cote options', app.coteOptions)

// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new app.cote.Subscriber({
name: 'feathers services subscriber',
namespace: 'services',
key: 'services',
subscribesTo: ['application', 'service']
}, app.coteOptions)
debug('Services subscriber ready for app with uuid ' + app.uuid)
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', serviceDescriptor => {
// Do not register our own services
if (serviceDescriptor.uuid === app.uuid) {
debug('Ignoring local service registration on path ' + serviceDescriptor.path)
return
}
// Skip already registered services
const service = app.service(serviceDescriptor.path)
if (service) {
if (service instanceof RemoteService) {
debug('Already registered service as remote on path ' + serviceDescriptor.path)
} else {
debug('Already registered local service on path ' + serviceDescriptor.path)
}
return
}
// Skip services we are not interested into
if (!isDiscoveredService(serviceDescriptor)) {
debug('Ignoring remote service on path ' + serviceDescriptor.path)
return
}
// Initialize our service by providing any required middleware
let args = [serviceDescriptor.path]
if (distributionOptions.middlewares.before) args = args.concat(distributionOptions.middlewares.before)
args.push(new RemoteService(serviceDescriptor))
if (distributionOptions.middlewares.after) args = args.concat(distributionOptions.middlewares.after)
app.use(...args)
debug('Registered remote service on path ' + serviceDescriptor.path)

// registering hook object on every remote service
if (distributionOptions.hooks) {
app.service(serviceDescriptor.path).hooks(distributionOptions.hooks)
}
debug('Registered hooks on remote service on path ' + serviceDescriptor.path)

// dispatch an event internally through node so that async processes can run
app.emit('service', serviceDescriptor)
})
// This publisher publishes an event each time a local app or service is registered
app.servicePublisher = new app.cote.Publisher({
name: 'feathers services publisher',
namespace: 'services',
key: 'services',
broadcasts: ['application', 'service']
}, app.coteOptions)
debug('Services publisher ready for app with uuid ' + app.uuid)
// Also each time a new app pops up so that it does not depend of the initialization order of the apps
app.serviceSubscriber.on('application', applicationDescriptor => {
// Not required for our own app
if (applicationDescriptor.uuid === app.uuid) {
debug('Ignoring local services republication for app ' + app.uuid)
return
}
debug('Republishing local services of app ' + app.uuid + ' for remote app ' + applicationDescriptor.uuid)
Object.getOwnPropertyNames(app.services).forEach(path => {
const service = app.services[path]
if (service.remote) return
const serviceDescriptor = { uuid: app.uuid, path }
// Skip internal services
if (isInternalService(serviceDescriptor)) {
debug('Ignoring local service republication on path ' + serviceDescriptor.path)
return
}
app.servicePublisher.publish('service', serviceDescriptor)
debug('Republished local service on path ' + path)
})
})
// Tell others apps I'm here
// Add a timeout so that the publisher/subscriber has been initialized on the node
setTimeout(_ => {
app.servicePublisher.publish('application', { uuid: app.uuid })
debug('Published local app with uuid ' + app.uuid)
}, distributionOptions.publicationDelay)
// Setup cote with options and required delay
if (app.distributionOptions.coteDelay) setTimeout(_ => { initializeCote(app) }, app.distributionOptions.coteDelay)
else initializeCote(app)

// We replace the use method to inject service publisher/responder
const superUse = app.use
app.use = function () {
const path = arguments[0]
// Register the service normally first
superUse.apply(app, arguments)
// With express apps we can directly register middlewares
// Check if cote has already been initialized
if (!app.cote) return
// With express apps we can directly register middlewares: not supported
if (typeof path !== 'string') return
const service = app.service(path)
// Note: middlewares are not supported
// Also avoid infinite loop by registering already registered remote services
if (typeof service === 'object' && !service.remote) {
const serviceDescriptor = { uuid: app.uuid, path: stripSlashes(path) }
// Skip internal services
if (isInternalService(serviceDescriptor)) {
debug('Ignoring local service on path ' + serviceDescriptor.path)
return
}
// Publish new local service
app.servicePublisher.publish('service', serviceDescriptor)
debug('Published local service on path ' + path)
// Register the responder to handle remote calls to the service
service.responder = new LocalService({ app, path: serviceDescriptor.path })
}
publishService(app, { uuid: app.uuid, path: stripSlashes(path) })
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions test/index.test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import utils from 'util'
import authentication from '@feathersjs/authentication'
import auth from '@feathersjs/authentication-client'
import jwt from '@feathersjs/authentication-jwt'
Expand Down Expand Up @@ -113,6 +114,7 @@ describe('feathers-distributed', () => {
function waitForService (apps, services, i) {
return new Promise((resolve, reject) => {
apps[i].on('service', data => {
console.log(data)
if (data.path === 'users') {
services[i] = apps[i].service('users')
expect(services[i]).toExist()
Expand All @@ -136,15 +138,16 @@ describe('feathers-distributed', () => {
// Distribute only the users service
services: (service) => service.path.endsWith('users'),
publicationDelay: 5000,
coteDelay: 5000,
cote: { // Use cote defaults
helloInterval: 2000,
checkInterval: 4000,
nodeTimeout: 5000,
masterTimeout: 6000
}
}))
expect(apps[i].servicePublisher).toExist()
expect(apps[i].serviceSubscriber).toExist()
//expect(apps[i].servicePublisher).toExist()
//expect(apps[i].serviceSubscriber).toExist()
apps[i].configure(channels)
// Only the first app has a local service
if (i === gateway) {
Expand Down Expand Up @@ -183,7 +186,7 @@ describe('feathers-distributed', () => {
await Promise.all(promises)
})
// Let enough time to process
.timeout(10000)
.timeout(20000)

it('initiate the rest clients', () => {
for (let i = 0; i < nbApps; i++) {
Expand Down

0 comments on commit c0a38fc

Please sign in to comment.