Skip to content

Commit

Permalink
fix: LiveQuery server is not shut down properly when handleShutdown
Browse files Browse the repository at this point in the history
… is called (parse-community#8491)
  • Loading branch information
dblythy authored Jun 8, 2023
1 parent 3ea1ace commit 967700b
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 4 deletions.
35 changes: 35 additions & 0 deletions spec/ParseLiveQuery.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const Auth = require('../lib/Auth');
const UserController = require('../lib/Controllers/UserController').UserController;
const Config = require('../lib/Config');
const ParseServer = require('../lib/index').ParseServer;
const triggers = require('../lib/triggers');
const validatorFail = () => {
throw 'you are not authorized';
Expand Down Expand Up @@ -1214,6 +1215,40 @@ describe('ParseLiveQuery', function () {
await object.save();
});

it('does shutdown liveQuery server', async () => {
await reconfigureServer({ appId: 'test_app_id' });
const config = {
appId: 'hello_test',
masterKey: 'world',
port: 1345,
mountPath: '/1',
serverURL: 'http://localhost:1345/1',
liveQuery: {
classNames: ['Yolo'],
},
startLiveQueryServer: true,
};
if (process.env.PARSE_SERVER_TEST_DB === 'postgres') {
config.databaseAdapter = new databaseAdapter.constructor({
uri: databaseURI,
collectionPrefix: 'test_',
});
config.filesAdapter = defaultConfiguration.filesAdapter;
}
const server = await ParseServer.startApp(config);
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
client.serverURL = 'ws://localhost:1345/1';
const query = await new Parse.Query('Yolo').subscribe();
await Promise.all([
server.handleShutdown(),
new Promise(resolve => query.on('close', resolve)),
]);
await new Promise(resolve => setTimeout(resolve, 100));
expect(server.liveQueryServer.server.address()).toBeNull();
expect(server.liveQueryServer.subscriber.isOpen).toBeFalse();
await new Promise(resolve => server.server.close(resolve));
});

it('prevent afterSave trigger if not exists', async () => {
await reconfigureServer({
liveQuery: {
Expand Down
7 changes: 4 additions & 3 deletions src/Adapters/Storage/Mongo/MongoStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,12 @@ export class MongoStorageAdapter implements StorageAdapter {
throw error;
}

handleShutdown() {
async handleShutdown() {
if (!this.client) {
return Promise.resolve();
return;
}
return this.client.close(false);
await this.client.close(false);
delete this.connectionPromise;
}

_adaptiveCollection(name: string) {
Expand Down
4 changes: 3 additions & 1 deletion src/Adapters/Storage/Postgres/PostgresStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,9 @@ export class PostgresStorageAdapter implements StorageAdapter {
const now = new Date().getTime();
const helpers = this._pgp.helpers;
debug('deleteAllClasses');

if (this._client?.$pool.ended) {
return;
}
await this._client
.task('delete-all-classes', async t => {
try {
Expand Down
15 changes: 15 additions & 0 deletions src/LiveQuery/ParseLiveQueryServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ class ParseLiveQueryServer {
}
this._createSubscribers();
}

async shutdown() {
if (this.subscriber.isOpen) {
await Promise.all([
...[...this.clients.values()].map(client => client.parseWebSocket.ws.close()),
this.parseWebSocketServer.close(),
...Array.from(this.subscriber.subscriptions.keys()).map(key =>
this.subscriber.unsubscribe(key)
),
this.subscriber.close?.(),
]);
}
this.subscriber.isOpen = false;
}

_createSubscribers() {
const messageRecieved = (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr);
Expand Down
6 changes: 6 additions & 0 deletions src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ class ParseServer {
if (cacheAdapter && typeof cacheAdapter.handleShutdown === 'function') {
promises.push(cacheAdapter.handleShutdown());
}
if (this.liveQueryServer?.server?.close) {
promises.push(new Promise(resolve => this.liveQueryServer.server.close(resolve)));
}
if (this.liveQueryServer) {
promises.push(this.liveQueryServer.shutdown());
}
return (promises.length > 0 ? Promise.all(promises) : Promise.resolve()).then(() => {
if (this.config.serverCloseComplete) {
this.config.serverCloseComplete();
Expand Down

0 comments on commit 967700b

Please sign in to comment.