Skip to content

Commit

Permalink
Parallelizes package unplugging (#3955)
Browse files Browse the repository at this point in the history
* Parallelizes package unplugging

* Versions

* Update packages/yarnpkg-core/sources/miscUtils.ts

Co-authored-by: Kristoffer K. <merceyz@users.noreply.github.com>

Co-authored-by: Kristoffer K. <merceyz@users.noreply.github.com>
  • Loading branch information
arcanis and merceyz authored Jan 7, 2022
1 parent 28781c1 commit a248d0b
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 75 deletions.
33 changes: 33 additions & 0 deletions .yarn/versions/d5a60103.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
releases:
"@yarnpkg/cli": patch
"@yarnpkg/core": minor
"@yarnpkg/plugin-pnp": minor
"@yarnpkg/plugin-pnpm": minor

declined:
- "@yarnpkg/plugin-compat"
- "@yarnpkg/plugin-constraints"
- "@yarnpkg/plugin-dlx"
- "@yarnpkg/plugin-essentials"
- "@yarnpkg/plugin-exec"
- "@yarnpkg/plugin-file"
- "@yarnpkg/plugin-git"
- "@yarnpkg/plugin-github"
- "@yarnpkg/plugin-http"
- "@yarnpkg/plugin-init"
- "@yarnpkg/plugin-interactive-tools"
- "@yarnpkg/plugin-link"
- "@yarnpkg/plugin-nm"
- "@yarnpkg/plugin-npm"
- "@yarnpkg/plugin-npm-cli"
- "@yarnpkg/plugin-pack"
- "@yarnpkg/plugin-patch"
- "@yarnpkg/plugin-stage"
- "@yarnpkg/plugin-typescript"
- "@yarnpkg/plugin-version"
- "@yarnpkg/plugin-workspace-tools"
- "@yarnpkg/builder"
- "@yarnpkg/doctor"
- "@yarnpkg/nm"
- "@yarnpkg/pnpify"
- "@yarnpkg/sdks"
36 changes: 21 additions & 15 deletions packages/plugin-pnp/sources/PnpLinker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {miscUtils, structUtils, formatUtils, Descriptor, LocatorHash} from '@yarnpkg/core';
import {miscUtils, structUtils, formatUtils, Descriptor, LocatorHash, InstallPackageExtraApi} from '@yarnpkg/core';
import {FetchResult, Locator, Package} from '@yarnpkg/core';
import {Linker, LinkOptions, MinimalLinkOptions, Manifest, MessageName, DependencyMeta, LinkType, Installer} from '@yarnpkg/core';
import {AliasFS, CwdFS, PortablePath, VirtualFS, npath, ppath, xfs, Filename} from '@yarnpkg/fslib';
Expand Down Expand Up @@ -86,6 +86,8 @@ export class PnpLinker implements Linker {
export class PnpInstaller implements Installer {
protected mode = `strict`;

private readonly asyncActions = new miscUtils.AsyncActions(10);

private readonly packageRegistry: PackageRegistry = new Map();

private readonly virtualTemplates: Map<LocatorHash, {
Expand Down Expand Up @@ -116,7 +118,7 @@ export class PnpInstaller implements Installer {
this.customData = customData;
}

async installPackage(pkg: Package, fetchResult: FetchResult) {
async installPackage(pkg: Package, fetchResult: FetchResult, api: InstallPackageExtraApi) {
const key1 = structUtils.stringifyIdent(pkg);
const key2 = pkg.reference;

Expand Down Expand Up @@ -166,7 +168,7 @@ export class PnpInstaller implements Installer {
: [];

const packageFs = mayNeedToBeUnplugged
? await this.unplugPackageIfNeeded(pkg, customPackageData!, fetchResult, dependencyMeta!)
? await this.unplugPackageIfNeeded(pkg, customPackageData!, fetchResult, dependencyMeta!, api)
: fetchResult.packageFs;

if (ppath.isAbsolute(fetchResult.prefixPath))
Expand Down Expand Up @@ -294,6 +296,8 @@ export class PnpInstaller implements Installer {
shebang,
});

await this.asyncActions.wait();

return {
customData: this.customData,
};
Expand Down Expand Up @@ -409,9 +413,9 @@ export class PnpInstaller implements Installer {

private readonly unpluggedPaths: Set<string> = new Set();

private async unplugPackageIfNeeded(pkg: Package, customPackageData: CustomPackageData, fetchResult: FetchResult, dependencyMeta: DependencyMeta) {
private async unplugPackageIfNeeded(pkg: Package, customPackageData: CustomPackageData, fetchResult: FetchResult, dependencyMeta: DependencyMeta, api: InstallPackageExtraApi) {
if (this.shouldBeUnplugged(pkg, customPackageData, dependencyMeta)) {
return this.unplugPackage(pkg, fetchResult);
return this.unplugPackage(pkg, fetchResult, api);
} else {
return fetchResult.packageFs;
}
Expand All @@ -436,25 +440,27 @@ export class PnpInstaller implements Installer {
return false;
}

private async unplugPackage(locator: Locator, fetchResult: FetchResult) {
private async unplugPackage(locator: Locator, fetchResult: FetchResult, api: InstallPackageExtraApi) {
const unplugPath = pnpUtils.getUnpluggedPath(locator, {configuration: this.opts.project.configuration});
if (this.opts.project.disabledLocators.has(locator.locatorHash))
return new AliasFS(unplugPath, {baseFs: fetchResult.packageFs, pathUtils: ppath});

this.unpluggedPaths.add(unplugPath);

const readyFile = ppath.join(unplugPath, fetchResult.prefixPath, `.ready` as Filename);
if (await xfs.existsPromise(readyFile))
return new CwdFS(unplugPath);
api.holdFetchResult(this.asyncActions.set(locator.locatorHash, async () => {
const readyFile = ppath.join(unplugPath, fetchResult.prefixPath, `.ready` as Filename);
if (await xfs.existsPromise(readyFile))
return;

// Delete any build state for the locator so it can run anew, this allows users
// to remove `.yarn/unplugged` and have the builds run again
this.opts.project.storedBuildState.delete(locator.locatorHash);
// Delete any build state for the locator so it can run anew, this allows users
// to remove `.yarn/unplugged` and have the builds run again
this.opts.project.storedBuildState.delete(locator.locatorHash);

await xfs.mkdirPromise(unplugPath, {recursive: true});
await xfs.copyPromise(unplugPath, PortablePath.dot, {baseFs: fetchResult.packageFs, overwrite: false});
await xfs.mkdirPromise(unplugPath, {recursive: true});
await xfs.copyPromise(unplugPath, PortablePath.dot, {baseFs: fetchResult.packageFs, overwrite: false});

await xfs.writeFilePromise(readyFile, ``);
await xfs.writeFilePromise(readyFile, ``);
}));

return new CwdFS(unplugPath);
}
Expand Down
65 changes: 5 additions & 60 deletions packages/plugin-pnpm/sources/PnpmLinker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import {Descriptor, FetchResult, formatUtils, Installer, InstallPackageExtraApi, Linker, LinkOptions, LinkType, Locator, LocatorHash, Manifest, MessageName, MinimalLinkOptions, Package, Project, structUtils} from '@yarnpkg/core';
import {Dirent, Filename, PortablePath, ppath, xfs} from '@yarnpkg/fslib';
import {jsInstallUtils} from '@yarnpkg/plugin-pnp';
import {UsageError} from 'clipanion';
import pLimit from 'p-limit';
import {Descriptor, FetchResult, formatUtils, Installer, InstallPackageExtraApi, Linker, LinkOptions, LinkType, Locator, LocatorHash, Manifest, MessageName, MinimalLinkOptions, Package, Project, miscUtils, structUtils} from '@yarnpkg/core';
import {Dirent, Filename, PortablePath, ppath, xfs} from '@yarnpkg/fslib';
import {jsInstallUtils} from '@yarnpkg/plugin-pnp';
import {UsageError} from 'clipanion';

export type PnpmCustomData = {
pathByLocator: Map<LocatorHash, PortablePath>;
Expand Down Expand Up @@ -72,7 +71,7 @@ export class PnpmLinker implements Linker {
}

class PnpmInstaller implements Installer {
private asyncActions = new AsyncActions();
private readonly asyncActions = new miscUtils.AsyncActions(10);

constructor(private opts: LinkOptions) {
// Nothing to do
Expand Down Expand Up @@ -391,57 +390,3 @@ async function removeIfEmpty(dir: PortablePath) {
}
}
}

type Deferred = {
promise: Promise<void>;
resolve: () => void;
reject: (err: Error) => void;
};

function makeDeferred(): Deferred {
let resolve: () => void;
let reject: (err: Error) => void;

const promise = new Promise<void>((resolveFn, rejectFn) => {
resolve = resolveFn;
reject = rejectFn;
});

return {promise, resolve: resolve!, reject: reject!};
}

class AsyncActions {
deferred = new Map<string, Deferred>();
promises = new Map<string, Promise<void>>();
limit = pLimit(10);

set(key: string, factory: () => Promise<void>) {
let deferred = this.deferred.get(key);
if (typeof deferred === `undefined`)
this.deferred.set(key, deferred = makeDeferred());

const promise = this.limit(() => factory());
this.promises.set(key, promise);

promise.then(() => {
if (this.promises.get(key) === promise) {
deferred!.resolve();
}
}, err => {
if (this.promises.get(key) === promise) {
deferred!.reject(err);
}
});

return deferred.promise;
}

reduce(key: string, factory: (action: Promise<void>) => Promise<void>) {
const promise = this.promises.get(key) ?? Promise.resolve();
this.set(key, () => factory(promise));
}

async wait() {
await Promise.all(this.promises.values());
}
}
60 changes: 60 additions & 0 deletions packages/yarnpkg-core/sources/miscUtils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {PortablePath, npath, xfs} from '@yarnpkg/fslib';
import {UsageError} from 'clipanion';
import micromatch from 'micromatch';
import pLimit, {Limit} from 'p-limit';
import semver from 'semver';
import {Readable, Transform} from 'stream';

Expand Down Expand Up @@ -225,6 +226,65 @@ export class BufferStream extends Transform {
}
}

type Deferred = {
promise: Promise<void>;
resolve: () => void;
reject: (err: Error) => void;
};

function makeDeferred(): Deferred {
let resolve: () => void;
let reject: (err: Error) => void;

const promise = new Promise<void>((resolveFn, rejectFn) => {
resolve = resolveFn;
reject = rejectFn;
});

return {promise, resolve: resolve!, reject: reject!};
}

export class AsyncActions {
private deferred = new Map<string, Deferred>();
private promises = new Map<string, Promise<void>>();

private limit: Limit;

constructor(limit: number) {
this.limit = pLimit(limit);
}

set(key: string, factory: () => Promise<void>) {
let deferred = this.deferred.get(key);
if (typeof deferred === `undefined`)
this.deferred.set(key, deferred = makeDeferred());

const promise = this.limit(() => factory());
this.promises.set(key, promise);

promise.then(() => {
if (this.promises.get(key) === promise) {
deferred!.resolve();
}
}, err => {
if (this.promises.get(key) === promise) {
deferred!.reject(err);
}
});

return deferred.promise;
}

reduce(key: string, factory: (action: Promise<void>) => Promise<void>) {
const promise = this.promises.get(key) ?? Promise.resolve();
this.set(key, () => factory(promise));
}

async wait() {
await Promise.all(this.promises.values());
}
}

// A stream implementation that prints a message if nothing was output

export class DefaultStream extends Transform {
Expand Down

0 comments on commit a248d0b

Please sign in to comment.