Skip to content

Commit

Permalink
make yt processed status persistent
Browse files Browse the repository at this point in the history
  • Loading branch information
umputun committed Mar 29, 2022
1 parent ab43ac2 commit 51c16f3
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 55 deletions.
95 changes: 91 additions & 4 deletions app/youtube/mocks/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 13 additions & 7 deletions app/youtube/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Service struct {
RSSFileStore RSSFileStore
KeepPerChannel int
RootURL string
processed map[string]bool
}

// FeedInfo contains channel or feed ID, readable name and other per-feed info
Expand Down Expand Up @@ -61,6 +60,8 @@ type StoreService interface {
Load(channelID string, max int) ([]ytfeed.Entry, error)
Exist(entry ytfeed.Entry) (bool, error)
RemoveOld(channelID string, keep int) ([]string, error)
SetProcessed(entry ytfeed.Entry) error
CheckProcessed(entry ytfeed.Entry) (found bool, ts time.Time, err error)
}

// Do is a blocking function that downloads audio from youtube channels and updates metadata
Expand All @@ -71,7 +72,6 @@ func (s *Service) Do(ctx context.Context) error {
log.Printf("[INFO] youtube feed %+v", f)
}

s.processed = make(map[string]bool)
tick := time.NewTicker(s.CheckDuration)
defer tick.Stop()

