Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
zaknesler committed May 9, 2024
1 parent 510479e commit 0f5d68e
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 21 deletions.
5 changes: 5 additions & 0 deletions crates/blend-web/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ pub enum WebError {

#[error(transparent)]
WorkerJobSendError(#[from] tokio::sync::mpsc::error::SendError<blend_worker::Job>),

#[error(transparent)]
NotificationSendError(
#[from] tokio::sync::broadcast::error::SendError<blend_worker::Notification>,
),
}
21 changes: 15 additions & 6 deletions crates/blend-web/src/router/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,30 @@ async fn refresh_feed(
.await?
.ok_or_else(|| WebError::NotFoundError)?;

let worker = ctx.jobs.lock().await;
worker.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
worker.send(blend_worker::Job::FetchEntries(feed.clone())).await?;
let notifier = ctx.notifs.lock().await;
let dispatcher = ctx.jobs.lock().await;

notifier.send(blend_worker::Notification::StartedFeedRefresh {
feed_uuid: feed.uuid,
})?;
dispatcher.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
dispatcher.send(blend_worker::Job::FetchEntries(feed.clone())).await?;

Ok(Json(json!({ "success": true })))
}

async fn refresh_feeds(State(ctx): State<crate::Context>) -> WebResult<impl IntoResponse> {
let feeds = repo::feed::FeedRepo::new(ctx.db).get_feeds().await?;

let worker = ctx.jobs.lock().await;
let notifier = ctx.notifs.lock().await;
let dispatcher = ctx.jobs.lock().await;

for feed in feeds {
worker.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
worker.send(blend_worker::Job::FetchEntries(feed.clone())).await?;
notifier.send(blend_worker::Notification::StartedFeedRefresh {
feed_uuid: feed.uuid,
})?;
dispatcher.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
dispatcher.send(blend_worker::Job::FetchEntries(feed.clone())).await?;
}

Ok(Json(json!({ "success": true })))
Expand Down
14 changes: 6 additions & 8 deletions crates/blend-worker/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ pub async fn fetch_entries(
db: SqlitePool,
notifs: Arc<Mutex<broadcast::Sender<Notification>>>,
) -> WorkerResult<()> {
// Acquire lock to send initial notification, then release it while the entries are parsed & inserted
{
notifs.lock().await.send(Notification::FetchingEntries {
feed_uuid: feed.uuid,
})?;
}

let mapped = parse_entries(&feed.url_feed)
.await?
.into_iter()
Expand All @@ -36,7 +29,12 @@ pub async fn fetch_entries(

let entry_uuids = EntryRepo::new(db).upsert_entries(&feed.uuid, &mapped).await?;

notifs.lock().await.send(Notification::EntriesFetched {
let notifier = notifs.lock().await;

notifier.send(Notification::FinishedFeedRefresh {
feed_uuid: feed.uuid,
})?;
notifier.send(Notification::EntriesFetched {
feed_uuid: feed.uuid,
entry_uuids,
})?;
Expand Down
19 changes: 14 additions & 5 deletions crates/blend-worker/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use ts_rs::TS;
#[serde(tag = "type")]
#[ts(export, export_to = "../../../ui/src/types/bindings/notification.ts")]
pub enum Notification {
FetchingEntries {
StartedFeedRefresh {
feed_uuid: uuid::Uuid,
},
FinishedFeedRefresh {
feed_uuid: uuid::Uuid,
},
EntriesFetched {
Expand All @@ -18,18 +21,24 @@ pub enum Notification {
impl Display for Notification {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Notification::FetchingEntries { feed_uuid } => write!(
Notification::StartedFeedRefresh { feed_uuid } => write!(
f,
"[notification: fetching entries] (size = {}) feed: {}",
"[notification: started feed refresh] (size = {}) feed: {}",
std::mem::size_of_val(self),
feed_uuid.hyphenated()
feed_uuid.hyphenated(),
),
Notification::FinishedFeedRefresh { feed_uuid } => write!(
f,
"[notification: finished feed refresh] (size = {}) feed: {}",
std::mem::size_of_val(self),
feed_uuid.hyphenated(),
),
Notification::EntriesFetched {
feed_uuid,
entry_uuids,
} => write!(
f,
"[notification: feed refreshed] (size = {}) feed: {}, entries: {}",
"[notification: entries fetched] (size = {}) feed: {}, entries: {}",
std::mem::size_of_val(self),
feed_uuid.hyphenated(),
entry_uuids.len()
Expand Down
2 changes: 1 addition & 1 deletion crates/blend-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn start_queue_worker(

tokio::spawn(async move {
if let Err(err) = handle_job(job.clone(), db, notifs).await {
tracing::error!("!!failed: {} with error: {}", job, err);
tracing::error!("job failed: {} with error: {}", job, err);
}
});
}
Expand Down
19 changes: 19 additions & 0 deletions ui/src/hooks/use-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@ import { Notification } from '~/types/bindings/notification';
import { useQueryClient } from '@tanstack/solid-query';
import { QUERY_KEYS } from '~/constants/query';
import { useFilterParams } from './use-filter-params';
import { createEffect, createSignal } from 'solid-js';

export const useWs = () => {
const filter = useFilterParams();
const queryClient = useQueryClient();

const [feedsRefreshing, setFeedsRefreshing] = createSignal<string[]>([]);

createEffect(() => {
const refreshing = feedsRefreshing();
if (!refreshing.length) return;

console.log(`refreshing ${refreshing.length} feeds`);
});

const socket = new WebSocket(wsUrl('/notifs'), undefined, {
connectionTimeout: 1000,
maxRetries: 10,
Expand All @@ -23,12 +33,21 @@ export const useWs = () => {
console.info('[ws] received message:', notif);

switch (notif.type) {
case 'StartedFeedRefresh':
setFeedsRefreshing(uuids => [...uuids, notif.feed_uuid]);
break;

case 'FinishedFeedRefresh':
setFeedsRefreshing(uuids => uuids.filter(uuid => uuid !== notif.feed_uuid));
break;

case 'EntriesFetched':
queryClient.invalidateQueries({ queryKey: [QUERY_KEYS.FEEDS] });
queryClient.invalidateQueries({ queryKey: [QUERY_KEYS.FEEDS_STATS] });
queryClient.invalidateQueries({ queryKey: [QUERY_KEYS.FEEDS_VIEW, notif.feed_uuid] });
queryClient.invalidateQueries({ queryKey: [QUERY_KEYS.ENTRIES_INDEX] }); // TODO: move this to only run after all feeds have been refreshed
queryClient.invalidateQueries({ queryKey: [QUERY_KEYS.ENTRIES_INDEX, notif.feed_uuid, filter.getView()] });
break;
}
});
};
2 changes: 1 addition & 1 deletion ui/src/types/bindings/notification.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

export type Notification = { "type": "FetchingEntries", feed_uuid: string, } | { "type": "EntriesFetched", feed_uuid: string, entry_uuids: Array<string>, };
export type Notification = { "type": "StartedFeedRefresh", feed_uuid: string, } | { "type": "FinishedFeedRefresh", feed_uuid: string, } | { "type": "EntriesFetched", feed_uuid: string, entry_uuids: Array<string>, };

0 comments on commit 0f5d68e

Please sign in to comment.