From b236b99e64cc8d571cfe9be6f3ffec37792f5151 Mon Sep 17 00:00:00 2001 From: Jeff Ching Date: Fri, 2 Dec 2022 11:21:16 -0800 Subject: [PATCH] fix: wait for all PRs to attempt to merge before throwing error (#4663) * feat: add helper for running work items in parallel * fix: run cleanDatastore and checkPRMergeability in forAllInAsyncGroups with thrown errors --- packages/merge-on-green/package-lock.json | 1 + packages/merge-on-green/package.json | 1 + packages/merge-on-green/src/merge-on-green.ts | 111 +++++++++--------- packages/merge-on-green/src/parallel-work.ts | 62 ++++++++++ packages/merge-on-green/test/parallel-work.ts | 68 +++++++++++ 5 files changed, 185 insertions(+), 58 deletions(-) create mode 100644 packages/merge-on-green/src/parallel-work.ts create mode 100644 packages/merge-on-green/test/parallel-work.ts diff --git a/packages/merge-on-green/package-lock.json b/packages/merge-on-green/package-lock.json index 9314457ff34..ea9236e278d 100644 --- a/packages/merge-on-green/package-lock.json +++ b/packages/merge-on-green/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "@google-automations/label-utils": "^3.0.0", "@google-cloud/datastore": "^7.0.0", + "aggregate-error": "^3.1.0", "gcf-utils": "^14.2.0" }, "devDependencies": { diff --git a/packages/merge-on-green/package.json b/packages/merge-on-green/package.json index 1a18a396cfd..29a88109681 100644 --- a/packages/merge-on-green/package.json +++ b/packages/merge-on-green/package.json @@ -29,6 +29,7 @@ "dependencies": { "@google-automations/label-utils": "^3.0.0", "@google-cloud/datastore": "^7.0.0", + "aggregate-error": "^3.1.0", "gcf-utils": "^14.2.0" }, "devDependencies": { diff --git a/packages/merge-on-green/src/merge-on-green.ts b/packages/merge-on-green/src/merge-on-green.ts index 7dd1dcbda16..406b3bc6beb 100644 --- a/packages/merge-on-green/src/merge-on-green.ts +++ b/packages/merge-on-green/src/merge-on-green.ts @@ -27,11 +27,11 @@ import { MERGE_ON_GREEN_LABEL_SECURE, MERGE_ON_GREEN_LABELS, } from './labels'; +import {forAllInAsyncGroups} from './parallel-work'; const TABLE = 'mog-prs'; const datastore = new Datastore(); const MAX_TEST_TIME = 1000 * 60 * 60 * 6; // 6 hr. -const WORKER_SIZE = 4; const MAX_ENTRIES = 25; handler.allowlist = [ @@ -392,29 +392,27 @@ handler.cleanDatastoreTable = async function cleanDatastoreTable( watchedPRs: DatastorePR[], logger: GCFLogger ) { - while (watchedPRs.length) { - const work = watchedPRs.splice(0, WORKER_SIZE); - await Promise.all( - work.map(async wp => { - logger.info(`checking ${wp.url}, ${wp.installationId} for cleanup`); - if (!wp.installationId) { - logger.warn(`installationId is not provided for ${wp.url}, skipping`); - return; - } - const octokit = await getAuthenticatedOctokit(wp.installationId); - await handler.checkIfPRIsInvalid( - wp.owner, - wp.repo, - wp.number, - wp.label, - wp.reactionId, - wp.url, - octokit, - logger - ); - }) + async function cleanDatastoreItem(wp: DatastorePR) { + logger.info(`checking ${wp.url}, ${wp.installationId} for cleanup`); + if (!wp.installationId) { + logger.warn(`installationId is not provided for ${wp.url}, skipping`); + return; + } + const octokit = await getAuthenticatedOctokit(wp.installationId); + await handler.checkIfPRIsInvalid( + wp.owner, + wp.repo, + wp.number, + wp.label, + wp.reactionId, + wp.url, + octokit, + logger ); } + await forAllInAsyncGroups(watchedPRs, cleanDatastoreItem, { + throwOnError: true, + }); }; /** @@ -453,50 +451,47 @@ handler.checkPRMergeability = async function checkPRMergeability( octokit: Octokit, logger: GCFLogger ) { - while (watchedPRs.length) { - const work = watchedPRs.splice(0, WORKER_SIZE); - await Promise.all( - work.map(async wp => { - const prLogger = addPullRequestLoggerContext(logger, wp); - prLogger.info(`checking ${wp.url}, ${wp.installationId}`); + async function checkSinglePR(wp: DatastorePR): Promise { + const prLogger = addPullRequestLoggerContext(logger, wp); + prLogger.info(`checking ${wp.url}, ${wp.installationId}`); + try { + const remove = await mergeOnGreen( + wp.owner, + wp.repo, + wp.number, + [MERGE_ON_GREEN_LABEL, MERGE_ON_GREEN_LABEL_SECURE], + wp.state, + wp.branchProtection!, + wp.label, + wp.author, + octokit, + prLogger + ); + if (remove || wp.state === 'stop') { + await handler.removePR(wp.url, prLogger); try { - const remove = await mergeOnGreen( + await handler.cleanUpPullRequest( wp.owner, wp.repo, wp.number, - [MERGE_ON_GREEN_LABEL, MERGE_ON_GREEN_LABEL_SECURE], - wp.state, - wp.branchProtection!, wp.label, - wp.author, - octokit, - prLogger + wp.reactionId, + octokit + ); + } catch (err) { + prLogger.warn( + `Failed to delete reaction and label on ${wp.owner}/${wp.repo}/${wp.number}` ); - if (remove || wp.state === 'stop') { - await handler.removePR(wp.url, prLogger); - try { - await handler.cleanUpPullRequest( - wp.owner, - wp.repo, - wp.number, - wp.label, - wp.reactionId, - octokit - ); - } catch (err) { - prLogger.warn( - `Failed to delete reaction and label on ${wp.owner}/${wp.repo}/${wp.number}` - ); - } - } - } catch (e) { - const err = e as Error; - err.message = `Error in merge-on-green: \n\n${err.message}`; - prLogger.error(err); } - }) - ); + } + } catch (e) { + const err = e as Error; + err.message = `Error in merge-on-green: \n\n${err.message}`; + prLogger.error(err); + throw e; + } } + await forAllInAsyncGroups(watchedPRs, checkSinglePR, {throwOnError: true}); }; /** diff --git a/packages/merge-on-green/src/parallel-work.ts b/packages/merge-on-green/src/parallel-work.ts new file mode 100644 index 00000000000..f3868b61a19 --- /dev/null +++ b/packages/merge-on-green/src/parallel-work.ts @@ -0,0 +1,62 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import AggregateError from 'aggregate-error'; + +const DEFAULT_GROUP_SIZE = 4; + +interface AsyncGroupsOptions { + groupSize?: number; + throwOnError?: boolean; +} +type SingleItemHandler = (item: TItem) => Promise; +/** + * Helper that executes a single item handler for each item in parallel + * groups. + * @param {TItem[]} items The list of items to handle. + * @param {SingleItemHandler} asyncHandler The single item async + * handler + * @param {number} options.groupSize The number of parallel executions. Defaults to 4. + * @param {boolean} options.throwOnError Whether to throw an AggregateError if any items fail + * @returns A list of settled promise results (either failure or success). + * @throws {AggregateError} if any items fails which contains all the thrown Errors. + */ +export async function forAllInAsyncGroups( + items: TItem[], + asyncHandler: SingleItemHandler, + options: AsyncGroupsOptions = {} +): Promise[]> { + const groupSize = options?.groupSize ?? DEFAULT_GROUP_SIZE; + const throwOnError = options?.throwOnError ?? false; + let results: PromiseSettledResult[] = []; + for (let i = 0; i < items.length; i += groupSize) { + const group = items.slice(i, i + groupSize); + const partial = await Promise.allSettled(group.map(pr => asyncHandler(pr))); + results = results.concat(...partial); + } + + if (throwOnError) { + const errors: Error[] = []; + for (const result of results) { + if (result.status === 'rejected') { + errors.push(result.reason); + } + } + if (errors.length > 0) { + throw new AggregateError(errors); + } + } + + return results; +} diff --git a/packages/merge-on-green/test/parallel-work.ts b/packages/merge-on-green/test/parallel-work.ts new file mode 100644 index 00000000000..cc2dff3aa28 --- /dev/null +++ b/packages/merge-on-green/test/parallel-work.ts @@ -0,0 +1,68 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it} from 'mocha'; +import assert from 'assert'; +import {forAllInAsyncGroups} from '../src/parallel-work'; +import AggregateError from 'aggregate-error'; + +describe('forAllInAsyncGroups', () => { + const handler = async function (x: number): Promise { + if (x % 2 === 0) { + return x; + } else { + throw new Error(`odd number: ${x}`); + } + }; + const input: number[] = []; + for (let i = 0; i < 10; i++) { + input.push(i); + } + + it('runs all work items', async () => { + const results = await forAllInAsyncGroups(input, handler); + assert.strictEqual(results.length, 10); + const successes = results.filter(result => result.status === 'fulfilled'); + assert.strictEqual(successes.length, 5); + const failures = results.filter(result => result.status === 'rejected'); + assert.strictEqual(failures.length, 5); + for (const failure of failures) { + assert.ok( + failure.status === 'rejected' && failure.reason instanceof Error + ); + } + }); + + it('throws if throwOnError is specified', async () => { + await assert.rejects( + async () => { + await forAllInAsyncGroups(input, handler, {throwOnError: true}); + }, + err => { + return err instanceof AggregateError; + } + ); + }); + + it('succeeds when throwOnError is specified', async () => { + const results = await forAllInAsyncGroups( + input.map(x => x * 2), + handler, + {throwOnError: true} + ); + assert.strictEqual(results.length, 10); + const successes = results.filter(result => result.status === 'fulfilled'); + assert.strictEqual(successes.length, 10); + }); +});