Skip to content

Commit

Permalink
fix: key value stores emitting an error when multiple write promises …
Browse files Browse the repository at this point in the history
…ran in parallel (#1460)
  • Loading branch information
vladfrangu authored Aug 12, 2022
1 parent 99d125e commit f201cca
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions packages/memory-storage/src/memory-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import type * as storage from '@crawlee/types';
import type { Dictionary } from '@crawlee/types';
import { s } from '@sapphire/shapeshift';
import { ensureDir, pathExistsSync } from 'fs-extra';
import { ensureDirSync, pathExistsSync } from 'fs-extra';
import { renameSync } from 'node:fs';
import { rm, rename, readdir } from 'node:fs/promises';
import { resolve } from 'node:path';
import { DatasetClient } from './resource-clients/dataset';
Expand Down Expand Up @@ -114,6 +115,20 @@ export class MemoryStorage implements storage.StorageClient {
* - local directory containing the default request queue.
*/
async purge(): Promise<void> {
// Key-value stores
const keyValueStores = await readdir(this.keyValueStoresDirectory).catch(() => []);
const keyValueStorePromises: Promise<void>[] = [];

for (const keyValueStoreFolder of keyValueStores) {
if (keyValueStoreFolder.startsWith('__CRAWLEE_TEMPORARY') || keyValueStoreFolder.startsWith('__OLD')) {
keyValueStorePromises.push((await this.batchRemoveFiles(resolve(this.keyValueStoresDirectory, keyValueStoreFolder)))());
} else if (keyValueStoreFolder === 'default') {
keyValueStorePromises.push(this.handleDefaultKeyValueStore(resolve(this.keyValueStoresDirectory, keyValueStoreFolder))());
}
}

void Promise.allSettled(keyValueStorePromises);

// Datasets
const datasets = await readdir(this.datasetsDirectory).catch(() => []);
const datasetPromises: Promise<void>[] = [];
Expand All @@ -137,20 +152,6 @@ export class MemoryStorage implements storage.StorageClient {
}

void Promise.allSettled(requestQueuePromises);

// Key-value stores
const keyValueStores = await readdir(this.keyValueStoresDirectory).catch(() => []);
const keyValueStorePromises: Promise<void>[] = [];

for (const keyValueStoreFolder of keyValueStores) {
if (keyValueStoreFolder.startsWith('__CRAWLEE_TEMPORARY')) {
keyValueStorePromises.push((await this.batchRemoveFiles(resolve(this.keyValueStoresDirectory, keyValueStoreFolder)))());
} else if (keyValueStoreFolder === 'default') {
keyValueStorePromises.push((await this.handleDefaultKeyValueStore(resolve(this.keyValueStoresDirectory, keyValueStoreFolder)))());
}
}

void Promise.allSettled(keyValueStorePromises);
}

/**
Expand All @@ -162,7 +163,7 @@ export class MemoryStorage implements storage.StorageClient {
await Promise.all(promises);
}

private async handleDefaultKeyValueStore(folder: string): Promise<() => Promise<void>> {
private handleDefaultKeyValueStore(folder: string): () => Promise<void> {
const storagePathExists = pathExistsSync(folder);
const temporaryPath = resolve(folder, '../__CRAWLEE_MIGRATING_KEY_VALUE_STORE__');

Expand All @@ -176,26 +177,36 @@ export class MemoryStorage implements storage.StorageClient {

if (storagePathExists) {
// Create temporary folder to save important files in
await ensureDir(temporaryPath);
ensureDirSync(temporaryPath);

// Go through each file and save the ones that are important
for (const entity of possibleInputKeys) {
const originalFilePath = resolve(folder, entity);
const tempFilePath = resolve(temporaryPath, entity);

try {
await rename(originalFilePath, tempFilePath);
renameSync(originalFilePath, tempFilePath);
} catch {
// Ignore
}
}

// Remove the original folder and all its content
const tempPathForOldFolder = resolve(folder, '../__OLD_DEFAULT__');
await rename(folder, tempPathForOldFolder);
let counter = 0;
let tempPathForOldFolder = resolve(folder, `../__OLD_DEFAULT_${counter}__`);
let done = false;

while (!done) {
try {
renameSync(folder, tempPathForOldFolder);
done = true;
} catch {
tempPathForOldFolder = resolve(folder, `../__OLD_DEFAULT_${++counter}__`);
}
}

// Replace the temporary folder with the original folder
await rename(temporaryPath, folder);
renameSync(temporaryPath, folder);

// Remove the old folder
return async () => (await this.batchRemoveFiles(tempPathForOldFolder))();
Expand Down

0 comments on commit f201cca

Please sign in to comment.