Skip to content

Commit

Permalink
feat(pack): Simplify the process and make it testable with DI
Browse files Browse the repository at this point in the history
  • Loading branch information
yamadashy committed Jan 25, 2025
1 parent 64fcd14 commit 2be1e27
Show file tree
Hide file tree
Showing 31 changed files with 539 additions and 686 deletions.
3 changes: 2 additions & 1 deletion src/cli/actions/remoteAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const runRemoteAction = async (
deps = {
isGitInstalled,
execGitShallowClone,
runDefaultAction,
},
): Promise<DefaultActionRunnerResult> => {
if (!(await deps.isGitInstalled())) {
Expand Down Expand Up @@ -46,7 +47,7 @@ export const runRemoteAction = async (
logger.log('');

// Run the default action on the cloned repository
result = await runDefaultAction(tempDirPath, tempDirPath, options);
result = await deps.runDefaultAction(tempDirPath, tempDirPath, options);
await copyOutputToCurrentDirectory(tempDirPath, process.cwd(), result.config.output.filePath);
} catch (error) {
spinner.fail('Error during repository cloning. cleanup...');
Expand Down
108 changes: 52 additions & 56 deletions src/core/file/fileCollect.ts
Original file line number Diff line number Diff line change
@@ -1,71 +1,67 @@
import * as fs from 'node:fs/promises';
import path from 'node:path';
import iconv from 'iconv-lite';
import { isBinary } from 'istextorbinary';
import jschardet from 'jschardet';
import pMap from 'p-map';
import pc from 'picocolors';
import { Piscina } from 'piscina';
import { logger } from '../../shared/logger.js';
import { getProcessConcurrency } from '../../shared/processConcurrency.js';
import { getWorkerThreadCount } from '../../shared/processConcurrency.js';
import type { RepomixProgressCallback } from '../../shared/types.js';
import type { RawFile } from './fileTypes.js';
import type { FileCollectTask } from './workers/fileCollectWorker.js';

// Maximum file size to process (50MB)
// This prevents out-of-memory errors when processing very large files
export const MAX_FILE_SIZE = 50 * 1024 * 1024;
const initTaskRunner = (numOfTasks: number) => {
const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks);
logger.trace(`Initializing worker pool with min=${minThreads}, max=${maxThreads} threads`);

Check warning on line 11 in src/core/file/fileCollect.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileCollect.ts#L10-L11

Added lines #L10 - L11 were not covered by tests

export const collectFiles = async (filePaths: string[], rootDir: string): Promise<RawFile[]> => {
const rawFiles = await pMap(
filePaths,
async (filePath) => {
const fullPath = path.resolve(rootDir, filePath);
const content = await readRawFile(fullPath);
if (content) {
return { path: filePath, content };
}
return null;
},
{
concurrency: getProcessConcurrency(),
},
);
const pool = new Piscina({
filename: new URL('./workers/fileCollectWorker.js', import.meta.url).href,
minThreads,
maxThreads,
idleTimeout: 5000,
});

Check warning on line 18 in src/core/file/fileCollect.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileCollect.ts#L13-L18

Added lines #L13 - L18 were not covered by tests

return rawFiles.filter((file): file is RawFile => file != null);
return (task: FileCollectTask) => pool.run(task);

Check warning on line 20 in src/core/file/fileCollect.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileCollect.ts#L20

Added line #L20 was not covered by tests
};

const readRawFile = async (filePath: string): Promise<string | null> => {
try {
const stats = await fs.stat(filePath);

if (stats.size > MAX_FILE_SIZE) {
const sizeMB = (stats.size / 1024 / 1024).toFixed(1);
logger.log('');
logger.log('⚠️ Large File Warning:');
logger.log('──────────────────────');
logger.log(`File exceeds size limit: ${sizeMB}MB > ${MAX_FILE_SIZE / 1024 / 1024}MB (${filePath})`);
logger.note('Add this file to .repomixignore if you want to exclude it permanently');
logger.log('');
return null;
}

if (isBinary(filePath)) {
logger.debug(`Skipping binary file: ${filePath}`);
return null;
}
export const collectFiles = async (
filePaths: string[],
rootDir: string,
progressCallback: RepomixProgressCallback = () => {},
deps = {
initTaskRunner,
},
): Promise<RawFile[]> => {
const runTask = deps.initTaskRunner(filePaths.length);
const tasks = filePaths.map(
(filePath) =>
({
filePath,
rootDir,
}) satisfies FileCollectTask,
);

logger.trace(`Reading file: ${filePath}`);
try {
const startTime = process.hrtime.bigint();
logger.trace(`Starting file collection for ${filePaths.length} files using worker pool`);

const buffer = await fs.readFile(filePath);
let completedTasks = 0;
const totalTasks = tasks.length;

if (isBinary(null, buffer)) {
logger.debug(`Skipping binary file (content check): ${filePath}`);
return null;
}
const results = await Promise.all(
tasks.map((task) =>
runTask(task).then((result) => {
completedTasks++;
progressCallback(`Collect file... (${completedTasks}/${totalTasks}) ${pc.dim(task.filePath)}`);
logger.trace(`Collect files... (${completedTasks}/${totalTasks}) ${task.filePath}`);
return result;
}),
),
);

const encoding = jschardet.detect(buffer).encoding || 'utf-8';
const content = iconv.decode(buffer, encoding);
const endTime = process.hrtime.bigint();
const duration = Number(endTime - startTime) / 1e6;
logger.trace(`File collection completed in ${duration.toFixed(2)}ms`);

return content;
return results.filter((file): file is RawFile => file !== null);
} catch (error) {
logger.warn(`Failed to read file: ${filePath}`, error);
return null;
logger.error('Error during file collection:', error);
throw error;

Check warning on line 65 in src/core/file/fileCollect.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileCollect.ts#L64-L65

Added lines #L64 - L65 were not covered by tests
}
};
144 changes: 31 additions & 113 deletions src/core/file/fileProcess.ts
Original file line number Diff line number Diff line change
@@ -1,97 +1,62 @@
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import pc from 'picocolors';
import { Piscina } from 'piscina';
import type { RepomixConfigMerged } from '../../config/configSchema.js';
import { logger } from '../../shared/logger.js';
import { getWorkerThreadCount } from '../../shared/processConcurrency.js';
import type { RepomixProgressCallback } from '../../shared/types.js';
import { getFileManipulator } from './fileManipulate.js';
import type { ProcessedFile, RawFile } from './fileTypes.js';
import type { FileProcessTask } from './workers/fileProcessWorker.js';

// Worker pool singleton
let workerPool: Piscina | null = null;
const initTaskRunner = (numOfTasks: number) => {
const { minThreads, maxThreads } = getWorkerThreadCount(numOfTasks);
logger.trace(`Initializing worker pool with min=${minThreads}, max=${maxThreads} threads`);

Check warning on line 12 in src/core/file/fileProcess.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileProcess.ts#L11-L12

Added lines #L11 - L12 were not covered by tests

/**
* Initialize the worker pool
*/
const initializeWorkerPool = (): Piscina => {
if (workerPool) {
return workerPool;
}

const { minThreads, maxThreads } = getWorkerThreadCount();
logger.trace(`Initializing file process worker pool with min=${minThreads}, max=${maxThreads} threads`);

workerPool = new Piscina({
filename: path.resolve(path.dirname(fileURLToPath(import.meta.url)), './workers/fileProcessWorker.js'),
const pool = new Piscina({
filename: new URL('./workers/fileProcessWorker.js', import.meta.url).href,
minThreads,
maxThreads,
idleTimeout: 5000,
});

Check warning on line 19 in src/core/file/fileProcess.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileProcess.ts#L14-L19

Added lines #L14 - L19 were not covered by tests

return workerPool;
return (task: FileProcessTask) => pool.run(task);
};

Check warning on line 22 in src/core/file/fileProcess.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileProcess.ts#L21-L22

Added lines #L21 - L22 were not covered by tests

/**
* Process files in chunks to maintain progress visibility and prevent memory issues
*/
async function processFileChunks(
pool: Piscina,
tasks: Array<{ rawFile: RawFile; index: number; totalFiles: number; config: RepomixConfigMerged }>,
progressCallback: RepomixProgressCallback,
chunkSize = 100,
): Promise<ProcessedFile[]> {
const results: ProcessedFile[] = [];
let completedTasks = 0;
const totalTasks = tasks.length;

// Process files in chunks
for (let i = 0; i < tasks.length; i += chunkSize) {
const chunk = tasks.slice(i, i + chunkSize);
const chunkPromises = chunk.map((task) => {
return pool.run(task).then((result) => {
completedTasks++;
progressCallback(`Processing file... (${completedTasks}/${totalTasks}) ${pc.dim(task.rawFile.path)}`);
return result;
});
});

const chunkResults = await Promise.all(chunkPromises);
results.push(...chunkResults);

// Allow event loop to process other tasks
await new Promise((resolve) => setTimeout(resolve, 0));
}

return results;
}

/**
* Process files using a worker thread pool
*/
export const processFiles = async (
rawFiles: RawFile[],
config: RepomixConfigMerged,
progressCallback: RepomixProgressCallback,
deps = {
initTaskRunner,
},
): Promise<ProcessedFile[]> => {
const pool = initializeWorkerPool();
const tasks = rawFiles.map((rawFile, index) => ({
rawFile,
index,
totalFiles: rawFiles.length,
config,
}));
const runTask = deps.initTaskRunner(rawFiles.length);
const tasks = rawFiles.map(
(rawFile, index) =>
({
rawFile,
config,
}) satisfies FileProcessTask,
);

try {
const startTime = process.hrtime.bigint();
logger.trace(`Starting file processing for ${rawFiles.length} files using worker pool`);

// Process files in chunks
const results = await processFileChunks(pool, tasks, progressCallback);
let completedTasks = 0;
const totalTasks = tasks.length;

const results = await Promise.all(
tasks.map((task) =>
runTask(task).then((result) => {
completedTasks++;
progressCallback(`Processing file... (${completedTasks}/${totalTasks}) ${pc.dim(task.rawFile.path)}`);
return result;
}),
),
);

const endTime = process.hrtime.bigint();
const duration = Number(endTime - startTime) / 1e6; // Convert to milliseconds
const duration = Number(endTime - startTime) / 1e6;
logger.trace(`File processing completed in ${duration.toFixed(2)}ms`);

return results;
Expand All @@ -100,50 +65,3 @@ export const processFiles = async (
throw error;

Check warning on line 65 in src/core/file/fileProcess.ts

View check run for this annotation

Codecov / codecov/patch

src/core/file/fileProcess.ts#L64-L65

Added lines #L64 - L65 were not covered by tests
}
};

/**
* Cleanup worker pool resources
*/
export const cleanupWorkerPool = async (): Promise<void> => {
if (workerPool) {
logger.trace('Cleaning up file process worker pool');
await workerPool.destroy();
workerPool = null;
}
};

export const processContent = async (
content: string,
filePath: string,
config: RepomixConfigMerged,
): Promise<string> => {
let processedContent = content;
const manipulator = getFileManipulator(filePath);

logger.trace(`Processing file: ${filePath}`);

const processStartAt = process.hrtime.bigint();

if (config.output.removeComments && manipulator) {
processedContent = manipulator.removeComments(processedContent);
}

if (config.output.removeEmptyLines && manipulator) {
processedContent = manipulator.removeEmptyLines(processedContent);
}

processedContent = processedContent.trim();

if (config.output.showLineNumbers) {
const lines = processedContent.split('\n');
const padding = lines.length.toString().length;
const numberedLines = lines.map((line, index) => `${(index + 1).toString().padStart(padding)}: ${line}`);
processedContent = numberedLines.join('\n');
}

const processEndAt = process.hrtime.bigint();

logger.trace(`Processed file: ${filePath}. Took: ${(Number(processEndAt - processStartAt) / 1e6).toFixed(2)}ms`);

return processedContent;
};
Loading

0 comments on commit 2be1e27

Please sign in to comment.