Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

112 handle cascade deletes #118

Merged
merged 6 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions couch2pg/src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import * as db from './db.js';
const SELECT_SEQ_STMT = `SELECT seq FROM ${db.postgresProgressTable} WHERE source = $1`;
const INSERT_SEQ_STMT = `INSERT INTO ${db.postgresProgressTable}(seq, source) VALUES ($1, $2)`;
const UPDATE_SEQ_STMT = `UPDATE ${db.postgresProgressTable} SET seq = $1 WHERE source = $2`;
const INSERT_DOCS_STMT = `INSERT INTO ${db.postgresTable} ("@timestamp", _id, _deleted, doc) VALUES`;
const INSERT_DOCS_STMT = `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, doc) VALUES`;
const ON_CONFLICT_STMT = `
ON CONFLICT (_id) DO UPDATE SET
"@timestamp" = EXCLUDED."@timestamp",
saved_timestamp = EXCLUDED.saved_timestamp,
_deleted = EXCLUDED._deleted,
doc = EXCLUDED.doc
`;
Expand Down
2 changes: 1 addition & 1 deletion couch2pg/src/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as db from './db.js';
const createSchema = `CREATE SCHEMA IF NOT EXISTS ${db.postgresSchema}`;
const createTable = `
CREATE TABLE IF NOT EXISTS ${db.postgresTable} (
"@timestamp" TIMESTAMP,
saved_timestamp TIMESTAMP,
_id VARCHAR PRIMARY KEY,
_deleted BOOLEAN,
doc jsonb
Expand Down
20 changes: 10 additions & 10 deletions couch2pg/tests/unit/importer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ const getSeqMatch = () => `SELECT seq FROM ${db.postgresProgressTable} WHERE sou
const insertSeqMatch = () => `INSERT INTO ${db.postgresProgressTable}(seq, source) VALUES ($1, $2)`;
const updateSeqMatch = () => `UPDATE ${db.postgresProgressTable} SET seq = $1 WHERE source = $2`;

const insertDocsMatch = () => `INSERT INTO ${db.postgresTable} ("@timestamp", _id, _deleted, doc) VALUES`;
const insertDocsMatch = () => `INSERT INTO ${db.postgresTable} (saved_timestamp, _id, _deleted, doc) VALUES`;

