Skip to content

Commit

Permalink
create notification system foundation (#1248)
Browse files Browse the repository at this point in the history
* Temp Patch: remove the linux OS check from the runtime container crate

* Template migration and stitched resend edge function

* Template procedure to isolate active notifications

* Update the notifications_ext view to evaluate catalog stats accordingly for this notification type

* Create notification preference when a tenant is provisioned

* Remove superfluous logic in resend edge function

* Refactor resend edge function to reduce repetitive logic

* Placeholder: Rename notifications table and view as well as add confirmation message templates to notification_messages

* Consolidate RLS policies on the notification_preferences table

* Rename and alter type of the notification_preferences prefix column

* Use a compound key on the notification_subscriptions table

* Move the notification_messages table into the internal namespace and adjust key

* Update beta onboarding directive and remove subscribed_by column in notification_preferences

* Rename notification_messages table to notification_templates

* Rename notification_subscriptions table to notification_configurations

* Rename notification_preferences table to notification_subscriptions

* Rename and scope notification_configurations table to specific notification type

* Correct data_processing_notifications policy and add foreign key constraint to live_spec_id column

* Reintroduce the live_spec_id column to the data_processing_notifications view

* Testing Required: update resend edge function in response to the migration changes

* Temp Patch Reversal: reintroduce the linux OS check to the runtime container crate

* Update beta onboarding directive-related files after table renaming

* Rename notifications migration file

* Conditionally create pg_cron extension

* Update existing code to be in accord with discussion conclusions

* Get the user email from the auth users table in beta onboarding directive

* Call the resend edge function from a postgres trigger

* Add a return statement to trigger function

* Delete alert_data_processing rows corresponding to deleted tasks

* Add RLS policy to alert_history table

* Add signature to notification email

* Relocate resend credentials to an environment variable

* Adjust the alert-related publication changes

* Update migration in response to review feedback

* Reverse file in previous commit

* Remove redunances in alert firing view

* Rollback late-stage alert view column changes

* Add test coverage for alerts migration

* Update confirmation email content

* Update alerts migration filename

* Update url and bearer token of alert edge function invocation

* Update alert cron interval and wrap migration in transaction

* Update alert_history table trigger condition

* Add order by clause to an evaluate_alert_events test
  • Loading branch information
kiahna-tucker authored Nov 21, 2023
1 parent 01dbb5b commit a2d18f8
Show file tree
Hide file tree
Showing 9 changed files with 653 additions and 2 deletions.
3 changes: 3 additions & 0 deletions crates/agent-sql/src/directives/beta_onboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub async fn provision_tenant(
($2, '{"stores": [{"provider": "GCS", "bucket": "estuary-trial", "prefix": "collection-data/"}]}', $3),
('recovery/' || $2, '{"stores": [{"provider": "GCS", "bucket": "estuary-trial"}]}', $3)
on conflict do nothing
),
create_alert_subscription as (
insert into alert_subscriptions (catalog_prefix, email) values ($2, (select email from auth.users where id = $1 limit 1))
)
insert into tenants (tenant, detail) values ($2, $3);
"#,
Expand Down
18 changes: 18 additions & 0 deletions crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,3 +710,21 @@ pub async fn prune_unbound_collections(

Ok(res.into_iter().map(|r| r.catalog_name).collect())
}

pub async fn delete_data_processing_alerts(
catalog_name: &str,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
delete from alert_data_processing
where alert_data_processing.catalog_name = $1
returning alert_data_processing.catalog_name;
"#,
catalog_name,
)
.fetch_optional(&mut *txn)
.await?;

Ok(())
}
20 changes: 19 additions & 1 deletion crates/agent-sql/tests/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn test_publication_data_operations() {

let mut txn = conn.begin().await.unwrap();

// Fixture: insert live_specs, grants, drafts, and draft_specs fixtures.
// Fixture: insert live_specs, grants, drafts, draft_specs, and alert_data_processing fixtures.
sqlx::query(
r#"
with p1 as (
Expand Down Expand Up @@ -98,6 +98,10 @@ async fn test_publication_data_operations() {
p7 as (
insert into publications (id, user_id, draft_id) values
('eeeeeeeeeeeeeeee', '11111111-1111-1111-1111-111111111111','dddddddddddddddd')
),
p8 as (
insert into alert_data_processing (catalog_name, evaluation_interval) values
('aliceCo/Second/Thing', '2 hours')
)
select 1;
"#,
Expand Down Expand Up @@ -221,6 +225,13 @@ async fn test_publication_data_operations() {
.await
.unwrap();

agent_sql::publications::delete_data_processing_alerts(
&row.catalog_name,
&mut txn,
)
.await
.unwrap();

agent_sql::drafts::delete_spec(row.draft_spec_id, &mut txn)
.await
.unwrap();
Expand Down Expand Up @@ -248,6 +259,13 @@ async fn test_publication_data_operations() {
"[].created_at" => "<redacted-timestamp>",
"[].updated_at" => "<redacted-timestamp>",
});

// Expect `alert_data_processing` is now empty.
assert!(sqlx::query("select catalog_name from alert_data_processing")
.fetch_optional(&mut txn)
.await
.unwrap()
.is_none());
}

#[tokio::test]
Expand Down
8 changes: 8 additions & 0 deletions crates/agent/src/directives/beta_onboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ mod test {
-- Expect a storage mapping was created.
select json_build_object('prefix', m.catalog_prefix, 'storageMapping', m.spec)
from storage_mappings m where m.catalog_prefix like '%AcmeTenant%'
union all
-- Expect an alert subscription was created.
select json_build_object('catalog_prefix', s.catalog_prefix, 'email', s.email)
from alert_subscriptions s where s.catalog_prefix = 'AcmeTenant/'
"#,
)
.fetch_all(&mut txn)
Expand Down Expand Up @@ -298,6 +302,10 @@ mod test {
}
]
}
},
{
"catalog_prefix": "AcmeTenant/",
"email": "new@example.com"
}
]
"###);
Expand Down
9 changes: 9 additions & 0 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,15 @@ pub async fn apply_updates_for_row(
.await
.context("insert live_spec_flow edges")?;

if draft_spec.is_none() {
agent_sql::publications::delete_data_processing_alerts(
catalog_name,
txn,
)
.await
.context("delete alert_data_processing rows")?;
}

Ok(())
}

