Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): Concurrency not fully using queue potential #11828

Merged
merged 21 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions cli/src/commands/asset.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Action,
AssetBulkUploadCheckItem,
AssetBulkUploadCheckResult,
AssetMediaResponseDto,
AssetMediaStatus,
Expand All @@ -11,7 +12,7 @@ import {
getSupportedMediaTypes,
} from '@immich/sdk';
import byteSize from 'byte-size';
import { Presets, SingleBar } from 'cli-progress';
import { MultiBar, Presets, SingleBar } from 'cli-progress';
import { chunk } from 'lodash-es';
import { Stats, createReadStream } from 'node:fs';
import { stat, unlink } from 'node:fs/promises';
Expand Down Expand Up @@ -90,23 +91,23 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
return { newFiles: files, duplicates: [] };
}

const progressBar = new SingleBar(
{ format: 'Checking files | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
const multiBar = new MultiBar(
{ format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
Presets.shades_classic,
);

progressBar.start(files.length, 0);
const hashProgressBar = multiBar.create(files.length, 0, { message: 'Hashing files ' });
const checkProgressBar = multiBar.create(files.length, 0, { message: 'Checking for duplicates' });

const newFiles: string[] = [];
const duplicates: Asset[] = [];

const queue = new Queue<string[], AssetBulkUploadCheckResults>(
async (filepaths: string[]) => {
const dto = await Promise.all(
filepaths.map(async (filepath) => ({ id: filepath, checksum: await sha1(filepath) })),
);
const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets: dto } });
const checkBulkUploadQueue = new Queue<AssetBulkUploadCheckItem[], void>(
async (assets: AssetBulkUploadCheckItem[]) => {
const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets } });

const results = response.results as AssetBulkUploadCheckResults;

for (const { id: filepath, assetId, action } of results) {
if (action === Action.Accept) {
newFiles.push(filepath);
Expand All @@ -115,19 +116,46 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
duplicates.push({ id: assetId as string, filepath });
}
}
progressBar.increment(filepaths.length);

checkProgressBar.increment(assets.length);
},
{ concurrency, retry: 3 },
);

const results: { id: string; checksum: string }[] = [];
let checkBulkUploadRequests: AssetBulkUploadCheckItem[] = [];

const queue = new Queue<string, AssetBulkUploadCheckItem[]>(
async (filepath: string): Promise<AssetBulkUploadCheckItem[]> => {
const dto = { id: filepath, checksum: await sha1(filepath) };

results.push(dto);
checkBulkUploadRequests.push(dto);
if (checkBulkUploadRequests.length === 5000) {
const batch = checkBulkUploadRequests;
checkBulkUploadRequests = [];
void checkBulkUploadQueue.push(batch);
}

hashProgressBar.increment();
return results;
},
{ concurrency, retry: 3 },
);