Expand Down Expand Up @@ -179,9 +179,14 @@ func (s *Service) procChannels(ctx context.Context) error {

// check if we already processed this entry.
// this is needed to avoid infinite get/remove loop when the original feed is updated in place
if _, ok := s.processed[entry.UID()]; ok {
found, procTS, procErr := s.Store.CheckProcessed(entry)
if procErr != nil {
log.Printf("[WARN] can't get processed status for %s, %+v", entry.VideoID, feedInfo)
}
if procErr == nil && found {
processed++
log.Printf("[INFO] skipping already processed entry %s, %+v", entry.VideoID, feedInfo)
log.Printf("[INFO] skipping already processed entry %s at %s, %+v",
entry.VideoID, procTS.Format(time.RFC3339), feedInfo)
continue
}

Expand All @@ -205,9 +210,10 @@ func (s *Service) procChannels(ctx context.Context) error {
log.Printf("[WARN] attempt to save dup entry %+v", entry)
}
changed = true
s.processed[entry.UID()] = true // track processed entries
log.Printf("[INFO] saved %s (%s) to %s, channel: %+v, total processed: %d",
entry.VideoID, entry.Title, file, feedInfo, len(s.processed))
if procErr = s.Store.SetProcessed(entry); procErr != nil {
log.Printf("[WARN] failed to set processed status for %s: %v", entry.VideoID, procErr)
}
log.Printf("[INFO] saved %s (%s) to %s, channel: %+v", entry.VideoID, entry.Title, file, feedInfo)
}

if changed { // save rss feed to fs if there are new entries
Expand Down
73 changes: 29 additions & 44 deletions app/youtube/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package youtube
import (
"context"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ytfeed "github.com/umputun/feed-master/app/youtube/feed"
"github.com/umputun/feed-master/app/youtube/store"
bolt "go.etcd.io/bbolt"

"github.com/umputun/feed-master/app/youtube/mocks"
)
Expand All @@ -20,9 +22,9 @@ func TestService_Do(t *testing.T) {
chans := &mocks.ChannelServiceMock{
GetFunc: func(ctx context.Context, chanID string, feedType ytfeed.Type) ([]ytfeed.Entry, error) {
return []ytfeed.Entry{
{ChannelID: chanID, VideoID: "vid1", Title: "title1"},
{ChannelID: chanID, VideoID: "vid2", Title: "title2"},
{ChannelID: chanID, VideoID: "vid2", Title: "title2"}, // duplicate
{ChannelID: chanID, VideoID: "vid1", Title: "title1", Published: time.Now()},
{ChannelID: chanID, VideoID: "vid2", Title: "title2", Published: time.Now()},
{ChannelID: chanID, VideoID: "vid2", Title: "title2", Published: time.Now()}, // duplicate
}, nil
},
}
Expand All @@ -31,36 +33,21 @@ func TestService_Do(t *testing.T) {
return "/tmp/" + fname + ".mp3", nil
},
}
store := &mocks.StoreServiceMock{
ExistFunc: func(entry ytfeed.Entry) (bool, error) {
if entry.VideoID == "vid2" {
return true, nil
}
return false, nil
},
SaveFunc: func(entry ytfeed.Entry) (bool, error) {
return true, nil
},

RemoveOldFunc: func(channelID string, keep int) ([]string, error) {
return []string{"/tmp/blah.mp3"}, nil
},
LoadFunc: func(channelID string, max int) ([]ytfeed.Entry, error) {
return []ytfeed.Entry{
{ChannelID: channelID, VideoID: "vid1", Title: "title1"},
{ChannelID: channelID, VideoID: "vid2", Title: "title2"},
}, nil
},
}
tmpfile := filepath.Join(os.TempDir(), "test.db")
defer os.Remove(tmpfile)

db, err := bolt.Open(tmpfile, 0o600, &bolt.Options{Timeout: 1 * time.Second})
require.NoError(t, err)
boltStore := &store.BoltDB{DB: db}
svc := Service{
Feeds: []FeedInfo{
{ID: "channel1", Name: "name1", Type: ytfeed.FTChannel},
{ID: "channel2", Name: "name2", Type: ytfeed.FTPlaylist},
},
Downloader: downloader,
ChannelService: chans,
Store: store,
Store: boltStore,
CheckDuration: time.Millisecond * 500,
KeepPerChannel: 10,
RSSFileStore: RSSFileStore{Enabled: true, Location: "/tmp"},
Expand All @@ -69,7 +56,7 @@ func TestService_Do(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*900)
defer cancel()

err := svc.Do(ctx)
err = svc.Do(ctx)
assert.EqualError(t, err, "context deadline exceeded")

require.Equal(t, 4, len(chans.GetCalls()))
Expand All @@ -80,24 +67,22 @@ func TestService_Do(t *testing.T) {
assert.Equal(t, "channel1", chans.GetCalls()[2].ChanID)
assert.Equal(t, "channel2", chans.GetCalls()[3].ChanID)

require.Equal(t, 12, len(store.ExistCalls()))
require.Equal(t, "channel1", store.ExistCalls()[0].Entry.ChannelID)
require.Equal(t, "channel1", store.ExistCalls()[1].Entry.ChannelID)
require.Equal(t, "channel1", store.ExistCalls()[2].Entry.ChannelID)
require.Equal(t, "channel2", store.ExistCalls()[3].Entry.ChannelID)
require.Equal(t, "channel2", store.ExistCalls()[4].Entry.ChannelID)
res, err := boltStore.Load("channel1", 10)
require.NoError(t, err)
assert.Equal(t, 2, len(res), "two entries for channel1, skipped duplicate")
assert.Equal(t, "vid2", res[0].VideoID)
assert.Equal(t, "vid1", res[1].VideoID)

require.Equal(t, 2, len(downloader.GetCalls()))
res, err = boltStore.Load("channel2", 10)
require.NoError(t, err)
assert.Equal(t, 2, len(res), "two entries for channel1, skipped duplicate")
assert.Equal(t, "vid2", res[0].VideoID)
assert.Equal(t, "vid1", res[1].VideoID)

require.Equal(t, 4, len(downloader.GetCalls()))
require.Equal(t, "vid1", downloader.GetCalls()[0].ID)
require.True(t, downloader.GetCalls()[0].Fname != "")

require.Equal(t, 2, len(store.SaveCalls()))
require.Equal(t, "channel1", store.SaveCalls()[0].Entry.ChannelID)
require.Equal(t, "vid1", store.SaveCalls()[0].Entry.VideoID)
require.Equal(t, "name1: title1", store.SaveCalls()[0].Entry.Title)
require.True(t, strings.HasPrefix(store.SaveCalls()[0].Entry.File, "/tmp/"))
require.True(t, strings.HasSuffix(store.SaveCalls()[0].Entry.File, ".mp3"))

rssData, err := os.ReadFile("/tmp/channel1.xml")
require.NoError(t, err)
t.Logf("%s", string(rssData))
Expand All @@ -113,7 +98,7 @@ func TestService_Do(t *testing.T) {

// nolint:dupl // test if very similar to TestService_RSSFeed
func TestService_RSSFeed(t *testing.T) {
store := &mocks.StoreServiceMock{
storeSvc := &mocks.StoreServiceMock{
LoadFunc: func(channelID string, max int) ([]ytfeed.Entry, error) {
res := []ytfeed.Entry{
{ChannelID: "channel1", VideoID: "vid1", Title: "title1", File: "/tmp/file1.mp3"},
Expand All @@ -131,7 +116,7 @@ func TestService_RSSFeed(t *testing.T) {
{ID: "channel1", Name: "name1", Type: ytfeed.FTChannel},
{ID: "channel2", Name: "name2", Type: ytfeed.FTPlaylist},
},
Store: store,
Store: storeSvc,
RootURL: "http://localhost:8080/yt",
KeepPerChannel: 10,
}
Expand All @@ -151,7 +136,7 @@ func TestService_RSSFeed(t *testing.T) {

// nolint:dupl // test if very similar to TestService_RSSFeed
func TestService_RSSFeedPlayList(t *testing.T) {
store := &mocks.StoreServiceMock{
storeSvc := &mocks.StoreServiceMock{
LoadFunc: func(channelID string, max int) ([]ytfeed.Entry, error) {
res := []ytfeed.Entry{
{ChannelID: "channel1", VideoID: "vid1", Title: "title1", File: "/tmp/file1.mp3"},
Expand All @@ -169,7 +154,7 @@ func TestService_RSSFeedPlayList(t *testing.T) {
{ID: "channel1", Name: "name1", Type: ytfeed.FTPlaylist},
{ID: "channel2", Name: "name2", Type: ytfeed.FTPlaylist},
},
Store: store,
Store: storeSvc,
RootURL: "http://localhost:8080/yt",
KeepPerChannel: 10,
}
Expand Down
Loading

0 comments on commit 51c16f3

Please sign in to comment.