Skip to content

Commit

Permalink
Update notifications on updates to subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexTugarev committed Sep 15, 2022
1 parent 69f7e72 commit faa927e
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 7 deletions.
21 changes: 14 additions & 7 deletions components/dashboard/src/AppNotifications.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ export function AppNotifications() {
setNotifications(localState);
return;
}
(async () => {
const serverState = await getGitpodService().server.getNotifications();
setNotifications(serverState);
if (serverState.length > 0) {
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 60 /* seconds */);
}
})();
reloadNotifications().catch(console.error);

getGitpodService().registerClient({
onNotificationUpdated: () => reloadNotifications().catch(console.error),
});
}, []);

const reloadNotifications = async () => {
const serverState = await getGitpodService().server.getNotifications();
setNotifications(serverState);
removeLocalStorageObject(KEY_APP_NOTIFICATIONS);
if (serverState.length > 0) {
setLocalStorageObject(KEY_APP_NOTIFICATIONS, serverState, /* expires in */ 300 /* seconds */);
}
};

const topNotification = notifications[0];
if (topNotification === undefined) {
return null;
Expand Down
6 changes: 6 additions & 0 deletions components/gitpod-messagebus/src/messagebus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export interface MessageBusHelper {
* @param topic the topic to parse
*/
getWsInformationFromTopic(topic: string): WorkspaceTopic | undefined;

getSubscriptionUpdateTopic(attributionId?: string): string;
}

export const WorkspaceTopic = Symbol("WorkspaceTopic");
Expand Down Expand Up @@ -89,6 +91,10 @@ export class MessageBusHelperImpl implements MessageBusHelper {
await ch.assertExchange(this.workspaceExchange, "topic", { durable: true });
}

getSubscriptionUpdateTopic(attributionId: string | undefined): string {
return `subscription.${attributionId || "*"}.update`;
}

/**
* Computes the topic name of for listening to a workspace.
*
Expand Down
14 changes: 14 additions & 0 deletions components/gitpod-protocol/src/gitpod-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export interface GitpodClient {

onPrebuildUpdate(update: PrebuildWithStatus): void;

onNotificationUpdated(): void;

onCreditAlert(creditAlert: CreditAlert): void;

//#region propagating reconnection to iframe
Expand Down Expand Up @@ -570,6 +572,18 @@ export class GitpodCompositeClient<Client extends GitpodClient> implements Gitpo
}
}
}

onNotificationUpdated(): void {
for (const client of this.clients) {
if (client.onNotificationUpdated) {
try {
client.onNotificationUpdated();
} catch (error) {
console.error(error);
}
}
}
}
}

export type GitpodService = GitpodServiceImpl<GitpodClient, GitpodServer>;
Expand Down
34 changes: 34 additions & 0 deletions components/server/ee/src/workspace/gitpod-server-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ import { getExperimentsClientForBackend } from "@gitpod/gitpod-protocol/lib/expe
import { BillingService } from "../billing/billing-service";
import Stripe from "stripe";
import { UsageServiceDefinition } from "@gitpod/usage-api/lib/usage/v1/usage.pb";
import { MessageBusIntegration } from "../../../src/workspace/messagebus-integration";

@injectable()
export class GitpodServerEEImpl extends GitpodServerImpl {
Expand Down Expand Up @@ -165,6 +166,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
@inject(BillingModes) protected readonly billingModes: BillingModes;
@inject(BillingService) protected readonly billingService: BillingService;

@inject(MessageBusIntegration) protected readonly messageBus: MessageBusIntegration;

initialize(
client: GitpodClient | undefined,
user: User | undefined,
Expand All @@ -177,6 +180,7 @@ export class GitpodServerEEImpl extends GitpodServerImpl {

this.listenToCreditAlerts();
this.listenForPrebuildUpdates().catch((err) => log.error("error registering for prebuild updates", err));
this.listenForSubscriptionUpdates().catch((err) => log.error("error registering for prebuild updates", err));
}

protected async listenForPrebuildUpdates() {
Expand Down Expand Up @@ -204,6 +208,32 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
// TODO(at) we need to keep the list of accessible project up to date
}

protected async listenForSubscriptionUpdates() {
if (!this.user) {
return;
}
const teamIds = (await this.teamDB.findTeamsByUser(this.user.id)).map(({ id }) =>
AttributionId.render({ kind: "team", teamId: id }),
);
for (const attributionId of [AttributionId.render({ kind: "user", userId: this.user.id }), ...teamIds]) {
this.disposables.push(
this.localMessageBroker.listenForSubscriptionUpdates(
attributionId,
(ctx: TraceContext, attributionId: AttributionId) =>
TraceContext.withSpan(
"forwardSubscriptionUpdateToClient",
(ctx) => {
traceClientMetadata(ctx, this.clientMetadata);
TraceContext.setJsonRPCMetadata(ctx, "onSubscriptionUpdate");
this.client?.onNotificationUpdated();
},
ctx,
),
),
);
}
}

protected async getAccessibleProjects() {
if (!this.user) {
return [];
Expand Down Expand Up @@ -2161,6 +2191,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
billingStrategy: CostCenter_BillingStrategy.BILLING_STRATEGY_STRIPE,
},
});

this.messageBus.notifyOnSubscriptionUpdate(ctx, attrId).catch();
} catch (error) {
log.error(`Failed to subscribe '${attributionId}' to Stripe`, error);
throw new ResponseError(
Expand Down Expand Up @@ -2262,6 +2294,8 @@ export class GitpodServerEEImpl extends GitpodServerImpl {
response?.costCenter?.billingStrategy || CostCenter_BillingStrategy.BILLING_STRATEGY_OTHER,
},
});

this.messageBus.notifyOnSubscriptionUpdate(ctx, attrId).catch();
}

async getUsageLimitForTeam(ctx: TraceContext, teamId: string): Promise<number | undefined> {
Expand Down
29 changes: 29 additions & 0 deletions components/server/src/messaging/local-message-broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
WorkspaceInstance,
} from "@gitpod/gitpod-protocol";
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";
import { log } from "@gitpod/gitpod-protocol/lib/util/logging";
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
import { inject, injectable } from "inversify";
Expand All @@ -20,6 +21,9 @@ import { MessageBusIntegration } from "../workspace/messagebus-integration";
export interface PrebuildUpdateListener {
(ctx: TraceContext, evt: PrebuildWithStatus): void;
}
export interface SubscriptionUpdateListener {
(ctx: TraceContext, attributionId: AttributionId): void;
}
export interface CreditAlertListener {
(ctx: TraceContext, alert: CreditAlert): void;
}
Expand All @@ -38,6 +42,8 @@ export interface LocalMessageBroker {

listenForPrebuildUpdates(projectId: string, listener: PrebuildUpdateListener): Disposable;

listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable;

listenToCreditAlerts(userId: string, listener: CreditAlertListener): Disposable;

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable;
Expand Down Expand Up @@ -69,6 +75,7 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
protected creditAlertsListeners: Map<string, CreditAlertListener[]> = new Map();
protected headlessWorkspaceEventListeners: Map<string, HeadlessWorkspaceEventListener[]> = new Map();
protected workspaceInstanceUpdateListeners: Map<string, WorkspaceInstanceUpdateListener[]> = new Map();
protected subscriptionUpdateListeners: Map<string, SubscriptionUpdateListener[]> = new Map();

protected readonly disposables = new DisposableCollection();

Expand Down Expand Up @@ -151,6 +158,24 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
},
),
);
this.disposables.push(
this.messageBusIntegration.listenToSubscriptionUpdates(
(ctx: TraceContext, attributionId: AttributionId) => {
TraceContext.setOWI(ctx, {});

const listeners = this.subscriptionUpdateListeners.get(AttributionId.render(attributionId)) || [];

for (const l of listeners) {
try {
l(ctx, attributionId);
} catch (err) {
TraceContext.setError(ctx, err);
log.error("listenToSubscriptionUpdates", err, { attributionId });
}
}
},
),
);
}

async stop() {
Expand All @@ -165,6 +190,10 @@ export class LocalRabbitMQBackedMessageBroker implements LocalMessageBroker {
return this.doRegister(userId, listener, this.creditAlertsListeners);
}

listenForSubscriptionUpdates(attributionId: string, listener: SubscriptionUpdateListener): Disposable {
return this.doRegister(attributionId, listener, this.subscriptionUpdateListeners);
}

listenForPrebuildUpdatableEvents(listener: HeadlessWorkspaceEventListener): Disposable {
// we're being cheap here in re-using a map where it just needs to be a plain array.
return this.doRegister(
Expand Down
33 changes: 33 additions & 0 deletions components/server/src/workspace/messagebus-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as opentracing from "opentracing";
import { CancellationTokenSource } from "vscode-ws-jsonrpc";
import { increaseMessagebusTopicReads } from "../prometheus-metrics";
import { CreditAlert } from "@gitpod/gitpod-protocol/lib/accounting-protocol";
import { AttributionId } from "@gitpod/gitpod-protocol/lib/attribution";

interface WorkspaceInstanceUpdateCallback {
(ctx: TraceContext, instance: WorkspaceInstance, ownerId: string | undefined): void;
Expand Down Expand Up @@ -72,6 +73,16 @@ export class CreditAlertListener extends AbstractTopicListener<CreditAlert> {
}
}

export class SubscriptionUpdateListener extends AbstractTopicListener<AttributionId> {
constructor(protected messageBusHelper: MessageBusHelper, listener: TopicListener<AttributionId>) {
super(messageBusHelper.workspaceExchange, listener);
}

topic() {
return this.messageBusHelper.getSubscriptionUpdateTopic();
}
}

export class PrebuildUpdatableQueueListener implements MessagebusListener {
protected channel: Channel | undefined;
protected consumerTag: string | undefined;
Expand Down Expand Up @@ -208,6 +219,28 @@ export class MessageBusIntegration extends AbstractMessageBusIntegration {
return Disposable.create(() => cancellationTokenSource.cancel());
}

async notifyOnSubscriptionUpdate(ctx: TraceContext, attributionId: AttributionId) {
if (!this.channel) {
throw new Error("Not connected to message bus");
}
const topic = this.messageBusHelper.getSubscriptionUpdateTopic(AttributionId.render(attributionId));
const msg = Buffer.from(JSON.stringify(attributionId));
await this.messageBusHelper.assertWorkspaceExchange(this.channel);

await super.publish(MessageBusHelperImpl.WORKSPACE_EXCHANGE_LOCAL, topic, msg, {
trace: ctx,
});
}

listenToSubscriptionUpdates(callback: (ctx: TraceContext, attributionId: AttributionId) => void): Disposable {
const listener = new SubscriptionUpdateListener(this.messageBusHelper, callback);
const cancellationTokenSource = new CancellationTokenSource();
this.listen(listener, cancellationTokenSource.token).catch((err) => {
/** ignore */
});
return Disposable.create(() => cancellationTokenSource.cancel());
}

async notifyOnPrebuildUpdate(prebuildInfo: PrebuildWithStatus) {
if (!this.channel) {
throw new Error("Not connected to message bus");
Expand Down

0 comments on commit faa927e

Please sign in to comment.