Skip to content

Commit

Permalink
Merge pull request #27408 from backstage/patch-release-pr-27407
Browse files Browse the repository at this point in the history
Patch release of #27407
  • Loading branch information
Rugvip authored Oct 31, 2024
2 parents ed98ca6 + b7ea044 commit 7219465
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 25 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "root",
"version": "1.32.4",
"version": "1.32.5",
"private": true,
"repository": {
"type": "git",
Expand Down
6 changes: 6 additions & 0 deletions plugins/events-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @backstage/plugin-events-node

## 0.4.4

### Patch Changes

- f7ca00b: Fixed an issue where the event bus polling would duplicate and increase exponentially over time.

## 0.4.3

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion plugins/events-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@backstage/plugin-events-node",
"version": "0.4.3",
"version": "0.4.4",
"description": "The plugin-events-node module for @backstage/plugin-events-backend",
"backstage": {
"role": "node-library",
Expand Down
88 changes: 88 additions & 0 deletions plugins/events-node/src/api/DefaultEventsService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,94 @@ describe('DefaultEventsService', () => {
await (service as any).shutdown();
});

it('should wait an poll on timeout', async () => {
const logger = mockServices.logger.mock();
const service = DefaultEventsService.create({ logger }).forPlugin('a', {
auth: mockServices.auth(),
logger,
discovery: mockServices.discovery(),
lifecycle: mockServices.lifecycle.mock(),
});

let callCount = 0;

let blockingController: ReadableStreamDefaultController;
const blockingStream = new ReadableStream({
start(controller) {
blockingController = controller;
},
});

mswServer.use(
rest.put(
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester',
(_req, res, ctx) => res(ctx.status(200)),
),
// The first and third calls result in a blocking 202 that is resolved after 100ms
// The second and fourth calls result in a 200 with an event
// The fifth call blocks until the end of the test
// No more than 5 calls should be made
rest.get(
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester/events',
(_req, res, ctx) => {
callCount += 1;
if (callCount === 1 || callCount === 3) {
return res(
ctx.status(202),
ctx.body(
new ReadableStream({
start(controller) {
setTimeout(() => controller.close(), 100);
},
}),
),
);
} else if (callCount === 2 || callCount === 4) {
return res(
ctx.status(200),
ctx.json({
events: [{ topic: 'test', payload: { callCount } }],
}),
);
} else if (callCount === 5) {
return res(ctx.status(202), ctx.body(blockingStream));
}
throw new Error(`events endpoint called too many times`);
},
),
);

const event = await new Promise(resolve => {
const events = new Array<EventParams>();
service.subscribe({
id: 'tester',
topics: ['test'],
async onEvent(newEvent) {
events.push(newEvent);
if (events.length === 2) {
resolve(events);
}
},
});
});

expect(event).toEqual([
{ topic: 'test', eventPayload: { callCount: 2 } },
{ topic: 'test', eventPayload: { callCount: 4 } },
]);

// Wait to make sure no additional calls happen
await new Promise(resolve => setTimeout(resolve, 100));

expect(callCount).toBe(5);

// Internal call to clean up subscriptions
await (service as any).shutdown();

// Close the stream for the 5th call so that we don't leave the request hanging
blockingController!.close();
});

it('should not read events from bus if disabled', async () => {
const logger = mockServices.logger.mock();
const service = DefaultEventsService.create({
Expand Down
43 changes: 20 additions & 23 deletions plugins/events-node/src/api/DefaultEventsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,17 @@ class PluginEventsService implements EventsService {
{ token },
);

if (!res.ok) {
if (res.status === 404) {
this.logger.info(
`Polling event subscription resulted in a 404, recreating subscription`,
);
hasSubscription = false;
} else {
throw await ResponseError.fromResponse(res);
}
}

// Successful response, reset backoff
backoffMs = POLL_BACKOFF_START_MS;

// 202 means there were no immediately available events, but the
// response will block until either new events are available or the
// request times out. In both cases we should should try to read events
// immediately again
if (res.status === 202) {
// 202 means there were no immediately available events, but the
// response will block until either new events are available or the
// request times out. In both cases we should should try to read events
// immediately again

lock.release();
await res.body?.getReader()?.closed;
process.nextTick(poll);
// We don't actually expect any response body here, but waiting for
// an empty body to be returned has been more reliable that waiting
// for the response body stream to close.
await res.text();
} else if (res.status === 200) {
const data = await res.json();
if (data) {
Expand All @@ -245,10 +234,15 @@ class PluginEventsService implements EventsService {
);
}
}
} else {
this.logger.warn(
`Unexpected response status ${res.status} from events backend for subscription "${subscriptionId}"`,
}
} else {
if (res.status === 404) {
this.logger.info(
`Polling event subscription resulted in a 404, recreating subscription`,
);
hasSubscription = false;
} else {
throw await ResponseError.fromResponse(res);
}
}
}
Expand Down Expand Up @@ -276,6 +270,9 @@ class PluginEventsService implements EventsService {
}
}

// No errors, reset backoff
backoffMs = POLL_BACKOFF_START_MS;

process.nextTick(poll);
} catch (error) {
this.logger.warn(
Expand Down

0 comments on commit 7219465

Please sign in to comment.