Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix EMFILE too many open files #8

Merged
merged 6 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## next

- Added `maxConcurrency` option to limit concurrent FS operations, preventing "too many open files" errors (#8)
- Fixed Node.js warnings such as "Warning: Closing file descriptor # on garbage collection", which is deprecated in Node.js 22 and will result in an error being thrown in the future

## 0.1.4 (2024-10-30)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ await repo.dispose();

- `gitdir`: string - path to the git repo
- `options` – optional settings:
- `maxConcurrency` – limit the number of file system operations (default: 50)
- `cruftPacks` – defines how [cruft packs](https://git-scm.com/docs/cruft-packs) are processed:
- `'include'` or `true` (default) - process all packs
- `'exclude'` or `false` - exclude cruft packs from processing
Expand Down
36 changes: 20 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ import { createPackedObjectIndex } from './packed-object-index.js';
import { createFilesMethods } from './files-methods.js';
import { createCommitMethods } from './commits.js';
import { createStatMethod } from './stat.js';
import { promiseAllThreaded } from './utils/threads.js';
import { GitReaderOptions, NormalizedGitReaderOptions, CruftPackMode } from './types';

export * from './types.js';
export * from './parse-object.js';
export { isGitDir, resolveGitDir };

export async function createGitReader(gitdir: string, options?: GitReaderOptions) {
export async function createGitReader(gitdir: string, options?: Partial<GitReaderOptions>) {
const startInitTime = Date.now();
const normalizedOptions = normalizeOptions(options);
const resolvedGitDir = await resolveGitDir(gitdir);
const [refIndex, looseObjectIndex, packedObjectIndex] = await Promise.all([
createRefIndex(resolvedGitDir),
createLooseObjectIndex(resolvedGitDir),
createRefIndex(resolvedGitDir, normalizedOptions),
createLooseObjectIndex(resolvedGitDir, normalizedOptions),
createPackedObjectIndex(resolvedGitDir, normalizedOptions)
]);
const { readObjectHeaderByHash, readObjectByHash, readObjectHeaderByOid, readObjectByOid } =
Expand All @@ -38,27 +39,30 @@ export async function createGitReader(gitdir: string, options?: GitReaderOptions
async dispose() {
await Promise.all([looseObjectIndex.dispose(), packedObjectIndex.dispose()]);
},
stat: createStatMethod({
gitdir: resolvedGitDir,
refIndex,
looseObjectIndex,
packedObjectIndex
}),
stat: createStatMethod(
resolvedGitDir,
{ refIndex, looseObjectIndex, packedObjectIndex },
normalizedOptions
),

initTime: Date.now() - startInitTime
};
}

function normalizeOptions(options?: GitReaderOptions): NormalizedGitReaderOptions {
if (!options || options.cruftPacks === undefined) {
return { cruftPacks: 'include' };
}
function normalizeOptions(options?: Partial<GitReaderOptions>): NormalizedGitReaderOptions {
const { cruftPacks = true, maxConcurrency } = options || {};
const maxConcurrencyNormalized = Number.isFinite(maxConcurrency)
? (maxConcurrency as number)
: 50;

return {
maxConcurrency: maxConcurrencyNormalized,
performConcurrent: (queue, action) =>
promiseAllThreaded(maxConcurrencyNormalized, queue, action),
cruftPacks:
typeof options.cruftPacks === 'string'
? validateCruftPackMode(options.cruftPacks)
: options.cruftPacks // expands true/false aliases
typeof cruftPacks === 'string'
? validateCruftPackMode(cruftPacks)
: cruftPacks // expands true/false aliases
? 'include'
: 'exclude'
};
Expand Down
24 changes: 13 additions & 11 deletions src/loose-object-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
GitObject,
InternalGitObjectContent,
InternalGitObjectHeader,
NormalizedGitReaderOptions,
ObjectsTypeStat,
PackedObjectType
} from './types.js';
Expand All @@ -14,20 +15,21 @@ import { createObjectsTypeStat, objectsStatFromTypes } from './utils/stat.js';
type LooseObjectMap = Map<string, string>;
type LooseObjectMapEntry = [oid: string, relpath: string];

async function createLooseObjectMap(gitdir: string): Promise<LooseObjectMap> {
async function createLooseObjectMap(
gitdir: string,
{ performConcurrent }: NormalizedGitReaderOptions
): Promise<LooseObjectMap> {
const objectsPath = pathJoin(gitdir, 'objects');
const looseDirs = (await fsPromises.readdir(objectsPath)).filter((p) =>
/^[0-9a-f]{2}$/.test(p)
);

const objectDirs = await Promise.all(
looseDirs.map((dir) =>
fsPromises
.readdir(pathJoin(objectsPath, dir))
.then((files) =>
files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`])
)
)
const objectDirs = await performConcurrent(looseDirs, (dir) =>
fsPromises
.readdir(pathJoin(objectsPath, dir))
.then((files) =>
files.map((file): LooseObjectMapEntry => [dir + file, `objects/${dir}/${file}`])
)
);

return new Map(objectDirs.flat().sort(([a], [b]) => (a < b ? -1 : 1)));
Expand Down Expand Up @@ -77,8 +79,8 @@ function parseLooseObject(buffer: Buffer): InternalGitObjectContent {
};
}

export async function createLooseObjectIndex(gitdir: string) {
const looseObjectMap = await createLooseObjectMap(gitdir);
export async function createLooseObjectIndex(gitdir: string, options: NormalizedGitReaderOptions) {
const looseObjectMap = await createLooseObjectMap(gitdir, options);
const { fanoutTable, binaryNames, names } = indexObjectNames([...looseObjectMap.keys()]);

const getOidFromHash = (hash: Buffer) => {
Expand Down
8 changes: 3 additions & 5 deletions src/packed-object-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const PACKDIR = 'objects/pack';
*/
export async function createPackedObjectIndex(
gitdir: string,
{ cruftPacks }: NormalizedGitReaderOptions
{ cruftPacks, performConcurrent }: NormalizedGitReaderOptions
) {
function readObjectHeaderByHash(
hash: Buffer,
Expand Down Expand Up @@ -75,10 +75,8 @@ export async function createPackedObjectIndex(
: !cruftPackFilenames.includes(filename);
});

const packFiles = await Promise.all(
packFilenames.map((filename) =>
readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
)
const packFiles = await performConcurrent(packFilenames, async (filename) =>
readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
);

return {
Expand Down
14 changes: 9 additions & 5 deletions src/resolve-ref.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { promises as fsPromises, existsSync } from 'fs';
import { join as pathJoin, basename, sep as pathSep } from 'path';
import { scanFs } from '@discoveryjs/scan-fs';
import { NormalizedGitReaderOptions } from './types.js';

type Ref = {
name: string;
Expand Down Expand Up @@ -49,7 +50,10 @@ function isOid(value: unknown) {
return typeof value === 'string' && value.length === 40 && /^[0-9a-f]{40}$/.test(value);
}

export async function createRefIndex(gitdir: string) {
export async function createRefIndex(
gitdir: string,
{ performConcurrent }: NormalizedGitReaderOptions
) {
const refResolver = await createRefResolver(gitdir);

// expand a ref into a full form
Expand Down Expand Up @@ -136,8 +140,8 @@ export async function createRefIndex(gitdir: string) {
let cachedRefsWithOid = listRefsWithOidCache.get(prefix);

if (cachedRefsWithOid === undefined) {
const oids = await Promise.all(
cachedRefs.map((name) => refResolver.resolveOid(prefix + name))
const oids = await performConcurrent(cachedRefs, (name) =>
refResolver.resolveOid(prefix + name)
);

cachedRefsWithOid = cachedRefs.map((name, index) => ({
Expand Down Expand Up @@ -210,8 +214,8 @@ export async function createRefIndex(gitdir: string) {

async stat() {
const remotes = listRemotes();
const branchesByRemote = await Promise.all(
remotes.map((remote) => listRemoteBranches(remote))
const branchesByRemote = await performConcurrent(remotes, (remote) =>
listRemoteBranches(remote)
);

return {
Expand Down
21 changes: 11 additions & 10 deletions src/stat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import { sumObjectsStat } from './utils/stat.js';
import { createRefIndex } from './resolve-ref.js';
import { createLooseObjectIndex } from './loose-object-index.js';
import { createPackedObjectIndex } from './packed-object-index.js';
import { NormalizedGitReaderOptions } from './types.js';

export function createStatMethod({
gitdir,
refIndex,
looseObjectIndex,
packedObjectIndex
}: {
gitdir: string;
type CreateStatMethodInput = {
refIndex: Awaited<ReturnType<typeof createRefIndex>>;
looseObjectIndex: Awaited<ReturnType<typeof createLooseObjectIndex>>;
packedObjectIndex: Awaited<ReturnType<typeof createPackedObjectIndex>>;
}) {
};

export function createStatMethod(
gitdir: string,
{ refIndex, looseObjectIndex, packedObjectIndex }: CreateStatMethodInput,
{ performConcurrent }: NormalizedGitReaderOptions
) {
return async function () {
const [refs, looseObjects, packedObjects, { files }] = await Promise.all([
refIndex.stat(),
Expand All @@ -25,8 +26,8 @@ export function createStatMethod({
scanFs(gitdir)
]);

const fileStats = await Promise.all(
files.map((file) => fsPromises.stat(path.join(gitdir, file.path)))
const fileStats = await performConcurrent(files, (file) =>
fsPromises.stat(path.join(gitdir, file.path))
);

const objectsTypes = looseObjects.objects.types.map((entry) => ({ ...entry }));
Expand Down
13 changes: 12 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,20 @@ export interface GitReaderOptions {
*
* @default 'include'
*/
cruftPacks?: CruftPackMode | boolean;
cruftPacks: CruftPackMode | boolean;

/**
* Maximum number of concurrent file system operations.
* @default 50
*/
maxConcurrency: number;
}

export interface NormalizedGitReaderOptions {
cruftPacks: CruftPackMode;
maxConcurrency: number;
performConcurrent: <T, R>(
queue: T[],
action: (item: T, itemIdx: number) => Promise<R>
) => Promise<R[]>;
}
47 changes: 47 additions & 0 deletions src/utils/threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Run async tasks in queue with a maximum number of threads.
* Works like Promise.all, but with a maximum number of threads.
* - The order of the results is guaranteed to be the same as the order of the input queue.
* - If any task fails, the whole queue is rejected.
* - If the queue is empty, the result is an empty array.
* - If the queue has only one task, the result is an array with one element.
*
* @example
* // Before
* const packFiles = await Promise.all(
* packFilenames.map((filename) =>
* readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
* )
* );
*
* // After
* const packFiles = await promiseAllThreaded(50, packFilenames, async (filename) =>
* readPackFile(gitdir, `${PACKDIR}/${filename}`, readObjectHeaderByHash, readObjectByHash)
* );
*/
export async function promiseAllThreaded<T, R>(
maxThreadCount: number,
queue: T[],
asyncFn: (task: T, taskIdx: number) => Promise<R>
): Promise<R[]> {
const result = Array(queue.length);
let taskProcessed = 0;
let queueSnapshot = [...queue];
const thread = async () => {
while (taskProcessed < queueSnapshot.length) {
const taskIdx = taskProcessed++;
const task = queueSnapshot[taskIdx];
result[taskIdx] = await asyncFn(task, taskIdx);
}
};

await Promise.all(
Array.from({ length: Math.min(maxThreadCount, queueSnapshot.length) }, () => thread())
).catch((err) => {
// remove all pending tasks
queueSnapshot = [];
throw err;
});

return result;
}
29 changes: 29 additions & 0 deletions test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
import assert from 'assert';
import { readEncodedOffset, BufferCursor } from '../src/utils/buffer.js';
import { promiseAllThreaded } from '../src/utils/threads.js';

it('readEncodedOffset', () => {
const buffer = Buffer.from([142, 254, 254, 254, 254, 254, 254, 127]);
const cursor = new BufferCursor(buffer);

assert.strictEqual(readEncodedOffset(cursor), Number.MAX_SAFE_INTEGER);
});

it('promiseAllThreaded', async () => {
const maxThreadCount = 2;
const queue = [1, 2, 3, 4, 5];
const asyncFn = async (task: number) => task * 2;

const result = await promiseAllThreaded(maxThreadCount, queue, asyncFn);

assert.deepStrictEqual(result, [2, 4, 6, 8, 10]);
});

it('promiseAllThreaded with error', async () => {
const maxThreadCount = 2;
const queue = [1, 2, 3, 4, 5];
const asyncFn = async (task: number) => {
if (task === 3) {
throw new Error('Task failed');
}
return task * 2;
};

try {
await promiseAllThreaded(maxThreadCount, queue, asyncFn);
assert.fail('Expected an error');
} catch (err) {
assert.strictEqual(err.message, 'Task failed');
}
});
Loading