Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Idempotency to Postgres #7750

Merged
merged 31 commits into from
Jan 2, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
62dc681
fix passing idempotencyOptions to database adaptors
cbaker6 Dec 28, 2021
46bbad4
add ability to delete mongo index if needed
cbaker6 Dec 28, 2021
b28ebd1
don't drop mongo ttl index
cbaker6 Dec 28, 2021
89d386e
use unique config for idempotency tests
cbaker6 Dec 28, 2021
12ae0bf
don't drop mongo index for now
cbaker6 Dec 28, 2021
479e82c
working idempotency on postgres
cbaker6 Dec 28, 2021
c95d786
revert mongo changes
cbaker6 Dec 28, 2021
c3f4b3f
set original times for Idempotency tests
cbaker6 Dec 28, 2021
9f65504
remove postgres trigger
cbaker6 Dec 29, 2021
7fb9e54
update postgres adapter
cbaker6 Dec 29, 2021
41d3d8d
make postgres idempotency function callable
cbaker6 Dec 29, 2021
143d502
lint
cbaker6 Dec 29, 2021
046be1f
revert awaits
cbaker6 Dec 29, 2021
b54258b
Update README.md
cbaker6 Dec 29, 2021
ccc04bd
remove error checks
cbaker6 Dec 29, 2021
964fe9d
lint
cbaker6 Dec 29, 2021
ce2a147
Merge branch 'alpha' into idempotency
mtrezza Jan 1, 2022
e8c6c50
change function name
cbaker6 Jan 1, 2022
6ad8108
revert lock file
cbaker6 Jan 1, 2022
9e7a8f5
revert lock file
cbaker6 Jan 1, 2022
164e5a7
nits
cbaker6 Jan 2, 2022
7c78abb
Merge branch 'alpha' into idempotency
mtrezza Jan 2, 2022
12e8945
revert adapter file
cbaker6 Jan 2, 2022
e52f255
Merge branch 'alpha' into idempotency
cbaker6 Jan 2, 2022
295ba26
Merge branch 'alpha' into idempotency
mtrezza Jan 2, 2022
37960bf
Merge branch 'alpha' into idempotency
mtrezza Jan 2, 2022
40cbdc0
rewording postgres idempotency note
mtrezza Jan 2, 2022
ceec24f
dedup
cbaker6 Jan 2, 2022
25b7ab7
lint
cbaker6 Jan 2, 2022
222fa3d
Merge branch 'alpha' into idempotency
cbaker6 Jan 2, 2022
5ef6b1d
Merge branch 'alpha' into idempotency
cbaker6 Jan 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,26 @@ let api = new ParseServer({
| `idempotencyOptions.paths` | yes | `Array<String>` | `[]` | `.*` (all paths, includes the examples below), <br>`functions/.*` (all functions), <br>`jobs/.*` (all jobs), <br>`classes/.*` (all classes), <br>`functions/.*` (all functions), <br>`users` (user creation / update), <br>`installations` (installation creation / update) | PARSE_SERVER_EXPERIMENTAL_IDEMPOTENCY_PATHS | An array of path patterns that have to match the request path for request deduplication to be enabled. The mount path must not be included, for example to match the request path `/parse/functions/myFunction` specify the path pattern `functions/myFunction`. A trailing slash of the request path is ignored, for example the path pattern `functions/myFunction` matches both `/parse/functions/myFunction` and `/parse/functions/myFunction/`. |
| `idempotencyOptions.ttl` | yes | `Integer` | `300` | `60` (60 seconds) | PARSE_SERVER_EXPERIMENTAL_IDEMPOTENCY_TTL | The duration in seconds after which a request record is discarded from the database. Duplicate requests due to network issues can be expected to arrive within milliseconds up to several seconds. This value must be greater than `0`. |

### Notes <!-- omit in toc -->
### Postgres <!-- omit in toc -->

To use this feature in Postgres, you will need to create a cron job using [pgAdmin](https://www.pgadmin.org/docs/pgadmin4/development/pgagent_jobs.html) or similar to call the Postgres function `idempotency_delete_expired_records()` that deletes expired idempotency records. You can find an example script below. Make sure the script has the same privileges to log into Postgres as Parse Server.

```bash
#!/bin/bash

set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
SELECT idempotency_delete_expired_records();
EOSQL

- This feature is currently only available for MongoDB and not for Postgres.
exec "$@"
```

Assuming the script above is named, `parse_idempotency_delete_expired_records.sh`, a cron job that runs the script every 2 minutes may look like:

```bash
2 * * * * /root/parse_idempotency_delete_expired_records.sh >/dev/null 2>&1
```

## Localization

Expand Down
35 changes: 31 additions & 4 deletions spec/Idempotency.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ const rest = require('../lib/rest');
const auth = require('../lib/Auth');
const uuid = require('uuid');

describe_only_db('mongo')('Idempotency', () => {
describe('Idempotency', () => {
// Parameters
/** Enable TTL expiration simulated by removing entry instead of waiting for MongoDB TTL monitor which
runs only every 60s, so it can take up to 119s until entry removal - ain't nobody got time for that */
const SIMULATE_TTL = true;
const ttl = 2;
const maxTimeOut = 4000;

// Helpers
async function deleteRequestEntry(reqId) {
const config = Config.get(Parse.applicationId);
Expand Down Expand Up @@ -38,9 +41,10 @@ describe_only_db('mongo')('Idempotency', () => {
}
await setup({
paths: ['functions/.*', 'jobs/.*', 'classes/.*', 'users', 'installations'],
ttl: 30,
ttl: ttl,
});
});

// Tests
it('should enforce idempotency for cloud code function', async () => {
let counter = 0;
Expand All @@ -56,7 +60,7 @@ describe_only_db('mongo')('Idempotency', () => {
'X-Parse-Request-Id': 'abc-123',
},
};
expect(Config.get(Parse.applicationId).idempotencyOptions.ttl).toBe(30);
expect(Config.get(Parse.applicationId).idempotencyOptions.ttl).toBe(ttl);
await request(params);
await request(params).then(fail, e => {
expect(e.status).toEqual(400);
Expand All @@ -83,12 +87,35 @@ describe_only_db('mongo')('Idempotency', () => {
if (SIMULATE_TTL) {
await deleteRequestEntry('abc-123');
} else {
await new Promise(resolve => setTimeout(resolve, 130000));
await new Promise(resolve => setTimeout(resolve, maxTimeOut));
}
await expectAsync(request(params)).toBeResolved();
expect(counter).toBe(2);
});

it_only_db('postgres')('should delete request entry when postgress ttl function is called', async () => {
const client = Config.get(Parse.applicationId).database.adapter._client;
let counter = 0;
Parse.Cloud.define('myFunction', () => {
counter++;
});
const params = {
method: 'POST',
url: 'http://localhost:8378/1/functions/myFunction',
headers: {
'X-Parse-Application-Id': Parse.applicationId,
'X-Parse-Master-Key': Parse.masterKey,
'X-Parse-Request-Id': 'abc-123',
},
};
await expectAsync(request(params)).toBeResolved();
await expectAsync(request(params)).toBeRejected();
await new Promise(resolve => setTimeout(resolve, maxTimeOut));
await client.one('SELECT idempotency_delete_expired_records()');
await expectAsync(request(params)).toBeResolved();
expect(counter).toBe(2);
});

it('should enforce idempotency for cloud code jobs', async () => {
let counter = 0;
Parse.Cloud.job('myJob', () => {
Expand Down
11 changes: 11 additions & 0 deletions spec/PostgresStorageAdapter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,17 @@ describe_only_db('postgres')('PostgresStorageAdapter', () => {
await new Promise(resolve => setTimeout(resolve, 2000));
expect(adapter._onchange).toHaveBeenCalled();
});

it('Idempotency class should have function', async () => {
await reconfigureServer();
const adapter = Config.get('test').database.adapter;
const client = adapter._client;
const qs = "SELECT format('%I.%I(%s)', ns.nspname, p.proname, oidvectortypes(p.proargtypes)) FROM pg_proc p INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid) WHERE p.proname = 'idempotency_delete_expired_records'";
const foundFunction = await client.one(qs);
expect(foundFunction.format).toBe("public.idempotency_delete_expired_records()");
await adapter.deleteIdempotencyFunction();
await client.none(qs);
});
});

describe_only_db('postgres')('PostgresStorageAdapter shutdown', () => {
Expand Down
66 changes: 48 additions & 18 deletions src/Adapters/Storage/Postgres/PostgresStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -2449,25 +2449,55 @@ export class PostgresStorageAdapter implements StorageAdapter {
? fieldNames.map((fieldName, index) => `lower($${index + 3}:name) varchar_pattern_ops`)
: fieldNames.map((fieldName, index) => `$${index + 3}:name`);
const qs = `CREATE INDEX IF NOT EXISTS $1:name ON $2:name (${constraintPatterns.join()})`;
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames]).catch(error => {
if (
error.code === PostgresDuplicateRelationError &&
error.message.includes(indexNameOptions.name)
) {
// Index already exists. Ignore error.
} else if (
error.code === PostgresUniqueIndexViolationError &&
error.message.includes(indexNameOptions.name)
) {
// Cast the error into the proper parse error
throw new Parse.Error(
Parse.Error.DUPLICATE_VALUE,
'A duplicate value for a field with unique values was provided'
);
} else {
const setIdempotencyFunction = options.setIdempotencyFunction !== undefined ? options.setIdempotencyFunction : false;
if (setIdempotencyFunction) {
await this.ensureIdempotencyFunctionExists(options);
}
await conn.none(qs, [indexNameOptions.name, className, ...fieldNames])
.catch(error => {
if (
error.code === PostgresDuplicateRelationError &&
error.message.includes(indexNameOptions.name)
) {
// Index already exists. Ignore error.
} else if (
error.code === PostgresUniqueIndexViolationError &&
error.message.includes(indexNameOptions.name)
) {
// Cast the error into the proper parse error
throw new Parse.Error(
Parse.Error.DUPLICATE_VALUE,
'A duplicate value for a field with unique values was provided'
);
} else {
throw error;
}
});
}

async deleteIdempotencyFunction(
options?: Object = {}
): Promise<any> {
const conn = options.conn !== undefined ? options.conn : this._client;
const qs = 'DROP FUNCTION IF EXISTS idempotency_delete_expired_records()';
return conn
.none(qs)
.catch(error => {
throw error;
}
});
});
}

async ensureIdempotencyFunctionExists(
options?: Object = {}
): Promise<any> {
const conn = options.conn !== undefined ? options.conn : this._client;
const ttlOptions = options.ttl !== undefined ? `${options.ttl} seconds` : '60 seconds';
const qs = 'CREATE OR REPLACE FUNCTION idempotency_delete_expired_records() RETURNS void LANGUAGE plpgsql AS $$ BEGIN DELETE FROM "_Idempotency" WHERE expire < NOW() - INTERVAL $1; END; $$;';
return conn
.none(qs, [ttlOptions])
.catch(error => {
throw error;
});
}
}

Expand Down
31 changes: 21 additions & 10 deletions src/Controllers/DatabaseController.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import logger from '../logger';
import * as SchemaController from './SchemaController';
import { StorageAdapter } from '../Adapters/Storage/StorageAdapter';
import MongoStorageAdapter from '../Adapters/Storage/Mongo/MongoStorageAdapter';
import PostgresStorageAdapter from '../Adapters/Storage/Postgres/PostgresStorageAdapter';
import SchemaCache from '../Adapters/Cache/SchemaCache';
import type { LoadSchemaOptions } from './types';
import type { QueryOptions, FullQueryOptions } from '../Adapters/Storage/StorageAdapter';
Expand Down Expand Up @@ -394,12 +395,14 @@ const relationSchema = {

class DatabaseController {
adapter: StorageAdapter;
idempotencyOptions: any;
schemaCache: any;
schemaPromise: ?Promise<SchemaController.SchemaController>;
_transactionalSession: ?any;

constructor(adapter: StorageAdapter) {
constructor(adapter: StorageAdapter, idempotencyOptions?: Object = {}) {
this.adapter = adapter;
this.idempotencyOptions = idempotencyOptions;
// We don't want a mutable this.schema, because then you could have
// one request that uses different schemas for different parts of
// it. Instead, use loadSchema to get a schema.
Expand Down Expand Up @@ -1713,9 +1716,7 @@ class DatabaseController {
};
await this.loadSchema().then(schema => schema.enforceClassExists('_User'));
await this.loadSchema().then(schema => schema.enforceClassExists('_Role'));
if (this.adapter instanceof MongoStorageAdapter) {
await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency'));
}
await this.loadSchema().then(schema => schema.enforceClassExists('_Idempotency'));

await this.adapter.ensureUniqueness('_User', requiredUserFields, ['username']).catch(error => {
logger.warn('Unable to ensure uniqueness for usernames: ', error);
Expand Down Expand Up @@ -1751,18 +1752,28 @@ class DatabaseController {
logger.warn('Unable to ensure uniqueness for role name: ', error);
throw error;
});

await this.adapter
.ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId'])
.catch(error => {
logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error);
throw error;
});

if (this.adapter instanceof MongoStorageAdapter) {
await this.adapter
.ensureUniqueness('_Idempotency', requiredIdempotencyFields, ['reqId'])
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, {
ttl: 0,
})
.catch(error => {
logger.warn('Unable to ensure uniqueness for idempotency request ID: ', error);
logger.warn('Unable to create TTL index for idempotency expire date: ', error);
throw error;
});

} else if (this.adapter instanceof PostgresStorageAdapter) {
const options = this.idempotencyOptions;
options.setIdempotencyFunction = true;
mtrezza marked this conversation as resolved.
Show resolved Hide resolved
await this.adapter
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, {
ttl: 0,
})
.ensureIndex('_Idempotency', requiredIdempotencyFields, ['expire'], 'ttl', false, options)
.catch(error => {
logger.warn('Unable to create TTL index for idempotency expire date: ', error);
throw error;
Expand Down
4 changes: 2 additions & 2 deletions src/Controllers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ export function getLiveQueryController(options: ParseServerOptions): LiveQueryCo
}

export function getDatabaseController(options: ParseServerOptions): DatabaseController {
const { databaseURI, collectionPrefix, databaseOptions } = options;
const { databaseURI, collectionPrefix, databaseOptions, idempotencyOptions } = options;
let { databaseAdapter } = options;
if (
(databaseOptions ||
Expand All @@ -157,7 +157,7 @@ export function getDatabaseController(options: ParseServerOptions): DatabaseCont
} else {
databaseAdapter = loadAdapter(databaseAdapter);
}
return new DatabaseController(databaseAdapter);
return new DatabaseController(databaseAdapter, idempotencyOptions);
}

export function getHooksController(
Expand Down
3 changes: 2 additions & 1 deletion src/middlewares.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ClientSDK from './ClientSDK';
import defaultLogger from './logger';
import rest from './rest';
import MongoStorageAdapter from './Adapters/Storage/Mongo/MongoStorageAdapter';
import PostgresStorageAdapter from './Adapters/Storage/Postgres/PostgresStorageAdapter';

export const DEFAULT_ALLOWED_HEADERS =
'X-Parse-Master-Key, X-Parse-REST-API-Key, X-Parse-Javascript-Key, X-Parse-Application-Id, X-Parse-Client-Version, X-Parse-Session-Token, X-Requested-With, X-Parse-Revocable-Session, X-Parse-Request-Id, Content-Type, Pragma, Cache-Control';
Expand Down Expand Up @@ -431,7 +432,7 @@ export function promiseEnforceMasterKeyAccess(request) {
*/
export function promiseEnsureIdempotency(req) {
// Enable feature only for MongoDB
if (!(req.config.database.adapter instanceof MongoStorageAdapter)) {
if (!((req.config.database.adapter instanceof MongoStorageAdapter) || (req.config.database.adapter instanceof PostgresStorageAdapter))) {
return Promise.resolve();
}
// Get parameters
Expand Down