Skip to content

Commit

Permalink
fix: Statistics broadcast breaking when clients disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
snorremd committed Nov 23, 2024
1 parent 3d3ed02 commit 2e84ba5
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 144 deletions.
52 changes: 49 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func serveCmd() *cli.Command {
firehoseCtx := context.Context(ctx.Context)
livenessTicker := time.NewTicker(5 * time.Minute)
postChan := make(chan interface{})
statisticsChan := make(chan models.StatisticsEvent)
statisticsChan := make(chan models.StatisticsEvent, 1000)
dbPostChan := make(chan interface{}) // Channel for writing posts to the database
broadcaster := server.NewBroadcaster() // SSE broadcaster

Expand All @@ -105,6 +105,11 @@ func serveCmd() *cli.Command {
// Ideally one might want to do this in a more elegant way
// TODO: Move broadcaster into server package, i.e. make server a receiver and handle broadcaster and fiber together
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in post broadcast routine: %v", r)
}
}()
for post := range postChan {
switch post := post.(type) {
// Don't crash if broadcast fails
Expand All @@ -120,30 +125,66 @@ func serveCmd() *cli.Command {

// Glue code to pass statistics events to the broadcaster
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in statistics broadcast routine: %v", r)
}
}()

for stats := range statisticsChan {
broadcaster.BroadcastStatistics(stats)
}
}()

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in firehose subscribe routine: %v", r)
}
}()
fmt.Println("Subscribing to firehose...")
firehose.Subscribe(firehoseCtx, postChan, statisticsChan, livenessTicker, seq)
firehose.Subscribe(firehoseCtx, postChan, livenessTicker, seq)
}()

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in monitor firehose routine: %v", r)
}
}()
fmt.Println("Starting statistics monitor...")
firehose.MonitorFirehoseStats(ctx.Context, statisticsChan)
}()

go func() {
<-ctx.Done()
log.Info("Context canceled with reason:", ctx.Err())
}()

// Add liveness probe to the server, to check if we are still receiving posts on the web socket
// If not we need to restart the firehose connection

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in firehose liveness ticker: %v", r)
}
}()
for range livenessTicker.C {
// If we get here the firehose stopped sending posts so we need to restart the connection
log.Errorf("Firehose liveness probe failed, restarting connection")
firehoseCtx.Done()
firehoseCtx = context.Context(ctx.Context)
firehose.Subscribe(firehoseCtx, postChan, statisticsChan, livenessTicker, seq)
firehose.Subscribe(firehoseCtx, postChan, livenessTicker, seq)
}
}()

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %v", r)
}
}()
fmt.Println("Starting server...")

if err := app.Listen(fmt.Sprintf("%s:%d", host, port)); err != nil {
Expand All @@ -152,6 +193,11 @@ func serveCmd() *cli.Command {
}()

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %v", r)
}
}()
fmt.Println("Starting database writer...")
db.Subscribe(ctx.Context, database, dbPostChan)
}()
Expand Down
2 changes: 1 addition & 1 deletion cmd/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Prints all other log messages to stderr.`,

go func() {
fmt.Println("Subscribing to firehose...")
firehose.Subscribe(ctx.Context, postChan, nil, ticker, -1)
firehose.Subscribe(ctx.Context, postChan, ticker, -1)
}()

go func() {
Expand Down
49 changes: 22 additions & 27 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
)

// Subscribe to the firehose using the Firehose struct as a receiver
func Subscribe(ctx context.Context, postChan chan interface{}, statisticsChan chan models.StatisticsEvent, ticker *time.Ticker, seq int64) {
func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Ticker, seq int64) {

address := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
headers := http.Header{}
Expand All @@ -61,32 +61,6 @@ func Subscribe(ctx context.Context, postChan chan interface{}, statisticsChan ch
backoff.Multiplier = 2
backoff.MaxElapsedTime = 120 * time.Second

// Setup a ticker chan to send statistics events
// Make a new ticker running every 5 seconds
statisticsTicker := time.NewTicker(1 * time.Second)

go func() {
defer func() {
fmt.Println("Ticker stopped")
statisticsTicker.Stop()
}()
for {
select {
case <-statisticsTicker.C:
// Send statistics event
statisticsChan <- models.StatisticsEvent{
EventsPerSecond: atomic.LoadInt64(&processedEvents),
PostsPerSecond: atomic.LoadInt64(&processedPosts),
}
// Reset processed events and posts
atomic.StoreInt64(&processedEvents, 0)
atomic.StoreInt64(&processedPosts, 0)
case <-ctx.Done():
return
}
}
}()

// Check if context is cancelled, if so exit the connection loop
for {
select {
Expand Down Expand Up @@ -124,6 +98,27 @@ func Subscribe(ctx context.Context, postChan chan interface{}, statisticsChan ch
}
}

func MonitorFirehoseStats(ctx context.Context, statisticsChan chan models.StatisticsEvent) {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
// Send statistics event
statisticsChan <- models.StatisticsEvent{
// Divide by 5 and round to get average per second
EventsPerSecond: atomic.LoadInt64(&processedEvents) / 5,
PostsPerSecond: atomic.LoadInt64(&processedPosts) / 5,
}
// Reset processed events and posts
atomic.StoreInt64(&processedEvents, 0)
atomic.StoreInt64(&processedPosts, 0)
case <-ctx.Done():
log.Info("Stopping statistics ticker")
return
}
}
}

func eventProcessor(postChan chan interface{}, context context.Context, ticker *time.Ticker) *events.RepoStreamCallbacks {
streamCallbacks := &events.RepoStreamCallbacks{
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
Expand Down
96 changes: 49 additions & 47 deletions frontend/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -259,58 +259,52 @@ const langToName = (lang: string): string => {
};

interface PostFirehoseProps {
posts: Accessor<Post[]>;
post: Accessor<Post | undefined>;
className?: string;
}

const PostFirehose: Component<PostFirehoseProps> = ({ posts, className }) => {
const PostFirehose: Component<PostFirehoseProps> = ({ post, className }) => {

// Display a pretty list of the posts
// Set a max height and use overflow-y: scroll to make it scrollable
// Height should be whatever the parent is.

const createdAt = post() ? formatRelative(new Date(post().createdAt * 1000), new Date()) : "";
// Match regex to get the profile and post id
// URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623
// profile = did:plc:opkjeuzx2lego6a7gueytryu
// post = 3kcbxsslpu623

const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/;
const [profile, postId] = post() ? regex.exec(post().uri)!.slice(1) : ["", ""];
const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}`;

