From 18c1692447cdceceb40facab45191f00d80ebfa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Snorre=20Magnus=20Dav=C3=B8en?= Date: Thu, 9 Jan 2025 22:01:29 +0100 Subject: [PATCH] feat: Keyword based feeds Allows creating feeds selecting on multiple keywords. To allow creating feeds on new keywords that go back in time before the feed was created we now store text and a full text search index to facilitate search based look up. --- cmd/serve.go | 37 +--- config/config.go | 1 + .../20250109194601_add_text_column.down.sql | 10 + .../20250109194601_add_text_column.up.sql | 25 +++ db/reader.go | 26 ++- db/writer.go | 58 +++-- feeds.example.toml | 16 ++ feeds/feeds.go | 12 +- firehose/firehose.go | 188 +++++++--------- frontend/App.tsx | 87 -------- models/models.go | 5 - server/server.go | 208 ++---------------- 12 files changed, 220 insertions(+), 453 deletions(-) create mode 100644 db/migrations/20250109194601_add_text_column.down.sql create mode 100644 db/migrations/20250109194601_add_text_column.up.sql diff --git a/cmd/serve.go b/cmd/serve.go index 43975a2..ba952b9 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -114,9 +114,7 @@ func serveCmd() *cli.Command { firehoseCtx := context.Context(ctx.Context) livenessTicker := time.NewTicker(15 * time.Minute) postChan := make(chan interface{}, 1000) - statisticsChan := make(chan models.StatisticsEvent, 1000) - dbPostChan := make(chan interface{}) // Channel for writing posts to the database - broadcaster := server.NewBroadcaster() // SSE broadcaster + dbPostChan := make(chan interface{}) // Channel for writing posts to the database dbReader := db.NewReader(database) seq, err := dbReader.GetSequence() @@ -171,10 +169,9 @@ func serveCmd() *cli.Command { // Create the server app := server.Server(&server.ServerConfig{ - Hostname: hostname, - Reader: dbReader, - Broadcaster: broadcaster, - Feeds: feedMap, + Hostname: hostname, + Reader: dbReader, + Feeds: feedMap, }) // Some glue code to pass posts from the firehose to the database and/or broadcaster @@ -188,30 +185,14 @@ func serveCmd() *cli.Command { }() for post := range postChan { switch post := post.(type) { - // Don't crash if broadcast fails case models.CreatePostEvent: dbPostChan <- post - // Broadcast without blocking - go broadcaster.BroadcastCreatePost(post) // Broadcast new post to SSE clients default: dbPostChan <- post } } }() - // Glue code to pass statistics events to the broadcaster - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Recovered from panic in statistics broadcast routine: %v", r) - } - }() - - for stats := range statisticsChan { - broadcaster.BroadcastStatistics(stats) - } - }() - go func() { defer func() { if r := recover(); r != nil { @@ -226,16 +207,6 @@ func serveCmd() *cli.Command { }) }() - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Recovered from panic in monitor firehose routine: %v", r) - } - }() - fmt.Println("Starting statistics monitor...") - firehose.MonitorFirehoseStats(ctx.Context, statisticsChan) - }() - go func() { <-ctx.Done() log.Info("Context canceled with reason:", ctx.Err()) diff --git a/config/config.go b/config/config.go index 8d4c9e2..9ac13bd 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type FeedConfig struct { Description string `toml:"description"` AvatarPath string `toml:"avatar_path,omitempty"` Languages []string `toml:"languages"` + Keywords []string `toml:"keywords,omitempty"` } type Config struct { diff --git a/db/migrations/20250109194601_add_text_column.down.sql b/db/migrations/20250109194601_add_text_column.down.sql new file mode 100644 index 0000000..8bc00a0 --- /dev/null +++ b/db/migrations/20250109194601_add_text_column.down.sql @@ -0,0 +1,10 @@ +-- Drop triggers first +DROP TRIGGER IF EXISTS posts_ai; +DROP TRIGGER IF EXISTS posts_ad; +DROP TRIGGER IF EXISTS posts_au; + +-- Drop FTS table +DROP TABLE IF EXISTS posts_fts; + +-- Remove text column from posts +ALTER TABLE posts DROP COLUMN text; diff --git a/db/migrations/20250109194601_add_text_column.up.sql b/db/migrations/20250109194601_add_text_column.up.sql new file mode 100644 index 0000000..dc9b07a --- /dev/null +++ b/db/migrations/20250109194601_add_text_column.up.sql @@ -0,0 +1,25 @@ +ALTER TABLE posts ADD COLUMN text TEXT; + +-- Create virtual FTS table +CREATE VIRTUAL TABLE posts_fts USING fts5( + text, + content='posts', + content_rowid='id', + tokenize='unicode61' +); + +-- Trigger to keep FTS table in sync on insert +CREATE TRIGGER posts_ai AFTER INSERT ON posts BEGIN + INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text); +END; + +-- Trigger to keep FTS table in sync on delete +CREATE TRIGGER posts_ad AFTER DELETE ON posts BEGIN + INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text); +END; + +-- Trigger to keep FTS table in sync on update +CREATE TRIGGER posts_au AFTER UPDATE ON posts BEGIN + INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text); + INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text); +END; \ No newline at end of file diff --git a/db/reader.go b/db/reader.go index ede6973..6d0afc6 100644 --- a/db/reader.go +++ b/db/reader.go @@ -5,6 +5,7 @@ import ( "fmt" "norsky/models" "strconv" + "strings" "time" sqlbuilder "github.com/huandu/go-sqlbuilder" @@ -25,7 +26,7 @@ func NewReader(database string) *Reader { } } -func (reader *Reader) GetFeed(langs []string, limit int, postId int64) ([]models.FeedPost, error) { +func (reader *Reader) GetFeed(langs []string, keywords []string, limit int, postId int64) ([]models.FeedPost, error) { sb := sqlbuilder.NewSelectBuilder() sb.Select("DISTINCT posts.id", "posts.uri").From("posts") @@ -33,14 +34,29 @@ func (reader *Reader) GetFeed(langs []string, limit int, postId int64) ([]models sb.Where(sb.LessThan("posts.id", postId)) } + // Build language conditions if specified if len(langs) > 0 { sb.Join("post_languages", "posts.id = post_languages.post_id") - // Build OR conditions for each language - conditions := make([]string, len(langs)) + langConditions := make([]string, len(langs)) for i, lang := range langs { - conditions[i] = sb.Equal("post_languages.language", lang) + langConditions[i] = sb.Equal("post_languages.language", lang) } - sb.Where(sb.Or(conditions...)) + sb.Where(sb.Or(langConditions...)) + } + + // Use FTS search for keywords if specified + if len(keywords) > 0 { + // Join with FTS table and build search query + sb.Join("posts_fts", "posts.id = posts_fts.rowid") + searchTerms := make([]string, len(keywords)) + for i, keyword := range keywords { + // Escape quotes and use * for prefix matching + escaped := strings.ReplaceAll(keyword, "'", "''") + searchTerms[i] = fmt.Sprintf("%s*", escaped) + } + // Combine terms with OR + searchQuery := strings.Join(searchTerms, " OR ") + sb.Where(fmt.Sprintf("posts_fts MATCH '%s'", searchQuery)) } sb.OrderBy("posts.id").Desc() diff --git a/db/writer.go b/db/writer.go index 22bc79e..052469d 100644 --- a/db/writer.go +++ b/db/writer.go @@ -2,6 +2,7 @@ package db import ( "context" + "fmt" "norsky/models" "time" @@ -66,45 +67,56 @@ func processSeq(db *sql.DB, evt models.ProcessSeqEvent) error { return nil } -func createPost(db *sql.DB, post models.Post) error { +func createPost(db *sql.DB, post models.Post) (int64, error) { log.WithFields(log.Fields{ - "uri": post.Uri, - "languages": post.Languages, + "uri": post.Uri, + "languages": post.Languages, + "created_at": time.Unix(post.CreatedAt, 0).Format(time.RFC3339), + // Record lag from when the post was created to when it was processed + "lagSeconds": time.Since(time.Unix(post.CreatedAt, 0)).Seconds(), }).Info("Creating post") + + // Start a transaction since we need to insert into multiple tables + tx, err := db.Begin() + if err != nil { + return 0, fmt.Errorf("transaction error: %w", err) + } + defer tx.Rollback() + // Post insert query insertPost := sqlbuilder.NewInsertBuilder() - sql, args := insertPost.InsertInto("posts").Cols("uri", "created_at").Values(post.Uri, post.CreatedAt).Build() + insertPost.InsertInto("posts").Cols("uri", "created_at", "text") + insertPost.Values(post.Uri, post.CreatedAt, post.Text) - // Spread args - res, err := db.Exec(sql, args...) + sql, args := insertPost.Build() + + result, err := tx.Exec(sql, args...) if err != nil { - log.Error("Error inserting post", err) - return err + return 0, fmt.Errorf("insert error: %w", err) } - // Get inserted id - id, err := res.LastInsertId() + postId, err := result.LastInsertId() if err != nil { - log.Error("Error getting inserted id", err) - return err + return 0, fmt.Errorf("last insert id error: %w", err) } - // Post languages insert query - insertLangs := sqlbuilder.NewInsertBuilder() - insertLangs.InsertInto("post_languages").Cols("post_id", "language") + // Insert languages for _, lang := range post.Languages { - insertLangs.Values(id, lang) - } - sql, args = insertLangs.Build() + insertLang := sqlbuilder.NewInsertBuilder() + insertLang.InsertInto("post_languages").Cols("post_id", "language") + insertLang.Values(postId, lang) - _, err = db.Exec(sql, args...) + sql, args := insertLang.Build() + if _, err := tx.Exec(sql, args...); err != nil { + return 0, fmt.Errorf("language insert error: %w", err) + } + } - if err != nil { - log.Error("Error inserting languages", err) - return err + if err := tx.Commit(); err != nil { + return 0, fmt.Errorf("commit error: %w", err) } - return nil + return postId, nil } func deletePost(db *sql.DB, post models.Post) error { diff --git a/feeds.example.toml b/feeds.example.toml index ffd7479..f5fd7ad 100644 --- a/feeds.example.toml +++ b/feeds.example.toml @@ -25,3 +25,19 @@ display_name = "Norsk (Norwegian)" description = "A feed of Bluesky posts written in Norwegian bokmål, nynorsk and sami" avatar_path = "./assets/avatar.png" languages = ["nb", "nn", "no", "se"] + +[[feeds]] +id = "tv-shows" +display_name = "Norwegian TV Shows" +description = "A feed of Bluesky posts about Norwegian TV shows" +avatar_path = "./assets/avatar.png" +languages = ["nb", "nn", "no"] +keywords = ["tv-serie", "tvserie", "nrk", "tv2", "netflix"] + +[[feeds]] +id = "tech" +display_name = "Norwegian Tech" +description = "A feed of Bluesky posts about technology in Norwegian" +avatar_path = "./assets/avatar.png" +languages = ["nb", "nn", "no"] +keywords = ["teknologi", "programmering", "koding", "kunstig intelligens", "ai"] diff --git a/feeds/feeds.go b/feeds/feeds.go index 88efc5f..9755200 100644 --- a/feeds/feeds.go +++ b/feeds/feeds.go @@ -13,10 +13,10 @@ import ( type Algorithm func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error) // Reuse genericAlgo for all algorithms -func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string) (*models.FeedResponse, error) { +func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string, keywords []string) (*models.FeedResponse, error) { postId := safeParseCursor(cursor) - posts, err := reader.GetFeed(languages, limit+1, postId) + posts, err := reader.GetFeed(languages, keywords, limit+1, postId) if err != nil { log.Error("Error getting feed", err) return nil, err @@ -57,6 +57,7 @@ type Feed struct { Description string AvatarPath string Languages []string + Keywords []string Algorithm Algorithm } @@ -71,7 +72,8 @@ func InitializeFeeds(cfg *config.Config) map[string]Feed { Description: feedCfg.Description, AvatarPath: feedCfg.AvatarPath, Languages: feedCfg.Languages, - Algorithm: createAlgorithm(feedCfg.Languages), + Keywords: feedCfg.Keywords, + Algorithm: createAlgorithm(feedCfg.Languages, feedCfg.Keywords), } } @@ -79,8 +81,8 @@ func InitializeFeeds(cfg *config.Config) map[string]Feed { } // Helper function to create an algorithm based on languages -func createAlgorithm(languages []string) Algorithm { +func createAlgorithm(languages []string, keywords []string) Algorithm { return func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error) { - return genericAlgo(reader, cursor, limit, languages) + return genericAlgo(reader, cursor, limit, languages, keywords) } } diff --git a/firehose/firehose.go b/firehose/firehose.go index 51189eb..b9f47b7 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -8,52 +8,70 @@ import ( "io" "net/http" "norsky/models" - "runtime" "strings" "sync" - "sync/atomic" "time" "unicode" "unicode/utf8" "log/slog" + "net" + "github.com/bluesky-social/indigo/api/atproto" appbsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/events" - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" + "github.com/bluesky-social/indigo/events/schedulers/sequential" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" "github.com/bluesky-social/indigo/repomgr" "github.com/cenkalti/backoff/v4" "github.com/gorilla/websocket" lingua "github.com/pemistahl/lingua-go" + "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" log "github.com/sirupsen/logrus" ) // Some constants to optimize the firehose +// Allow these to grow to 10MB const ( - wsReadBufferSize = 1024 * 16 // 16KB - wsWriteBufferSize = 1024 * 16 // 16KB - eventBufferSize = 10000 // Increase from 1000 + wsReadBufferSize = 1024 * 1024 * 2 // 2MB + wsWriteBufferSize = 1024 * 1024 * 2 // 2MB + wsReadTimeout = 90 * time.Second // Increased from 60s + wsWriteTimeout = 15 * time.Second // Increased from 10s + wsPingInterval = 30 * time.Second // Reduced from 60s ) -// We use all languages so as to reliably separate Norwegian from other European languages -var detector lingua.LanguageDetector +// Add these metrics +var ( + wsMessageProcessingTime = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "norsky_ws_message_processing_seconds", + Help: "Time spent processing websocket messages", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), + }) + + wsMessageBacklog = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "norsky_ws_message_backlog", + Help: "Number of messages waiting to be processed", + }) +) -func InitDetector() { - if detector == nil { - detector = lingua.NewLanguageDetectorBuilder().FromLanguages(lingua.AllLanguages()...).WithMinimumRelativeDistance(0.25).Build() - } +func init() { + prometheus.MustRegister(wsMessageProcessingTime) + prometheus.MustRegister(wsMessageBacklog) } -// Keep track of processed event and posts count to show stats in the web interface -var ( - processedEvents int64 - processedPosts int64 -) +func NewLanguageDetector(targetLangs []lingua.Language) lingua.LanguageDetector { + // Always include English plus target languages + languages := lingua.AllLanguages() + + return lingua.NewLanguageDetectorBuilder(). + FromLanguages(languages...). + WithMinimumRelativeDistance(0.25). + Build() +} // Add a pool for the FeedPost struct to reduce GC pressure // Instead of allocating new FeedPost structs for every post, @@ -175,7 +193,6 @@ func ContainsRepetitivePattern(text string) bool { minRepeats = 2 } if repeats >= minRepeats { - log.Debugf("Found repeating pattern '%v' (%d times)", pattern, repeats) return true } } else { @@ -247,19 +264,16 @@ func ContainsSpamContent(text string) bool { // If more than 5 hashtags, consider it spam if hashtagCount > 5 { - log.Infof("Skipping spam post with many hashtags: %s", text) return true } // If more than 5 mentions, consider it spam if mentionCount > 5 { - log.Infof("Skipping spam post with many mentions: %s", text) return true } // Check for repeated hashtags or mentions (common spam pattern) if strings.Count(text, "##") > 0 || strings.Count(text, "@@") > 0 { - log.Infof("Skipping spam post with repeated hashtags/mentions: %s", text) return true } @@ -270,7 +284,6 @@ func ContainsSpamContent(text string) bool { symbolRatio := float64(hashtagCount+mentionCount) / float64(len(words)) // If more than 50% of words are hashtags or mentions combined, consider it spam if symbolRatio > 0.5 { - log.Infof("Skipping spam post with high hashtag/mention ratio: %s", text) return true } } @@ -288,11 +301,10 @@ type FirehoseConfig struct { // Subscribe to the firehose using the Firehose struct as a receiver func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Ticker, seq int64, config FirehoseConfig) { - InitDetector() - address := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" headers := http.Header{} headers.Set("User-Agent", "NorSky: https://github.com/snorremd/norsky") + headers.Set("Accept-Encoding", "gzip") if seq >= 0 { log.Info("Starting from sequence: ", seq) @@ -303,12 +315,16 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick dialer := websocket.Dialer{ ReadBufferSize: wsReadBufferSize, WriteBufferSize: wsWriteBufferSize, - HandshakeTimeout: 30 * time.Second, + HandshakeTimeout: 45 * time.Second, + NetDialContext: (&net.Dialer{ + Timeout: 45 * time.Second, + KeepAlive: 45 * time.Second, + }).DialContext, } backoff := backoff.NewExponentialBackOff() - backoff.InitialInterval = 1 * time.Second - backoff.MaxInterval = 600 * time.Second + backoff.InitialInterval = 100 * time.Millisecond + backoff.MaxInterval = 30 * time.Second backoff.Multiplier = 1.5 backoff.MaxElapsedTime = 0 @@ -330,14 +346,14 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick backoff.Reset() // Set initial deadlines - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + conn.SetReadDeadline(time.Now().Add(wsReadTimeout)) + conn.SetWriteDeadline(time.Now().Add(wsWriteTimeout)) - // Start ping ticker - pingTicker := time.NewTicker(60 * time.Second) + // Start ping ticker with shorter interval + pingTicker := time.NewTicker(wsPingInterval) defer pingTicker.Stop() - // Start ping goroutine + // Update ping goroutine go func() { for { select { @@ -345,13 +361,13 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick return case <-pingTicker.C: log.Debug("Sending ping to check connection") - if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(wsWriteTimeout)); err != nil { log.Warn("Ping failed, closing connection for restart: ", err) conn.Close() return } // Reset read deadline after successful ping - if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { + if err := conn.SetReadDeadline(time.Now().Add(wsReadTimeout)); err != nil { log.Warn("Failed to set read deadline, closing connection: ", err) conn.Close() return @@ -360,6 +376,12 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick } }() + // Add connection close handler + conn.SetCloseHandler(func(code int, text string) error { + log.Infof("WebSocket connection closed with code %d: %s", code, text) + return nil + }) + // Remove pong handler since server doesn't respond // Keep ping handler for completeness conn.SetPingHandler(func(appData string) error { @@ -367,16 +389,20 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick return conn.SetReadDeadline(time.Now().Add(60 * time.Second)) }) - scheduler := autoscaling.NewScheduler( - autoscaling.AutoscaleSettings{ - MaxConcurrency: runtime.NumCPU(), - Concurrency: 2, - AutoscaleFrequency: 5 * time.Second, - ThroughputBucketDuration: 1 * time.Second, - ThroughputBucketCount: 10, - }, + scheduler := sequential.NewScheduler( + //runtime.NumCPU(), + //100, + // autoscaling.AutoscaleSettings{ + + // MaxConcurrency: runtime.NumCPU() * 2, + // Concurrency: runtime.NumCPU() * 2, + // AutoscaleFrequency: 10 * time.Second, + // ThroughputBucketDuration: 2 * time.Second, + // ThroughputBucketCount: 15, + // }, conn.RemoteAddr().String(), eventProcessor(postChan, ctx, ticker, config).EventHandler) + err = events.HandleRepoStream(ctx, conn, scheduler, slog.Default()) // If error sleep @@ -401,27 +427,6 @@ func Subscribe(ctx context.Context, postChan chan interface{}, ticker *time.Tick } } -func MonitorFirehoseStats(ctx context.Context, statisticsChan chan models.StatisticsEvent) { - ticker := time.NewTicker(5 * time.Second) - for { - select { - case <-ticker.C: - // Send statistics event - statisticsChan <- models.StatisticsEvent{ - // Divide by 5 and round to get average per second - EventsPerSecond: atomic.LoadInt64(&processedEvents) / 5, - PostsPerSecond: atomic.LoadInt64(&processedPosts) / 5, - } - // Reset processed events and posts - atomic.StoreInt64(&processedEvents, 0) - atomic.StoreInt64(&processedPosts, 0) - case <-ctx.Done(): - log.Info("Stopping statistics ticker") - return - } - } -} - // Add new types to help organize the code type PostProcessor struct { postChan chan interface{} @@ -430,51 +435,37 @@ type PostProcessor struct { config FirehoseConfig targetLanguages []lingua.Language supportedLanguages map[lingua.Language]string + languageDetector lingua.LanguageDetector // We should have one detector per worker } // Rename to DetectLanguage since it's no longer Norwegian-specific func (p *PostProcessor) DetectLanguage(text string, currentLangs []string, targetLangs []lingua.Language) (bool, []string) { - detectedLang, exists := detector.DetectLanguageOf(text) - if !exists || detectedLang == lingua.English { - return false, currentLangs - } - - // Check if detected language is one of our target languages - isTargetLang := false - for _, lang := range targetLangs { - if detectedLang == lang { - isTargetLang = true - break - } - } - if !isTargetLang { - return false, currentLangs - } - - // Get confidence scores for target languages var highestConf float64 - var detectedLingua lingua.Language + var detectedLang lingua.Language - for _, lang := range targetLangs { - conf := detector.ComputeLanguageConfidence(text, lang) + // Check confidence for English and all target languages + for _, lang := range append([]lingua.Language{lingua.English}, targetLangs...) { + conf := p.languageDetector.ComputeLanguageConfidence(text, lang) if conf > highestConf { highestConf = conf - detectedLingua = lang + detectedLang = lang } - log.Infof("%s confidence: %.2f (threshold: %.2f)", - lang.String(), conf, p.config.ConfidenceThreshold) } - if highestConf < p.config.ConfidenceThreshold { + // If confidence is too low or detected language is English, skip + if highestConf < p.config.ConfidenceThreshold || detectedLang == lingua.English { return false, currentLangs } + log.Infof("%s confidence: %.2f (threshold: %.2f)", + detectedLang.String(), highestConf, p.config.ConfidenceThreshold) + // Create new slice to avoid modifying the input updatedLangs := make([]string, len(currentLangs)) copy(updatedLangs, currentLangs) // Map lingua language to ISO code - langCode := linguaToISO(detectedLingua, p.supportedLanguages) + langCode := linguaToISO(detectedLang, p.supportedLanguages) if langCode != "" && !lo.Contains(updatedLangs, langCode) { updatedLangs = append(updatedLangs, langCode) } @@ -502,30 +493,23 @@ func isoToLingua(code string, languages map[lingua.Language]string) (lingua.Lang // Handle post processing logic func (p *PostProcessor) processPost(evt *atproto.SyncSubscribeRepos_Commit, op *atproto.SyncSubscribeRepos_RepoOp, record *appbsky.FeedPost) error { + // Get URI uri := fmt.Sprintf("at://%s/%s", evt.Repo, op.Path) - // 1. Check word count first (cheapest operation - just string splitting) words := strings.Fields(record.Text) if len(words) < 4 { - log.Debugf("Skipping short post with only %d words: %s", len(words), uri) return nil } - // 3. Check letter ratio (fast character counting) if !HasEnoughLetters(record.Text) { - log.Debugf("Skipping post with insufficient letter ratio: %s", uri) return nil } - // 4. Check for repetitive patterns (string analysis) if ContainsRepetitivePattern(record.Text) { - log.Debugf("Skipping post with repetitive pattern: %s", uri) return nil } - // 5. Check for spam content (string matching) if ContainsSpamContent(record.Text) { - log.Debugf("Skipping spam post: %s", uri) return nil } @@ -614,12 +598,11 @@ func eventProcessor(postChan chan interface{}, context context.Context, ticker * config: config, targetLanguages: targetLangs, supportedLanguages: supportedLangs, + languageDetector: NewLanguageDetector(targetLangs), } return &events.RepoStreamCallbacks{ RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { - atomic.AddInt64(&processedEvents, 1) - rr, err := repo.ReadRepoFromCar(context, bytes.NewReader(evt.Blocks)) if err != nil { return fmt.Errorf("failed to read repo from car: %w", err) @@ -636,8 +619,6 @@ func eventProcessor(postChan chan interface{}, context context.Context, ticker * continue } - atomic.AddInt64(&processedPosts, 1) - // Get and decode record _, rec, err := rr.GetRecord(context, op.Path) if err != nil { @@ -675,11 +656,6 @@ func eventProcessor(postChan chan interface{}, context context.Context, ticker * } } -// GetDetector returns the package-level detector for testing -func GetDetector() lingua.LanguageDetector { - return detector -} - // Rename and modify the function to just get supported languages func getSupportedLanguages() map[lingua.Language]string { languages := make(map[lingua.Language]string) diff --git a/frontend/App.tsx b/frontend/App.tsx index 919ceae..a669603 100644 --- a/frontend/App.tsx +++ b/frontend/App.tsx @@ -258,73 +258,6 @@ const langToName = (lang: string): string => { } }; -interface PostFirehoseProps { - post: Accessor; - className?: string; -} - -const PostFirehose: Component = ({ post, className }) => { - - // Match regex to get the profile and post id - // URI example: at://did:plc:opkjeuzx2lego6a7gueytryu/app.bsky.feed.post/3kcbxsslpu623 - // profile = did:plc:opkjeuzx2lego6a7gueytryu - // post = 3kcbxsslpu623 - - const bskyLink = (post: Post) => { - const regex = /at:\/\/(did:plc:[a-z0-9]+)\/app.bsky.feed.post\/([a-z0-9]+)/; - const [profile, postId] = regex.exec(post.uri)!.slice(1); - return `https://bsky.app/profile/${profile}/post/${postId}`; - } - - return ( - -

Recent posts

-
- {post() ? ( -
-
-

{formatRelative(new Date(post().createdAt * 1000), new Date()) }

-

- {post().languages.map(langToName).join(", ")} -

-
-

{post().text}

- - {/* Link to post on Bluesky */} - -
- ): null} -
-
- ); -}; - -const StatisticStat = ({ - label, - value, - className, -}: { - label: string; - value: Accessor; - className?: string; -}) => { - return ( - -

{label}

-

{value()}

-

per second

-
- ); -}; - const Header = () => { return (
{ const App: Component = () => { const [key, setKey] = createSignal(); // Used to politely close the event source const [post, setPost] = createSignal(); - const [eventsPerSecond, setEventsPerSecond] = createSignal(0); - const [postsPerSecond, setPostsPerSecond] = createSignal(0); const [eventSource, setEventSource] = createSignal(null); onMount(() => { @@ -370,13 +301,6 @@ const App: Component = () => { setPost(data); }); - es.addEventListener("statistics", (e: MessageEvent) => { - const data = JSON.parse(e.data); - console.log("Received statistics", data); - setEventsPerSecond(data.eventsPerSecond); - setPostsPerSecond(data.postsPerSecond); - }); - // Add error handling es.addEventListener("error", (e) => { console.error("EventSource error:", e); @@ -412,21 +336,10 @@ const App: Component = () => { <>
- - -
); diff --git a/models/models.go b/models/models.go index 8366157..29f8452 100644 --- a/models/models.go +++ b/models/models.go @@ -45,8 +45,3 @@ type PostsAggregatedByTime struct { Time time.Time `json:"time"` Count int64 `json:"count"` } - -type StatisticsEvent struct { - EventsPerSecond int64 `json:"eventsPerSecond"` - PostsPerSecond int64 `json:"postsPerSecond"` -} diff --git a/server/server.go b/server/server.go index 0b75db1..c33bdb9 100644 --- a/server/server.go +++ b/server/server.go @@ -1,30 +1,27 @@ package server import ( - "bufio" "embed" - "encoding/json" "fmt" "net/http" "norsky/db" "norsky/feeds" - "norsky/models" "strconv" "strings" - "sync" "time" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/adaptor" "github.com/gofiber/fiber/v2/middleware/cache" "github.com/gofiber/fiber/v2/middleware/compress" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/filesystem" "github.com/gofiber/fiber/v2/middleware/requestid" - "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" - "github.com/valyala/fasthttp" ) //go:embed dist/* @@ -38,100 +35,29 @@ type ServerConfig struct { // The reader to use for reading posts Reader *db.Reader - // Broadcast channels to pass posts to SSE clients - Broadcaster *Broadcaster - // Add feeds to config Feeds map[string]feeds.Feed } -// Make it sync -type Broadcaster struct { - sync.RWMutex - createPostClients map[string]chan models.CreatePostEvent - statisticsClients map[string]chan models.StatisticsEvent -} - -// Constructor -func NewBroadcaster() *Broadcaster { - return &Broadcaster{ - createPostClients: make(map[string]chan models.CreatePostEvent, 10000), - statisticsClients: make(map[string]chan models.StatisticsEvent, 10000), - } -} - -func (b *Broadcaster) BroadcastCreatePost(post models.CreatePostEvent) { - for id, client := range b.createPostClients { - select { - case client <- post: // Non-blocking send - default: - log.Warnf("Client channel full, skipping stats for client: %v", id) - } - } -} - -func (b *Broadcaster) BroadcastStatistics(stats models.StatisticsEvent) { - b.RLock() // Assuming you have a mutex for client safety - defer b.RUnlock() - - for id, client := range b.statisticsClients { - select { - case client <- stats: // Non-blocking send - default: - log.Warnf("Client channel full, skipping stats for client: %v", id) - } - } -} - -// Function to add a client to the broadcaster -func (b *Broadcaster) AddClient(key string, createPostClient chan models.CreatePostEvent, statisticsClient chan models.StatisticsEvent) { - b.Lock() - defer b.Unlock() - b.createPostClients[key] = createPostClient - b.statisticsClients[key] = statisticsClient - log.WithFields(log.Fields{ - "key": key, - "count": len(b.createPostClients), - }).Info("Adding client to broadcaster") -} - -// Function to remove a client from the broadcaster -func (b *Broadcaster) RemoveClient(key string) { - b.Lock() - defer b.Unlock() - - // Remove from createPostClients - if client, ok := b.createPostClients[key]; ok { // Check if the client exists - close(client) // Safely close the channel - delete(b.createPostClients, key) // Remove from the map - } - - // Remove from statisticsClients - if client, ok := b.statisticsClients[key]; ok { // Check if the client exists - close(client) // Safely close the channel - delete(b.statisticsClients, key) // Remove from the map - } - - log.WithFields(log.Fields{ - "key": key, - "count": len(b.createPostClients), - }).Info("Removed client from broadcaster") -} +var ( + // Define custom metrics + customPostsProcessed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "norsky_posts_processed_total", + Help: "Total number of posts processed by language", + }, + []string{"language"}, + ) +) -func (b *Broadcaster) Shutdown() { - log.Info("Shutting down broadcaster") - b.Lock() - defer b.Unlock() - for _, client := range b.createPostClients { - close(client) - } +func init() { + // Register custom metrics + prometheus.MustRegister(customPostsProcessed) } // Returns a fiber.App instance to be used as an HTTP server for the norsky feed func Server(config *ServerConfig) *fiber.App { - bc := config.Broadcaster - app := fiber.New() // Middleware to track the latency of each request @@ -299,105 +225,6 @@ func Server(config *ServerConfig) *fiber.App { return c.Status(200).JSON(postsPerTime) }) - app.Delete("/dashboard/feed/sse", func(c *fiber.Ctx) error { - // Get the feed query parameters and parse the limit - key := c.Query("key", "") - bc.RemoveClient(key) - return c.Status(200).SendString("OK") - }) - - app.Get("/dashboard/feed/sse", func(c *fiber.Ctx) error { - c.Set("Content-Type", "text/event-stream") - c.Set("Cache-Control", "no-cache") - c.Set("Connection", "keep-alive") - c.Set("Transfer-Encoding", "chunked") - - // Unique client key - key := uuid.New().String() - sseCreatePostChannel := make(chan models.CreatePostEvent, 10) // Buffered channel - sseStatisticsChannel := make(chan models.StatisticsEvent, 10) - aliveChan := time.NewTicker(5 * time.Second) - - defer aliveChan.Stop() - - // Register the client - bc.AddClient(key, sseCreatePostChannel, sseStatisticsChannel) - - // Cleanup function - cleanup := func() { - log.Infof("Cleaning up SSE stream for client: %s", key) - bc.RemoveClient(key) - } - - // Use StreamWriter to manage SSE streaming - c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { - defer cleanup() - - // Send initial event with client key - fmt.Fprintf(w, "event: init\ndata: %s\n\n", key) - if err := w.Flush(); err != nil { - log.Errorf("Failed to send init event: %v", err) - return - } - - // Start streaming loop - for { - select { - case <-aliveChan.C: - // Send keep-alive pings - if _, err := fmt.Fprintf(w, "event: ping\ndata: \n\n"); err != nil { - log.Warnf("Failed to send ping to client %s: %v", key, err) - return - } - if err := w.Flush(); err != nil { - log.Warnf("Failed to flush ping for client %s: %v", key, err) - return - } - - case post, ok := <-sseCreatePostChannel: - if !ok { - log.Warnf("CreatePostChannel closed for client %s", key) - return - } - jsonPost, err := json.Marshal(post.Post) - if err != nil { - log.Errorf("Error marshalling post for client %s: %v", key, err) - continue - } - if _, err := fmt.Fprintf(w, "event: create-post\ndata: %s\n\n", jsonPost); err != nil { - log.Warnf("Failed to send create-post event to client %s: %v", key, err) - return - } - if err := w.Flush(); err != nil { - log.Warnf("Failed to flush create-post event for client %s: %v", key, err) - return - } - - case stats, ok := <-sseStatisticsChannel: - if !ok { - log.Warnf("StatisticsChannel closed for client %s", key) - return - } - jsonStats, err := json.Marshal(stats) - if err != nil { - log.Errorf("Error marshalling stats for client %s: %v", key, err) - continue - } - if _, err := fmt.Fprintf(w, "event: statistics\ndata: %s\n\n", jsonStats); err != nil { - log.Warnf("Failed to send statistics event to client %s: %v", key, err) - return - } - if err := w.Flush(); err != nil { - log.Warnf("Failed to flush statistics event for client %s: %v", key, err) - return - } - } - } - })) - - return nil - }) - // Serve the Solid dashboard app.Use("/", filesystem.New(filesystem.Config{ Browse: false, @@ -406,5 +233,8 @@ func Server(config *ServerConfig) *fiber.App { PathPrefix: "/dist", })) + // Add Prometheus metrics endpoint + app.Get("/metrics", adaptor.HTTPHandler(promhttp.Handler())) + return app }