Expand Down
5 changes: 4 additions & 1 deletion supabase/env.local
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
CONFIG_ENCRYPTION_URL=http://host.docker.internal:8765/v1/encrypt-config
# Pull this from 1password: "Stripe API Key"."Local Dev Test Key"
# STRIPE_API_KEY=rk_test_...
# STRIPE_API_KEY=rk_test_...

RESEND_API_KEY = ""
RESEND_EMAIL_ADDRESS = "Estuary <onboarding@resend.dev>"
218 changes: 218 additions & 0 deletions supabase/functions/alert-data-processing/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import { serve } from "https://deno.land/std@0.184.0/http/server.ts";
import { isEmpty, isFinite } from "https://cdn.skypack.dev/lodash";

import { corsHeaders } from "../_shared/cors.ts";
import { returnPostgresError } from "../_shared/helpers.ts";
import { supabaseClient } from "../_shared/supabaseClient.ts";

interface DataProcessingArguments {
bytes_processed: number;
emails: string[];
evaluation_interval: string;
spec_type: string;
}

interface AlertRecord {
alert_type: string;
catalog_name: string;
fired_at: string;
resolved_at: string | null;
arguments: DataProcessingArguments;
}

interface EmailConfig {
emails: string[];
subject: string;
content: string;
}

export const handleSuccess = <T>(response: any) => {
return response.error
? {
data: null,
error: response.error,
}
: {
data: response.data as T,
};
};

export const handleFailure = (error: any) => {
return {
data: null,
error,
};
};

const dataProcessingAlertType = "data_not_processed_in_interval";

const TABLES = { ALERT_HISTORY: "alert_history" };

const getTaskDetailsPageURL = (catalogName: string, specType: string) =>
`https://dashboard.estuary.dev/${specType}s/details/overview?catalogName=${catalogName}`;

const emailNotifications = async (
pendingNotifications: EmailConfig[],
token: string,
senderAddress: string,
): Promise<void> => {
const notificationPromises = pendingNotifications.map(
({ emails, content, subject }) =>
fetch("https://api.resend.com/emails", {
method: "POST",
headers: {
...corsHeaders,
"Content-Type": "application/json",
"Authorization": `Bearer ${token}`,
},
body: JSON.stringify({
from: senderAddress,
to: emails,
subject,
html: `
<div style="font-family: 'Helvetica Neue', Helvetica, Arial, sans-serif;">
${content}
<p style="margin-bottom: 0;">Thanks,</p>
<p style="margin-top: 0;">Estuary Team</p>
</div>
`,
}),
}),
);

await Promise.all(notificationPromises);
};

