diff --git a/spec/Parse.Push.spec.js b/spec/Parse.Push.spec.js index 504a8b32c2..03deacf3cd 100644 --- a/spec/Parse.Push.spec.js +++ b/spec/Parse.Push.spec.js @@ -2,6 +2,12 @@ const request = require('request'); +const delayPromise = (delay) => { + return new Promise((resolve) => { + setTimeout(resolve, delay); + }); +} + describe('Parse.Push', () => { var setup = function() { var pushAdapter = { @@ -16,8 +22,8 @@ describe('Parse.Push', () => { } return Promise.resolve({ err: null, - deviceType: installation.deviceType, - result: true + device: installation, + transmitted: true }) }); return Promise.all(promises); @@ -63,6 +69,8 @@ describe('Parse.Push', () => { alert: 'Hello world!' } }, {useMasterKey: true}) + }).then(() => { + return delayPromise(500); }) .then(() => { done(); @@ -83,6 +91,8 @@ describe('Parse.Push', () => { alert: 'Hello world!' } }, {useMasterKey: true}) + }).then(() => { + return delayPromise(500); }).then(() => { done(); }).catch((err) => { diff --git a/spec/PushController.spec.js b/spec/PushController.spec.js index dd7b01316e..d31e82fbf8 100644 --- a/spec/PushController.spec.js +++ b/spec/PushController.spec.js @@ -2,6 +2,7 @@ var PushController = require('../src/Controllers/PushController').PushController; var StatusHandler = require('../src/StatusHandler'); var Config = require('../src/Config'); +var validatePushType = require('../src/Push/utils').validatePushType; const successfulTransmissions = function(body, installations) { @@ -35,7 +36,7 @@ describe('PushController', () => { var validPushTypes = ['ios', 'android']; expect(function(){ - PushController.validatePushType(where, validPushTypes); + validatePushType(where, validPushTypes); }).not.toThrow(); done(); }); @@ -48,7 +49,7 @@ describe('PushController', () => { var validPushTypes = ['ios', 'android']; expect(function(){ - PushController.validatePushType(where, validPushTypes); + validatePushType(where, validPushTypes); }).not.toThrow(); done(); }); @@ -63,7 +64,7 @@ describe('PushController', () => { var validPushTypes = ['ios', 'android']; expect(function(){ - PushController.validatePushType(where, validPushTypes); + validatePushType(where, validPushTypes); }).not.toThrow(); done(); }); @@ -76,7 +77,7 @@ describe('PushController', () => { var validPushTypes = ['ios', 'android']; expect(function(){ - PushController.validatePushType(where, validPushTypes); + validatePushType(where, validPushTypes); }).toThrow(); done(); }); @@ -89,7 +90,7 @@ describe('PushController', () => { var validPushTypes = ['ios', 'android']; expect(function(){ - PushController.validatePushType(where, validPushTypes); + validatePushType(where, validPushTypes); }).toThrow(); done(); }); @@ -131,7 +132,23 @@ describe('PushController', () => { }); it('properly increment badges', (done) => { - + var pushAdapter = { + send: function(body, installations) { + var badge = body.data.badge; + installations.forEach((installation) => { + if (installation.deviceType == "ios") { + expect(installation.badge).toEqual(badge); + expect(installation.originalBadge + 1).toEqual(installation.badge); + } else { + expect(installation.badge).toBeUndefined(); + } + }) + return successfulTransmissions(body, installations); + }, + getValidPushTypes: function() { + return ["ios", "android"]; + } + } var payload = {data:{ alert: "Hello World!", badge: "Increment", @@ -154,32 +171,17 @@ describe('PushController', () => { installation.set("deviceType", "android"); installations.push(installation); } - - var pushAdapter = { - send: function(body, installations) { - var badge = body.data.badge; - installations.forEach((installation) => { - if (installation.deviceType == "ios") { - expect(installation.badge).toEqual(badge); - expect(installation.originalBadge + 1).toEqual(installation.badge); - } else { - expect(installation.badge).toBeUndefined(); - } - }) - return successfulTransmissions(body, installations); - }, - getValidPushTypes: function() { - return ["ios", "android"]; - } - } - var config = new Config(Parse.applicationId); var auth = { isMaster: true } - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - Parse.Object.saveAll(installations).then(() => { + var pushController = new PushController(); + reconfigureServer({ + push: { adapter: pushAdapter } + }).then(() => { + return Parse.Object.saveAll(installations) + }).then(() => { return pushController.sendPush(payload, {}, config, auth); }).then(() => { done(); @@ -187,11 +189,24 @@ describe('PushController', () => { jfail(err); done(); }); - }); it('properly set badges to 1', (done) => { + var pushAdapter = { + send: function(body, installations) { + var badge = body.data.badge; + installations.forEach((installation) => { + expect(installation.badge).toEqual(badge); + expect(1).toEqual(installation.badge); + }) + return successfulTransmissions(body, installations); + }, + getValidPushTypes: function() { + return ["ios"]; + } + } + var payload = {data: { alert: "Hello World!", badge: 1, @@ -207,27 +222,17 @@ describe('PushController', () => { installations.push(installation); } - var pushAdapter = { - send: function(body, installations) { - var badge = body.data.badge; - installations.forEach((installation) => { - expect(installation.badge).toEqual(badge); - expect(1).toEqual(installation.badge); - }) - return successfulTransmissions(body, installations); - }, - getValidPushTypes: function() { - return ["ios"]; - } - } - var config = new Config(Parse.applicationId); var auth = { isMaster: true } - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - Parse.Object.saveAll(installations).then(() => { + var pushController = new PushController(); + reconfigureServer({ + push: { adapter: pushAdapter } + }).then(() => { + return Parse.Object.saveAll(installations) + }).then(() => { return pushController.sendPush(payload, {}, config, auth); }).then(() => { done(); @@ -235,7 +240,6 @@ describe('PushController', () => { fail("should not fail"); done(); }); - }); it('properly set badges to 1 with complex query #2903 #3022', (done) => { @@ -276,9 +280,14 @@ describe('PushController', () => { var auth = { isMaster: true } - - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - Parse.Object.saveAll(installations).then((installations) => { + var pushController = new PushController(); + reconfigureServer({ + push: { + adapter: pushAdapter + } + }).then(() => { + return Parse.Object.saveAll(installations) + }).then((installations) => { const objectIds = installations.map(installation => { return installation.id; }) @@ -286,6 +295,10 @@ describe('PushController', () => { objectId: {'$in': objectIds.slice(0, 5)} } return pushController.sendPush(payload, where, config, auth); + }).then(() => { + return new Promise((res) => { + setTimeout(res, 300); + }); }).then(() => { expect(matchedInstallationsCount).toBe(5); const query = new Parse.Query(Parse.Installation); @@ -338,46 +351,50 @@ describe('PushController', () => { var auth = { isMaster: true } - - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - Parse.Object.saveAll(installations).then(() => { - return pushController.sendPush(payload, {}, config, auth); + var pushController = new PushController(); + reconfigureServer({ + push: { adapter: pushAdapter } }).then(() => { - return new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, 1000); - }); - }).then(() => { - const query = new Parse.Query('_PushStatus'); - return query.find({useMasterKey: true}); - }).then((results) => { - expect(results.length).toBe(1); - const result = results[0]; - expect(result.createdAt instanceof Date).toBe(true); - expect(result.updatedAt instanceof Date).toBe(true); - expect(result.id.length).toBe(10); - expect(result.get('source')).toEqual('rest'); - expect(result.get('query')).toEqual(JSON.stringify({})); - expect(typeof result.get('payload')).toEqual("string"); - expect(JSON.parse(result.get('payload'))).toEqual(payload.data); - expect(result.get('status')).toEqual('succeeded'); - expect(result.get('numSent')).toEqual(10); - expect(result.get('sentPerType')).toEqual({ - 'ios': 10 // 10 ios - }); - expect(result.get('numFailed')).toEqual(5); - expect(result.get('failedPerType')).toEqual({ - 'android': 5 // android - }); + return Parse.Object.saveAll(installations) + }) + .then(() => { + return pushController.sendPush(payload, {}, config, auth); + }).then(() => { + // it is enqueued so it can take time + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + const query = new Parse.Query('_PushStatus'); + return query.find({useMasterKey: true}); + }).then((results) => { + expect(results.length).toBe(1); + const result = results[0]; + expect(result.createdAt instanceof Date).toBe(true); + expect(result.updatedAt instanceof Date).toBe(true); + expect(result.id.length).toBe(10); + expect(result.get('source')).toEqual('rest'); + expect(result.get('query')).toEqual(JSON.stringify({})); + expect(typeof result.get('payload')).toEqual("string"); + expect(JSON.parse(result.get('payload'))).toEqual(payload.data); + expect(result.get('status')).toEqual('succeeded'); + expect(result.get('numSent')).toEqual(10); + expect(result.get('sentPerType')).toEqual({ + 'ios': 10 // 10 ios + }); + expect(result.get('numFailed')).toEqual(5); + expect(result.get('failedPerType')).toEqual({ + 'android': 5 // android + }); // Try to get it without masterKey - const query = new Parse.Query('_PushStatus'); - return query.find(); - }).then((results) => { - expect(results.length).toBe(0); - done(); - }); - + const query = new Parse.Query('_PushStatus'); + return query.find(); + }).then((results) => { + expect(results.length).toBe(0); + done(); + }); }); it('should properly report failures in _PushStatus', (done) => { @@ -404,8 +421,12 @@ describe('PushController', () => { var auth = { isMaster: true } - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - pushController.sendPush(payload, where, config, auth).then(() => { + var pushController = new PushController(); + reconfigureServer({ + push: { adapter: pushAdapter } + }).then(() => { + return pushController.sendPush(payload, where, config, auth) + }).then(() => { fail('should not succeed'); done(); }).catch(() => { @@ -416,7 +437,7 @@ describe('PushController', () => { expect(pushStatus.get('status')).toBe('failed'); done(); }); - }) + }); }); it('should support full RESTQuery for increment', (done) => { @@ -433,7 +454,6 @@ describe('PushController', () => { return ["ios"]; } } - var config = new Config(Parse.applicationId); var auth = { isMaster: true @@ -450,8 +470,12 @@ describe('PushController', () => { } } - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - pushController.sendPush(payload, where, config, auth).then(() => { + var pushController = new PushController(); + reconfigureServer({ + push: { adapter: pushAdapter } + }).then(() => { + return pushController.sendPush(payload, where, config, auth); + }).then(() => { done(); }).catch((err) => { jfail(err); @@ -491,8 +515,12 @@ describe('PushController', () => { } } - var pushController = new PushController(pushAdapter, Parse.applicationId, defaultConfiguration.push); - pushController.sendPush(payload, where, config, auth).then(() => { + var pushController = new PushController(); + reconfigureServer({ + push: { adapter: pushAdapter } + }).then(() => { + pushController.sendPush(payload, where, config, auth) + }).then(() => { done(); }).catch(() => { fail('should not fail'); diff --git a/spec/PushWorker.spec.js b/spec/PushWorker.spec.js new file mode 100644 index 0000000000..8b392d12e2 --- /dev/null +++ b/spec/PushWorker.spec.js @@ -0,0 +1,57 @@ +var PushWorker = require('../src').PushWorker; +var Config = require('../src/Config'); + +describe('PushWorker', () => { + it('should run with small batch', (done) => { + const batchSize = 3; + var sendCount = 0; + reconfigureServer({ + push: { + queueOptions: { + disablePushWorker: true, + batchSize + } + } + }).then(() => { + expect(new Config('test').pushWorker).toBeUndefined(); + new PushWorker({ + send: (body, installations) => { + expect(installations.length <= batchSize).toBe(true); + sendCount += installations.length; + return Promise.resolve(); + }, + getValidPushTypes: function() { + return ['ios', 'android'] + } + }); + var installations = []; + while(installations.length != 10) { + var installation = new Parse.Object("_Installation"); + installation.set("installationId", "installation_" + installations.length); + installation.set("deviceToken","device_token_" + installations.length) + installation.set("badge", 1); + installation.set("deviceType", "ios"); + installations.push(installation); + } + return Parse.Object.saveAll(installations); + }).then(() => { + return Parse.Push.send({ + where: { + deviceType: 'ios' + }, + data: { + alert: 'Hello world!' + } + }, {useMasterKey: true}) + }).then(() => { + return new Promise((resolve) => { + setTimeout(resolve, 500); + }); + }).then(() => { + expect(sendCount).toBe(10); + done(); + }).catch(err => { + jfail(err); + }) + }); +}); diff --git a/src/Adapters/MessageQueue/EventEmitterMQ.js b/src/Adapters/MessageQueue/EventEmitterMQ.js new file mode 100644 index 0000000000..e23bc83d59 --- /dev/null +++ b/src/Adapters/MessageQueue/EventEmitterMQ.js @@ -0,0 +1,65 @@ +import events from 'events'; + +const emitter = new events.EventEmitter(); +const subscriptions = new Map(); + +function unsubscribe(channel: string) { + if (!subscriptions.has(channel)) { + //console.log('No channel to unsub from'); + return; + } + //console.log('unsub ', channel); + emitter.removeListener(channel, subscriptions.get(channel)); + subscriptions.delete(channel); +} + +class Publisher { + emitter: any; + + constructor(emitter: any) { + this.emitter = emitter; + } + + publish(channel: string, message: string): void { + this.emitter.emit(channel, message); + } +} + +class Consumer extends events.EventEmitter { + emitter: any; + + constructor(emitter: any) { + super(); + this.emitter = emitter; + } + + subscribe(channel: string): void { + unsubscribe(channel); + const handler = (message) => { + this.emit('message', channel, message); + } + subscriptions.set(channel, handler); + this.emitter.on(channel, handler); + } + + unsubscribe(channel: string): void { + unsubscribe(channel); + } +} + +function createPublisher(): any { + return new Publisher(emitter); +} + +function createSubscriber(): any { + return new Consumer(emitter); +} + +const EventEmitterMQ = { + createPublisher, + createSubscriber +} + +export { + EventEmitterMQ +} diff --git a/src/Adapters/Push/PushAdapter.js b/src/Adapters/Push/PushAdapter.js index 597304caee..58e50df1e6 100644 --- a/src/Adapters/Push/PushAdapter.js +++ b/src/Adapters/Push/PushAdapter.js @@ -1,3 +1,4 @@ +// @flow /*eslint no-unused-vars: "off"*/ // Push Adapter // @@ -11,13 +12,15 @@ // android push and APNS for ios push. export class PushAdapter { - send(devices, installations, pushStatus) { } + send(body: any, installations: any[], pushStatus: any): ?Promise<*> {} /** * Get an array of valid push types. * @returns {Array} An array of valid push types */ - getValidPushTypes() {} + getValidPushTypes(): string[] { + return [] + } } export default PushAdapter; diff --git a/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js b/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js index 76473c0c33..3ea4705301 100644 --- a/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js +++ b/src/Adapters/Storage/Postgres/PostgresStorageAdapter.js @@ -926,16 +926,34 @@ export class PostgresStorageAdapter { } else if (typeof fieldValue === 'object' && schema.fields[fieldName] && schema.fields[fieldName].type === 'Object') { + // Gather keys to increment + const keysToIncrement = Object.keys(originalUpdate).filter(k => { + // choose top level fields that have a delete operation set + return originalUpdate[k].__op === 'Increment' && k.split('.').length === 2 && k.split(".")[0] === fieldName; + }).map(k => k.split('.')[1]); + + let incrementPatterns = ''; + if (keysToIncrement.length > 0) { + incrementPatterns = ' || ' + keysToIncrement.map((c) => { + const amount = fieldValue[c].amount; + return `CONCAT('{"${c}":', COALESCE($${index}:name->>'${c}','0')::int + ${amount}, '}')::jsonb`; + }).join(' || '); + // Strip the keys + keysToIncrement.forEach((key) => { + delete fieldValue[key]; + }); + } + const keysToDelete = Object.keys(originalUpdate).filter(k => { // choose top level fields that have a delete operation set - return originalUpdate[k].__op === 'Delete' && k.split('.').length === 2 + return originalUpdate[k].__op === 'Delete' && k.split('.').length === 2 && k.split(".")[0] === fieldName; }).map(k => k.split('.')[1]); const deletePatterns = keysToDelete.reduce((p, c, i) => { return p + ` - '$${index + 1 + i}:value'`; }, ''); - updatePatterns.push(`$${index}:name = ( COALESCE($${index}:name, '{}'::jsonb) ${deletePatterns} || $${index + 1 + keysToDelete.length}::jsonb )`); + updatePatterns.push(`$${index}:name = ( COALESCE($${index}:name, '{}'::jsonb) ${deletePatterns} ${incrementPatterns} || $${index + 1 + keysToDelete.length}::jsonb )`); values.push(fieldName, ...keysToDelete, JSON.stringify(fieldValue)); index += 2 + keysToDelete.length; diff --git a/src/Config.js b/src/Config.js index c0af7ab07a..a1a51abc08 100644 --- a/src/Config.js +++ b/src/Config.js @@ -58,6 +58,9 @@ export class Config { this.hooksController = cacheInfo.hooksController; this.filesController = cacheInfo.filesController; this.pushController = cacheInfo.pushController; + this.pushControllerQueue = cacheInfo.pushControllerQueue; + this.pushWorker = cacheInfo.pushWorker; + this.hasPushSupport = cacheInfo.hasPushSupport; this.loggerController = cacheInfo.loggerController; this.userController = cacheInfo.userController; this.authDataManager = cacheInfo.authDataManager; diff --git a/src/Controllers/AdaptableController.js b/src/Controllers/AdaptableController.js index f747a5a339..6d0a0190fe 100644 --- a/src/Controllers/AdaptableController.js +++ b/src/Controllers/AdaptableController.js @@ -38,11 +38,15 @@ export class AdaptableController { } validateAdapter(adapter) { + AdaptableController.validateAdapter(adapter, this); + } + + static validateAdapter(adapter, self, ExpectedType) { if (!adapter) { throw new Error(this.constructor.name + " requires an adapter"); } - const Type = this.expectedAdapterType(); + const Type = ExpectedType || self.expectedAdapterType(); // Allow skipping for testing if (!Type) { return; diff --git a/src/Controllers/HooksController.js b/src/Controllers/HooksController.js index 2cd22efbb8..28062696e0 100644 --- a/src/Controllers/HooksController.js +++ b/src/Controllers/HooksController.js @@ -9,6 +9,8 @@ const DefaultHooksCollectionName = "_Hooks"; export class HooksController { _applicationId:string; + _webhookKey:string; + database: any; constructor(applicationId:string, databaseController, webhookKey) { this._applicationId = applicationId; diff --git a/src/Controllers/PushController.js b/src/Controllers/PushController.js index e9f0790781..a390d31ec5 100644 --- a/src/Controllers/PushController.js +++ b/src/Controllers/PushController.js @@ -1,54 +1,17 @@ import { Parse } from 'parse/node'; -import rest from '../rest'; -import AdaptableController from './AdaptableController'; -import { PushAdapter } from '../Adapters/Push/PushAdapter'; import deepcopy from 'deepcopy'; import RestQuery from '../RestQuery'; import RestWrite from '../RestWrite'; import { master } from '../Auth'; import { pushStatusHandler } from '../StatusHandler'; -const UNSUPPORTED_BADGE_KEY = "unsupported"; - -export class PushController extends AdaptableController { - - /** - * Check whether the deviceType parameter in qury condition is valid or not. - * @param {Object} where A query condition - * @param {Array} validPushTypes An array of valid push types(string) - */ - static validatePushType(where = {}, validPushTypes = []) { - var deviceTypeField = where.deviceType || {}; - var deviceTypes = []; - if (typeof deviceTypeField === 'string') { - deviceTypes.push(deviceTypeField); - } else if (Array.isArray(deviceTypeField['$in'])) { - deviceTypes.concat(deviceTypeField['$in']); - } - for (var i = 0; i < deviceTypes.length; i++) { - var deviceType = deviceTypes[i]; - if (validPushTypes.indexOf(deviceType) < 0) { - throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED, - deviceType + ' is not supported push type.'); - } - } - } - - get pushIsAvailable() { - return !!this.adapter; - } +export class PushController { sendPush(body = {}, where = {}, config, auth, onPushStatusSaved = () => {}) { - var pushAdapter = this.adapter; - if (!this.pushIsAvailable) { - throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED, - 'Push adapter is not available'); - } - if (!this.options) { + if (!config.hasPushSupport) { throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED, 'Missing push configuration'); } - PushController.validatePushType(where, pushAdapter.getValidPushTypes()); // Replace the expiration_time with a valid Unix epoch milliseconds time body['expiration_time'] = PushController.getExpirationTime(body); // TODO: If the req can pass the checking, we return immediately instead of waiting @@ -86,15 +49,7 @@ export class PushController extends AdaptableController { onPushStatusSaved(pushStatus.objectId); return badgeUpdate(); }).then(() => { - return rest.find(config, auth, '_Installation', where); - }).then((response) => { - if (!response.results) { - return Promise.reject({error: 'PushController: no results in query'}) - } - pushStatus.setRunning(response.results); - return this.sendToAdapter(body, response.results, pushStatus, config); - }).then((results) => { - return pushStatus.complete(results); + return config.pushControllerQueue.enqueue(body, where, config, auth, pushStatus); }).catch((err) => { return pushStatus.fail(err).then(() => { throw err; @@ -102,34 +57,6 @@ export class PushController extends AdaptableController { }); } - sendToAdapter(body, installations, pushStatus) { - if (body.data && body.data.badge && typeof body.data.badge == 'string' && body.data.badge.toLowerCase() == "increment") { - // Collect the badges to reduce the # of calls - const badgeInstallationsMap = installations.reduce((map, installation) => { - let badge = installation.badge; - if (installation.deviceType != "ios") { - badge = UNSUPPORTED_BADGE_KEY; - } - map[badge + ''] = map[badge + ''] || []; - map[badge + ''].push(installation); - return map; - }, {}); - - // Map the on the badges count and return the send result - const promises = Object.keys(badgeInstallationsMap).map((badge) => { - const payload = deepcopy(body); - if (badge == UNSUPPORTED_BADGE_KEY) { - delete payload.data.badge; - } else { - payload.data.badge = parseInt(badge); - } - return this.adapter.send(payload, badgeInstallationsMap[badge], pushStatus.objectId); - }); - return Promise.all(promises); - } - return this.adapter.send(body, installations, pushStatus.objectId); - } - /** * Get expiration time from the request body. * @param {Object} request A request object @@ -157,10 +84,6 @@ export class PushController extends AdaptableController { } return expirationTime.valueOf(); } - - expectedAdapterType() { - return PushAdapter; - } } export default PushController; diff --git a/src/Controllers/SchemaController.js b/src/Controllers/SchemaController.js index f478f8727c..22c5fe2a9a 100644 --- a/src/Controllers/SchemaController.js +++ b/src/Controllers/SchemaController.js @@ -86,6 +86,7 @@ const defaultColumns = Object.freeze({ "errorMessage": {type:'Object'}, "sentPerType": {type:'Object'}, "failedPerType":{type:'Object'}, + "count": {type:'Number'} }, _JobStatus: { "jobName": {type: 'String'}, diff --git a/src/ParseMessageQueue.js b/src/ParseMessageQueue.js new file mode 100644 index 0000000000..7195642400 --- /dev/null +++ b/src/ParseMessageQueue.js @@ -0,0 +1,26 @@ +import { loadAdapter } from './Adapters/AdapterLoader'; +import { + EventEmitterMQ +} from './Adapters/MessageQueue/EventEmitterMQ'; + +const ParseMessageQueue = {}; + +ParseMessageQueue.createPublisher = function(config: any): any { + const adapter = loadAdapter(config.messageQueueAdapter, EventEmitterMQ, config); + if (typeof adapter.createPublisher !== 'function') { + throw 'pubSubAdapter should have createPublisher()'; + } + return adapter.createPublisher(config); +} + +ParseMessageQueue.createSubscriber = function(config: any): void { + const adapter = loadAdapter(config.messageQueueAdapter, EventEmitterMQ, config) + if (typeof adapter.createSubscriber !== 'function') { + throw 'messageQueueAdapter should have createSubscriber()'; + } + return adapter.createSubscriber(config); +} + +export { + ParseMessageQueue +} diff --git a/src/ParseServer.js b/src/ParseServer.js index cb3294ab6b..11cb38f56e 100644 --- a/src/ParseServer.js +++ b/src/ParseServer.js @@ -39,6 +39,8 @@ import { LogsRouter } from './Routers/LogsRouter'; import { ParseLiveQueryServer } from './LiveQuery/ParseLiveQueryServer'; import { PublicAPIRouter } from './Routers/PublicAPIRouter'; import { PushController } from './Controllers/PushController'; +import { PushQueue } from './Push/PushQueue'; +import { PushWorker } from './Push/PushWorker'; import { PushRouter } from './Routers/PushRouter'; import { CloudCodeRouter } from './Routers/CloudCodeRouter'; import { RolesRouter } from './Routers/RolesRouter'; @@ -168,11 +170,28 @@ class ParseServer { }); const filesController = new FilesController(filesControllerAdapter, appId); + const pushOptions = Object.assign({}, push); + const pushQueueOptions = pushOptions.queueOptions || {}; + if (pushOptions.queueOptions) { + delete pushOptions.queueOptions; + } // Pass the push options too as it works with the default - const pushControllerAdapter = loadAdapter(push && push.adapter, ParsePushAdapter, push || {}); - // We pass the options and the base class for the adapter, + const pushAdapter = loadAdapter(pushOptions && pushOptions.adapter, ParsePushAdapter, pushOptions); + // We pass the options and the base class for the adatper, // Note that passing an instance would work too - const pushController = new PushController(pushControllerAdapter, appId, push); + const pushController = new PushController(); + + const hasPushSupport = pushAdapter && push; + + const { + disablePushWorker + } = pushQueueOptions; + + const pushControllerQueue = new PushQueue(pushQueueOptions); + let pushWorker; + if (!disablePushWorker) { + pushWorker = new PushWorker(pushAdapter, pushQueueOptions); + } const emailControllerAdapter = loadAdapter(emailAdapter); const userController = new UserController(emailControllerAdapter, appId, { verifyUserEmails }); @@ -237,7 +256,10 @@ class ParseServer { databaseController, schemaCacheTTL, enableSingleSchemaCache, - userSensitiveFields + userSensitiveFields, + pushWorker, + pushControllerQueue, + hasPushSupport }); Config.validate(AppCache.get(appId)); diff --git a/src/Push/PushQueue.js b/src/Push/PushQueue.js new file mode 100644 index 0000000000..80b6c2f316 --- /dev/null +++ b/src/Push/PushQueue.js @@ -0,0 +1,60 @@ +import { ParseMessageQueue } from '../ParseMessageQueue'; +import rest from '../rest'; +import { isPushIncrementing } from './utils'; + +const PUSH_CHANNEL = 'parse-server-push'; +const DEFAULT_BATCH_SIZE = 100; + +export class PushQueue { + parsePublisher: Object; + channel: String; + batchSize: Number; + + // config object of the publisher, right now it only contains the redisURL, + // but we may extend it later. + constructor(config: any = {}) { + this.channel = config.channel || PUSH_CHANNEL; + this.batchSize = config.batchSize || DEFAULT_BATCH_SIZE; + this.parsePublisher = ParseMessageQueue.createPublisher(config); + } + + static defaultPushChannel() { + return PUSH_CHANNEL; + } + + enqueue(body, where, config, auth, pushStatus) { + const limit = this.batchSize; + // Order by badge (because the payload is badge dependant) + // and createdAt to fix the order + const order = isPushIncrementing(body) ? 'badge,createdAt' : 'createdAt'; + + return Promise.resolve().then(() => { + return rest.find(config, + auth, + '_Installation', + where, + {limit: 0, count: true}); + }).then(({results, count}) => { + if (!results) { + return Promise.reject({error: 'PushController: no results in query'}) + } + pushStatus.setRunning(count); + let skip = 0; + while (skip < count) { + const query = { where, + limit, + skip, + order }; + + const pushWorkItem = { + body, + query, + pushStatus: { objectId: pushStatus.objectId }, + applicationId: config.applicationId + } + this.parsePublisher.publish(this.channel, JSON.stringify(pushWorkItem)); + skip += limit; + } + }); + } +} diff --git a/src/Push/PushWorker.js b/src/Push/PushWorker.js new file mode 100644 index 0000000000..8639b7f0d7 --- /dev/null +++ b/src/Push/PushWorker.js @@ -0,0 +1,95 @@ +// @flow +import deepcopy from 'deepcopy'; +import AdaptableController from '../Controllers/AdaptableController'; +import { master } from '../Auth'; +import Config from '../Config'; +import { PushAdapter } from '../Adapters/Push/PushAdapter'; +import rest from '../rest'; +import { pushStatusHandler } from '../StatusHandler'; +import { isPushIncrementing } from './utils'; +import { ParseMessageQueue } from '../ParseMessageQueue'; +import { PushQueue } from './PushQueue'; + +const UNSUPPORTED_BADGE_KEY = "unsupported"; + +function groupByBadge(installations) { + return installations.reduce((map, installation) => { + let badge = installation.badge + ''; + if (installation.deviceType != "ios") { + badge = UNSUPPORTED_BADGE_KEY; + } + map[badge] = map[badge] || []; + map[badge].push(installation); + return map; + }, {}); +} + +export class PushWorker { + subscriber: ?any; + adapter: any; + channel: string; + + constructor(pushAdapter: PushAdapter, subscriberConfig: any = {}) { + AdaptableController.validateAdapter(pushAdapter, this, PushAdapter); + this.adapter = pushAdapter; + + this.channel = subscriberConfig.channel || PushQueue.defaultPushChannel(); + this.subscriber = ParseMessageQueue.createSubscriber(subscriberConfig); + if (this.subscriber) { + const subscriber = this.subscriber; + subscriber.subscribe(this.channel); + subscriber.on('message', (channel, messageStr) => { + const workItem = JSON.parse(messageStr); + this.run(workItem); + }); + } + } + + unsubscribe(): void { + if (this.subscriber) { + this.subscriber.unsubscribe(this.channel); + } + } + + run({ body, query, pushStatus, applicationId }: any): Promise<*> { + const config = new Config(applicationId); + const auth = master(config); + const where = query.where; + delete query.where; + return rest.find(config, auth, '_Installation', where, query).then(({results}) => { + if (results.length == 0) { + return; + } + return this.sendToAdapter(body, results, pushStatus, config); + }, err => { + throw err; + }); + } + + sendToAdapter(body: any, installations: any[], pushStatus: any, config: Config): Promise<*> { + pushStatus = pushStatusHandler(config, pushStatus.objectId); + if (!isPushIncrementing(body)) { + return this.adapter.send(body, installations, pushStatus.objectId).then((results) => { + return pushStatus.trackSent(results); + }); + } + + // Collect the badges to reduce the # of calls + const badgeInstallationsMap = groupByBadge(installations); + + // Map the on the badges count and return the send result + const promises = Object.keys(badgeInstallationsMap).map((badge) => { + const payload = deepcopy(body); + if (badge == UNSUPPORTED_BADGE_KEY) { + delete payload.data.badge; + } else { + payload.data.badge = parseInt(badge); + } + const installations = badgeInstallationsMap[badge]; + return this.sendToAdapter(payload, installations, pushStatus, config); + }); + return Promise.all(promises); + } +} + +export default PushWorker; diff --git a/src/Push/utils.js b/src/Push/utils.js new file mode 100644 index 0000000000..44a4149401 --- /dev/null +++ b/src/Push/utils.js @@ -0,0 +1,30 @@ +import Parse from 'parse/node'; + +export function isPushIncrementing(body) { + return body.data && + body.data.badge && + typeof body.data.badge == 'string' && + body.data.badge.toLowerCase() == "increment" +} + +/** + * Check whether the deviceType parameter in qury condition is valid or not. + * @param {Object} where A query condition + * @param {Array} validPushTypes An array of valid push types(string) + */ +export function validatePushType(where = {}, validPushTypes = []) { + var deviceTypeField = where.deviceType || {}; + var deviceTypes = []; + if (typeof deviceTypeField === 'string') { + deviceTypes.push(deviceTypeField); + } else if (Array.isArray(deviceTypeField['$in'])) { + deviceTypes.concat(deviceTypeField['$in']); + } + for (var i = 0; i < deviceTypes.length; i++) { + var deviceType = deviceTypes[i]; + if (validPushTypes.indexOf(deviceType) < 0) { + throw new Parse.Error(Parse.Error.PUSH_MISCONFIGURED, + deviceType + ' is not supported push type.'); + } + } +} diff --git a/src/RestQuery.js b/src/RestQuery.js index 592dcd25a8..974e3e8912 100644 --- a/src/RestQuery.js +++ b/src/RestQuery.js @@ -92,15 +92,15 @@ function RestQuery(config, auth, className, restWhere = {}, restOptions = {}, cl break; case 'order': var fields = restOptions.order.split(','); - var sortMap = {}; - for (var field of fields) { + this.findOptions.sort = fields.reduce((sortMap, field) => { + field = field.trim(); if (field[0] == '-') { sortMap[field.slice(1)] = -1; } else { sortMap[field] = 1; } - } - this.findOptions.sort = sortMap; + return sortMap; + }, {}); break; case 'include': { const paths = restOptions.include.split(','); diff --git a/src/Routers/FeaturesRouter.js b/src/Routers/FeaturesRouter.js index 98e06af079..74e45eb141 100644 --- a/src/Routers/FeaturesRouter.js +++ b/src/Routers/FeaturesRouter.js @@ -29,9 +29,9 @@ export class FeaturesRouter extends PromiseRouter { from: true, }, push: { - immediatePush: req.config.pushController.pushIsAvailable, + immediatePush: req.config.hasPushSupport, scheduledPush: false, - storedPushData: req.config.pushController.pushIsAvailable, + storedPushData: req.config.hasPushSupport, pushAudiences: false, }, schemas: { diff --git a/src/StatusHandler.js b/src/StatusHandler.js index e2c0cff405..c92933d992 100644 --- a/src/StatusHandler.js +++ b/src/StatusHandler.js @@ -4,6 +4,15 @@ import { logger } from './logger'; const PUSH_STATUS_COLLECTION = '_PushStatus'; const JOB_STATUS_COLLECTION = '_JobStatus'; +const incrementOp = function(object = {}, key, amount = 1) { + if (!object[key]) { + object[key] = {__op: 'Increment', amount: amount} + } else { + object[key].amount += amount; + } + return object[key]; +} + export function flatten(array) { var flattened = []; for(var i = 0; i < array.length; i++) { @@ -94,10 +103,9 @@ export function jobStatusHandler(config) { }); } -export function pushStatusHandler(config) { +export function pushStatusHandler(config, objectId = newObjectId()) { let pushStatus; - const objectId = newObjectId(); const database = config.database; const handler = statusHandler(PUSH_STATUS_COLLECTION, database); const setInitial = function(body = {}, where, options = {source: 'rest'}) { @@ -136,18 +144,17 @@ export function pushStatusHandler(config) { }); } - const setRunning = function(installations) { - logger.verbose('sending push to %d installations', installations.length); + const setRunning = function(count) { + logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count); return handler.update({status:"pending", objectId: objectId}, - {status: "running", updatedAt: new Date() }); + {status: "running", updatedAt: new Date(), count }); } - const complete = function(results) { + const trackSent = function(results) { const update = { - status: 'succeeded', updatedAt: new Date(), numSent: 0, - numFailed: 0, + numFailed: 0 }; if (Array.isArray(results)) { results = flatten(results); @@ -157,23 +164,44 @@ export function pushStatusHandler(config) { return memo; } const deviceType = result.device.deviceType; - if (result.transmitted) - { + const key = result.transmitted ? `sentPerType.${deviceType}` : `failedPerType.${deviceType}`; + memo[key] = incrementOp(memo, key); + if (result.transmitted) { memo.numSent++; - memo.sentPerType = memo.sentPerType || {}; - memo.sentPerType[deviceType] = memo.sentPerType[deviceType] || 0; - memo.sentPerType[deviceType]++; } else { memo.numFailed++; - memo.failedPerType = memo.failedPerType || {}; - memo.failedPerType[deviceType] = memo.failedPerType[deviceType] || 0; - memo.failedPerType[deviceType]++; } return memo; }, update); + incrementOp(update, 'count', -results.length); } - logger.verbose('sent push! %d success, %d failures', update.numSent, update.numFailed); - return handler.update({status:"running", objectId }, update); + + logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed); + + ['numSent', 'numFailed'].forEach((key) => { + if (update[key] > 0) { + update[key] = { + __op: 'Increment', + amount: update[key] + }; + } else { + delete update[key]; + } + }); + + return handler.update({ objectId }, update).then((res) => { + if (res && res.count === 0) { + return this.complete(); + } + }) + } + + const complete = function() { + return handler.update({ objectId }, { + status: 'succeeded', + count: {__op: 'Delete'}, + updatedAt: new Date() + }); } const fail = function(err) { @@ -182,7 +210,7 @@ export function pushStatusHandler(config) { status: 'failed', updatedAt: new Date() } - logger.info('warning: error while sending push', err); + logger.warn(`_PushStatus ${objectId}: error while sending push`, err); return handler.update({ objectId }, update); } @@ -190,6 +218,7 @@ export function pushStatusHandler(config) { objectId, setInitial, setRunning, + trackSent, complete, fail }) diff --git a/src/index.js b/src/index.js index 5878d28ecc..a8e13dc32f 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,7 @@ import RedisCacheAdapter from './Adapters/Cache/RedisCacheAdapter' import * as TestUtils from './TestUtils'; import { useExternal } from './deprecated'; import { getLogger } from './logger'; +import { PushWorker } from './Push/PushWorker'; // Factory function const _ParseServer = function(options) { @@ -23,4 +24,4 @@ Object.defineProperty(module.exports, 'logger', { }); export default ParseServer; -export { S3Adapter, GCSAdapter, FileSystemAdapter, InMemoryCacheAdapter, NullCacheAdapter, RedisCacheAdapter, TestUtils, _ParseServer as ParseServer }; +export { S3Adapter, GCSAdapter, FileSystemAdapter, InMemoryCacheAdapter, NullCacheAdapter, RedisCacheAdapter, TestUtils, PushWorker, _ParseServer as ParseServer };