Skip to content

Commit

Permalink
Showing 2 changed files with 30 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/lease-process/lease-process-pool.ts
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import type { LeaseProcess, LeaseProcessOptions } from "./lease-process";
import { Network, NetworkModule } from "../network";
import { LeaseModule } from "./lease.module";
import { AgreementOptions } from "../market/agreement/agreement";
import AsyncLock from "async-lock";

export interface LeaseProcessPoolDependencies {
allocation: Allocation;
@@ -69,6 +70,7 @@ export class LeaseProcessPool {
private readonly maxPoolSize: number;
private readonly leaseProcessOptions?: LeaseProcessOptions;
private readonly agreementOptions?: AgreementOptions;
private readonly lock = new AsyncLock();

constructor(options: LeaseProcessPoolOptions & LeaseProcessPoolDependencies) {
this.allocation = options.allocation;
@@ -180,16 +182,18 @@ export class LeaseProcessPool {
if (this.isDraining) {
throw new Error("The pool is in draining mode");
}
let leaseProcess = await this.takeValidLeaseProcess();
if (!leaseProcess) {
if (!this.canCreateMoreLeaseProcesses()) {
return this.enqueueAcquire();
return this.lock.acquire("lease-process-pool", async () => {
let leaseProcess = await this.takeValidLeaseProcess();
if (!leaseProcess) {
if (!this.canCreateMoreLeaseProcesses()) {
return this.enqueueAcquire();
}
leaseProcess = await this.createNewLeaseProcess();
}
leaseProcess = await this.createNewLeaseProcess();
}
this.borrowed.add(leaseProcess);
this.events.emit("acquired", leaseProcess.agreement);
return leaseProcess;
this.borrowed.add(leaseProcess);
this.events.emit("acquired", leaseProcess.agreement);
return leaseProcess;
});
}

/**
17 changes: 17 additions & 0 deletions tests/e2e/leaseProcessPool.spec.ts
Original file line number Diff line number Diff line change
@@ -158,4 +158,21 @@ describe("LeaseProcessPool", () => {
await pool.drainAndClear();
await glm.network.removeNetwork(network);
});

it("should not lease more process than maximum size", async () => {
const maxPoolSize = 3;
const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: { min: 1, max: maxPoolSize } });
const poolSizesDuringWork: number[] = [];
pool.events.on("acquired", () => poolSizesDuringWork.push(pool.getSize()));
const data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
await Promise.allSettled(
data.map((item) =>
pool.withLease((lease) =>
lease.getExeUnit().then((exe) => exe.run(`echo ${item} from provider ${exe.provider.name}`)),
),
),
);
expect(poolSizesDuringWork.length).toBeGreaterThan(1);
poolSizesDuringWork.forEach((size) => expect(size).toBeLessThanOrEqual(maxPoolSize));
});
});

0 comments on commit 83a356d

Please sign in to comment.