diff --git a/components/dashboard/src/AppNotifications.tsx b/components/dashboard/src/AppNotifications.tsx index b0dfe3e914bf9c..74aaa09df06a46 100644 --- a/components/dashboard/src/AppNotifications.tsx +++ b/components/dashboard/src/AppNotifications.tsx @@ -19,15 +19,22 @@ export function AppNotifications() { setNotifications(localState); return; } - (async () => { - const serverState = await getGitpodService().server.getNotifications(); - setNotifications(serverState); - if (serverState.length > 0) { - setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 60 /* seconds */); - } - })(); + reloadNotifications().catch(console.error); + + getGitpodService().registerClient({ + onNotificationUpdated: () => reloadNotifications().catch(console.error), + }); }, []); + const reloadNotifications = async () => { + const serverState = await getGitpodService().server.getNotifications(); + setNotifications(serverState); + removeLocalStorageObject(KEY_APP_NOTIFICATIONS); + if (serverState.length > 0) { + setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 300 /* seconds */); + } + }; + const topNotification = notifications[0]; if (topNotification === undefined) { return null; diff --git a/components/gitpod-messagebus/src/messagebus.ts b/components/gitpod-messagebus/src/messagebus.ts index 4fae7478c2fa75..581a8823990329 100644 --- a/components/gitpod-messagebus/src/messagebus.ts +++ b/components/gitpod-messagebus/src/messagebus.ts @@ -61,6 +61,8 @@ export interface MessageBusHelper { * @param topic the topic to parse */ getWsInformationFromTopic(topic: string): WorkspaceTopic | undefined; + + getSubscriptionUpdateTopic(attributionId?: string): string; } export const WorkspaceTopic = Symbol("WorkspaceTopic"); @@ -89,6 +91,10 @@ export class MessageBusHelperImpl implements MessageBusHelper { await ch.assertExchange(this.workspaceExchange, "topic", { durable: true }); } + getSubscriptionUpdateTopic(attributionId: string | undefined): string { + return `subscription.${attributionId || "*"}.update`; + } + /** * Computes the topic name of for listening to a workspace. * diff --git a/components/gitpod-protocol/src/gitpod-service.ts b/components/gitpod-protocol/src/gitpod-service.ts index 3285daea71710f..42d6c3e663e6d4 100644 --- a/components/gitpod-protocol/src/gitpod-service.ts +++ b/components/gitpod-protocol/src/gitpod-service.ts @@ -70,6 +70,8 @@ export interface GitpodClient { onPrebuildUpdate(update: PrebuildWithStatus): void; + onNotificationUpdated(): void; + onCreditAlert(creditAlert: CreditAlert): void; //#region propagating reconnection to iframe @@ -570,6 +572,18 @@ export class GitpodCompositeClient implements Gitpo } } } + + onNotificationUpdated(): void { + for (const client of this.clients) { + if (client.onNotificationUpdated) { + try { + client.onNotificationUpdated(); + } catch (error) { + console.error(error); + } + } + } + } } export type GitpodService = GitpodServiceImpl; diff --git a/components/server/ee/src/workspace/gitpod-server-impl.ts b/components/server/ee/src/workspace/gitpod-server-impl.ts index fb1170761194a6..3a64510cb7c77d 100644 --- a/components/server/ee/src/workspace/gitpod-server-impl.ts +++ b/components/server/ee/src/workspace/gitpod-server-impl.ts @@ -118,6 +118,7 @@ import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/expe import { BillingService } from "../billing/billing-service"; import Stripe from "stripe"; import { UsageServiceDefinition } from "@gitpod/usage-api/lib/usage/v1/usage.pb"; +import { MessageBusIntegration } from "../../../src/workspace/messagebus-integration"; @injectable() export class GitpodServerEEImpl extends GitpodServerImpl { @@ -165,6 +166,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl { @inject(BillingModes) protected readonly billingModes: BillingModes; @inject(BillingService) protected readonly billingService: BillingService; + @inject(MessageBusIntegration) protected readonly messageBus: MessageBusIntegration; + initialize( client: GitpodClient | undefined, user: User | undefined, @@ -177,6 +180,7 @@ export class GitpodServerEEImpl extends GitpodServerImpl { this.listenToCreditAlerts(); this.listenForPrebuildUpdates().catch((err) => log.error("error registering for prebuild updates", err)); + this.listenForSubscriptionUpdates().catch((err) => log.error("error registering for prebuild updates", err)); } protected async listenForPrebuildUpdates() { @@ -204,6 +208,32 @@ export class GitpodServerEEImpl extends GitpodServerImpl { // TODO(at) we need to keep the list of accessible project up to date } + protected async listenForSubscriptionUpdates() { + if (!this.user) { + return; + } + const teamIds = (await this.teamDB.findTeamsByUser(this.user.id)).map(({ id }) => + AttributionId.render({ kind: "team", teamId: id }), + ); + for (const attributionId of [AttributionId.render({ kind: "user", userId: this.user.id }), ...teamIds]) { + this.disposables.push( + this.localMessageBroker.listenForSubscriptionUpdates( + attributionId, + (ctx: TraceContext, attributionId: AttributionId) => + TraceContext.withSpan( + "forwardSubscriptionUpdateToClient", + (ctx) => { + traceClientMetadata(ctx, this.clientMetadata); + TraceContext.setJsonRPCMetadata(ctx, "onSubscriptionUpdate"); + this.client?.onNotificationUpdated(); + }, + ctx, + ), + ), + ); + } + } + protected async getAccessibleProjects() { if (!this.user) { return []; @@ -2161,6 +2191,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl { billingStrategy: CostCenter_BillingStrategy.BILLING_STRATEGY_STRIPE, }, }); + + this.messageBus.notifyOnSubscriptionUpdate(ctx, attrId).catch(); } catch (error) { log.error(`Failed to subscribe '${attributionId}' to Stripe`, error); throw new ResponseError( @@ -2262,6 +2294,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl { response?.costCenter?.billingStrategy || CostCenter_BillingStrategy.BILLING_STRATEGY_OTHER, }, }); + + this.messageBus.notifyOnSubscriptionUpdate(ctx, attrId).catch(); } async getUsageLimitForTeam(ctx: TraceContext, teamId: string): Promise { diff --git a/components/server/src/messaging/local-message-broker.ts b/components/server/src/messaging/local-message-broker.ts index 03deb567e43dff..9849006173c347 100644 --- a/components/server/src/messaging/local-message-broker.ts +++ b/components/server/src/messaging/local-message-broker.ts @@ -12,6 +12,7 @@ import { WorkspaceInstance, } from "@gitpod/gitpod-protocol"; import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol"; +import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution"; import { log } from "@gitpod/gitpod-protocol/lib/util/logging"; import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing"; import { inject, injectable } from "inversify"; @@ -20,6 +21,9 @@ import { MessageBusIntegration } from "../workspace/messagebus-integration"; export interface PrebuildUpdateListener { (ctx: TraceContext, evt: PrebuildWithStatus): void; } +export interface SubscriptionUpdateListener { + (ctx: TraceContext, attributionId: AttributionId): void; +} export interface CreditAlertListener { (ctx: TraceContext, alert: CreditAlert): void; } @@ -38,6 +42,8 @@ export interface LocalMessageBroker { listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable; + listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable; + listenToCreditAlerts(userId: string, listener: CreditAlertListener): Disposable; listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable; @@ -69,6 +75,7 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker { protected creditAlertsListeners: Map = new Map(); protected headlessWorkspaceEventListeners: Map = new Map(); protected workspaceInstanceUpdateListeners: Map = new Map(); + protected subscriptionUpdateListeners: Map = new Map(); protected readonly disposables = new DisposableCollection(); @@ -151,6 +158,24 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker { }, ), ); + this.disposables.push( + this.messageBusIntegration.listenToSubscriptionUpdates( + (ctx: TraceContext, attributionId: AttributionId) => { + TraceContext.setOWI(ctx, {}); + + const listeners = this.subscriptionUpdateListeners.get(AttributionId.render(attributionId)) || []; + + for (const l of listeners) { + try { + l(ctx, attributionId); + } catch (err) { + TraceContext.setError(ctx, err); + log.error("listenToSubscriptionUpdates", err, { attributionId }); + } + } + }, + ), + ); } async stop() { @@ -165,6 +190,10 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker { return this.doRegister(userId, listener, this.creditAlertsListeners); } + listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable { + return this.doRegister(attributionId, listener, this.subscriptionUpdateListeners); + } + listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable { // we're being cheap here in re-using a map where it just needs to be a plain array. return this.doRegister( diff --git a/components/server/src/workspace/messagebus-integration.ts b/components/server/src/workspace/messagebus-integration.ts index 95f56a55ce48bb..c049fff6f9aa30 100644 --- a/components/server/src/workspace/messagebus-integration.ts +++ b/components/server/src/workspace/messagebus-integration.ts @@ -22,6 +22,7 @@ import * as opentracing from "opentracing"; import { CancellationTokenSource } from "vscode-ws-jsonrpc"; import { increaseMessagebusTopicReads } from "../prometheus-metrics"; import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol"; +import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution"; interface WorkspaceInstanceUpdateCallback { (ctx: TraceContext, instance: WorkspaceInstance, ownerId: string | undefined): void; @@ -72,6 +73,16 @@ export class CreditAlertListener extends AbstractTopicListener { } } +export class SubscriptionUpdateListener extends AbstractTopicListener { + constructor(protected messageBusHelper: MessageBusHelper, listener: TopicListener) { + super(messageBusHelper.workspaceExchange, listener); + } + + topic() { + return this.messageBusHelper.getSubscriptionUpdateTopic(); + } +} + export class PrebuildUpdatableQueueListener implements MessagebusListener { protected channel: Channel | undefined; protected consumerTag: string | undefined; @@ -208,6 +219,28 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration { return Disposable.create(() => cancellationTokenSource.cancel()); } + async notifyOnSubscriptionUpdate(ctx: TraceContext, attributionId: AttributionId) { + if (!this.channel) { + throw new Error("Not connected to message bus"); + } + const topic = this.messageBusHelper.getSubscriptionUpdateTopic(AttributionId.render(attributionId)); + const msg = Buffer.from(JSON.stringify(attributionId)); + await this.messageBusHelper.assertWorkspaceExchange(this.channel); + + await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, msg, { + trace: ctx, + }); + } + + listenToSubscriptionUpdates(callback: (ctx: TraceContext, attributionId: AttributionId) => void): Disposable { + const listener = new SubscriptionUpdateListener(this.messageBusHelper, callback); + const cancellationTokenSource = new CancellationTokenSource(); + this.listen(listener, cancellationTokenSource.token).catch((err) => { + /** ignore */ + }); + return Disposable.create(() => cancellationTokenSource.cancel()); + } + async notifyOnPrebuildUpdate(prebuildInfo: PrebuildWithStatus) { if (!this.channel) { throw new Error("Not connected to message bus");