diff --git a/cmd/serve.go b/cmd/serve.go index 44c1387..800d7f9 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -83,7 +83,7 @@ func serveCmd() *cli.Command { firehoseCtx := context.Context(ctx.Context) livenessTicker := time.NewTicker(5 * time.Minute) postChan := make(chan interface{}) - statisticsChan := make(chan models.StatisticsEvent) + statisticsChan := make(chan models.StatisticsEvent, 1000) dbPostChan := make(chan interface{}) // Channel for writing posts to the database broadcaster := server.NewBroadcaster() // SSE broadcaster @@ -105,6 +105,11 @@ func serveCmd() *cli.Command { // Ideally one might want to do this in a more elegant way // TODO: Move broadcaster into server package, i.e. make server a receiver and handle broadcaster and fiber together go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic in post broadcast routine: %v", r) + } + }() for post := range postChan { switch post := post.(type) { // Don't crash if broadcast fails @@ -120,30 +125,66 @@ func serveCmd() *cli.Command { // Glue code to pass statistics events to the broadcaster go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic in statistics broadcast routine: %v", r) + } + }() + for stats := range statisticsChan { broadcaster.BroadcastStatistics(stats) } }() go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic in firehose subscribe routine: %v", r) + } + }() fmt.Println("Subscribing to firehose...") - firehose.Subscribe(firehoseCtx, postChan, statisticsChan, livenessTicker, seq) + firehose.Subscribe(firehoseCtx, postChan, livenessTicker, seq) + }() + + go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic in monitor firehose routine: %v", r) + } + }() + fmt.Println("Starting statistics monitor...") + firehose.MonitorFirehoseStats(ctx.Context, statisticsChan) + }() + + go func() { + <-ctx.Done() + log.Info("Context canceled with reason:", ctx.Err()) }() // Add liveness probe to the server, to check if we are still receiving posts on the web socket // If not we need to restart the firehose connection go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic in firehose liveness ticker: %v", r) + } + }() for range livenessTicker.C { // If we get here the firehose stopped sending posts so we need to restart the connection log.Errorf("Firehose liveness probe failed, restarting connection") firehoseCtx.Done() firehoseCtx = context.Context(ctx.Context) - firehose.Subscribe(firehoseCtx, postChan, statisticsChan, livenessTicker, seq) + firehose.Subscribe(firehoseCtx, postChan, livenessTicker, seq) } }() go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic: %v", r) + } + }() fmt.Println("Starting server...") if err := app.Listen(fmt.Sprintf("%s:%d", host, port)); err != nil { @@ -152,6 +193,11 @@ func serveCmd() *cli.Command { }() go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered from panic: %v", r) + } + }() fmt.Println("Starting database writer...") db.Subscribe(ctx.Context, database, dbPostChan) }() diff --git a/cmd/subscribe.go b/cmd/subscribe.go index a5c0960..b31374d 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -43,7 +43,7 @@ Prints all other log messages to stderr.`, go func() { fmt.Println("Subscribing to firehose...") - firehose.Subscribe(ctx.Context, postChan, nil, ticker, -1) + firehose.Subscribe(ctx.Context, postChan, ticker, -1) }() go func() { diff --git a/firehose/firehose.go b/firehose/firehose.go index 83afcd4..ff4e724 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -42,7 +42,7 @@ var ( ) // Subscribe to the firehose using the Firehose struct as a receiver -func Subscribe(ctx context.Context, postChan chan interface{}, statisticsChan chan models.StatisticsEvent, ticker *time.Ticker, seq int64) { +func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Ticker, seq int64) { address := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" headers := http.Header{} @@ -61,32 +61,6 @@ func Subscribe(ctx context.Context, postChan chan interface{}, statisticsChan ch backoff.Multiplier = 2 backoff.MaxElapsedTime = 120 * time.Second - // Setup a ticker chan to send statistics events - // Make a new ticker running every 5 seconds - statisticsTicker := time.NewTicker(1 * time.Second) - - go func() { - defer func() { - fmt.Println("Ticker stopped") - statisticsTicker.Stop() - }() - for { - select { - case <-statisticsTicker.C: - // Send statistics event - statisticsChan <- models.StatisticsEvent{ - EventsPerSecond: atomic.LoadInt64(&processedEvents), - PostsPerSecond: atomic.LoadInt64(&processedPosts), - } - // Reset processed events and posts - atomic.StoreInt64(&processedEvents, 0) - atomic.StoreInt64(&processedPosts, 0) - case <-ctx.Done(): - return - } - } - }() - // Check if context is cancelled, if so exit the connection loop for { select { @@ -124,6 +98,27 @@ func Subscribe(ctx context.Context, postChan chan interface{}, statisticsChan ch } } +func MonitorFirehoseStats(ctx context.Context, statisticsChan chan models.StatisticsEvent) { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ticker.C: + // Send statistics event + statisticsChan <- models.StatisticsEvent{ + // Divide by 5 and round to get average per second + EventsPerSecond: atomic.LoadInt64(&processedEvents) / 5, + PostsPerSecond: atomic.LoadInt64(&processedPosts) / 5, + } + // Reset processed events and posts + atomic.StoreInt64(&processedEvents, 0) + atomic.StoreInt64(&processedPosts, 0) + case <-ctx.Done(): + log.Info("Stopping statistics ticker") + return + } + } +} + func eventProcessor(postChan chan interface{}, context context.Context, ticker *time.Ticker) *events.RepoStreamCallbacks { streamCallbacks := &events.RepoStreamCallbacks{ RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { diff --git a/frontend/App.tsx b/frontend/App.tsx index 5afef12..b0f7e62 100644 --- a/frontend/App.tsx +++ b/frontend/App.tsx @@ -259,58 +259,52 @@ const langToName = (lang: string): string => { }; interface PostFirehoseProps { - posts: Accessor; + post: Accessor; className?: string; } -const PostFirehose: Component = ({ posts, className }) => { +const PostFirehose: Component = ({ post, className }) => { + // Display a pretty list of the posts // Set a max height and use overflow-y: scroll to make it scrollable // Height should be whatever the parent is. + const createdAt = post() ? formatRelative(new Date(post().createdAt * 1000), new Date()) : ""; + // Match regex to get the profile and post id + // URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623 + // profile = did:plc:opkjeuzx2lego6a7gueytryu + // post = 3kcbxsslpu623 + + const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/; + const [profile, postId] = post() ? regex.exec(post().uri)!.slice(1) : ["", ""]; + const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}`; + return (

Recent posts

-
- - {(post) => { - const createdAt = formatRelative( - new Date(post.createdAt * 1000), - new Date() - ); - // Match regex to get the profile and post id - // URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623 - // profile = did:plc:opkjeuzx2lego6a7gueytryu - // post = 3kcbxsslpu623 - - const regex = - /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/; - const [profile, postId] = regex.exec(post.uri)!.slice(1); - const bskyLink = `https://bsky.app/profile/${profile}/post/${postId}`; - return ( -
-
-

{createdAt}

-

- {post.languages.map(langToName).join(", ")} -

-
-

{post.text}

- - {/* Link to post on Bluesky */} - -
- ); - }} -
+
+ {post() ? ( +
+
+

{createdAt}

+

+ {post().languages.map(langToName).join(", ")} +

+
+

{post().text}

+ + {/* Link to post on Bluesky */} + +
+ ): null}
); @@ -357,7 +351,7 @@ const Header = () => { const App: Component = () => { const [key, setKey] = createSignal(); // Used to politely close the event source - const [posts, setPosts] = createSignal([]); + const [post, setPost] = createSignal(); const [eventsPerSecond, setEventsPerSecond] = createSignal(0); const [postsPerSecond, setPostsPerSecond] = createSignal(0); const [eventSource, setEventSource] = createSignal(null); @@ -375,7 +369,7 @@ const App: Component = () => { es.addEventListener("create-post", (e: MessageEvent) => { const data = JSON.parse(e.data); console.log("Received post", data); - setPosts((posts) => [data, ...posts.slice(0, 499)]); + setPost(data); }); es.addEventListener("statistics", (e: MessageEvent) => { @@ -404,13 +398,21 @@ const App: Component = () => { <>
- - + + - +
); diff --git a/server/server.go b/server/server.go index 6e259ba..1e7f3f3 100644 --- a/server/server.go +++ b/server/server.go @@ -44,7 +44,7 @@ type ServerConfig struct { // Make it sync type Broadcaster struct { - sync.Mutex + sync.RWMutex createPostClients map[string]chan models.CreatePostEvent statisticsClients map[string]chan models.StatisticsEvent } @@ -52,26 +52,31 @@ type Broadcaster struct { // Constructor func NewBroadcaster() *Broadcaster { return &Broadcaster{ - createPostClients: make(map[string]chan models.CreatePostEvent), - statisticsClients: make(map[string]chan models.StatisticsEvent), + createPostClients: make(map[string]chan models.CreatePostEvent, 10000), + statisticsClients: make(map[string]chan models.StatisticsEvent, 10000), } } func (b *Broadcaster) BroadcastCreatePost(post models.CreatePostEvent) { - log.WithFields(log.Fields{ - "clients": len(b.createPostClients), - }).Info("Broadcasting post to SSE clients") - for _, client := range b.createPostClients { - client <- post + for id, client := range b.createPostClients { + select { + case client <- post: // Non-blocking send + default: + log.Warnf("Client channel full, skipping stats for client: %v", id) + } } } func (b *Broadcaster) BroadcastStatistics(stats models.StatisticsEvent) { - log.WithFields(log.Fields{ - "clients": len(b.statisticsClients), - }).Info("Broadcasting statistics to SSE clients") - for _, client := range b.statisticsClients { - client <- stats + b.RLock() // Assuming you have a mutex for client safety + defer b.RUnlock() + + for id, client := range b.statisticsClients { + select { + case client <- stats: // Non-blocking send + default: + log.Warnf("Client channel full, skipping stats for client: %v", id) + } } } @@ -91,21 +96,17 @@ func (b *Broadcaster) AddClient(key string, createPostClient chan models.CreateP func (b *Broadcaster) RemoveClient(key string) { b.Lock() defer b.Unlock() - // Check if client channel exists in map - if _, ok := b.createPostClients[key]; !ok { - // Close the channel unless it's already closed - if b.createPostClients[key] != nil { - close(b.createPostClients[key]) - } - delete(b.createPostClients, key) + + // Remove from createPostClients + if client, ok := b.createPostClients[key]; ok { // Check if the client exists + close(client) // Safely close the channel + delete(b.createPostClients, key) // Remove from the map } - if _, ok := b.statisticsClients[key]; !ok { - // Close the channel unless it's already closed - if b.statisticsClients[key] != nil { - close(b.statisticsClients[key]) - } - delete(b.statisticsClients, key) + // Remove from statisticsClients + if client, ok := b.statisticsClients[key]; ok { // Check if the client exists + close(client) // Safely close the channel + delete(b.statisticsClients, key) // Remove from the map } log.WithFields(log.Fields{ @@ -312,67 +313,87 @@ func Server(config *ServerConfig) *fiber.App { c.Set("Connection", "keep-alive") c.Set("Transfer-Encoding", "chunked") + // Unique client key + key := uuid.New().String() + sseCreatePostChannel := make(chan models.CreatePostEvent, 10) // Buffered channel + sseStatisticsChannel := make(chan models.StatisticsEvent, 10) + aliveChan := time.NewTicker(5 * time.Second) + + defer aliveChan.Stop() + + // Register the client + bc.AddClient(key, sseCreatePostChannel, sseStatisticsChannel) + + // Cleanup function + cleanup := func() { + log.Infof("Cleaning up SSE stream for client: %s", key) + bc.RemoveClient(key) + } + + // Use StreamWriter to manage SSE streaming c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - // Register the channel to write posts to so server can write to it - key := uuid.New().String() - sseCreatePostChannel := make(chan models.CreatePostEvent) - sseStatisticsChannel := make(chan models.StatisticsEvent) - aliveChan := time.NewTicker(5 * time.Second) - bc.AddClient(key, sseCreatePostChannel, sseStatisticsChannel) - - // A function to cleanup - cleanup := func() { - log.Info("Cleaning up SSE stream ", key) - // Remove sseChannel from the list of channels - bc.RemoveClient(key) - aliveChan.Stop() - } + defer cleanup() - // Send the SSE key first so the client can close the connection when it wants to + // Send initial event with client key fmt.Fprintf(w, "event: init\ndata: %s\n\n", key) - w.Flush() + if err := w.Flush(); err != nil { + log.Errorf("Failed to send init event: %v", err) + return + } + // Start streaming loop for { select { case <-aliveChan.C: - err := w.Flush() - if err != nil { - cleanup() + // Send keep-alive pings + if _, err := fmt.Fprintf(w, "event: ping\ndata: \n\n"); err != nil { + log.Warnf("Failed to send ping to client %s: %v", key, err) return } - case post := <-sseCreatePostChannel: - - jsonPost, jsonErr := json.Marshal(post.Post) - - if jsonErr != nil { - log.Error("Error marshalling post", jsonErr) - break // Break out of the select statement as we have no post to write + if err := w.Flush(); err != nil { + log.Warnf("Failed to flush ping for client %s: %v", key, err) + return } - fmt.Fprintf(w, "event: create-post\ndata: %s\n\n", jsonPost) - err := w.Flush() + case post, ok := <-sseCreatePostChannel: + if !ok { + log.Warnf("CreatePostChannel closed for client %s", key) + return + } + jsonPost, err := json.Marshal(post.Post) if err != nil { - cleanup() + log.Errorf("Error marshalling post for client %s: %v", key, err) + continue + } + if _, err := fmt.Fprintf(w, "event: create-post\ndata: %s\n\n", jsonPost); err != nil { + log.Warnf("Failed to send create-post event to client %s: %v", key, err) return } - - case stats := <-sseStatisticsChannel: - jsonStats, jsonErr := json.Marshal(stats) - - if jsonErr != nil { - log.Error("Error marshalling stats", jsonErr) - break // Break out of the select statement as we have no stats to write + if err := w.Flush(); err != nil { + log.Warnf("Failed to flush create-post event for client %s: %v", key, err) + return } - fmt.Fprintf(w, "event: statistics\ndata: %s\n\n", jsonStats) - err := w.Flush() + case stats, ok := <-sseStatisticsChannel: + if !ok { + log.Warnf("StatisticsChannel closed for client %s", key) + return + } + jsonStats, err := json.Marshal(stats) if err != nil { - cleanup() + log.Errorf("Error marshalling stats for client %s: %v", key, err) + continue + } + if _, err := fmt.Fprintf(w, "event: statistics\ndata: %s\n\n", jsonStats); err != nil { + log.Warnf("Failed to send statistics event to client %s: %v", key, err) + return + } + if err := w.Flush(); err != nil { + log.Warnf("Failed to flush statistics event for client %s: %v", key, err) return } } } - })) return nil