Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Keyword based feeds #22

Merged
merged 1 commit into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 4 additions & 33 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ func serveCmd() *cli.Command {
firehoseCtx := context.Context(ctx.Context)
livenessTicker := time.NewTicker(15 * time.Minute)
postChan := make(chan interface{}, 1000)
statisticsChan := make(chan models.StatisticsEvent, 1000)
dbPostChan := make(chan interface{}) // Channel for writing posts to the database
broadcaster := server.NewBroadcaster() // SSE broadcaster
dbPostChan := make(chan interface{}) // Channel for writing posts to the database

dbReader := db.NewReader(database)
seq, err := dbReader.GetSequence()
Expand Down Expand Up @@ -171,10 +169,9 @@ func serveCmd() *cli.Command {

// Create the server
app := server.Server(&server.ServerConfig{
Hostname: hostname,
Reader: dbReader,
Broadcaster: broadcaster,
Feeds: feedMap,
Hostname: hostname,
Reader: dbReader,
Feeds: feedMap,
})

// Some glue code to pass posts from the firehose to the database and/or broadcaster
Expand All @@ -188,30 +185,14 @@ func serveCmd() *cli.Command {
}()
for post := range postChan {
switch post := post.(type) {
// Don't crash if broadcast fails
case models.CreatePostEvent:
dbPostChan <- post
// Broadcast without blocking
go broadcaster.BroadcastCreatePost(post) // Broadcast new post to SSE clients
default:
dbPostChan <- post
}
}
}()

// Glue code to pass statistics events to the broadcaster
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in statistics broadcast routine: %v", r)
}
}()

for stats := range statisticsChan {
broadcaster.BroadcastStatistics(stats)
}
}()

go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -226,16 +207,6 @@ func serveCmd() *cli.Command {
})
}()

go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic in monitor firehose routine: %v", r)
}
}()
fmt.Println("Starting statistics monitor...")
firehose.MonitorFirehoseStats(ctx.Context, statisticsChan)
}()

go func() {
<-ctx.Done()
log.Info("Context canceled with reason:", ctx.Err())
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type FeedConfig struct {
Description string `toml:"description"`
AvatarPath string `toml:"avatar_path,omitempty"`
Languages []string `toml:"languages"`
Keywords []string `toml:"keywords,omitempty"`
}

type Config struct {
Expand Down
10 changes: 10 additions & 0 deletions db/migrations/20250109194601_add_text_column.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Drop triggers first
DROP TRIGGER IF EXISTS posts_ai;
DROP TRIGGER IF EXISTS posts_ad;
DROP TRIGGER IF EXISTS posts_au;

-- Drop FTS table
DROP TABLE IF EXISTS posts_fts;

-- Remove text column from posts
ALTER TABLE posts DROP COLUMN text;
25 changes: 25 additions & 0 deletions db/migrations/20250109194601_add_text_column.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
ALTER TABLE posts ADD COLUMN text TEXT;

-- Create virtual FTS table
CREATE VIRTUAL TABLE posts_fts USING fts5(
text,
content='posts',
content_rowid='id',
tokenize='unicode61'
);

-- Trigger to keep FTS table in sync on insert
CREATE TRIGGER posts_ai AFTER INSERT ON posts BEGIN
INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text);
END;

-- Trigger to keep FTS table in sync on delete
CREATE TRIGGER posts_ad AFTER DELETE ON posts BEGIN
INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text);
END;

-- Trigger to keep FTS table in sync on update
CREATE TRIGGER posts_au AFTER UPDATE ON posts BEGIN
INSERT INTO posts_fts(posts_fts, rowid, text) VALUES('delete', old.id, old.text);
INSERT INTO posts_fts(rowid, text) VALUES (new.id, new.text);
END;
26 changes: 21 additions & 5 deletions db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"norsky/models"
"strconv"
"strings"
"time"

sqlbuilder "github.com/huandu/go-sqlbuilder"
Expand All @@ -25,22 +26,37 @@ func NewReader(database string) *Reader {
}
}

