Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import unified owners #1053

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ allowed_patterns:
- \[key\]
- key=
- 'key:'
- .keys()
version: "1.0"
18 changes: 18 additions & 0 deletions server/src/infra/database/migrations/20241223105911-owners-dept.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable('owners_dept', (table) => {
table
.uuid('owner_id')
.notNullable()
.references('id')
.inTable('owners')
.onUpdate('CASCADE')
.onDelete('CASCADE');
table.string('owner_idpersonne').notNullable();
});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTable('owners_dept');
}
13 changes: 8 additions & 5 deletions server/src/models/HousingOwnerApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@ export interface HousingOwnerApi extends OwnerApi {
locprop?: number;
}

type Incorrect = -1;
type Awaiting = -2;
export const AWAITING_RANK = -2 as const;
export const INCORRECT_RANK = -1 as const;
export const POSITIVE_RANKS = [1, 2, 3, 4, 5, 6] as const;

export type IncorrectRank = typeof INCORRECT_RANK;
export type AwaitingRank = typeof AWAITING_RANK;
export type PositiveRank = (typeof POSITIVE_RANKS)[number];
export type Rank = Incorrect | Awaiting | PositiveRank;
export function isIncorrect(rank: Rank): rank is Incorrect {
export type Rank = IncorrectRank | AwaitingRank | PositiveRank;
export function isIncorrect(rank: Rank): rank is IncorrectRank {
return rank === -1;
}
export function isAwaiting(rank: Rank): rank is Awaiting {
export function isAwaiting(rank: Rank): rank is AwaitingRank {
return rank === -2;
}

Expand Down
28 changes: 28 additions & 0 deletions server/src/repositories/departmentalOwnersRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Readable } from 'node:stream';
import { ReadableStream } from 'node:stream/web';

import db from '~/infra/database';

export const departmentalOwnersTable = 'owners_dept';
export const DepartmentalOwners = (transaction = db) =>
transaction<DepartmentalOwnerDBO>(departmentalOwnersTable);

export interface DepartmentalOwnerDBO {
owner_id: string;
owner_idpersonne: string;
}

function stream(): ReadableStream<DepartmentalOwnerDBO> {
const query = DepartmentalOwners()
.select(`${departmentalOwnersTable}.*`)
.orderBy('owner_idpersonne')
.stream();

return Readable.toWeb(query);
}

const departmentalOwnersRepository = {
stream
};

export default departmentalOwnersRepository;
102 changes: 102 additions & 0 deletions server/src/scripts/import-unified-owners/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Import unified owners

## Usage

### Migrating

```shell
DATABASE_URL=... yarn workspace @zerologementvacant/server migrate
```

### Importing to postgres

```sql
INSTALL httpfs;
LOAD httpfs;

CREATE OR REPLACE SECRET (
TYPE S3,
KEY_ID '...',
SECRET '...',
ENDPOINT 'cellar-c2.services.clever-cloud.com',
REGION 'us-east-1'
);

CREATE TABLE IF NOT EXISTS "dev"."main"."unified_owners" AS
SELECT
id,
ff_owner_idpersonne AS idpersonne
FROM
read_csv(
's3://zlv-production/production/dump_20241218/unified_owners.csv',
auto_detect = TRUE,
ignore_errors = TRUE
)
ORDER BY id;

WITH duplicates AS (
SELECT COUNT(*), unified_owners.idpersonne FROM unified_owners
GROUP BY unified_owners.idpersonne
HAVING COUNT(*) > 1
)
SELECT * FROM duplicates
LIMIT 10;

COPY (
SELECT DISTINCT ON (idpersonne) idpersonne, id FROM "dev"."main"."unified_owners"
) TO 'unified-owners.csv' (HEADER, DELIMITER ',');

-- Import to postgres
INSTALL postgres;
LOAD postgres;

CREATE OR REPLACE SECRET (
TYPE POSTGRES,
HOST 'localhost',
PORT 5432,
DATABASE 'dev',
USER 'postgres',
PASSWORD 'postgres'
);

ATTACH IF NOT EXISTS '' AS postgres (TYPE POSTGRES);

TRUNCATE TABLE "postgres"."public"."owners_dept";

INSERT INTO "postgres"."public"."owners_dept"
SELECT id AS owner_id, idpersonne AS owner_idpersonne FROM read_csv(
'unified-owners.csv',
auto_detect = TRUE,
header = TRUE,
ignore_errors = TRUE
);


-- Show some metrics
SELECT COUNT(*) FROM "dev"."main"."unified_owners";
SELECT COUNT(*) FROM "postgres"."public"."owners_dept";

-- Are there housings that have several awaiting owners?
SELECT housing_id, COUNT(*) AS count FROM "postgres"."public"."owners_housing"
WHERE rank = -2
GROUP BY housing_id
HAVING COUNT(*) >= 2;

-- Awaiting national housing owners
SELECT COUNT(*) FROM "postgres"."public"."owners_housing"
JOIN "postgres"."public"."owners" ON owners.id = owners_housing.owner_id
WHERE rank = -2 AND idpersonne IS NULL;

-- Departmental housing owners
SELECT COUNT(*) FROM "postgres"."public"."owners_housing"
JOIN "postgres"."public"."owners" ON owners.id = owners_housing.owner_id
WHERE rank >= 1 AND idpersonne IS NOT NULL;
```

### Running the script

This script will be processing the actual housing owners, in production.
```shell
cd server
DATABASE_URL=... yarn ts-node src/scripts/import-unified-owners/index.ts
```
98 changes: 98 additions & 0 deletions server/src/scripts/import-unified-owners/command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import {
createProcessor,
FindHousingOwnersOptions,
RemoveEventsOptions
} from '~/scripts/import-unified-owners/processor';
import { AWAITING_RANK, HousingOwnerApi } from '~/models/HousingOwnerApi';
import {
HousingOwners,
housingOwnersTable
} from '~/repositories/housingOwnerRepository';
import {
ownerTable,
parseHousingOwnerApi
} from '~/repositories/ownerRepository';
import departmentalOwnersRepository from '~/repositories/departmentalOwnersRepository';
import { createLogger } from '~/infra/logger';
import {
eventsTable,
HousingEvents,
housingEventsTable
} from '~/repositories/eventRepository';
import { usersTable } from '~/repositories/userRepository';

const logger = createLogger('command');

export default function createImportUnifiedOwnersCommand() {
return async (): Promise<void> => {
const processor = createProcessor({
findHousingOwners,
updateHousingOwner,
removeHousingOwner,
removeEvents
});

await departmentalOwnersRepository.stream().pipeTo(processor);
};
}

export async function findHousingOwners(
options: FindHousingOwnersOptions
): Promise<ReadonlyArray<HousingOwnerApi>> {
const query = HousingOwners()
.select(`${housingOwnersTable}.*`)
.join(ownerTable, `${ownerTable}.id`, `${housingOwnersTable}.owner_id`)
.select(`${ownerTable}.*`);

// Split the request to allow Postgres to use the indexes
const [nationalOwners, departmentalOwners] = await Promise.all([
query.clone().where({
owner_id: options.nationalOwner,
rank: AWAITING_RANK
}),
query
.clone()
.where('idpersonne', options.departmentalOwner)
.where('rank', '>=', 1)
]);
const housingOwners = nationalOwners.concat(departmentalOwners);
return housingOwners.map(parseHousingOwnerApi);
}

export async function updateHousingOwner(
housingOwner: HousingOwnerApi
): Promise<void> {
logger.debug('Updating housing owner...', housingOwner);
await HousingOwners().update({ rank: housingOwner.rank }).where({
owner_id: housingOwner.ownerId,
housing_id: housingOwner.housingId,
housing_geo_code: housingOwner.housingGeoCode
});
}

export async function removeHousingOwner(
housingOwner: HousingOwnerApi
): Promise<void> {
logger.debug('Removing housing owner...', housingOwner);
await HousingOwners().delete().where({
owner_id: housingOwner.ownerId,
housing_id: housingOwner.housingId,
housing_geo_code: housingOwner.housingGeoCode
});
}

export async function removeEvents(
options: RemoveEventsOptions
): Promise<void> {
logger.debug('Removing events...', options);
await HousingEvents()
.where({ housing_id: options.housingId })
.join(eventsTable, `${eventsTable}.id`, `${housingEventsTable}.event_id`)
.where({ name: 'Changement de propriétaires' })
.whereRaw(
`${eventsTable}.created_at::date BETWEEN '2024-09-08' AND '2024-09-09'`
)
.join(usersTable, `${usersTable}.id`, `${eventsTable}.created_by`)
.where(`${usersTable}.email`, '=', 'admin@zerologementvacant.beta.gouv.fr')
.delete();
}
12 changes: 12 additions & 0 deletions server/src/scripts/import-unified-owners/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import createImportUnifiedOwnersCommand from '~/scripts/import-unified-owners/command';

const importer = createImportUnifiedOwnersCommand();
importer()
.then(() => {
console.log('Done.');
process.exit(0);
})
.catch((error) => {
console.error(error);
process.exit(1);
});
Loading
Loading