serve(async (_request: Request): Promise<Response> => {
const startTimestamp = new Date();
const minuteOffset = startTimestamp.getUTCMinutes() - 5;

startTimestamp.setUTCMilliseconds(0);
startTimestamp.setUTCSeconds(0);
startTimestamp.setUTCMinutes(minuteOffset);

const { data: alerts, error: alertsError } = await supabaseClient
.from<AlertRecord>(TABLES.ALERT_HISTORY)
.select("*")
.eq("alert_type", dataProcessingAlertType)
.is("resolved_at", null)
.gt("fired_at", startTimestamp.toUTCString());

if (alertsError !== null) {
returnPostgresError(alertsError);
}

const { data: confirmations, error: confirmationsError } = await supabaseClient
.from<AlertRecord>(TABLES.ALERT_HISTORY)
.select("*")
.eq("alert_type", dataProcessingAlertType)
.gt("resolved_at", startTimestamp.toUTCString());

if (confirmationsError !== null) {
returnPostgresError(confirmationsError);
}

if (isEmpty(alerts) && isEmpty(confirmations)) {
// Terminate the function without error if there aren't any active notifications in the system.
return new Response(null, {
headers: { ...corsHeaders, "Content-Type": "application/json" },
status: 200,
});
}

const pendingAlertEmails: EmailConfig[] = alerts
? alerts.map(
({
arguments: { emails, evaluation_interval, spec_type },
catalog_name,
}) => {
let formattedEvaluationInterval = evaluation_interval;

// A postgresql interval in hour increments has the following format: 'HH:00:00'.
if (evaluation_interval.includes(":")) {
const timeOffset = evaluation_interval.split(":");
const hours = Number(timeOffset[0]);

// Ideally, an hour-based interval less than ten would be represented by a single digit. To accomplish this,
// the hour segment of the evaluation interval is selected (i.e., timeOffset[0]) and attempted to be converted to a number.
// This conditional is a failsafe, in the event the aforementioned conversion fails which would result in the display
// of two digits for the hour (e.g., 02 hours instead of 2 hours).
formattedEvaluationInterval = isFinite(hours) ? `${hours} hours` : `${timeOffset[0]} hours`;
}

const subject = `Estuary Flow: Alert for ${spec_type} ${catalog_name}`;

const detailsPageURL = getTaskDetailsPageURL(catalog_name, spec_type);

const content =
`<p>You are receiving this alert because your task, ${spec_type} ${catalog_name} hasn't seen new data in ${formattedEvaluationInterval}. You can locate your task <a href="${detailsPageURL}" target="_blank" rel="noopener">here</a> to make changes or update its alerting settings.</p>`;

return {
content,
emails,
subject,
};
},
)
: [];

const pendingConfirmationEmails: EmailConfig[] = confirmations
? confirmations.map(
({ arguments: { emails, spec_type }, catalog_name }) => {
const subject = `Estuary Flow: Alert for ${spec_type} ${catalog_name}`;

const detailsPageURL = getTaskDetailsPageURL(catalog_name, spec_type);

const content =
`<p>You are receiving this notice because a previous alert for your task, ${spec_type} ${catalog_name}, has now resolved. You can locate your task <a href="${detailsPageURL}" target="_blank" rel="noopener">here</a> to make changes or update its alerting settings.</p>`;

return {
content,
emails,
subject,
};
},
)
: [];

const pendingEmails = [...pendingAlertEmails, ...pendingConfirmationEmails];

if (pendingEmails.length === 0) {
return new Response(null, {
headers: { ...corsHeaders, "Content-Type": "application/json" },
status: 200,
});
}

const resendToken = Deno.env.get('RESEND_API_KEY');
const senderAddress = Deno.env.get('RESEND_EMAIL_ADDRESS');

if (!resendToken || !senderAddress) {
return new Response(
JSON.stringify({
error: {
code: 'invalid_resend_credentials',
message: `Unauthorized: access is denied due to invalid credentials.`,
description: `The server could not verify that you are authorized to access the desired resource with the credentials provided.`,
},
}),
{
headers: {
...corsHeaders,
'Content-Type': 'application/json',
},
status: 401,
}
);
}

await emailNotifications(pendingEmails, resendToken, senderAddress);

return new Response(null, {
status: 200,
headers: {
"Content-Type": "application/json",
},
});
});
Loading

0 comments on commit a2d18f8

Please sign in to comment.