Skip to content

Commit

Permalink
wip: calculate folder size in workers
Browse files Browse the repository at this point in the history
  • Loading branch information
zaldih committed Jun 6, 2024
1 parent f3d0d39 commit 03cf3b9
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 120 deletions.
33 changes: 0 additions & 33 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
"dependencies": {
"ansi-escapes": "^6.2.1",
"colors": "1.4.0",
"get-folder-size": "^4.0.0",
"node-emoji": "^2.1.3",
"open-file-explorer": "^1.0.2",
"rxjs": "^7.8.1"
Expand Down
2 changes: 2 additions & 0 deletions src/constants/workers.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ export enum EVENTS {
exploreConfig = 'exploreConfig',
explore = 'explore',
scanResult = 'scanResult',
getFolderSize = 'getFolderSize',
getFolderSizeResult = 'getFolderSizeResult',
}
4 changes: 2 additions & 2 deletions src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ export class Controller {
this.logger.info(`Calculating stats for ${nodeFolder.path}`);
return this.fileService.getFolderSize(nodeFolder.path).pipe(
tap((size) => {
nodeFolder.size = this.fileService.convertKbToGB(+size);
this.logger.info(`Size of ${nodeFolder.path}: ${size}kb`);
this.logger.info(`Size of ${nodeFolder.path}: ${size}bytes`);
nodeFolder.size = this.fileService.convertBytesToGb(size);
}),
switchMap(async () => {
// Saves resources by not scanning a result that is probably not of interest
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/file-service.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ export interface IFileService {
deleteDir: (path: string) => Promise<boolean>;
fakeDeleteDir: (_path: string) => Promise<boolean>;
isValidRootFolder: (path: string) => boolean;
convertKbToGB: (kb: number) => number;
convertBytesToKB: (bytes: number) => number;
convertBytesToGb: (bytes: number) => number;
convertGBToMB: (gb: number) => number;
getFileContent: (path: string) => string;
isSafeToDelete: (path: string, targetFolder: string) => boolean;
Expand Down
13 changes: 6 additions & 7 deletions src/services/files/files.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import { readdir, stat } from 'fs/promises';
import { Observable } from 'rxjs';

export abstract class FileService implements IFileService {
abstract getFolderSize(path: string): Observable<number>;
abstract listDir(params: IListDirParams): Observable<string>;
abstract deleteDir(path: string): Promise<boolean>;
abstract getFolderSize(path: string): Observable<number>;

/** Used for dry-run or testing. */
async fakeDeleteDir(_path: string): Promise<boolean> {
const randomDelay = Math.floor(Math.random() * 4000 + 200);
await new Promise((r) => setTimeout(r, randomDelay));
await new Promise((resolve) => setTimeout(resolve, randomDelay));
return true;
}

Expand All @@ -40,16 +40,15 @@ export abstract class FileService implements IFileService {
return true;
}

convertKbToGB(kb: number): number {
const factorKBtoGB = 1048576;
return kb / factorKBtoGB;
}

convertBytesToKB(bytes: number): number {
const factorBytestoKB = 1024;
return bytes / factorBytestoKB;
}

convertBytesToGb(bytes: number): number {
return bytes / Math.pow(1024, 3);
}

convertGBToMB(gb: number): number {
const factorGBtoMB = 1024;
return gb * factorGBtoMB;
Expand Down
50 changes: 37 additions & 13 deletions src/services/files/files.worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { MAX_WORKERS, EVENTS } from '../../constants/workers.constants.js';

export type WorkerStatus = 'stopped' | 'scanning' | 'dead' | 'finished';
interface WorkerJob {
job: 'explore'; // | 'getSize';
job: EVENTS;
value: { path: string };
}

Expand All @@ -29,6 +29,9 @@ export class FileWorkerService {
private index = 0;
private workers: Worker[] = [];
private workersPendingJobs: number[] = [];
private getSizePendings: Array<{ path: string; stream$: Subject<number> }> =
[];

private pendingJobs = 0;
private totalJobs = 0;
private tunnels: MessagePort[] = [];
Expand All @@ -44,7 +47,13 @@ export class FileWorkerService {
this.setWorkerConfig(params);

// Manually add the first job.
this.addJob({ job: 'explore', value: { path: params.path } });
this.addJob({ job: EVENTS.explore, value: { path: params.path } });
}

getFolderSize(stream$: Subject<number>, path: string): void {
// this.listenEvents(stream$);
this.getSizePendings = [...this.getSizePendings, { path, stream$ }];
this.addJob({ job: EVENTS.getFolderSize, value: { path } });
}

private listenEvents(stream$: Subject<string>): void {
Expand Down Expand Up @@ -83,7 +92,7 @@ export class FileWorkerService {
stream$.next(path);
} else {
this.addJob({
job: 'explore',
job: EVENTS.explore,
value: { path },
});
}
Expand All @@ -93,28 +102,43 @@ export class FileWorkerService {
this.checkJobComplete(stream$);
}

if (type === EVENTS.getFolderSizeResult) {
const result: { path: string; size: number } = value.results;
const workerId: number = value.workerId;
this.workersPendingJobs[workerId] = value.pending;

this.getSizePendings.forEach((pending, index) => {
if (pending.path === result.path) {
pending.stream$.next(result.size);
this.getSizePendings.splice(index, 1);
}
});

this.pendingJobs = this.getPendingJobs();
this.checkJobComplete(stream$);
}

if (type === EVENTS.alive) {
this.searchStatus.workerStatus = 'scanning';
}
}

/** Jobs are distributed following the round-robin algorithm. */
private addJob(job: WorkerJob): void {
if (job.job === 'explore') {
const tunnel = this.tunnels[this.index];
const message: WorkerMessage = { type: EVENTS.explore, value: job.value };
tunnel.postMessage(message);
this.workersPendingJobs[this.index]++;
this.totalJobs++;
this.pendingJobs++;
this.index = this.index >= this.workers.length - 1 ? 0 : this.index + 1;
}
const tunnel = this.tunnels[this.index];
const message: WorkerMessage = { type: job.job, value: job.value };
tunnel.postMessage(message);
this.workersPendingJobs[this.index]++;
this.totalJobs++;
this.pendingJobs++;
this.index = this.index >= this.workers.length - 1 ? 0 : this.index + 1;
}

private checkJobComplete(stream$: Subject<string>): void {
this.updateStats();
const isCompleted = this.getPendingJobs() === 0;
if (isCompleted) {
// TODO &&false only for development purposes.
if (isCompleted && false) {
this.searchStatus.workerStatus = 'finished';
stream$.complete();
void this.killWorkers();
Expand Down
105 changes: 93 additions & 12 deletions src/services/files/files.worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Dir, Dirent } from 'fs';
import { opendir } from 'fs/promises';
import { lstat, opendir, readdir, stat } from 'fs/promises';

import EventEmitter from 'events';
import { WorkerMessage } from './files.worker.service';
Expand All @@ -10,7 +10,7 @@ import { EVENTS, MAX_PROCS } from '../../constants/workers.constants.js';

enum ETaskOperation {
'explore',
'getSize',
'getFolderSize',
}
interface Task {
operation: ETaskOperation;
Expand Down Expand Up @@ -51,7 +51,15 @@ interface Task {
}

if (message?.type === EVENTS.explore) {
fileWalker.enqueueTask(message.value.path);
fileWalker.enqueueTask(message.value.path, ETaskOperation.explore);
}

if (message?.type === EVENTS.getFolderSize) {
fileWalker.enqueueTask(
message.value.path,
ETaskOperation.getFolderSize,
true,
);
}
});
}
Expand All @@ -63,6 +71,20 @@ interface Task {
value: { results, workerId: id, pending: fileWalker.pendingJobs },
});
});

fileWalker.events.on(
'folderSizeResult',
(result: { path: string; size: number }) => {
tunnel.postMessage({
type: EVENTS.getFolderSizeResult,
value: {
results: result,
workerId: id,
pending: fileWalker.pendingJobs,
},
});
},
);
}
})();

Expand All @@ -82,8 +104,17 @@ class FileWalker {
this.searchConfig = params;
}

enqueueTask(path: string): void {
this.taskQueue.push({ path, operation: ETaskOperation.explore });
enqueueTask(
path: string,
operation: ETaskOperation,
priorize: boolean = false,
): void {
if (priorize) {
this.taskQueue.unshift({ path, operation });
} else {
this.taskQueue.push({ path, operation });
}

this.processQueue();
}

Expand Down Expand Up @@ -115,6 +146,42 @@ class FileWalker {
}
}

private async runGetFolderSize(path: string): Promise<void> {
this.updateProcs(1);

try {
const size = await this.getFolderSize(path).catch(() => 0);
this.events.emit('folderSizeResult', { path, size });
} catch (_) {
this.completeTask();
}
}

async getFolderSize(dir: string): Promise<number> {
async function calculateDirectorySize(directory: string): Promise<number> {
const entries = await readdir(directory, { withFileTypes: true });

const tasks = entries.map(async (entry) => {
const fullPath = join(directory, entry.name);

if (entry.isDirectory()) {
// Ignore errors.
return calculateDirectorySize(fullPath).catch(() => 0);
} else if (entry.isFile()) {
const fileStat = await lstat(fullPath);
return fileStat.size;
}

return 0;
});

const sizes = await Promise.all(tasks);
return sizes.reduce((total, size) => total + size, 0);
}

return calculateDirectorySize(dir);
}

private newDirEntry(path: string, entry: Dirent, results: any[]): void {
const subpath = join(path, entry.name);
const shouldSkip = !entry.isDirectory() || this.isExcluded(subpath);
Expand Down Expand Up @@ -160,17 +227,31 @@ class FileWalker {

private processQueue(): void {
while (this.procs < MAX_PROCS && this.taskQueue.length > 0) {
const path = this.taskQueue.shift()?.path;
const task = this.taskQueue.shift();

if (task === null || task === undefined) {
return;
}

const path = task.path;

if (path === undefined || path === '') {
return;
}

// Ignore as other mechanisms (pending/completed tasks) are used to
// check the progress of this.
this.run(path).then(
() => {},
() => {},
);
if (task.operation === ETaskOperation.explore) {
this.run(path).then(
() => {},
() => {},
);
}

if (task.operation === ETaskOperation.getFolderSize) {
this.runGetFolderSize(path).then(
() => {},
() => {},
);
}
}
}

Expand Down
Loading

0 comments on commit 03cf3b9

Please sign in to comment.