for (const items of chunk(files, concurrency)) {
await queue.push(items);
for (const item of files) {
void queue.push(item);
}

await queue.drained();

progressBar.stop();
if (checkBulkUploadRequests.length > 0) {
void checkBulkUploadQueue.push(checkBulkUploadRequests);
}

await checkBulkUploadQueue.drained();

multiBar.stop();

console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`);

Expand Down Expand Up @@ -201,8 +229,8 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo
{ concurrency, retry: 3 },
);

for (const filepath of files) {
await queue.push(filepath);
for (const item of files) {
void queue.push(item);
}

await queue.drained();
Expand Down
4 changes: 2 additions & 2 deletions cli/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ export class Queue<T, R> {
* @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker.
* This promise could be ignored as it will not lead to a `unhandledRejection`.
*/
async drained(): Promise<void> {
await this.queue.drain();
drained(): Promise<void> {
return this.queue.drained();
}

/**
Expand Down
42 changes: 21 additions & 21 deletions e2e/src/cli/specs/upload.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe(`immich upload`, () => {
describe(`immich upload /path/to/file.jpg`, () => {
it('should upload a single file', async () => {
const { stderr, stdout, exitCode } = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
);
Expand All @@ -126,7 +126,7 @@ describe(`immich upload`, () => {
const expectedCount = Object.entries(files).filter((entry) => entry[1]).length;

const { stderr, stdout, exitCode } = await immichCli(['upload', ...commandLine]);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining(`Successfully uploaded ${expectedCount} new asset`)]),
);
Expand Down Expand Up @@ -154,7 +154,7 @@ describe(`immich upload`, () => {
cpSync(`${testAssetDir}/albums/nature/silver_fir.jpg`, testPaths[1]);

const { stderr, stdout, exitCode } = await immichCli(['upload', ...testPaths]);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Successfully uploaded 2 new assets')]),
);
Expand All @@ -169,7 +169,7 @@ describe(`immich upload`, () => {

it('should skip a duplicate file', async () => {
const first = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
expect(first.stderr).toBe('');
expect(first.stderr).toContain('{message}');
expect(first.stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
);
Expand All @@ -179,7 +179,7 @@ describe(`immich upload`, () => {
expect(assets.total).toBe(1);

const second = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
expect(second.stderr).toBe('');
expect(second.stderr).toContain('{message}');
expect(second.stdout.split('\n')).toEqual(
expect.arrayContaining([
expect.stringContaining('Found 0 new files and 1 duplicate'),
Expand All @@ -205,7 +205,7 @@ describe(`immich upload`, () => {
`${testAssetDir}/albums/nature/silver_fir.jpg`,
'--dry-run',
]);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Would have uploaded 1 asset')]),
);
Expand All @@ -217,7 +217,7 @@ describe(`immich upload`, () => {

it('dry run should handle duplicates', async () => {
const first = await immichCli(['upload', `${testAssetDir}/albums/nature/silver_fir.jpg`]);
expect(first.stderr).toBe('');
expect(first.stderr).toContain('{message}');
expect(first.stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Successfully uploaded 1 new asset')]),
);
Expand All @@ -227,7 +227,7 @@ describe(`immich upload`, () => {
expect(assets.total).toBe(1);

const second = await immichCli(['upload', `${testAssetDir}/albums/nature/`, '--dry-run']);
expect(second.stderr).toBe('');
expect(second.stderr).toContain('{message}');
expect(second.stdout.split('\n')).toEqual(
expect.arrayContaining([
expect.stringContaining('Found 8 new files and 1 duplicate'),
Expand All @@ -241,7 +241,7 @@ describe(`immich upload`, () => {
describe('immich upload --recursive', () => {
it('should upload a folder recursively', async () => {
const { stderr, stdout, exitCode } = await immichCli(['upload', `${testAssetDir}/albums/nature/`, '--recursive']);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Successfully uploaded 9 new assets')]),
);
Expand All @@ -267,7 +267,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Successfully updated 9 assets'),
]),
);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(exitCode).toBe(0);

const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand All @@ -283,7 +283,7 @@ describe(`immich upload`, () => {
expect(response1.stdout.split('\n')).toEqual(
expect.arrayContaining([expect.stringContaining('Successfully uploaded 9 new assets')]),
);
expect(response1.stderr).toBe('');
expect(response1.stderr).toContain('{message}');
expect(response1.exitCode).toBe(0);

const assets1 = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand All @@ -299,7 +299,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Successfully updated 9 assets'),
]),
);
expect(response2.stderr).toBe('');
expect(response2.stderr).toContain('{message}');
expect(response2.exitCode).toBe(0);

const assets2 = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand All @@ -325,7 +325,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Would have updated albums of 9 assets'),
]),
);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(exitCode).toBe(0);

const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand All @@ -351,7 +351,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Successfully updated 9 assets'),
]),
);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(exitCode).toBe(0);

const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand All @@ -377,7 +377,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Would have updated albums of 9 assets'),
]),
);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(exitCode).toBe(0);

const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand Down Expand Up @@ -408,7 +408,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Deleting assets that have been uploaded'),
]),
);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(exitCode).toBe(0);

const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand All @@ -434,7 +434,7 @@ describe(`immich upload`, () => {
expect.stringContaining('Would have deleted 9 local assets'),
]),
);
expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(exitCode).toBe(0);

const assets = await getAssetStatistics({}, { headers: asKeyAuth(key) });
Expand Down Expand Up @@ -493,7 +493,7 @@ describe(`immich upload`, () => {
'2',
]);

expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([
'Found 9 new files and 0 duplicates',
Expand Down Expand Up @@ -534,7 +534,7 @@ describe(`immich upload`, () => {
'silver_fir.jpg',
]);

expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([
'Found 8 new files and 0 duplicates',
Expand All @@ -555,7 +555,7 @@ describe(`immich upload`, () => {
'!(*_*_*).jpg',
]);

expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([
'Found 1 new files and 0 duplicates',
Expand All @@ -577,7 +577,7 @@ describe(`immich upload`, () => {
'--dry-run',
]);

expect(stderr).toBe('');
expect(stderr).toContain('{message}');
expect(stdout.split('\n')).toEqual(
expect.arrayContaining([
'Found 8 new files and 0 duplicates',
Expand Down
Loading