Skip to content

Commit

Permalink
feat: nested transactions (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
wschurman authored Sep 13, 2022
1 parent acbad82 commit a77b914
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ export default class PostgresEntityQueryContextProvider extends EntityQueryConte
) => Promise<T> {
return (transactionScope) => this.knexInstance.transaction(transactionScope);
}

protected createNestedTransactionRunner<T>(
outerQueryInterface: any
): (transactionScope: (queryInterface: any) => Promise<T>) => Promise<T> {
return (transactionScope) => (outerQueryInterface as Knex).transaction(transactionScope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ describe('postgres entity integration', () => {
const vc1 = new ViewerContext(createKnexIntegrationTestEntityCompanionProvider(knexInstance));

let preCommitCallCount = 0;
let preCommitInnerCallCount = 0;
let postCommitCallCount = 0;

await vc1.runInTransactionForDatabaseAdaptorFlavorAsync('postgres', async (queryContext) => {
Expand All @@ -645,6 +646,28 @@ describe('postgres entity integration', () => {
queryContext.appendPreCommitCallback(async () => {
preCommitCallCount++;
}, 0);

await queryContext.runInNestedTransactionAsync(async (innerQueryContext) => {
innerQueryContext.appendPostCommitCallback(async () => {
postCommitCallCount++;
});
innerQueryContext.appendPreCommitCallback(async () => {
preCommitInnerCallCount++;
}, 0);
});

// this one throws so its post commit shouldn't execute
try {
await queryContext.runInNestedTransactionAsync(async (innerQueryContext) => {
innerQueryContext.appendPostCommitCallback(async () => {
postCommitCallCount++;
});
innerQueryContext.appendPreCommitCallback(async () => {
preCommitInnerCallCount++;
throw Error('wat');
}, 0);
});
} catch {}
});

await expect(
Expand All @@ -660,7 +683,8 @@ describe('postgres entity integration', () => {
).rejects.toThrowError('wat');

expect(preCommitCallCount).toBe(2);
expect(postCommitCallCount).toBe(1);
expect(preCommitInnerCallCount).toBe(2);
expect(postCommitCallCount).toBe(2);
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { ViewerContext } from '@expo/entity';
import { knex, Knex } from 'knex';
import nullthrows from 'nullthrows';

import PostgresEntityQueryContextProvider from '../PostgresEntityQueryContextProvider';
import PostgresUniqueTestEntity from '../testfixtures/PostgresUniqueTestEntity';
import { createKnexIntegrationTestEntityCompanionProvider } from '../testfixtures/createKnexIntegrationTestEntityCompanionProvider';

describe(PostgresEntityQueryContextProvider, () => {
let knexInstance: Knex;

beforeAll(() => {
knexInstance = knex({
client: 'pg',
connection: {
user: nullthrows(process.env['PGUSER']),
password: nullthrows(process.env['PGPASSWORD']),
host: 'localhost',
port: parseInt(nullthrows(process.env['PGPORT']), 10),
database: nullthrows(process.env['PGDATABASE']),
},
});
});

beforeEach(async () => {
await PostgresUniqueTestEntity.createOrTruncatePostgresTable(knexInstance);
});

afterAll(async () => {
await PostgresUniqueTestEntity.dropPostgresTable(knexInstance);
await knexInstance.destroy();
});

it('supports nested transactions', async () => {
const vc1 = new ViewerContext(createKnexIntegrationTestEntityCompanionProvider(knexInstance));

await PostgresUniqueTestEntity.creator(vc1).setField('name', 'unique').enforceCreateAsync();

const id = (
await PostgresUniqueTestEntity.creator(vc1).setField('name', 'wat').enforceCreateAsync()
).getID();

await vc1.runInTransactionForDatabaseAdaptorFlavorAsync('postgres', async (queryContext) => {
const entity = await PostgresUniqueTestEntity.loader(vc1, queryContext)
.enforcing()
.loadByIDAsync(id);
await PostgresUniqueTestEntity.updater(entity, queryContext)
.setField('name', 'wat2')
.enforceUpdateAsync();

// ensure the outer transaction is not aborted due to postgres error in inner transaction,
// in this case the error triggered is a unique constraint violation
try {
await queryContext.runInNestedTransactionAsync(async (innerQueryContext) => {
const entity = await PostgresUniqueTestEntity.loader(vc1, innerQueryContext)
.enforcing()
.loadByIDAsync(id);
await PostgresUniqueTestEntity.updater(entity, innerQueryContext)
.setField('name', 'unique')
.enforceUpdateAsync();
});
} catch {}

const entity2 = await PostgresUniqueTestEntity.loader(vc1, queryContext)
.enforcing()
.loadByIDAsync(id);
await PostgresUniqueTestEntity.updater(entity2, queryContext)
.setField('name', 'wat3')
.enforceUpdateAsync();
});

const entityLoaded = await PostgresUniqueTestEntity.loader(vc1).enforcing().loadByIDAsync(id);
expect(entityLoaded.getField('name')).toEqual('wat3');
});

it('supports multi-nested transactions', async () => {
const vc1 = new ViewerContext(createKnexIntegrationTestEntityCompanionProvider(knexInstance));

const id = (
await PostgresUniqueTestEntity.creator(vc1).setField('name', 'wat').enforceCreateAsync()
).getID();

await vc1.runInTransactionForDatabaseAdaptorFlavorAsync('postgres', async (queryContext) => {
await queryContext.runInNestedTransactionAsync(async (innerQueryContext) => {
await innerQueryContext.runInNestedTransactionAsync(async (innerQueryContex2) => {
await innerQueryContex2.runInNestedTransactionAsync(async (innerQueryContex3) => {
const entity = await PostgresUniqueTestEntity.loader(vc1, innerQueryContex3)
.enforcing()
.loadByIDAsync(id);
await PostgresUniqueTestEntity.updater(entity, innerQueryContex3)
.setField('name', 'wat3')
.enforceUpdateAsync();
});
});
});
});

const entityLoaded = await PostgresUniqueTestEntity.loader(vc1).enforcing().loadByIDAsync(id);
expect(entityLoaded.getField('name')).toEqual('wat3');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import {
AlwaysAllowPrivacyPolicyRule,
EntityPrivacyPolicy,
ViewerContext,
UUIDField,
StringField,
EntityConfiguration,
EntityCompanionDefinition,
Entity,
} from '@expo/entity';
import { Knex } from 'knex';

type PostgresUniqueTestEntityFields = {
id: string;
name: string | null;
};

export default class PostgresUniqueTestEntity extends Entity<
PostgresUniqueTestEntityFields,
string,
ViewerContext
> {
static getCompanionDefinition(): EntityCompanionDefinition<
PostgresUniqueTestEntityFields,
string,
ViewerContext,
PostgresUniqueTestEntity,
PostgresUniqueTestEntityPrivacyPolicy
> {
return postgresTestEntityCompanionDefinition;
}

public static async createOrTruncatePostgresTable(knex: Knex): Promise<void> {
await knex.raw('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"'); // for uuid_generate_v4()

const tableName = this.getCompanionDefinition().entityConfiguration.tableName;
const hasTable = await knex.schema.hasTable(tableName);
if (!hasTable) {
await knex.schema.createTable(tableName, (table) => {
table.uuid('id').defaultTo(knex.raw('uuid_generate_v4()')).primary();
table.string('name').unique();
});
}
await knex.into(tableName).truncate();
}

public static async dropPostgresTable(knex: Knex): Promise<void> {
const tableName = this.getCompanionDefinition().entityConfiguration.tableName;
const hasTable = await knex.schema.hasTable(tableName);
if (hasTable) {
await knex.schema.dropTable(tableName);
}
}
}

class PostgresUniqueTestEntityPrivacyPolicy extends EntityPrivacyPolicy<
PostgresUniqueTestEntityFields,
string,
ViewerContext,
PostgresUniqueTestEntity
> {
protected override readonly createRules = [
new AlwaysAllowPrivacyPolicyRule<
PostgresUniqueTestEntityFields,
string,
ViewerContext,
PostgresUniqueTestEntity
>(),
];
protected override readonly readRules = [
new AlwaysAllowPrivacyPolicyRule<
PostgresUniqueTestEntityFields,
string,
ViewerContext,
PostgresUniqueTestEntity
>(),
];
protected override readonly updateRules = [
new AlwaysAllowPrivacyPolicyRule<
PostgresUniqueTestEntityFields,
string,
ViewerContext,
PostgresUniqueTestEntity
>(),
];
protected override readonly deleteRules = [
new AlwaysAllowPrivacyPolicyRule<
PostgresUniqueTestEntityFields,
string,
ViewerContext,
PostgresUniqueTestEntity
>(),
];
}

export const postgresTestEntityConfiguration =
new EntityConfiguration<PostgresUniqueTestEntityFields>({
idField: 'id',
tableName: 'postgres_test_entities',
schema: {
id: new UUIDField({
columnName: 'id',
cache: true,
}),
name: new StringField({
columnName: 'name',
}),
},
databaseAdapterFlavor: 'postgres',
cacheAdapterFlavor: 'redis',
});

const postgresTestEntityCompanionDefinition = new EntityCompanionDefinition({
entityClass: PostgresUniqueTestEntity,
entityConfiguration: postgresTestEntityConfiguration,
privacyPolicyClass: PostgresUniqueTestEntityPrivacyPolicy,
});
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ export default class InMemoryQueryContextProvider extends EntityQueryContextProv
) => Promise<T> {
return (transactionScope) => Promise.resolve(transactionScope({}));
}

protected createNestedTransactionRunner<T>(
_outerQueryInterface: any
): (transactionScope: (queryInterface: any) => Promise<T>) => Promise<T> {
return (transactionScope) => Promise.resolve(transactionScope({}));
}
}
60 changes: 60 additions & 0 deletions packages/entity/src/EntityQueryContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ export class EntityTransactionalQueryContext extends EntityQueryContext {

private readonly preCommitCallbacks: { callback: PreCommitCallback; order: number }[] = [];

constructor(
queryInterface: any,
private readonly entityQueryContextProvider: EntityQueryContextProvider
) {
super(queryInterface);
}

/**
* Schedule a pre-commit callback. These will be run within the transaction right before it is
* committed, and will be run in the order specified. Ordering of callbacks scheduled with the
Expand Down Expand Up @@ -129,4 +136,57 @@ export class EntityTransactionalQueryContext extends EntityQueryContext {
): Promise<T> {
return await transactionScope(this);
}

async runInNestedTransactionAsync<T>(
transactionScope: (innerQueryContext: EntityTransactionalQueryContext) => Promise<T>
): Promise<T> {
return await this.entityQueryContextProvider.runInNestedTransactionAsync(
this,
transactionScope
);
}
}

/**
* Entity framework representation of a nested transactional query execution unit. When supplied
* to EntityMutator and EntityLoader methods, those methods and their
* dependent triggers and validators will run within the nested transaction.
*
* This exists to forward post-commit callbacks to the parent query context.
*/
export class EntityNestedTransactionalQueryContext extends EntityTransactionalQueryContext {
private readonly postCommitInvalidationCallbacksToTransfer: PostCommitCallback[] = [];
private readonly postCommitCallbacksToTransfer: PostCommitCallback[] = [];

constructor(
queryInterface: any,
private readonly parentQueryContext: EntityTransactionalQueryContext,
entityQueryContextProvider: EntityQueryContextProvider
) {
super(queryInterface, entityQueryContextProvider);
}

public override appendPostCommitCallback(callback: PostCommitCallback): void {
this.postCommitInvalidationCallbacksToTransfer.push(callback);
}

public override appendPostCommitInvalidationCallback(callback: PostCommitCallback): void {
this.postCommitCallbacksToTransfer.push(callback);
}

public override runPostCommitCallbacksAsync(): Promise<void> {
throw new Error(
'Must not call runPostCommitCallbacksAsync on EntityNestedTransactionalQueryContext'
);
}

public transferPostCommitCallbacksToParent(): void {
for (const callback of this.postCommitInvalidationCallbacksToTransfer) {
this.parentQueryContext.appendPostCommitInvalidationCallback(callback);
}

for (const callback of this.postCommitCallbacksToTransfer) {
this.parentQueryContext.appendPostCommitCallback(callback);
}
}
}
Loading

0 comments on commit a77b914

Please sign in to comment.