Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make yt processed status persistent #62

Merged
merged 1 commit into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions app/youtube/mocks/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 13 additions & 7 deletions app/youtube/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Service struct {
RSSFileStore RSSFileStore
KeepPerChannel int
RootURL string
processed map[string]bool
}

// FeedInfo contains channel or feed ID, readable name and other per-feed info
Expand Down Expand Up @@ -61,6 +60,8 @@ type StoreService interface {
Load(channelID string, max int) ([]ytfeed.Entry, error)
Exist(entry ytfeed.Entry) (bool, error)
RemoveOld(channelID string, keep int) ([]string, error)
SetProcessed(entry ytfeed.Entry) error
CheckProcessed(entry ytfeed.Entry) (found bool, ts time.Time, err error)
}

// Do is a blocking function that downloads audio from youtube channels and updates metadata
Expand All @@ -71,7 +72,6 @@ func (s *Service) Do(ctx context.Context) error {
log.Printf("[INFO] youtube feed %+v", f)
}

s.processed = make(map[string]bool)
tick := time.NewTicker(s.CheckDuration)
defer tick.Stop()

Expand Down Expand Up @@ -179,9 +179,14 @@ func (s *Service) procChannels(ctx context.Context) error {

// check if we already processed this entry.
// this is needed to avoid infinite get/remove loop when the original feed is updated in place
if _, ok := s.processed[entry.UID()]; ok {
found, procTS, procErr := s.Store.CheckProcessed(entry)
if procErr != nil {
log.Printf("[WARN] can't get processed status for %s, %+v", entry.VideoID, feedInfo)
}
if procErr == nil && found {
processed++
log.Printf("[INFO] skipping already processed entry %s, %+v", entry.VideoID, feedInfo)
log.Printf("[INFO] skipping already processed entry %s at %s, %+v",
entry.VideoID, procTS.Format(time.RFC3339), feedInfo)
continue
}

Expand All @@ -205,9 +210,10 @@ func (s *Service) procChannels(ctx context.Context) error {
log.Printf("[WARN] attempt to save dup entry %+v", entry)
}
changed = true
s.processed[entry.UID()] = true // track processed entries
log.Printf("[INFO] saved %s (%s) to %s, channel: %+v, total processed: %d",
entry.VideoID, entry.Title, file, feedInfo, len(s.processed))
if procErr = s.Store.SetProcessed(entry); procErr != nil {
log.Printf("[WARN] failed to set processed status for %s: %v", entry.VideoID, procErr)
}
log.Printf("[INFO] saved %s (%s) to %s, channel: %+v", entry.VideoID, entry.Title, file, feedInfo)
}

if changed { // save rss feed to fs if there are new entries
Expand Down
73 changes: 29 additions & 44 deletions app/youtube/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package youtube
import (
"context"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ytfeed "github.com/umputun/feed-master/app/youtube/feed"
"github.com/umputun/feed-master/app/youtube/store"
bolt "go.etcd.io/bbolt"

"github.com/umputun/feed-master/app/youtube/mocks"
)
Expand All @@ -20,9 +22,9 @@ func TestService_Do(t *testing.T) {
chans := &mocks.ChannelServiceMock{
GetFunc: func(ctx context.Context, chanID string, feedType ytfeed.Type) ([]ytfeed.Entry, error) {
return []ytfeed.Entry{
{ChannelID: chanID, VideoID: "vid1", Title: "title1"},
{ChannelID: chanID, VideoID: "vid2", Title: "title2"},
{ChannelID: chanID, VideoID: "vid2", Title: "title2"}, // duplicate
{ChannelID: chanID, VideoID: "vid1", Title: "title1", Published: time.Now()},
{ChannelID: chanID, VideoID: "vid2", Title: "title2", Published: time.Now()},
{ChannelID: chanID, VideoID: "vid2", Title: "title2", Published: time.Now()}, // duplicate
}, nil
},
}
Expand All @@ -31,36 +33,21 @@ func TestService_Do(t *testing.T) {
return "/tmp/" + fname + ".mp3", nil
},
}
store := &mocks.StoreServiceMock{
ExistFunc: func(entry ytfeed.Entry) (bool, error) {
if entry.VideoID == "vid2" {
return true, nil
}
return false, nil
},
SaveFunc: func(entry ytfeed.Entry) (bool, error) {
return true, nil
},

RemoveOldFunc: func(channelID string, keep int) ([]string, error) {
return []string{"/tmp/blah.mp3"}, nil
},
LoadFunc: func(channelID string, max int) ([]ytfeed.Entry, error) {
return []ytfeed.Entry{
{ChannelID: channelID, VideoID: "vid1", Title: "title1"},
{ChannelID: channelID, VideoID: "vid2", Title: "title2"},
}, nil
},
}
tmpfile := filepath.Join(os.TempDir(), "test.db")
defer os.Remove(tmpfile)

db, err := bolt.Open(tmpfile, 0o600, &bolt.Options{Timeout: 1 * time.Second})
require.NoError(t, err)
boltStore := &store.BoltDB{DB: db}
svc := Service{
Feeds: []FeedInfo{
{ID: "channel1", Name: "name1", Type: ytfeed.FTChannel},
{ID: "channel2", Name: "name2", Type: ytfeed.FTPlaylist},
},
Downloader: downloader,
ChannelService: chans,
Store: store,
Store: boltStore,
CheckDuration: time.Millisecond * 500,
KeepPerChannel: 10,
RSSFileStore: RSSFileStore{Enabled: true, Location: "/tmp"},
Expand All @@ -69,7 +56,7 @@ func TestService_Do(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*900)
defer cancel()

err := svc.Do(ctx)
err = svc.Do(ctx)
assert.EqualError(t, err, "context deadline exceeded")

require.Equal(t, 4, len(chans.GetCalls()))
Expand All @@ -80,24 +67,22 @@ func TestService_Do(t *testing.T) {
assert.Equal(t, "channel1", chans.GetCalls()[2].ChanID)
assert.Equal(t, "channel2", chans.GetCalls()[3].ChanID)

require.Equal(t, 12, len(store.ExistCalls()))
require.Equal(t, "channel1", store.ExistCalls()[0].Entry.ChannelID)
require.Equal(t, "channel1", store.ExistCalls()[1].Entry.ChannelID)
require.Equal(t, "channel1", store.ExistCalls()[2].Entry.ChannelID)
require.Equal(t, "channel2", store.ExistCalls()[3].Entry.ChannelID)
require.Equal(t, "channel2", store.ExistCalls()[4].Entry.ChannelID)
res, err := boltStore.Load("channel1", 10)
require.NoError(t, err)
assert.Equal(t, 2, len(res), "two entries for channel1, skipped duplicate")
assert.Equal(t, "vid2", res[0].VideoID)
assert.Equal(t, "vid1", res[1].VideoID)

require.Equal(t, 2, len(downloader.GetCalls()))
res, err = boltStore.Load("channel2", 10)
require.NoError(t, err)
assert.Equal(t, 2, len(res), "two entries for channel1, skipped duplicate")
assert.Equal(t, "vid2", res[0].VideoID)
assert.Equal(t, "vid1", res[1].VideoID)

require.Equal(t, 4, len(downloader.GetCalls()))
require.Equal(t, "vid1", downloader.GetCalls()[0].ID)
require.True(t, downloader.GetCalls()[0].Fname != "")

require.Equal(t, 2, len(store.SaveCalls()))
require.Equal(t, "channel1", store.SaveCalls()[0].Entry.ChannelID)
require.Equal(t, "vid1", store.SaveCalls()[0].Entry.VideoID)
require.Equal(t, "name1: title1", store.SaveCalls()[0].Entry.Title)
require.True(t, strings.HasPrefix(store.SaveCalls()[0].Entry.File, "/tmp/"))
require.True(t, strings.HasSuffix(store.SaveCalls()[0].Entry.File, ".mp3"))

rssData, err := os.ReadFile("/tmp/channel1.xml")
require.NoError(t, err)
t.Logf("%s", string(rssData))
Expand All @@ -113,7 +98,7 @@ func TestService_Do(t *testing.T) {

// nolint:dupl // test if very similar to TestService_RSSFeed
func TestService_RSSFeed(t *testing.T) {
store := &mocks.StoreServiceMock{
storeSvc := &mocks.StoreServiceMock{
LoadFunc: func(channelID string, max int) ([]ytfeed.Entry, error) {
res := []ytfeed.Entry{
{ChannelID: "channel1", VideoID: "vid1", Title: "title1", File: "/tmp/file1.mp3"},
Expand All @@ -131,7 +116,7 @@ func TestService_RSSFeed(t *testing.T) {
{ID: "channel1", Name: "name1", Type: ytfeed.FTChannel},
{ID: "channel2", Name: "name2", Type: ytfeed.FTPlaylist},
},
Store: store,
Store: storeSvc,
RootURL: "http://localhost:8080/yt",
KeepPerChannel: 10,
}
Expand All @@ -151,7 +136,7 @@ func TestService_RSSFeed(t *testing.T) {

// nolint:dupl // test if very similar to TestService_RSSFeed
func TestService_RSSFeedPlayList(t *testing.T) {
store := &mocks.StoreServiceMock{
storeSvc := &mocks.StoreServiceMock{
LoadFunc: func(channelID string, max int) ([]ytfeed.Entry, error) {
res := []ytfeed.Entry{
{ChannelID: "channel1", VideoID: "vid1", Title: "title1", File: "/tmp/file1.mp3"},
Expand All @@ -169,7 +154,7 @@ func TestService_RSSFeedPlayList(t *testing.T) {
{ID: "channel1", Name: "name1", Type: ytfeed.FTPlaylist},
{ID: "channel2", Name: "name2", Type: ytfeed.FTPlaylist},
},
Store: store,
Store: storeSvc,
RootURL: "http://localhost:8080/yt",
KeepPerChannel: 10,
}
Expand Down
Loading