Skip to content

Commit

Permalink
fix: wait for all PRs to attempt to merge before throwing error (#4663)
Browse files Browse the repository at this point in the history
* feat: add helper for running work items in parallel

* fix: run cleanDatastore and checkPRMergeability in forAllInAsyncGroups with thrown errors
  • Loading branch information
chingor13 authored Dec 2, 2022
1 parent e1bd3bf commit b236b99
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 58 deletions.
1 change: 1 addition & 0 deletions packages/merge-on-green/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/merge-on-green/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"dependencies": {
"@google-automations/label-utils": "^3.0.0",
"@google-cloud/datastore": "^7.0.0",
"aggregate-error": "^3.1.0",
"gcf-utils": "^14.2.0"
},
"devDependencies": {
Expand Down
111 changes: 53 additions & 58 deletions packages/merge-on-green/src/merge-on-green.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import {
MERGE_ON_GREEN_LABEL_SECURE,
MERGE_ON_GREEN_LABELS,
} from './labels';
import {forAllInAsyncGroups} from './parallel-work';

const TABLE = 'mog-prs';
const datastore = new Datastore();
const MAX_TEST_TIME = 1000 * 60 * 60 * 6; // 6 hr.
const WORKER_SIZE = 4;
const MAX_ENTRIES = 25;

handler.allowlist = [
Expand Down Expand Up @@ -392,29 +392,27 @@ handler.cleanDatastoreTable = async function cleanDatastoreTable(
watchedPRs: DatastorePR[],
logger: GCFLogger
) {
while (watchedPRs.length) {
const work = watchedPRs.splice(0, WORKER_SIZE);
await Promise.all(
work.map(async wp => {
logger.info(`checking ${wp.url}, ${wp.installationId} for cleanup`);
if (!wp.installationId) {
logger.warn(`installationId is not provided for ${wp.url}, skipping`);
return;
}
const octokit = await getAuthenticatedOctokit(wp.installationId);
await handler.checkIfPRIsInvalid(
wp.owner,
wp.repo,
wp.number,
wp.label,
wp.reactionId,
wp.url,
octokit,
logger
);
})
async function cleanDatastoreItem(wp: DatastorePR) {
logger.info(`checking ${wp.url}, ${wp.installationId} for cleanup`);
if (!wp.installationId) {
logger.warn(`installationId is not provided for ${wp.url}, skipping`);
return;
}
const octokit = await getAuthenticatedOctokit(wp.installationId);
await handler.checkIfPRIsInvalid(
wp.owner,
wp.repo,
wp.number,
wp.label,
wp.reactionId,
wp.url,
octokit,
logger
);
}
await forAllInAsyncGroups(watchedPRs, cleanDatastoreItem, {
throwOnError: true,
});
};

/**
Expand Down Expand Up @@ -453,50 +451,47 @@ handler.checkPRMergeability = async function checkPRMergeability(
octokit: Octokit,
logger: GCFLogger
) {
while (watchedPRs.length) {
const work = watchedPRs.splice(0, WORKER_SIZE);
await Promise.all(
work.map(async wp => {
const prLogger = addPullRequestLoggerContext(logger, wp);
prLogger.info(`checking ${wp.url}, ${wp.installationId}`);
async function checkSinglePR(wp: DatastorePR): Promise<void> {
const prLogger = addPullRequestLoggerContext(logger, wp);
prLogger.info(`checking ${wp.url}, ${wp.installationId}`);
try {
const remove = await mergeOnGreen(
wp.owner,
wp.repo,
wp.number,
[MERGE_ON_GREEN_LABEL, MERGE_ON_GREEN_LABEL_SECURE],
wp.state,
wp.branchProtection!,
wp.label,
wp.author,
octokit,
prLogger
);
if (remove || wp.state === 'stop') {
await handler.removePR(wp.url, prLogger);
try {
const remove = await mergeOnGreen(
await handler.cleanUpPullRequest(
wp.owner,
wp.repo,
wp.number,
[MERGE_ON_GREEN_LABEL, MERGE_ON_GREEN_LABEL_SECURE],
wp.state,
wp.branchProtection!,
wp.label,
wp.author,
octokit,
prLogger
wp.reactionId,
octokit
);
} catch (err) {
prLogger.warn(
`Failed to delete reaction and label on ${wp.owner}/${wp.repo}/${wp.number}`
);
if (remove || wp.state === 'stop') {
await handler.removePR(wp.url, prLogger);
try {
await handler.cleanUpPullRequest(
wp.owner,
wp.repo,
wp.number,
wp.label,
wp.reactionId,
octokit
);
} catch (err) {
prLogger.warn(
`Failed to delete reaction and label on ${wp.owner}/${wp.repo}/${wp.number}`
);
}
}
} catch (e) {
const err = e as Error;
err.message = `Error in merge-on-green: \n\n${err.message}`;
prLogger.error(err);
}
})
);
}
} catch (e) {
const err = e as Error;
err.message = `Error in merge-on-green: \n\n${err.message}`;
prLogger.error(err);
throw e;
}
}
await forAllInAsyncGroups(watchedPRs, checkSinglePR, {throwOnError: true});
};

/**
Expand Down
62 changes: 62 additions & 0 deletions packages/merge-on-green/src/parallel-work.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import AggregateError from 'aggregate-error';

const DEFAULT_GROUP_SIZE = 4;

interface AsyncGroupsOptions {
groupSize?: number;
throwOnError?: boolean;
}
type SingleItemHandler<TItem, TResult> = (item: TItem) => Promise<TResult>;
/**
* Helper that executes a single item handler for each item in parallel
* groups.
* @param {TItem[]} items The list of items to handle.
* @param {SingleItemHandler<TItem, TResult>} asyncHandler The single item async
* handler
* @param {number} options.groupSize The number of parallel executions. Defaults to 4.
* @param {boolean} options.throwOnError Whether to throw an AggregateError if any items fail
* @returns A list of settled promise results (either failure or success).
* @throws {AggregateError} if any items fails which contains all the thrown Errors.
*/
export async function forAllInAsyncGroups<TItem, TResult>(
items: TItem[],
asyncHandler: SingleItemHandler<TItem, TResult>,
options: AsyncGroupsOptions = {}
): Promise<PromiseSettledResult<TResult>[]> {
const groupSize = options?.groupSize ?? DEFAULT_GROUP_SIZE;
const throwOnError = options?.throwOnError ?? false;
let results: PromiseSettledResult<TResult>[] = [];
for (let i = 0; i < items.length; i += groupSize) {
const group = items.slice(i, i + groupSize);
const partial = await Promise.allSettled(group.map(pr => asyncHandler(pr)));
results = results.concat(...partial);
}

if (throwOnError) {
const errors: Error[] = [];
for (const result of results) {
if (result.status === 'rejected') {
errors.push(result.reason);
}
}
if (errors.length > 0) {
throw new AggregateError(errors);
}
}

return results;
}
68 changes: 68 additions & 0 deletions packages/merge-on-green/test/parallel-work.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {describe, it} from 'mocha';
import assert from 'assert';
import {forAllInAsyncGroups} from '../src/parallel-work';
import AggregateError from 'aggregate-error';

describe('forAllInAsyncGroups', () => {
const handler = async function (x: number): Promise<number> {
if (x % 2 === 0) {
return x;
} else {
throw new Error(`odd number: ${x}`);
}
};
const input: number[] = [];
for (let i = 0; i < 10; i++) {
input.push(i);
}

it('runs all work items', async () => {
const results = await forAllInAsyncGroups(input, handler);
assert.strictEqual(results.length, 10);
const successes = results.filter(result => result.status === 'fulfilled');
assert.strictEqual(successes.length, 5);
const failures = results.filter(result => result.status === 'rejected');
assert.strictEqual(failures.length, 5);
for (const failure of failures) {
assert.ok(
failure.status === 'rejected' && failure.reason instanceof Error
);
}
});

it('throws if throwOnError is specified', async () => {
await assert.rejects(
async () => {
await forAllInAsyncGroups(input, handler, {throwOnError: true});
},
err => {
return err instanceof AggregateError;
}
);
});

it('succeeds when throwOnError is specified', async () => {
const results = await forAllInAsyncGroups(
input.map(x => x * 2),
handler,
{throwOnError: true}
);
assert.strictEqual(results.length, 10);
const successes = results.filter(result => result.status === 'fulfilled');
assert.strictEqual(successes.length, 10);
});
});

0 comments on commit b236b99

Please sign in to comment.