return (
<StatWrapper className={`row-span-2 ${className}`}>
<h1 class="text-2xl text-zinc-300 text-center pb-4">Recent posts</h1>
<div class="overflow-y-scroll scroll min-h-full gap-4 flex flex-col no-scrollbar">
<For each={posts()}>
{(post) => {
const createdAt = formatRelative(
new Date(post.createdAt * 1000),
new Date()
);
// Match regex to get the profile and post id
// URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623
// profile = did:plc:opkjeuzx2lego6a7gueytryu
// post = 3kcbxsslpu623

const regex =
/at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/;
const [profile, postId] = regex.exec(post.uri)!.slice(1);
const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}`;
return (
<div class="flex flex-col gap-4 p-4 bg-zinc-900 rounded-md">
<div class="flex flex-row justify-between">
<p class="text-zinc-400">{createdAt}</p>
<p class="text-zinc-400">
{post.languages.map(langToName).join(", ")}
</p>
</div>
<p class="text-zinc-300 w-full max-w-[80ch]">{post.text}</p>

{/* Link to post on Bluesky */}
<div class="flex flex-row justify-end">
<a
class="text-sky-300 hover:text-sky-200 underline"
href={bskyLink}
target="_blank"
>
View on Bsky
</a>
</div>
</div>
);
}}
</For>
<div class="max-h-full gap-4 flex flex-col ">
{post() ? (
<div class="flex flex-col gap-4 p-4 bg-zinc-900 rounded-md">
<div class="flex flex-row justify-between">
<p class="text-zinc-400">{createdAt}</p>
<p class="text-zinc-400">
{post().languages.map(langToName).join(", ")}
</p>
</div>
<p class="text-zinc-300 w-full max-w-[80ch]">{post().text}</p>

{/* Link to post on Bluesky */}
<div class="flex flex-row justify-end">
<a
class="text-sky-300 hover:text-sky-200 underline"
href={bskyLink}
target="_blank"
>
View on Bsky
</a>
</div>
</div>
): null}
</div>
</StatWrapper>
);
Expand Down Expand Up @@ -357,7 +351,7 @@ const Header = () => {

const App: Component = () => {
const [key, setKey] = createSignal<string>(); // Used to politely close the event source
const [posts, setPosts] = createSignal<Post[]>([]);
const [post, setPost] = createSignal<Post>();
const [eventsPerSecond, setEventsPerSecond] = createSignal<number>(0);
const [postsPerSecond, setPostsPerSecond] = createSignal<number>(0);
const [eventSource, setEventSource] = createSignal<EventSource | null>(null);
Expand All @@ -375,7 +369,7 @@ const App: Component = () => {
es.addEventListener("create-post", (e: MessageEvent) => {
const data = JSON.parse(e.data);
console.log("Received post", data);
setPosts((posts) => [data, ...posts.slice(0, 499)]);
setPost(data);
});

es.addEventListener("statistics", (e: MessageEvent) => {
Expand Down Expand Up @@ -404,13 +398,21 @@ const App: Component = () => {
<>
<Header />
<div class="p-8 min-h-full grid grid-cols-1 md:grid-cols-2 2xl:grid-cols-3 gap-8 w-full">
<StatisticStat className="order-1" label="Records" value={eventsPerSecond} />
<StatisticStat className="order-2 2xl:order-5" label="Posts" value={postsPerSecond} />
<StatisticStat
className="order-1"
label="Records"
value={eventsPerSecond}
/>
<StatisticStat
className="order-2 2xl:order-5"
label="Posts"
value={postsPerSecond}
/>
<PostPerTime className="order-3" lang="" label="All languages" />
<PostPerTime className="order-4" lang="nb" label="Norwegian bokmål" />
<PostPerTime className="order-5" lang="nn" label="Norwegian nynorsk" />
<PostPerTime className="order-6" lang="se" label="Northern Sami" />
<PostFirehose className="order-7" posts={posts} />
<PostFirehose className="order-7" post={post} />
</div>
</>
);
Expand Down
Loading

0 comments on commit 2e84ba5

Please sign in to comment.