Skip to content

Commit

Permalink
feat: smooth indexation fix #9
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Bouquillon committed Apr 13, 2021
1 parent 192cd25 commit 9294a3e
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 46 deletions.
43 changes: 29 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,39 +1,54 @@
# cdtn-entreprises : recherche d'entreprises

Ces scripts permettent de générer un index Elastic Search qui regroupe toutes les informations utiles pour rechercher un établissement ou une entreprise par établissement, raison sociale, code postal, ville, siret/siren, effectif, convention collective...
Ce projet permet de générer un index Elastic Search qui regroupe toutes les informations utiles pour rechercher un établissement ou une entreprise par établissement, raison sociale, code postal, ville, siret/siren, effectif, convention collective...

Les données sont issues de [plusieurs jeux de données data.gouv.fr](./assembly/scripts/get-data.sh) et de [kali-data](https://github.com/SocialGouv/kali-data) .
Les données sont issues de [plusieurs jeux de données data.gouv.fr](./assembly/scripts/get-data.sh) et de [kali-data](https://github.com/SocialGouv/kali-data).

Le dossier [`api`](./api) présente un exemple d'implémentation d'API NodeJS qui exploite cet index Elastic Search, avec différents exemples de requêtes.
Le dossier [`api`](./api) présente un exemple d'implémentation d'API NodeJS qui exploite cet index Elastic Search avec différentes requêtes.

## Stages :
## Étapes :

[![](https://mermaid.ink/svg/eyJjb2RlIjoiZ3JhcGggTFJcblxuU3RvY2tVbml0ZUxlZ2FsZS5jc3YtLT5QeUFzc2VtYmx5wqBcbmdlb19zaXJldC5jc3YtLT5QeUFzc2VtYmx5wqBcbndlZXouY3N2LS0-UHlBc3NlbWJsecKgXG5QeUFzc2VtYmx5LS0-YXNzZW1ibHkuY3N2LS0-aW5kZXgtLT5FbGFzdGljU2VhcmNoIiwibWVybWFpZCI6e30sInVwZGF0ZUVkaXRvciI6ZmFsc2V9)](https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoiZ3JhcGggTFJcblxuU3RvY2tVbml0ZUxlZ2FsZS5jc3YtLT5QeUFzc2VtYmx5wqBcbmdlb19zaXJldC5jc3YtLT5QeUFzc2VtYmx5wqBcbndlZXouY3N2LS0-UHlBc3NlbWJsecKgXG5QeUFzc2VtYmx5LS0-YXNzZW1ibHkuY3N2LS0-aW5kZXgtLT5FbGFzdGljU2VhcmNoIiwibWVybWFpZCI6e30sInVwZGF0ZUVkaXRvciI6ZmFsc2V9)
![](https://mermaid.ink/svg/eyJjb2RlIjoiZ3JhcGggTFJcblxuU3RvY2tVbml0ZUxlZ2FsZS5jc3YtLT5QeUFzc2VtYmx5wqBcbmdlb19zaXJldC5jc3YtLT5QeUFzc2VtYmx5wqBcbnNpcmV0MmlkY2MuY3N2LS0-UHlBc3NlbWJsecKgXG5QeUFzc2VtYmx5LS0-YXNzZW1ibHkuY3N2LS0-aW5kZXgtLT5FbGFzdGljU2VhcmNoIiwibWVybWFpZCI6e30sInVwZGF0ZUVkaXRvciI6ZmFsc2V9)

### Assembly
## Assemblage

The assembly CSV file is generated in two steps, from the `assembly/` directory :
Le CSV est généré en deux étapes dans le dossier `assembly/` :

- First we download the different datasets (8GB)
- Téléchargement des datasets (8GB)

`DATA_DIR=./data2/ scripts/get-data.sh`

- Then we execute a Python script to assemble the different sources into a unified dataset. It will be availble in the `OUTPUT_DIR`.
Finally, this Python scripts requires several dependencies (numpy & pandas) that might require OS dependencies. Please use the docker version to avoid system specific configuration.
- Assemblage des fichiers avec Python (numpy & pandas)

`pip install -r requirements.txt`

`DATA_DIR=./data2/ OUTPUT_DIR=./ scripts/assemble.sh`

### Index
Au final, le fichier CSV fait environ 600Mo

Now we use the assembled CSV file to populate an Elastic index. Within the `index/` directory :
| Dataset | usage |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------- |
| [geo-sirene](https://www.data.gouv.fr/fr/datasets/base-sirene-des-entreprises-et-de-leurs-etablissements-siren-siret/#resource-community-c6006b4d-0b4b-4504-a762-1efe69c7ed18) | Version géocodée du stock des établiseement |
| [insee-sirene](https://www.data.gouv.fr/fr/datasets/base-sirene-des-entreprises-et-de-leurs-etablissements-siren-siret/) | Base Sirene des entreprises et de leurs établissements |
| [siret2idcc](https://www.data.gouv.fr/fr/datasets/liste-des-conventions-collectives-par-entreprise-siret/#_) | Lien vers la convention collective |
| [kali-data](https://github.com/SocialGouv/kali-data) | Informations sur les conventions collectives |
| [codes-naf](https://github.com/SocialGouv/codes-naf) | Liste des codes NAF (Nomenclature d’activités française) |

## Indexation Elastic Search

Le dossier `index/` contient les scripts qui injectent le fichier `assembly.csv` dans un index `recherche-entreprises` ElasticSearch.

La mise à jour exploite la fonctionnalité [alias](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/indices-aliases.html) d'ElasticSearch pour éviter les downtimes.

Le script `scripts/create-es-keys.sh` permet de créer des token pour lire/écrire sur ces index.

Pour lancer une indexation :

```sh
yarn install
yarn build

ELASTICSEARCH_URL=https://elastic_url:9200 ELASTICSEARCH_API_KEY=key_with_writing_rights ASSEMBLY_FILE=/path_to/assembly.csv node dist/index.js
ELASTICSEARCH_URL=https://elastic_url:9200 ELASTICSEARCH_API_KEY=key_with_writing_rights ASSEMBLY_FILE=/path_to/assembly.csv yarn start
```

The default `ELASTICSEARCH_INDEX_NAME` is `recherche-entreprises`

1 change: 1 addition & 0 deletions index/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"@elastic/elasticsearch": "^7.10.0",
"@socialgouv/kali-data": "^2.55.0",
"fast-csv": "^4.3.6",
"p-all": "^3.0.0",
"ts-node": "^9.1.1"
},
"devDependencies": {
Expand Down
50 changes: 22 additions & 28 deletions index/src/elastic.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { Client, ClientOptions } from "@elastic/elasticsearch";
import { Enterprise, mapEnterprise, mappings } from "./enterprise";
import pAll from "p-all";

const ELASTICSEARCH_URL =
process.env.ELASTICSEARCH_URL || "http://localhost:9200";
const API_KEY = process.env.ELASTICSEARCH_API_KEY;

const indexName = process.env.ELASTICSEARCH_INDEX_NAME || "recherche-entreprises-test";
const INDEX_NAME =
process.env.ELASTICSEARCH_INDEX_NAME || "recherche-entreprises-test";
const indexPattern = `${INDEX_NAME}-*`;

const auth = API_KEY ? { apiKey: API_KEY } : undefined;

Expand Down Expand Up @@ -71,7 +74,7 @@ const index = {
},
};

const deleteOldIndices = async (alias: string, indexToKeep: string) => {
export const deleteOldIndices = async (indexToKeep: string) => {
const allIndices: string[] = await esClient.cat
.indices({ format: "json" })
.then(({ body }: { body: any }) =>
Expand All @@ -80,7 +83,9 @@ const deleteOldIndices = async (alias: string, indexToKeep: string) => {

// list indices to delete
const matchingIndices = allIndices.filter(
(index) => index.startsWith(alias) && index != indexToKeep
(index) =>
index.startsWith(indexPattern.substring(0, indexPattern.length - 2)) &&
index != indexToKeep
);

const deletePromises = matchingIndices.map((index) =>
Expand All @@ -94,44 +99,38 @@ const deleteOldIndices = async (alias: string, indexToKeep: string) => {
);
};

const updateAlias = (indexPattern: string, newIndex: string, alias: string) =>
export const updateAlias = (newIndexName: string) =>
esClient.indices.updateAliases({
body: {
actions: [
{
remove: {
alias,
alias: INDEX_NAME,
index: indexPattern,
},
},
{
add: {
alias: alias,
index: newIndex,
alias: INDEX_NAME,
index: newIndexName,
},
},
],
},
});

export const resetIndex = async () => {
export const createIndex = async () => {
const id = Math.floor(Math.random() * 100001);

const newIndex = `${indexName}-${id}`;
const indexPattern = `${indexName}-*`;

const newIndexName = `${INDEX_NAME}-${id}`;
const body = { mappings, settings: { analysis, index } };

await esClient.indices.create({
index: newIndex,
index: newIndexName,
body,
});

await updateAlias(indexPattern, newIndex, indexName);
await deleteOldIndices(indexPattern, newIndex);
return newIndexName;
};

const bulkInsert = async (enterprises: Enterprise[]) => {
const bulkInsert = async (enterprises: Enterprise[], indexName: string) => {
// async function bulkIndexDocuments({ client, indexName, documents }) {
try {
const resp = await esClient.bulk({
Expand All @@ -149,8 +148,6 @@ const bulkInsert = async (enterprises: Enterprise[]) => {
),
[]
),

// body: enterprises.map(mapEnterprise),
index: indexName,
});
if (resp.body.errors) {
Expand All @@ -165,7 +162,7 @@ const bulkInsert = async (enterprises: Enterprise[]) => {
}
};

export const add = async (enterprises: Enterprise[]) => {
export const add = async (enterprises: Enterprise[], indexName: string) => {
const batches = [];
let i = 0;

Expand All @@ -176,11 +173,8 @@ export const add = async (enterprises: Enterprise[]) => {

console.log(`${batches.length} batches`);

return batches
.map(bulkInsert)
.reduce((prev, cur, i) => prev.then(cur as any), Promise.resolve());
};

export const query = (query: string): Enterprise[] => {
return [];
return pAll(
batches.map((batch) => () => bulkInsert(batch, indexName)),
{ concurrency: 2 }
);
};
13 changes: 9 additions & 4 deletions index/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import * as fs from "fs";
import * as path from "path";
import * as csv from "fast-csv";
import { add, resetIndex } from "./elastic";
import { add, createIndex, updateAlias, deleteOldIndices } from "./elastic";
import { Enterprise } from "./enterprise";

const ASSEMBLY_FILE = process.env.ASSEMBLY_FILE || "../output/assembly.csv";

const parseEnterprises = () => {
const insertEntreprises = (indexName: string) => {
const stream = fs.createReadStream(path.resolve(ASSEMBLY_FILE));

const BUFFER_SIZE = 500;
Expand All @@ -22,7 +22,7 @@ const parseEnterprises = () => {
// create an immutable copy of the array
const batch = enterprisesBuffer.slice();
enterprisesBuffer = [];
await add(batch)
await add(batch, indexName);

// to run experiments
// stream.destroy();
Expand All @@ -36,5 +36,10 @@ const parseEnterprises = () => {
};

if (require.main === module) {
resetIndex().then(() => parseEnterprises());
// use elastic alias feature to prevent downtimes
createIndex().then(async (indexName) =>
insertEntreprises(indexName)
.then(() => updateAlias(indexName))
.then(() => deleteOldIndices(indexName))
);
}
32 changes: 32 additions & 0 deletions index/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@
resolved "https://registry.yarnpkg.com/@vercel/ncc/-/ncc-0.28.2.tgz#cf0c4f7e4c980bc849eaf115851f2440cd62c8a5"
integrity sha512-2ZBPviK9nFHzymu9POKGz50BRGGBIw7a8VcjgH73Xu2H4IvZx3KS0Qk/SS7S/N6iAalAGGxLSwFM1obeBGWXGg==

aggregate-error@^3.0.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/aggregate-error/-/aggregate-error-3.1.0.tgz#92670ff50f5359bdb7a3e0d40d0ec30c5737687a"
integrity sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==
dependencies:
clean-stack "^2.0.0"
indent-string "^4.0.0"

ansi-styles@^3.2.1:
version "3.2.1"
resolved "https://registry.yarnpkg.com/ansi-styles/-/ansi-styles-3.2.1.tgz#41fbb20243e50b12be0f04b8dedbf07520ce841d"
Expand Down Expand Up @@ -136,6 +144,11 @@ chalk@^2.0.0, chalk@^2.3.0:
escape-string-regexp "^1.0.5"
supports-color "^5.3.0"

clean-stack@^2.0.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/clean-stack/-/clean-stack-2.2.0.tgz#ee8472dbb129e727b31e8a10a427dee9dfe4008b"
integrity sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==

color-convert@^1.9.0:
version "1.9.3"
resolved "https://registry.yarnpkg.com/color-convert/-/color-convert-1.9.3.tgz#bb71850690e1f136567de629d2d5471deda4c1e8"
Expand Down Expand Up @@ -290,6 +303,11 @@ hpagent@^0.1.1:
resolved "https://registry.yarnpkg.com/hpagent/-/hpagent-0.1.1.tgz#66f67f16e5c7a8b59a068e40c2658c2c749ad5e2"
integrity sha512-IxJWQiY0vmEjetHdoE9HZjD4Cx+mYTr25tR7JCxXaiI3QxW0YqYyM11KyZbHufoa/piWhMb2+D3FGpMgmA2cFQ==

indent-string@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/indent-string/-/indent-string-4.0.0.tgz#624f8f4497d619b2d9768531d58f4122854d7251"
integrity sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==

inflight@^1.0.4:
version "1.0.6"
resolved "https://registry.yarnpkg.com/inflight/-/inflight-1.0.6.tgz#49bd6331d7d02d0c09bc910a1075ba8165b56df9"
Expand Down Expand Up @@ -414,6 +432,20 @@ once@^1.3.0, once@^1.3.1, once@^1.4.0:
dependencies:
wrappy "1"

p-all@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/p-all/-/p-all-3.0.0.tgz#077c023c37e75e760193badab2bad3ccd5782bfb"
integrity sha512-qUZbvbBFVXm6uJ7U/WDiO0fv6waBMbjlCm4E66oZdRR+egswICarIdHyVSZZHudH8T5SF8x/JG0q0duFzPnlBw==
dependencies:
p-map "^4.0.0"

p-map@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/p-map/-/p-map-4.0.0.tgz#bb2f95a5eda2ec168ec9274e06a747c3e2904d2b"
integrity sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==
dependencies:
aggregate-error "^3.0.0"

path-is-absolute@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/path-is-absolute/-/path-is-absolute-1.0.1.tgz#174b9268735534ffbc7ace6bf53a5a9e1b5c5f5f"
Expand Down

0 comments on commit 9294a3e

Please sign in to comment.