const ON_CONFLICT_STMT = `
ON CONFLICT (_id) DO UPDATE SET
"@timestamp" = EXCLUDED."@timestamp",
saved_timestamp = EXCLUDED.saved_timestamp,
_deleted = EXCLUDED._deleted,
doc = EXCLUDED.doc
`;
Expand Down Expand Up @@ -134,7 +134,7 @@ describe('importer', () => {
expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
Expand Down Expand Up @@ -212,7 +212,7 @@ describe('importer', () => {

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).callCount).to.equal(3);
expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[0]).to.deep.equal([
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
Expand All @@ -233,7 +233,7 @@ describe('importer', () => {
]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[1]).to.deep.equal([
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
Expand All @@ -254,7 +254,7 @@ describe('importer', () => {
]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args[2]).to.deep.equal([
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
Expand Down Expand Up @@ -293,7 +293,7 @@ describe('importer', () => {
await importer(couchDb);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4) ' + ON_CONFLICT_STMT,
[
new Date().toISOString(),
Expand Down Expand Up @@ -330,7 +330,7 @@ describe('importer', () => {
await importer(couchDb);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4) ' + ON_CONFLICT_STMT,
[
new Date().toISOString(),
Expand Down Expand Up @@ -370,7 +370,7 @@ describe('importer', () => {
expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc2'] }]]);

expect(pgClient.query.withArgs(sinon.match(insertDocsMatch())).args).to.deep.equal([[
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
Expand Down Expand Up @@ -499,7 +499,7 @@ describe('importer', () => {
expect(couchDb.allDocs.args).to.deep.equal([[{ include_docs: true, keys: ['doc1', 'doc2', 'doc3'] }]]);

const queryArgs = [
'INSERT INTO v1.whatever ("@timestamp", _id, _deleted, doc) VALUES ' +
'INSERT INTO v1.whatever (saved_timestamp, _id, _deleted, doc) VALUES ' +
'($1, $2, $3, $4),($5, $6, $7, $8),($9, $10, $11, $12) ' + ON_CONFLICT_STMT,
[
now.toISOString(),
Expand Down
2 changes: 1 addition & 1 deletion couch2pg/tests/unit/setup.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('setup', () => {
expect(pgClient.query.args[0]).to.deep.equal(['CREATE SCHEMA IF NOT EXISTS v1']);
expect(pgClient.query.args[1]).to.deep.equal([ `
CREATE TABLE IF NOT EXISTS v1.whatever (
"@timestamp" TIMESTAMP,
saved_timestamp TIMESTAMP,
_id VARCHAR PRIMARY KEY,
_deleted BOOLEAN,
doc jsonb
Expand Down
23 changes: 15 additions & 8 deletions tests/dbt/package/models/contacts/contacts.sql
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
{{
config(
materialized = 'incremental',
unique_key='_id',
unique_key='uuid',
on_schema_change='append_new_columns',
post_hook='delete from {{this}} where _deleted=true',
indexes=[
{'columns': ['"_id"'], 'type': 'hash'},
{'columns': ['"@timestamp"']},
{'columns': ['uuid'], 'type': 'hash'},
{'columns': ['saved_timestamp']},
{'columns': ['contact_type']},
]
)
}}

SELECT
_id,
"@timestamp",
doc,
_id as uuid,
saved_timestamp,
_deleted,
doc->>'edited' AS edited,
to_timestamp((NULLIF(doc ->> 'reported_date'::text, ''::text)::bigint / 1000)::double precision) AS reported,
doc->'parent'->>'_id' AS parent_uuid,
doc->>'name' AS name,
COALESCE(doc->>'contact_type', doc->>'type') as contact_type,
doc->>'phone' AS phone
doc->>'phone' AS phone,
doc->>'alternative_phone' AS phone2,
doc->>'is_active' AS active,
doc->>'notes' AS notes,
doc->>'contact_id' AS contact_id
FROM {{ env_var('ROOT_POSTGRES_SCHEMA') }}.{{ env_var('POSTGRES_TABLE') }}
WHERE
(
doc->>'type' IN ('contact', 'clinic', 'district_hospital', 'health_center', 'person')
or _deleted = true
)
{% if is_incremental() %}
and "@timestamp" >= (select coalesce(max("@timestamp"), '1900-01-01') from {{ this }})
and saved_timestamp >= (select coalesce(max(saved_timestamp), '1900-01-01') from {{ this }})
{% endif %}
49 changes: 47 additions & 2 deletions tests/dbt/package/models/contacts/contacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,58 @@ version: 2

models:
- name: contacts
config:
contract:
enforced: true
columns:
- name: _id
- name: uuid
data_type: string
constraints:
- type: unique
tests:
- not_null
- name: saved_timestamp
data_type: timestamp
- name: _deleted
data_type: boolean
- name: reported
data_type: timestamp with time zone
- name: parent_uuid
data_type: string
- name: name
data_type: string
- name: contact_type
data_type: string
- name: phone
data_type: string
- name: phone2
data_type: string
- name: active
data_type: string
- name: notes
data_type: string
- name: contact_id
data_type: string
- name: edited
data_type: string

- name: persons
config:
contract:
enforced: true
columns:
- name: _id
- name: uuid
data_type: string
constraints:
- type: foreign_key
expression: "{{ env_var('POSTGRES_SCHEMA') }}.contacts (uuid) ON DELETE CASCADE"
tests:
- not_null
- name: saved_timestamp
data_type: timestamp
- name: date_of_birth
data_type: string
- name: sex
data_type: string
- name: edited
data_type: string
20 changes: 10 additions & 10 deletions tests/dbt/package/models/contacts/persons.sql
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
{{
config(
materialized = 'incremental',
unique_key='_id',
unique_key='uuid',
on_schema_change='append_new_columns',
indexes=[
{'columns': ['_id'], 'type': 'hash'},
{'columns': ['"@timestamp"']},
{'columns': ['uuid'], 'type': 'hash'},
{'columns': ['saved_timestamp']},
]
)
}}

SELECT
contact._id,
contact._deleted,
contact.doc,
contact."@timestamp",
contact.uuid,
contact.saved_timestamp,
couchdb.doc->>'date_of_birth' as date_of_birth,
couchdb.doc->>'sex' as sex
couchdb.doc->>'sex' as sex,
doc->>'edited' AS edited
FROM {{ ref("contacts") }} contact
INNER JOIN {{ env_var('ROOT_POSTGRES_SCHEMA') }}.{{ env_var('POSTGRES_TABLE') }} couchdb ON couchdb._id = contact._id
INNER JOIN {{ env_var('ROOT_POSTGRES_SCHEMA') }}.{{ env_var('POSTGRES_TABLE') }} couchdb ON couchdb._id = contact.uuid
WHERE
contact.contact_type = 'person'
{% if is_incremental() %}
and couchdb."@timestamp" >= (select coalesce(max("@timestamp"), '1900-01-01') from {{ this }})
and couchdb.saved_timestamp >= (select coalesce(max(saved_timestamp), '1900-01-01') from {{ this }})
{% endif %}
17 changes: 9 additions & 8 deletions tests/dbt/package/models/reports/reports.sql
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
{{
config(
materialized = 'incremental',
unique_key='_id',
unique_key='uuid',
post_hook='delete from {{this}} where _deleted=true',
indexes=[
{'columns': ['"_id"'], 'type': 'hash'},
{'columns': ['"@timestamp"']},
{'columns': ['"form"']},
{'columns': ['"patient_id"']},
{'columns': ['uuid'], 'type': 'hash'},
{'columns': ['saved_timestamp']},
{'columns': ['form']},
{'columns': ['patient_id']},
]
)
}}

SELECT
_id,
"@timestamp",
_id as uuid,
saved_timestamp,
doc,
doc->>'form' as form,
_deleted,
Expand All @@ -38,5 +39,5 @@ WHERE (
or _deleted = true
)
{% if is_incremental() %}
and "@timestamp" >= (select coalesce(max("@timestamp"), '1900-01-01') from {{ this }})
and saved_timestamp >= (select coalesce(max(saved_timestamp), '1900-01-01') from {{ this }})
{% endif %}
35 changes: 16 additions & 19 deletions tests/e2e-test.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ describe('Main workflow Test Suite', () => {
const pgTableContact = await client.query(`SELECT * from ${PGTABLE} where _id = $1`, [contact._id]);
expect(pgTableContact.rows[0].doc.edited).to.equal(1);

await delay(6); // wait for DBT
await delay(12); // wait for DBT

const modelReportResult = await client.query(`SELECT * FROM ${pgSchema}.reports where _id = $1`, [report._id]);
const modelReportResult = await client.query(`SELECT * FROM ${pgSchema}.reports where uuid = $1`, [report._id]);
expect(modelReportResult.rows[0].doc.edited).to.equal(1);

const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where _id = $1`, [contact._id]);
expect(modelContactResult.rows[0].doc.edited).to.equal(1);
const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where uuid = $1`, [contact._id]);
expect(modelContactResult.rows[0].edited).to.equal('1');

const modelPersonResult = await client.query(`SELECT * FROM ${pgSchema}.persons where _id = $1`, [contact._id]);
expect(modelPersonResult.rows[0].doc.edited).to.equal(1);
const modelPersonResult = await client.query(`SELECT * FROM ${pgSchema}.persons where uuid = $1`, [contact._id]);
expect(modelPersonResult.rows[0].edited).to.equal('1');

const contactsTableResult = await client.query(`SELECT * FROM ${pgSchema}.contacts`);
expect(contactsTableResult.rows.length).to.equal(contacts().length);
Expand All @@ -104,27 +104,25 @@ describe('Main workflow Test Suite', () => {
const pgTableContact = await client.query(`SELECT * from ${PGTABLE} where _id = $1`, [contact._id]);
expect(pgTableContact.rows[0]._deleted).to.equal(true);
await delay(6); // wait for DBT
const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where _id = $1`, [contact._id]);
expect(modelContactResult.rows.length).to.equal(1);
expect(modelContactResult.rows[0]._deleted).to.equal(true);
const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where uuid = $1`, [contact._id]);
expect(modelContactResult.rows.length).to.equal(0);
});

it('should process person deletes', async () => {
const person = contacts().find(contact => contact.type === 'person' && !contact._deleted);

const preDelete = await client.query(`SELECT * FROM ${pgSchema}.persons where _id = $1`, [person._id]);
const preDelete = await client.query(`SELECT * FROM ${pgSchema}.persons where uuid = $1`, [person._id]);
expect(preDelete.rows.length).to.equal(1);

await deleteDoc(person);
await delay(6); // wait for CHT-Sync
await delay(6); // wait for DBT
await delay(6); // wait for DBT
await delay(12); // wait for DBT

const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where _id = $1`, [person._id]);
expect(modelContactResult.rows[0]._deleted).to.equal(true);
const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where uuid = $1`, [person._id]);
expect(modelContactResult.rows.length).to.equal(0);

const postDelete = await client.query(`SELECT * FROM ${pgSchema}.persons where _id = $1`, [person._id]);
expect(postDelete.rows[0]._deleted).to.equal(true);
const postDelete = await client.query(`SELECT * FROM ${pgSchema}.persons where uuid = $1`, [person._id]);
expect(postDelete.rows.length).to.equal(0);
});

it('should process report deletes', async () => {
Expand All @@ -134,8 +132,7 @@ describe('Main workflow Test Suite', () => {
const pgTableReport = await client.query(`SELECT * from ${PGTABLE} where _id = $1`, [report._id]);
expect(pgTableReport.rows[0]._deleted).to.equal(true);
await delay(6); // wait for DBT
const modelReportResult = await client.query(`SELECT * FROM ${pgSchema}.reports where _id = $1`, [report._id]);
expect(modelReportResult.rows.length).to.equal(1);
expect(modelReportResult.rows[0]._deleted).to.equal(true);
const modelReportResult = await client.query(`SELECT * FROM ${pgSchema}.reports where uuid = $1`, [report._id]);
expect(modelReportResult.rows.length).to.equal(0);
});
});