diff --git a/src/Ombi.Api.Plex/IPlexApi.cs b/src/Ombi.Api.Plex/IPlexApi.cs index 746c85b779..8bc548bbc3 100644 --- a/src/Ombi.Api.Plex/IPlexApi.cs +++ b/src/Ombi.Api.Plex/IPlexApi.cs @@ -24,6 +24,7 @@ public interface IPlexApi Task GetUsers(string authToken); Task GetAccount(string authToken); Task GetRecentlyAdded(string authToken, string uri, string sectionId); + Task GetPlayed(string authToken, string uri, string sectionId, int maxNumberOfItems = 0); Task GetPin(int pinId); Task GetOAuthUrl(string code, string applicationUrl); Task AddUser(string emailAddress, string serverId, string authToken, int[] libs); diff --git a/src/Ombi.Api.Plex/Models/PlexMediaFilterType.cs b/src/Ombi.Api.Plex/Models/PlexMediaFilterType.cs new file mode 100644 index 0000000000..6d0bb98eb1 --- /dev/null +++ b/src/Ombi.Api.Plex/Models/PlexMediaFilterType.cs @@ -0,0 +1,9 @@ +namespace Ombi.Api.Plex.Models +{ + public enum PlexMediaFilterType + { + Movie = 1, + Show = 2, + Episode = 4, + } +} \ No newline at end of file diff --git a/src/Ombi.Api.Plex/PlexApi.cs b/src/Ombi.Api.Plex/PlexApi.cs index fae4a78b9b..c08226b18c 100644 --- a/src/Ombi.Api.Plex/PlexApi.cs +++ b/src/Ombi.Api.Plex/PlexApi.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Net; using System.Net.Http; using System.Threading; @@ -212,6 +213,29 @@ public async Task GetRecentlyAdded(string authToken, string uri, s return await Api.Request(request); } + + public async Task GetPlayed(string authToken, string uri, string sectionId, int maxNumberOfItems) + { + var request = new Request($"library/sections/{sectionId}/all", uri, HttpMethod.Get); + await AddHeaders(request, authToken); + request.AddQueryString("unwatched", "0"); + request.AddQueryString("sort", "lastViewedAt:desc"); + + // for some reason, we need to explicitely include episodes for them to be returned by the API (movies are fine) + // also the order seems of importance: "4,1" doesn't work but "1,4" does work + var types = new List { (int) PlexMediaFilterType.Movie, (int) PlexMediaFilterType.Episode }; + var typeFilter = string.Join(",", types); + request.AddQueryString("type", typeFilter); + + if (maxNumberOfItems != 0) + { + AddLimitHeaders(request, 0, maxNumberOfItems); + } + + + return await Api.Request(request); + } + public async Task GetPin(int pinId) { var request = new Request($"api/v2/pins/{pinId}", "https://plex.tv/", HttpMethod.Get); diff --git a/src/Ombi.Core.Tests/Authentication/OmbiUserManagerTests.cs b/src/Ombi.Core.Tests/Authentication/OmbiUserManagerTests.cs index f4e6f59a34..a603d3e72b 100644 --- a/src/Ombi.Core.Tests/Authentication/OmbiUserManagerTests.cs +++ b/src/Ombi.Core.Tests/Authentication/OmbiUserManagerTests.cs @@ -30,7 +30,7 @@ public void Setup() AuthenticationSettings.Setup(x => x.GetSettingsAsync()) .ReturnsAsync(new AuthenticationSettings()); _um = new OmbiUserManager(UserStore.Object, null, null, null, null, null, null, null, null, - PlexApi.Object, null, null, null, null, AuthenticationSettings.Object); + PlexApi.Object, null, null, null, null, AuthenticationSettings.Object, null); } public OmbiUserManager _um { get; set; } diff --git a/src/Ombi.Core/Authentication/OmbiUserManager.cs b/src/Ombi.Core/Authentication/OmbiUserManager.cs index e80469a00d..ecdc2c785d 100644 --- a/src/Ombi.Core/Authentication/OmbiUserManager.cs +++ b/src/Ombi.Core/Authentication/OmbiUserManager.cs @@ -27,6 +27,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Identity; using Microsoft.EntityFrameworkCore; @@ -41,6 +42,7 @@ using Ombi.Helpers; using Ombi.Settings.Settings.Models; using Ombi.Store.Entities; +using Ombi.Store.Repository; namespace Ombi.Core.Authentication { @@ -52,7 +54,7 @@ public OmbiUserManager(IUserStore store, IOptions opt IdentityErrorDescriber errors, IServiceProvider services, ILogger> logger, IPlexApi plexApi, IEmbyApiFactory embyApi, ISettingsService embySettings, IJellyfinApiFactory jellyfinApi, ISettingsService jellyfinSettings, - ISettingsService auth) + ISettingsService auth, IRepository userError) : base(store, optionsAccessor, passwordHasher, userValidators, passwordValidators, keyNormalizer, errors, services, logger) { _plexApi = plexApi; @@ -61,6 +63,8 @@ public OmbiUserManager(IUserStore store, IOptions opt _embySettings = embySettings; _jellyfinSettings = jellyfinSettings; _authSettings = auth; + _userError = userError; + _logger = logger; } private readonly IPlexApi _plexApi; @@ -69,6 +73,8 @@ public OmbiUserManager(IUserStore store, IOptions opt private readonly ISettingsService _embySettings; private readonly ISettingsService _jellyfinSettings; private readonly ISettingsService _authSettings; + private readonly IRepository _userError; + private readonly Microsoft.Extensions.Logging.ILogger _logger; private string _clientIpAddress; public string ClientIpAddress { get => _clientIpAddress; set => _clientIpAddress = value; } @@ -139,6 +145,38 @@ public async Task GetOmbiUserFromPlexToken(string plexToken) } + public async Task> GetPlexUsersWithValidTokens() + { + var plexUsersWithTokens = Users.Where(x => x.UserType == UserType.PlexUser && x.MediaServerToken != null).ToList(); + _logger.LogDebug($"Found {plexUsersWithTokens.Count} users with tokens"); + var result = new List(); + + foreach (var user in plexUsersWithTokens) + { + // Check if the user has errors and the token is the same (not refreshed) + var failedUser = await _userError.GetAll().Where(x => x.UserId == user.Id).FirstOrDefaultAsync(); + if (failedUser != null) + { + if (failedUser.MediaServerToken.Equals(user.MediaServerToken)) + { + _logger.LogWarning($"Skipping user '{user.UserName}' as they failed previously and the token has not yet been refreshed. They need to re-authenticate with Ombi"); + continue; + } + else + { + // remove that guy + await _userError.Delete(failedUser); + failedUser = null; + } + } + if (failedUser == null) + { + result.Add(user); + } + } + return result; + + } /// /// Sign the user into plex and make sure we can get the authentication token. diff --git a/src/Ombi.DependencyInjection/IocExtensions.cs b/src/Ombi.DependencyInjection/IocExtensions.cs index 8a55099633..9298f27676 100644 --- a/src/Ombi.DependencyInjection/IocExtensions.cs +++ b/src/Ombi.DependencyInjection/IocExtensions.cs @@ -244,6 +244,7 @@ public static void RegisterJobs(this IServiceCollection services) services.AddSingleton(); services.AddTransient(); + services.AddTransient(); services.AddTransient(); services.AddTransient(); services.AddTransient(); diff --git a/src/Ombi.Schedule/Jobs/Plex/Interfaces/IPlexPlayedSync.cs b/src/Ombi.Schedule/Jobs/Plex/Interfaces/IPlexPlayedSync.cs new file mode 100644 index 0000000000..a0a663c67c --- /dev/null +++ b/src/Ombi.Schedule/Jobs/Plex/Interfaces/IPlexPlayedSync.cs @@ -0,0 +1,8 @@ +using Quartz; + +namespace Ombi.Schedule.Jobs +{ + public interface IPlexPlayedSync : IJob + { + } +} \ No newline at end of file diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs b/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs index eb31a5c8f0..6ce64a8fb3 100644 --- a/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs +++ b/src/Ombi.Schedule/Jobs/Plex/PlexContentSync.cs @@ -35,82 +35,56 @@ using Ombi.Api.Plex.Models; using Ombi.Api.TheMovieDb; using Ombi.Api.TheMovieDb.Models; +using Ombi.Core.Services; using Ombi.Core.Settings; using Ombi.Core.Settings.Models.External; using Ombi.Helpers; using Ombi.Hubs; using Ombi.Schedule.Jobs.Plex.Interfaces; using Ombi.Schedule.Jobs.Plex.Models; +using Ombi.Settings.Settings.Models; using Ombi.Store.Entities; using Ombi.Store.Repository; using Quartz; namespace Ombi.Schedule.Jobs.Plex { - public class PlexContentSync : IPlexContentSync + public class PlexContentSync : PlexLibrarySync, IPlexContentSync { private readonly IMovieDbApi _movieApi; private readonly IMediaCacheService _mediaCacheService; - - public PlexContentSync(ISettingsService plex, IPlexApi plexApi, ILogger logger, IPlexContentRepository repo, - IPlexEpisodeSync epsiodeSync, INotificationHubService notificationHubService, IMovieDbApi movieDbApi, IMediaCacheService mediaCacheService) + private readonly IFeatureService _feature; + private ProcessedContent _processedContent; + + + public PlexContentSync( + ISettingsService plex, + IPlexApi plexApi, ILogger logger, + IPlexContentRepository repo, + IPlexEpisodeSync epsiodeSync, + INotificationHubService notificationHubService, + IMovieDbApi movieDbApi, + IMediaCacheService mediaCacheService, + IFeatureService feature): + base(plex, plexApi, logger, notificationHubService) { - Plex = plex; - PlexApi = plexApi; - Logger = logger; Repo = repo; EpisodeSync = epsiodeSync; - Notification = notificationHubService; _movieApi = movieDbApi; _mediaCacheService = mediaCacheService; + _feature = feature; Plex.ClearCache(); } - private ISettingsService Plex { get; } - private IPlexApi PlexApi { get; } - private ILogger Logger { get; } private IPlexContentRepository Repo { get; } private IPlexEpisodeSync EpisodeSync { get; } - private INotificationHubService Notification { get; set; } - - public async Task Execute(IJobExecutionContext context) + public async override Task Execute(IJobExecutionContext context) { - JobDataMap dataMap = context.JobDetail.JobDataMap; - var recentlyAddedSearch = dataMap.GetBooleanValueFromString(JobDataKeys.RecentlyAddedSearch); - - var plexSettings = await Plex.GetSettingsAsync(); - if (!plexSettings.Enable) - { - return; - } - await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Started" : "Plex Content Sync Started"); - if (!ValidateSettings(plexSettings)) - { - Logger.LogError("Plex Settings are not valid"); - await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync, Settings Not Valid" : "Plex Content, Settings Not Valid"); - return; - } - var processedContent = new ProcessedContent(); - Logger.LogInformation(recentlyAddedSearch - ? "Starting Plex Content Cacher Recently Added Scan" - : "Starting Plex Content Cacher"); - try - { - if (recentlyAddedSearch) - { - processedContent = await StartTheCache(plexSettings, true); - } - else - { - await StartTheCache(plexSettings, false); - } - } - catch (Exception e) - { - await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Errored" : "Plex Content Sync Errored"); - Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content"); - } + + _processedContent = new ProcessedContent(); + await base.Execute(context); + if (!recentlyAddedSearch) { await NotifyClient("Plex Sync - Starting Episode Sync"); @@ -118,53 +92,32 @@ public async Task Execute(IJobExecutionContext context) await OmbiQuartz.TriggerJob(nameof(IPlexEpisodeSync), "Plex"); } - if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch) + if ((_processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch) { await NotifyClient("Plex Sync - Checking if any requests are now available"); Logger.LogInformation("Kicking off Plex Availability Checker"); await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex"); } - var processedCont = processedContent?.Content?.Count() ?? 0; - var processedEp = processedContent?.Episodes?.Count() ?? 0; + var processedCont = _processedContent?.Content?.Count() ?? 0; + var processedEp = _processedContent?.Episodes?.Count() ?? 0; Logger.LogInformation("Finished Plex Content Cacher, with processed content: {0}, episodes: {1}. Recently Added Scan: {2}", processedCont, processedEp, recentlyAddedSearch); await NotifyClient(recentlyAddedSearch ? $"Plex Recently Added Sync Finished, We processed {processedCont}, and {processedEp} Episodes" : "Plex Content Sync Finished"); - await _mediaCacheService.Purge(); - } - - private async Task StartTheCache(PlexSettings plexSettings, bool recentlyAddedSearch) - { - var processedContent = new ProcessedContent(); - foreach (var servers in plexSettings.Servers ?? new List()) + // Played state + var isPlayedSyncEnabled = await _feature.FeatureEnabled(FeatureNames.PlayedSync); + if(isPlayedSyncEnabled) { - try - { - Logger.LogInformation("Starting to cache the content on server {0}", servers.Name); - - if (recentlyAddedSearch) - { - // If it's recently added search then we want the results to pass to the metadata job - // This way the metadata job is smaller in size to process, it only need to look at newly added shit - return await ProcessServer(servers, true); - } - else - { - await ProcessServer(servers, false); - } - } - catch (Exception e) - { - Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name); - } + await OmbiQuartz.Scheduler.TriggerJob(new JobKey(nameof(IPlexPlayedSync), "Plex"), new JobDataMap(new Dictionary { { JobDataKeys.RecentlyAddedSearch, recentlyAddedSearch.ToString() } })); } - return processedContent; + await _mediaCacheService.Purge(); + } - private async Task ProcessServer(PlexServers servers, bool recentlyAddedSearch) + + protected override async Task ProcessServer(PlexServers servers) { - var retVal = new ProcessedContent(); var contentProcessed = new Dictionary(); var episodesProcessed = new List(); Logger.LogDebug("Getting all content from server {0}", servers.Name); @@ -282,9 +235,8 @@ private async Task ProcessServer(PlexServers servers, bool rec } } - retVal.Content = contentProcessed.Values; - retVal.Episodes = episodesProcessed; - return retVal; + _processedContent.Content = contentProcessed.Values; + _processedContent.Episodes = episodesProcessed; } public async Task MovieLoop(PlexServers servers, Mediacontainer content, HashSet contentToAdd, @@ -693,45 +645,27 @@ private async Task GetProviderIds(PlexMetadata showMetadata, PlexServerContent e /// private async Task> GetAllContent(PlexServers plexSettings, bool recentlyAddedSearch) { - var sections = await PlexApi.GetLibrarySections(plexSettings.PlexAuthToken, plexSettings.FullUri); - var libs = new List(); - if (sections != null) + + var directories = await GetEnabledLibraries(plexSettings); + + foreach (var directory in directories) { - foreach (var dir in sections.MediaContainer.Directory ?? new List()) + if (recentlyAddedSearch) { - if (plexSettings.PlexSelectedLibraries.Any()) + var container = await PlexApi.GetRecentlyAdded(plexSettings.PlexAuthToken, plexSettings.FullUri, + directory.key); + if (container != null) { - if (plexSettings.PlexSelectedLibraries.Any(x => x.Enabled)) - { - // Only get the enabled libs - var keys = plexSettings.PlexSelectedLibraries.Where(x => x.Enabled) - .Select(x => x.Key.ToString()).ToList(); - if (!keys.Contains(dir.key)) - { - Logger.LogDebug("Lib {0} is not monitored, so skipping", dir.key); - // We are not monitoring this lib - continue; - } - } - } - - if (recentlyAddedSearch) - { - var container = await PlexApi.GetRecentlyAdded(plexSettings.PlexAuthToken, plexSettings.FullUri, - dir.key); - if (container != null) - { - libs.Add(container.MediaContainer); - } + libs.Add(container.MediaContainer); } - else + } + else + { + var lib = await PlexApi.GetLibrary(plexSettings.PlexAuthToken, plexSettings.FullUri, directory.key); + if (lib != null) { - var lib = await PlexApi.GetLibrary(plexSettings.PlexAuthToken, plexSettings.FullUri, dir.key); - if (lib != null) - { - libs.Add(lib.MediaContainer); - } + libs.Add(lib.MediaContainer); } } } @@ -739,25 +673,6 @@ private async Task> GetAllContent(PlexServers plexSettings, return libs; } - private async Task NotifyClient(string message) - { - await Notification.SendNotificationToAdmins($"Plex Sync - {message}"); - } - - private static bool ValidateSettings(PlexSettings plex) - { - if (plex.Enable) - { - foreach (var server in plex.Servers ?? new List()) - { - if (string.IsNullOrEmpty(server?.Ip) || string.IsNullOrEmpty(server?.PlexAuthToken)) - { - return false; - } - } - } - return plex.Enable; - } private bool _disposed; diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexLibrarySync.cs b/src/Ombi.Schedule/Jobs/Plex/PlexLibrarySync.cs new file mode 100644 index 0000000000..5df3b3b641 --- /dev/null +++ b/src/Ombi.Schedule/Jobs/Plex/PlexLibrarySync.cs @@ -0,0 +1,163 @@ +#region Copyright +// /************************************************************************ +// Copyright (c) 2017 Jamie Rees +// File: PlexServerContentCacher.cs +// Created By: Jamie Rees +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// ************************************************************************/ +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Ombi.Api.Plex; +using Ombi.Api.Plex.Models; +using Ombi.Core.Settings; +using Ombi.Core.Settings.Models.External; +using Ombi.Helpers; +using Ombi.Hubs; +using Quartz; + +namespace Ombi.Schedule.Jobs.Plex +{ + public abstract class PlexLibrarySync + { + + public PlexLibrarySync( + ISettingsService plex, + IPlexApi plexApi, + ILogger logger, + INotificationHubService notificationHubService) + { + PlexApi = plexApi; + Plex = plex; + Logger = logger; + Notification = notificationHubService; + } + protected ILogger Logger { get; } + protected IPlexApi PlexApi { get; } + protected ISettingsService Plex { get; } + private INotificationHubService Notification { get; set; } + protected bool recentlyAddedSearch; + public virtual async Task Execute(IJobExecutionContext context) + { + JobDataMap dataMap = context.MergedJobDataMap; + recentlyAddedSearch = dataMap.GetBooleanValueFromString(JobDataKeys.RecentlyAddedSearch); + + var plexSettings = await Plex.GetSettingsAsync(); + if (!plexSettings.Enable) + { + return; + } + await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Started" : "Plex Content Sync Started"); + if (!ValidateSettings(plexSettings)) + { + Logger.LogError("Plex Settings are not valid"); + await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync, Settings Not Valid" : "Plex Content, Settings Not Valid"); + return; + } + Logger.LogInformation(recentlyAddedSearch + ? "Starting Plex Content Cacher Recently Added Scan" + : "Starting Plex Content Cacher"); + try + { + await StartTheCache(plexSettings); + } + catch (Exception e) + { + await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Errored" : "Plex Content Sync Errored"); + Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content"); + } + } + + private async Task StartTheCache(PlexSettings plexSettings) + { + foreach (var servers in plexSettings.Servers ?? new List()) + { + try + { + Logger.LogInformation("Starting to cache the content on server {0}", servers.Name); + await ProcessServer(servers); + } + catch (Exception e) + { + Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name); + } + } + } + + protected async Task> GetEnabledLibraries(PlexServers plexSettings) + { + var result = new List(); + var sections = await PlexApi.GetLibrarySections(plexSettings.PlexAuthToken, plexSettings.FullUri); + + if (sections != null) + { + foreach (var dir in sections.MediaContainer.Directory ?? new List()) + { + if (plexSettings.PlexSelectedLibraries.Any()) + { + if (plexSettings.PlexSelectedLibraries.Any(x => x.Enabled)) + { + // Only get the enabled libs + var keys = plexSettings.PlexSelectedLibraries.Where(x => x.Enabled) + .Select(x => x.Key.ToString()).ToList(); + if (!keys.Contains(dir.key)) + { + Logger.LogDebug("Lib {0} is not monitored, so skipping", dir.key); + // We are not monitoring this lib + continue; + } + } + } + result.Add(dir); + + } + } + + return result; + } + + protected abstract Task ProcessServer(PlexServers servers); + + protected async Task NotifyClient(string message) + { + await Notification.SendNotificationToAdmins($"Plex Sync - {message}"); + } + private static bool ValidateSettings(PlexSettings plex) + { + if (plex.Enable) + { + foreach (var server in plex.Servers ?? new List()) + { + if (string.IsNullOrEmpty(server?.Ip) || string.IsNullOrEmpty(server?.PlexAuthToken)) + { + return false; + } + } + } + return plex.Enable; + } + + } +} \ No newline at end of file diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexPlayedSync.cs b/src/Ombi.Schedule/Jobs/Plex/PlexPlayedSync.cs new file mode 100644 index 0000000000..709af5cc16 --- /dev/null +++ b/src/Ombi.Schedule/Jobs/Plex/PlexPlayedSync.cs @@ -0,0 +1,199 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Ombi.Api.Plex; +using Ombi.Api.Plex.Models; +using Ombi.Core.Authentication; +using Ombi.Core.Settings; +using Ombi.Core.Settings.Models.External; +using Ombi.Helpers; +using Ombi.Hubs; +using Ombi.Store.Entities; +using Ombi.Store.Repository; + +namespace Ombi.Schedule.Jobs.Plex +{ + public class PlexPlayedSync : PlexLibrarySync, IPlexPlayedSync + { + public PlexPlayedSync( + ISettingsService plex, + IPlexApi plexApi, + ILogger logger, + IPlexContentRepository contentRepo, + INotificationHubService notificationHubService, + OmbiUserManager user, + IUserPlayedMovieRepository movieRepo, + IUserPlayedEpisodeRepository episodeRepo): + base(plex, plexApi, logger, notificationHubService) + { + _contentRepo = contentRepo; + _userManager = user; + _movieRepo = movieRepo; + _episodeRepo = episodeRepo; + Plex.ClearCache(); + } + + private IPlexContentRepository _contentRepo { get; } + private OmbiUserManager _userManager { get; } + private readonly IUserPlayedMovieRepository _movieRepo; + private readonly IUserPlayedEpisodeRepository _episodeRepo; + + private const int recentlyAddedAmountToTake = 5; + + protected override async Task ProcessServer(PlexServers servers) + { + var allUsers = await _userManager.GetPlexUsersWithValidTokens(); + foreach (var user in allUsers) + { + await ProcessUser(servers, recentlyAddedSearch, user); + } + } + + private async Task ProcessUser(PlexServers servers, bool recentlyAddedSearch, OmbiUser user) + { + + var contentProcessed = new Dictionary(); + var episodesProcessed = new List(); + Logger.LogDebug($"Getting all played content from server {servers.Name} for user {user.Alias}"); + var allContent = await GetAllContent(servers, recentlyAddedSearch, user); + Logger.LogDebug("We found {0} items", allContent.Count); + + + // Let's now process this. + var episodesToAdd = new HashSet(); + var moviesToAdd = new HashSet(); + + + foreach (var content in allContent.OrderByDescending(x => x.viewGroup)) + { + Logger.LogDebug($"Got type '{content.viewGroup}' to process"); + if (content.viewGroup.Equals(PlexMediaType.Show.ToString(), StringComparison.InvariantCultureIgnoreCase)) + { + foreach (var epInfo in content.Metadata ?? new Metadata[] { }) + { + await ProcessEpisode(epInfo, user, episodesToAdd); + } + + } + if (content.viewGroup.Equals(PlexMediaType.Movie.ToString(), StringComparison.InvariantCultureIgnoreCase)) + { + Logger.LogDebug("Processing Movies"); + foreach (var movie in content?.Metadata ?? Array.Empty()) + { + await ProcessMovie(movie, user, moviesToAdd); + } + } + } + + await _movieRepo.AddRange(moviesToAdd); + await _episodeRepo.AddRange(episodesToAdd); + + } + + private async Task ProcessEpisode(Metadata epInfo, OmbiUser user, ICollection content) + { + var episode = await _contentRepo.GetEpisodeByKey(epInfo.ratingKey); + if (episode == null || episode.Series == null) + { + Logger.LogInformation($"The episode {epInfo.title} does not relate to a series, so we cannot save this"); + return; + } + if (episode.Series.TheMovieDbId.IsNullOrEmpty()) + { + Logger.LogWarning($"Episode {epInfo.title} is not linked to a TMDB series. Skipping."); + return; + } + + await AddToContent(content, new UserPlayedEpisode() + { + TheMovieDbId = int.Parse(episode.Series.TheMovieDbId), + SeasonNumber = episode.SeasonNumber, + EpisodeNumber = episode.EpisodeNumber, + UserId = user.Id + }); + + } + + private async Task AddToContent(ICollection content, UserPlayedEpisode episode) + { + + // Check if it exists + var existingEpisode = await _episodeRepo.Get(episode.TheMovieDbId, episode.SeasonNumber, episode.EpisodeNumber, episode.UserId); + var alreadyGoingToAdd = content.Any(x => + x.TheMovieDbId == episode.TheMovieDbId + && x.SeasonNumber == episode.SeasonNumber + && x.EpisodeNumber == episode.EpisodeNumber + && x.UserId == episode.UserId); + if (existingEpisode == null && !alreadyGoingToAdd) + { + content.Add(episode); + } + } + + public async Task ProcessMovie(Metadata movie, OmbiUser user, ICollection content) + { + var cachedMovie = await _contentRepo.GetByKey(movie.ratingKey); + if (cachedMovie == null || cachedMovie.TheMovieDbId.IsNullOrEmpty() ) + { + Logger.LogWarning($"Movie {movie.title} has no relevant metadata. Skipping."); + return; + } + var userPlayedMovie = new UserPlayedMovie() + { + TheMovieDbId = int.Parse(cachedMovie.TheMovieDbId), + UserId = user.Id + }; + // Check if it exists + var existingMovie = await _movieRepo.Get(userPlayedMovie.TheMovieDbId, userPlayedMovie.UserId); + var alreadyGoingToAdd = content.Any(x => x.TheMovieDbId == userPlayedMovie.TheMovieDbId && x.UserId == userPlayedMovie.UserId); + if (existingMovie == null && !alreadyGoingToAdd) + { + content.Add(userPlayedMovie); + } + } + + + private async Task> GetAllContent(PlexServers plexSettings, bool recentlyAddedSearch, OmbiUser user) + { + var libs = new List(); + + var directories = await GetEnabledLibraries(plexSettings); + + foreach (var directory in directories) + { + var maxNumberOfItems = 0; + if (recentlyAddedSearch) + { + maxNumberOfItems = recentlyAddedAmountToTake; + } + var container = await PlexApi.GetPlayed(user.MediaServerToken, plexSettings.FullUri, + directory.key, maxNumberOfItems); + if (container != null) + { + libs.Add(container.MediaContainer); + } + } + + return libs; + } + + + private bool _disposed; + + protected virtual void Dispose(bool disposing) + { + if (_disposed) + return; + + _disposed = true; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + } +} \ No newline at end of file diff --git a/src/Ombi.Schedule/Jobs/Plex/PlexWatchlistImport.cs b/src/Ombi.Schedule/Jobs/Plex/PlexWatchlistImport.cs index 85a926376a..03f594f0fc 100644 --- a/src/Ombi.Schedule/Jobs/Plex/PlexWatchlistImport.cs +++ b/src/Ombi.Schedule/Jobs/Plex/PlexWatchlistImport.cs @@ -64,30 +64,12 @@ public async Task Execute(IJobExecutionContext context) return; } - var plexUsersWithTokens = _ombiUserManager.Users.Where(x => x.UserType == UserType.PlexUser && x.MediaServerToken != null).ToList(); - _logger.LogInformation($"Found {plexUsersWithTokens.Count} users with tokens"); await NotifyClient("Starting Watchlist Import"); - - foreach (var user in plexUsersWithTokens) + var plexUsers = await _ombiUserManager.GetPlexUsersWithValidTokens(); + foreach (var user in plexUsers) { try { - // Check if the user has errors and the token is the same (not refreshed) - var failedUser = await _userError.GetAll().Where(x => x.UserId == user.Id).FirstOrDefaultAsync(); - if (failedUser != null) - { - if (failedUser.MediaServerToken.Equals(user.MediaServerToken)) - { - _logger.LogInformation($"Skipping Plex Watchlist Import for user '{user.UserName}' as they failed previously and the token has not yet been refreshed"); - continue; - } - else - { - // remove that guy - await _userError.Delete(failedUser); - failedUser = null; - } - } _logger.LogDebug($"Starting Watchlist Import for {user.UserName} with token {user.MediaServerToken}"); var watchlist = await _plexApi.GetWatchlist(user.MediaServerToken, context?.CancellationToken ?? CancellationToken.None); diff --git a/src/Ombi.Schedule/OmbiScheduler.cs b/src/Ombi.Schedule/OmbiScheduler.cs index 3b4f6316f6..10d984930b 100644 --- a/src/Ombi.Schedule/OmbiScheduler.cs +++ b/src/Ombi.Schedule/OmbiScheduler.cs @@ -90,6 +90,7 @@ private static async Task AddPlex(JobSettings s) await OmbiQuartz.Instance.AddJob(nameof(IPlexContentSync) + "RecentlyAdded", "Plex", JobSettingsHelper.PlexRecentlyAdded(s), new Dictionary { { JobDataKeys.RecentlyAddedSearch, "true" } }); await OmbiQuartz.Instance.AddJob(nameof(IPlexUserImporter), "Plex", JobSettingsHelper.UserImporter(s)); await OmbiQuartz.Instance.AddJob(nameof(IPlexEpisodeSync), "Plex", null); + await OmbiQuartz.Instance.AddJob(nameof(IPlexPlayedSync), "Plex", null); await OmbiQuartz.Instance.AddJob(nameof(IPlexAvailabilityChecker), "Plex", null); await OmbiQuartz.Instance.AddJob(nameof(IPlexWatchlistImport), "Plex", JobSettingsHelper.PlexWatchlistImport(s)); } diff --git a/src/Ombi.Store/Repository/PlexContentRepository.cs b/src/Ombi.Store/Repository/PlexContentRepository.cs index 9f34af6a1c..4381773acc 100644 --- a/src/Ombi.Store/Repository/PlexContentRepository.cs +++ b/src/Ombi.Store/Repository/PlexContentRepository.cs @@ -156,7 +156,7 @@ public async Task DeleteEpisode(PlexEpisode content) public async Task GetEpisodeByKey(string key) { - return await Db.PlexEpisode.FirstOrDefaultAsync(x => x.Key == key); + return await Db.PlexEpisode.Include(x => x.Series).FirstOrDefaultAsync(x => x.Key == key); } public override async Task AddRange(IEnumerable content) {