func (reader *Reader) GetFeed(langs []string, limit int, postId int64) ([]models.FeedPost, error) {
func (reader *Reader) GetFeed(langs []string, keywords []string, limit int, postId int64) ([]models.FeedPost, error) {
sb := sqlbuilder.NewSelectBuilder()
sb.Select("DISTINCT posts.id", "posts.uri").From("posts")

if postId != 0 {
sb.Where(sb.LessThan("posts.id", postId))
}

// Build language conditions if specified
if len(langs) > 0 {
sb.Join("post_languages", "posts.id = post_languages.post_id")
// Build OR conditions for each language
conditions := make([]string, len(langs))
langConditions := make([]string, len(langs))
for i, lang := range langs {
conditions[i] = sb.Equal("post_languages.language", lang)
langConditions[i] = sb.Equal("post_languages.language", lang)
}
sb.Where(sb.Or(conditions...))
sb.Where(sb.Or(langConditions...))
}

// Use FTS search for keywords if specified
if len(keywords) > 0 {
// Join with FTS table and build search query
sb.Join("posts_fts", "posts.id = posts_fts.rowid")
searchTerms := make([]string, len(keywords))
for i, keyword := range keywords {
// Escape quotes and use * for prefix matching
escaped := strings.ReplaceAll(keyword, "'", "''")
searchTerms[i] = fmt.Sprintf("%s*", escaped)
}
// Combine terms with OR
searchQuery := strings.Join(searchTerms, " OR ")
sb.Where(fmt.Sprintf("posts_fts MATCH '%s'", searchQuery))
}

sb.OrderBy("posts.id").Desc()
Expand Down
58 changes: 35 additions & 23 deletions db/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"fmt"
"norsky/models"
"time"

Expand Down Expand Up @@ -66,45 +67,56 @@ func processSeq(db *sql.DB, evt models.ProcessSeqEvent) error {
return nil
}

func createPost(db *sql.DB, post models.Post) error {
func createPost(db *sql.DB, post models.Post) (int64, error) {
log.WithFields(log.Fields{
"uri": post.Uri,
"languages": post.Languages,
"uri": post.Uri,
"languages": post.Languages,
"created_at": time.Unix(post.CreatedAt, 0).Format(time.RFC3339),
// Record lag from when the post was created to when it was processed
"lagSeconds": time.Since(time.Unix(post.CreatedAt, 0)).Seconds(),
}).Info("Creating post")

// Start a transaction since we need to insert into multiple tables
tx, err := db.Begin()
if err != nil {
return 0, fmt.Errorf("transaction error: %w", err)
}
defer tx.Rollback()

// Post insert query
insertPost := sqlbuilder.NewInsertBuilder()
sql, args := insertPost.InsertInto("posts").Cols("uri", "created_at").Values(post.Uri, post.CreatedAt).Build()
insertPost.InsertInto("posts").Cols("uri", "created_at", "text")
insertPost.Values(post.Uri, post.CreatedAt, post.Text)

// Spread args
res, err := db.Exec(sql, args...)
sql, args := insertPost.Build()

result, err := tx.Exec(sql, args...)
if err != nil {
log.Error("Error inserting post", err)
return err
return 0, fmt.Errorf("insert error: %w", err)
}

// Get inserted id
id, err := res.LastInsertId()
postId, err := result.LastInsertId()
if err != nil {
log.Error("Error getting inserted id", err)
return err
return 0, fmt.Errorf("last insert id error: %w", err)
}

// Post languages insert query
insertLangs := sqlbuilder.NewInsertBuilder()
insertLangs.InsertInto("post_languages").Cols("post_id", "language")
// Insert languages
for _, lang := range post.Languages {
insertLangs.Values(id, lang)
}
sql, args = insertLangs.Build()
insertLang := sqlbuilder.NewInsertBuilder()
insertLang.InsertInto("post_languages").Cols("post_id", "language")
insertLang.Values(postId, lang)

_, err = db.Exec(sql, args...)
sql, args := insertLang.Build()
if _, err := tx.Exec(sql, args...); err != nil {
return 0, fmt.Errorf("language insert error: %w", err)
}
}

if err != nil {
log.Error("Error inserting languages", err)
return err
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("commit error: %w", err)
}

return nil
return postId, nil
}

func deletePost(db *sql.DB, post models.Post) error {
Expand Down
16 changes: 16 additions & 0 deletions feeds.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,19 @@ display_name = "Norsk (Norwegian)"
description = "A feed of Bluesky posts written in Norwegian bokmål, nynorsk and sami"
avatar_path = "./assets/avatar.png"
languages = ["nb", "nn", "no", "se"]

[[feeds]]
id = "tv-shows"
display_name = "Norwegian TV Shows"
description = "A feed of Bluesky posts about Norwegian TV shows"
avatar_path = "./assets/avatar.png"
languages = ["nb", "nn", "no"]
keywords = ["tv-serie", "tvserie", "nrk", "tv2", "netflix"]

[[feeds]]
id = "tech"
display_name = "Norwegian Tech"
description = "A feed of Bluesky posts about technology in Norwegian"
avatar_path = "./assets/avatar.png"
languages = ["nb", "nn", "no"]
keywords = ["teknologi", "programmering", "koding", "kunstig intelligens", "ai"]
12 changes: 7 additions & 5 deletions feeds/feeds.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
type Algorithm func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error)

// Reuse genericAlgo for all algorithms
func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string) (*models.FeedResponse, error) {
func genericAlgo(reader *db.Reader, cursor string, limit int, languages []string, keywords []string) (*models.FeedResponse, error) {
postId := safeParseCursor(cursor)

posts, err := reader.GetFeed(languages, limit+1, postId)
posts, err := reader.GetFeed(languages, keywords, limit+1, postId)
if err != nil {
log.Error("Error getting feed", err)
return nil, err
Expand Down Expand Up @@ -57,6 +57,7 @@ type Feed struct {
Description string
AvatarPath string
Languages []string
Keywords []string
Algorithm Algorithm
}

Expand All @@ -71,16 +72,17 @@ func InitializeFeeds(cfg *config.Config) map[string]Feed {
Description: feedCfg.Description,
AvatarPath: feedCfg.AvatarPath,
Languages: feedCfg.Languages,
Algorithm: createAlgorithm(feedCfg.Languages),
Keywords: feedCfg.Keywords,
Algorithm: createAlgorithm(feedCfg.Languages, feedCfg.Keywords),
}
}

return feeds
}

// Helper function to create an algorithm based on languages
func createAlgorithm(languages []string) Algorithm {
func createAlgorithm(languages []string, keywords []string) Algorithm {
return func(reader *db.Reader, cursor string, limit int) (*models.FeedResponse, error) {
return genericAlgo(reader, cursor, limit, languages)
return genericAlgo(reader, cursor, limit, languages, keywords)
}
}
Loading
Loading