From f4be470ac96281af19c18e9dc24016326a74a796 Mon Sep 17 00:00:00 2001 From: iPromKnight Date: Tue, 2 Apr 2024 16:22:47 +0100 Subject: [PATCH 1/2] Process DMM all locally single call to github to download the repo archive. remove need for PAT update RTN to 0.2.13 change to batch_parse for title parsing from RTN --- deployment/docker/docker-compose.yaml | 14 +- .../docker/src/components/knightcrawler.yaml | 14 +- deployment/docker/stack.env | 3 - src/debrid-collector/requirements.txt | 2 +- .../Crawlers/Dmm/DMMFileDownloader.cs | 70 +++++ .../Features/Crawlers/Dmm/DMMHttpClient.cs | 6 + .../Crawlers/Dmm/DebridMediaManagerCrawler.cs | 242 +++++++++--------- .../Crawlers/Dmm/GithubConfiguration.cs | 9 - .../Crawlers/Dmm/IDMMFileDownloader.cs | 6 + .../Dmm/ServiceCollectionExtensions.cs | 16 ++ .../JobSupport/ServiceCollectionExtensions.cs | 17 +- src/producer/src/GlobalUsings.cs | 4 +- src/producer/src/Program.cs | 3 +- src/producer/src/requirements.txt | 2 +- src/qbit-collector/requirements.txt | 2 +- src/shared/Python/RTN/IRankTorrentName.cs | 3 +- src/shared/Python/RTN/RankTorrentName.cs | 103 ++++++-- 17 files changed, 331 insertions(+), 185 deletions(-) create mode 100644 src/producer/src/Features/Crawlers/Dmm/DMMFileDownloader.cs create mode 100644 src/producer/src/Features/Crawlers/Dmm/DMMHttpClient.cs delete mode 100644 src/producer/src/Features/Crawlers/Dmm/GithubConfiguration.cs create mode 100644 src/producer/src/Features/Crawlers/Dmm/IDMMFileDownloader.cs create mode 100644 src/producer/src/Features/Crawlers/Dmm/ServiceCollectionExtensions.cs diff --git a/deployment/docker/docker-compose.yaml b/deployment/docker/docker-compose.yaml index 4a724287..26f6942d 100644 --- a/deployment/docker/docker-compose.yaml +++ b/deployment/docker/docker-compose.yaml @@ -94,7 +94,7 @@ services: condition: service_healthy env_file: stack.env hostname: knightcrawler-addon - image: gabisonfire/knightcrawler-addon:2.0.19 + image: gabisonfire/knightcrawler-addon:2.0.20 labels: logging: promtail networks: @@ -117,7 +117,7 @@ services: redis: condition: service_healthy env_file: stack.env - image: gabisonfire/knightcrawler-consumer:2.0.19 + image: gabisonfire/knightcrawler-consumer:2.0.20 labels: logging: promtail networks: @@ -138,7 +138,7 @@ services: redis: condition: service_healthy env_file: stack.env - image: gabisonfire/knightcrawler-debrid-collector:2.0.19 + image: gabisonfire/knightcrawler-debrid-collector:2.0.20 labels: logging: promtail networks: @@ -152,7 +152,7 @@ services: migrator: condition: service_completed_successfully env_file: stack.env - image: gabisonfire/knightcrawler-metadata:2.0.19 + image: gabisonfire/knightcrawler-metadata:2.0.20 networks: - knightcrawler-network restart: "no" @@ -163,7 +163,7 @@ services: postgres: condition: service_healthy env_file: stack.env - image: gabisonfire/knightcrawler-migrator:2.0.19 + image: gabisonfire/knightcrawler-migrator:2.0.20 networks: - knightcrawler-network restart: "no" @@ -182,7 +182,7 @@ services: redis: condition: service_healthy env_file: stack.env - image: gabisonfire/knightcrawler-producer:2.0.19 + image: gabisonfire/knightcrawler-producer:2.0.20 labels: logging: promtail networks: @@ -207,7 +207,7 @@ services: deploy: replicas: ${QBIT_REPLICAS:-0} env_file: stack.env - image: gabisonfire/knightcrawler-qbit-collector:2.0.19 + image: gabisonfire/knightcrawler-qbit-collector:2.0.20 labels: logging: promtail networks: diff --git a/deployment/docker/src/components/knightcrawler.yaml b/deployment/docker/src/components/knightcrawler.yaml index 6e999f5c..e4164b10 100644 --- a/deployment/docker/src/components/knightcrawler.yaml +++ b/deployment/docker/src/components/knightcrawler.yaml @@ -20,7 +20,7 @@ x-depends: &knightcrawler-app-depends services: metadata: - image: gabisonfire/knightcrawler-metadata:2.0.18 + image: gabisonfire/knightcrawler-metadata:2.0.20 env_file: ../../.env networks: - knightcrawler-network @@ -30,7 +30,7 @@ services: condition: service_completed_successfully migrator: - image: gabisonfire/knightcrawler-migrator:2.0.18 + image: gabisonfire/knightcrawler-migrator:2.0.20 env_file: ../../.env networks: - knightcrawler-network @@ -40,7 +40,7 @@ services: condition: service_healthy addon: - image: gabisonfire/knightcrawler-addon:2.0.18 + image: gabisonfire/knightcrawler-addon:2.0.20 <<: [*knightcrawler-app, *knightcrawler-app-depends] restart: unless-stopped hostname: knightcrawler-addon @@ -48,22 +48,22 @@ services: - "7000:7000" consumer: - image: gabisonfire/knightcrawler-consumer:2.0.18 + image: gabisonfire/knightcrawler-consumer:2.0.20 <<: [*knightcrawler-app, *knightcrawler-app-depends] restart: unless-stopped debridcollector: - image: gabisonfire/knightcrawler-debrid-collector:2.0.18 + image: gabisonfire/knightcrawler-debrid-collector:2.0.20 <<: [*knightcrawler-app, *knightcrawler-app-depends] restart: unless-stopped producer: - image: gabisonfire/knightcrawler-producer:2.0.18 + image: gabisonfire/knightcrawler-producer:2.0.20 <<: [*knightcrawler-app, *knightcrawler-app-depends] restart: unless-stopped qbitcollector: - image: gabisonfire/knightcrawler-qbit-collector:2.0.18 + image: gabisonfire/knightcrawler-qbit-collector:2.0.20 <<: [*knightcrawler-app, *knightcrawler-app-depends] restart: unless-stopped depends_on: diff --git a/deployment/docker/stack.env b/deployment/docker/stack.env index a2b6bd0a..49018988 100644 --- a/deployment/docker/stack.env +++ b/deployment/docker/stack.env @@ -38,6 +38,3 @@ QBIT_REPLICAS=0 # Addon DEBUG_MODE=false - -# Producer -GITHUB_PAT= diff --git a/src/debrid-collector/requirements.txt b/src/debrid-collector/requirements.txt index 5a9e8ab2..b292cc44 100644 --- a/src/debrid-collector/requirements.txt +++ b/src/debrid-collector/requirements.txt @@ -1 +1 @@ -rank-torrent-name==0.2.11 \ No newline at end of file +rank-torrent-name==0.2.13 \ No newline at end of file diff --git a/src/producer/src/Features/Crawlers/Dmm/DMMFileDownloader.cs b/src/producer/src/Features/Crawlers/Dmm/DMMFileDownloader.cs new file mode 100644 index 00000000..5af28dd8 --- /dev/null +++ b/src/producer/src/Features/Crawlers/Dmm/DMMFileDownloader.cs @@ -0,0 +1,70 @@ +namespace Producer.Features.Crawlers.Dmm; + +public class DMMFileDownloader(HttpClient client, ILogger logger) : IDMMFileDownloader +{ + private const string Filename = "main.zip"; + private readonly IReadOnlyCollection _filesToIgnore = [ + "index.html", + "404.html", + "dedupe.sh", + "CNAME", + ]; + + public const string ClientName = "DmmFileDownloader"; + + public async Task DownloadFileToTempPath(CancellationToken cancellationToken) + { + logger.LogInformation("Downloading DMM Hashlists"); + + var response = await client.GetAsync(Filename, cancellationToken); + + var tempDirectory = Path.Combine(Path.GetTempPath(), "DMMHashlists"); + + EnsureDirectoryIsClean(tempDirectory); + + response.EnsureSuccessStatusCode(); + + await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken); + using var archive = new ZipArchive(stream); + + logger.LogInformation("Extracting DMM Hashlists to {TempDirectory}", tempDirectory); + + foreach (var entry in archive.Entries) + { + var entryPath = Path.Combine(tempDirectory, Path.GetFileName(entry.FullName)); + if (!entry.FullName.EndsWith('/')) // It's a file + { + entry.ExtractToFile(entryPath, true); + } + } + + foreach (var file in _filesToIgnore) + { + CleanRepoExtras(tempDirectory, file); + } + + logger.LogInformation("Downloaded and extracted Repository to {TempDirectory}", tempDirectory); + + return tempDirectory; + } + + private static void CleanRepoExtras(string tempDirectory, string fileName) + { + var repoIndex = Path.Combine(tempDirectory, fileName); + + if (File.Exists(repoIndex)) + { + File.Delete(repoIndex); + } + } + + private static void EnsureDirectoryIsClean(string tempDirectory) + { + if (Directory.Exists(tempDirectory)) + { + Directory.Delete(tempDirectory, true); + } + + Directory.CreateDirectory(tempDirectory); + } +} \ No newline at end of file diff --git a/src/producer/src/Features/Crawlers/Dmm/DMMHttpClient.cs b/src/producer/src/Features/Crawlers/Dmm/DMMHttpClient.cs new file mode 100644 index 00000000..8ce040d2 --- /dev/null +++ b/src/producer/src/Features/Crawlers/Dmm/DMMHttpClient.cs @@ -0,0 +1,6 @@ +namespace Producer.Features.Crawlers.Dmm; + +public class DMMHttpClient +{ + +} \ No newline at end of file diff --git a/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs b/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs index 1a6dd70e..16639186 100644 --- a/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs +++ b/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs @@ -1,64 +1,87 @@ namespace Producer.Features.Crawlers.Dmm; public partial class DebridMediaManagerCrawler( - IHttpClientFactory httpClientFactory, + IDMMFileDownloader dmmFileDownloader, ILogger logger, IDataStorage storage, - GithubConfiguration githubConfiguration, IRankTorrentName rankTorrentName, IDistributedCache cache) : BaseCrawler(logger, storage) { [GeneratedRegex("""""")] private static partial Regex HashCollectionMatcher(); - - private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main"; + protected override string Url => ""; protected override IReadOnlyDictionary Mappings => new Dictionary(); - protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1"; protected override string Source => "DMM"; public override async Task Execute() { - var client = httpClientFactory.CreateClient("Scraper"); - client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT); - client.DefaultRequestHeaders.UserAgent.ParseAdd("curl"); + var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None); - var jsonBody = await client.GetStringAsync(Url); + var files = Directory.GetFiles(tempDirectory, "*.html", SearchOption.AllDirectories); - var json = JsonDocument.Parse(jsonBody); + logger.LogInformation("Found {Files} files to parse", files.Length); - var entriesArray = json.RootElement.GetProperty("tree"); + foreach (var file in files) + { + var fileName = Path.GetFileName(file); + var torrentDictionary = await ExtractPageContents(file, fileName); - logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength()); + if (torrentDictionary != null) + { + ParseTitlesWithRtn(fileName, torrentDictionary); + var results = await ParseTorrents(torrentDictionary); - foreach (var entry in entriesArray.EnumerateArray()) - { - await ParsePage(entry, client); + if (results.Count > 0) + { + await InsertTorrents(results); + await Storage.MarkPageAsIngested(fileName); + } + } } } - private async Task ParsePage(JsonElement entry, HttpClient client) + private void ParseTitlesWithRtn(string fileName, IDictionary page) { - var (pageIngested, name) = await IsAlreadyIngested(entry); + logger.LogInformation("Parsing titles for {Page}", fileName); - if (string.IsNullOrEmpty(name) || pageIngested) - { - return; - } + var batchProcessables = page.Select(value => new RtnBatchProcessable(value.Key, value.Value.Filename)).ToList(); + var parsedResponses = rankTorrentName.BatchParse( + batchProcessables.Select(bp => bp.Filename).ToList(), trashGarbage: false); - var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}"); + // Filter out unsuccessful responses and match RawTitle to requesting title + var successfulResponses = parsedResponses + .Where(response => response != null && response.Success) + .GroupBy(response => response.Response.RawTitle!) + .ToDictionary(group => group.Key, group => group.First()); - await ExtractPageContents(pageSource, name); + foreach (var infoHash in batchProcessables.Select(t => t.InfoHash)) + { + if (page.TryGetValue(infoHash, out var dmmContent) && + successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse)) + { + page[infoHash] = dmmContent with {ParseResponse = parsedResponse}; + } + } } - private async Task ExtractPageContents(string pageSource, string name) + private async Task?> ExtractPageContents(string filePath, string filenameOnly) { + var (pageIngested, name) = await IsAlreadyIngested(filenameOnly); + + if (pageIngested) + { + return []; + } + + var pageSource = await File.ReadAllTextAsync(filePath); + var match = HashCollectionMatcher().Match(pageSource); if (!match.Success) { logger.LogWarning("Failed to match hash collection for {Name}", name); - await Storage.MarkPageAsIngested(name); - return; + await Storage.MarkPageAsIngested(filenameOnly); + return []; } var encodedJson = match.Groups.Values.ElementAtOrDefault(1); @@ -66,90 +89,83 @@ private async Task ExtractPageContents(string pageSource, string name) if (string.IsNullOrEmpty(encodedJson?.Value)) { logger.LogWarning("Failed to extract encoded json for {Name}", name); - return; + return []; } - await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name); - } - - private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name) - { - var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson); + var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value); var json = JsonDocument.Parse(decodedJson); + + var torrents = await json.RootElement.EnumerateArray() + .ToAsyncEnumerable() + .Select(ParsePageContent) + .Where(t => t is not null) + .ToListAsync(); - await InsertTorrentsForPage(json); - - var result = await Storage.MarkPageAsIngested(name); - - if (!result.IsSuccess) + if (torrents.Count == 0) { - logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.Failure.ErrorMessage); - return; + logger.LogWarning("No torrents found in {Name}", name); + await Storage.MarkPageAsIngested(filenameOnly); + return []; } + + var torrentDictionary = torrents + .Where(x => x is not null) + .GroupBy(x => x.InfoHash) + .ToDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null)); - logger.LogInformation("Successfully marked page as ingested"); + logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name); + + return torrentDictionary; } - private async Task ParseTorrent(JsonElement item) + private async Task> ParseTorrents(IDictionary page) { - - if (!item.TryGetProperty("filename", out var filenameElement) || - !item.TryGetProperty("bytes", out var bytesElement) || - !item.TryGetProperty("hash", out var hashElement)) - { - return null; + var ingestedTorrents = new List(); + + foreach (var (infoHash, dmmContent) in page) + { + var parsedTorrent = dmmContent.ParseResponse; + if (parsedTorrent is not {Success: true}) + { + continue; + } + + var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries"; + var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year); + var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey); + + if (cached) + { + logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle); + ingestedTorrents.Add(MapToTorrent(cachedResult, dmmContent.Bytes, infoHash, parsedTorrent)); + continue; + } + + int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null; + var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year); + + if (imdbEntry is null) + { + continue; + } + + await AddToCache(cacheKey, imdbEntry); + logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score); + ingestedTorrents.Add(MapToTorrent(imdbEntry, dmmContent.Bytes, infoHash, parsedTorrent)); } - var torrentTitle = filenameElement.GetString(); - - if (torrentTitle.IsNullOrEmpty()) - { - return null; - } - - var parsedTorrent = rankTorrentName.Parse(torrentTitle); - - if (!parsedTorrent.Success) - { - return null; - } - - var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries"; - - var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year); - - var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey); - - if (cached) - { - logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle); - return MapToTorrent(cachedResult, bytesElement, hashElement, parsedTorrent); - } - - int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null; - var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year); - - if (imdbEntry is null) - { - return null; - } - - await AddToCache(cacheKey, imdbEntry); - - logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score); - - return MapToTorrent(imdbEntry, bytesElement, hashElement, parsedTorrent); + return ingestedTorrents; } - private IngestedTorrent MapToTorrent(ImdbEntry result, JsonElement bytesElement, JsonElement hashElement, ParseTorrentTitleResponse parsedTorrent) => + private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHash, ParseTorrentTitleResponse parsedTorrent) => new() { Source = Source, Name = result.Title, Imdb = result.ImdbId, - Size = bytesElement.GetInt64().ToString(), - InfoHash = hashElement.ToString(), + Size = size.ToString(), + InfoHash = infoHash, Seeders = 0, Leechers = 0, Category = AssignCategory(result), @@ -179,35 +195,11 @@ private Task AddToCache(string cacheKey, ImdbEntry best) return (false, null); } - private async Task InsertTorrentsForPage(JsonDocument json) - { - var torrents = await json.RootElement.EnumerateArray() - .ToAsyncEnumerable() - .SelectAwait(async x => await ParseTorrent(x)) - .Where(t => t is not null) - .ToListAsync(); - - if (torrents.Count == 0) - { - logger.LogWarning("No torrents found in {Source} response", Source); - return; - } - - await InsertTorrents(torrents!); - } - - private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry) + private async Task<(bool Success, string? Name)> IsAlreadyIngested(string filename) { - var name = entry.GetProperty("path").GetString(); + var pageIngested = await Storage.PageIngested(filename); - if (string.IsNullOrEmpty(name)) - { - return (false, null); - } - - var pageIngested = await Storage.PageIngested(name); - - return (pageIngested, name); + return (pageIngested, filename); } private static string AssignCategory(ImdbEntry entry) => @@ -219,4 +211,20 @@ var category when string.Equals(category, "tvSeries", StringComparison.OrdinalIg }; private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}"; + + private static ExtractedDMMContent? ParsePageContent(JsonElement item) + { + if (!item.TryGetProperty("filename", out var filenameElement) || + !item.TryGetProperty("bytes", out var bytesElement) || + !item.TryGetProperty("hash", out var hashElement)) + { + return null; + } + + return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString()); + } + + private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse); + private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash); + private record RtnBatchProcessable(string InfoHash, string Filename); } diff --git a/src/producer/src/Features/Crawlers/Dmm/GithubConfiguration.cs b/src/producer/src/Features/Crawlers/Dmm/GithubConfiguration.cs deleted file mode 100644 index cb80bd4d..00000000 --- a/src/producer/src/Features/Crawlers/Dmm/GithubConfiguration.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace Producer.Features.Crawlers.Dmm; - -public class GithubConfiguration -{ - private const string Prefix = "GITHUB"; - private const string PatVariable = "PAT"; - - public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable); -} diff --git a/src/producer/src/Features/Crawlers/Dmm/IDMMFileDownloader.cs b/src/producer/src/Features/Crawlers/Dmm/IDMMFileDownloader.cs new file mode 100644 index 00000000..093d9dc0 --- /dev/null +++ b/src/producer/src/Features/Crawlers/Dmm/IDMMFileDownloader.cs @@ -0,0 +1,6 @@ +namespace Producer.Features.Crawlers.Dmm; + +public interface IDMMFileDownloader +{ + Task DownloadFileToTempPath(CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/src/producer/src/Features/Crawlers/Dmm/ServiceCollectionExtensions.cs b/src/producer/src/Features/Crawlers/Dmm/ServiceCollectionExtensions.cs new file mode 100644 index 00000000..34caabf1 --- /dev/null +++ b/src/producer/src/Features/Crawlers/Dmm/ServiceCollectionExtensions.cs @@ -0,0 +1,16 @@ +namespace Producer.Features.Crawlers.Dmm; + +public static class ServiceCollectionExtensions +{ + public static IServiceCollection AddDmmSupport(this IServiceCollection services) + { + services.AddHttpClient(DMMFileDownloader.ClientName, client => + { + client.BaseAddress = new("https://github.com/debridmediamanager/hashlists/zipball/main/"); + client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip"); + client.DefaultRequestHeaders.UserAgent.ParseAdd("curl"); + }); + + return services; + } +} \ No newline at end of file diff --git a/src/producer/src/Features/JobSupport/ServiceCollectionExtensions.cs b/src/producer/src/Features/JobSupport/ServiceCollectionExtensions.cs index 4d8eeaab..f967f0f3 100644 --- a/src/producer/src/Features/JobSupport/ServiceCollectionExtensions.cs +++ b/src/producer/src/Features/JobSupport/ServiceCollectionExtensions.cs @@ -5,7 +5,6 @@ internal static class ServiceCollectionExtensions internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration) { var scrapeConfiguration = services.LoadConfigurationFromConfig(configuration, ScrapeConfiguration.SectionName); - var githubConfiguration = services.LoadConfigurationFromEnv(); var rabbitConfiguration = services.LoadConfigurationFromEnv(); var jobTypes = Assembly.GetAssembly(typeof(BaseJob)) @@ -19,18 +18,13 @@ internal static IServiceCollection AddQuartz(this IServiceCollection services, I services.AddTransient(type); } - if (!string.IsNullOrEmpty(githubConfiguration.PAT)) - { - services.AddTransient(); - } - var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance); services.AddQuartz( quartz => { RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration); - RegisterDmmJob(githubConfiguration, quartz, scrapeConfiguration); + RegisterDmmJob(quartz, scrapeConfiguration); RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration); RegisterPublisher(quartz, rabbitConfiguration); }); @@ -64,13 +58,8 @@ private static void RegisterAutomaticRegistrationJobs(List jobTypes, Metho } } - private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) - { - if (!string.IsNullOrEmpty(githubConfiguration.PAT)) - { - AddJobWithTrigger(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration); - } - } + private static void RegisterDmmJob(IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) => + AddJobWithTrigger(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration); private static void RegisterTorrentioJob( IServiceCollection services, diff --git a/src/producer/src/GlobalUsings.cs b/src/producer/src/GlobalUsings.cs index 5dec1bf9..acd1ee98 100644 --- a/src/producer/src/GlobalUsings.cs +++ b/src/producer/src/GlobalUsings.cs @@ -1,12 +1,12 @@ // Global using directives +global using System.Collections.Concurrent; +global using System.IO.Compression; global using System.Reflection; global using System.Text; global using System.Text.Json; global using System.Text.RegularExpressions; global using System.Xml.Linq; -global using FuzzySharp; -global using FuzzySharp.Extractor; global using FuzzySharp.PreProcess; global using FuzzySharp.SimilarityRatio.Scorer; global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive; diff --git a/src/producer/src/Program.cs b/src/producer/src/Program.cs index 4e1b65a5..4d398459 100644 --- a/src/producer/src/Program.cs +++ b/src/producer/src/Program.cs @@ -12,7 +12,8 @@ .RegisterMassTransit() .AddDataStorage() .AddCrawlers() + .AddDmmSupport() .AddQuartz(builder.Configuration); var app = builder.Build(); -app.Run(); +app.Run(); \ No newline at end of file diff --git a/src/producer/src/requirements.txt b/src/producer/src/requirements.txt index efc5d160..4e10386e 100644 --- a/src/producer/src/requirements.txt +++ b/src/producer/src/requirements.txt @@ -1 +1 @@ -rank-torrent-name==0.2.11 \ No newline at end of file +rank-torrent-name==0.2.13 \ No newline at end of file diff --git a/src/qbit-collector/requirements.txt b/src/qbit-collector/requirements.txt index 5a9e8ab2..b292cc44 100644 --- a/src/qbit-collector/requirements.txt +++ b/src/qbit-collector/requirements.txt @@ -1 +1 @@ -rank-torrent-name==0.2.11 \ No newline at end of file +rank-torrent-name==0.2.13 \ No newline at end of file diff --git a/src/shared/Python/RTN/IRankTorrentName.cs b/src/shared/Python/RTN/IRankTorrentName.cs index 820bf9bb..4f9af75b 100644 --- a/src/shared/Python/RTN/IRankTorrentName.cs +++ b/src/shared/Python/RTN/IRankTorrentName.cs @@ -2,5 +2,6 @@ namespace SharedContracts.Python.RTN; public interface IRankTorrentName { - ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true); + ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false); + List BatchParse(IReadOnlyCollection titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false); } \ No newline at end of file diff --git a/src/shared/Python/RTN/RankTorrentName.cs b/src/shared/Python/RTN/RankTorrentName.cs index 1e42d09c..113ae8ba 100644 --- a/src/shared/Python/RTN/RankTorrentName.cs +++ b/src/shared/Python/RTN/RankTorrentName.cs @@ -12,41 +12,102 @@ public RankTorrentName(IPythonEngineService pythonEngineService) _pythonEngineService = pythonEngineService; InitModules(); } - - public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true) => - _pythonEngineService.ExecutePythonOperationWithDefault( - () => - { - var result = _rtn?.parse(title, trashGarbage); - return ParseResult(result); - }, new ParseTorrentTitleResponse(false, null), nameof(Parse), throwOnErrors: false, logErrors: false); - private static ParseTorrentTitleResponse ParseResult(dynamic result) + public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false) { - if (result == null) + try + { + using var gil = Py.GIL(); + var result = _rtn?.parse(title, trashGarbage); + return ParseResult(result); + } + catch (Exception ex) { + if (logErrors) + { + _pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse)); + } + + if (throwOnErrors) + { + throw; + } + return new(false, null); } + } + + public List BatchParse(IReadOnlyCollection titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false) + { + var responses = new List(); - var json = result.model_dump_json()?.As(); + try + { + if (titles.Count == 0) + { + return responses; + } + + using var gil = Py.GIL(); + var pythonList = new PyList(titles.Select(x => new PyString(x).As()).ToArray()); + PyList results = _rtn?.batch_parse(pythonList, trashGarbage, chunkSize, workers); + + if (results == null) + { + return responses; + } - if (json is null || string.IsNullOrEmpty(json)) + responses.AddRange(results.Select(ParseResult)); + } + catch (Exception ex) { - return new(false, null); + if (logErrors) + { + _pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse)); + } + + if (throwOnErrors) + { + throw; + } } - var mediaType = result.GetAttr("type")?.As(); - - if (string.IsNullOrEmpty(mediaType)) + return responses; + } + + private static ParseTorrentTitleResponse? ParseResult(dynamic result) + { + try { - return new(false, null); - } + if (result == null) + { + return new(false, null); + } + + var json = result.model_dump_json()?.As(); + + if (json is null || string.IsNullOrEmpty(json)) + { + return new(false, null); + } + + var mediaType = result.GetAttr("type")?.As(); + + if (string.IsNullOrEmpty(mediaType)) + { + return new(false, null); + } - var response = JsonSerializer.Deserialize(json); + var response = JsonSerializer.Deserialize(json); - response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase); + response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase); - return new(true, response); + return new(true, response); + } + catch + { + return new(false, null); + } } private void InitModules() => From c8eb20f32c38a474ee673b6ac9ea35ec8ac8c8c1 Mon Sep 17 00:00:00 2001 From: iPromKnight Date: Tue, 2 Apr 2024 16:59:40 +0100 Subject: [PATCH 2/2] introduce concurrent dictionary, and parallelism --- src/producer/src/Configuration/scrapers.json | 2 +- .../Crawlers/Dmm/DebridMediaManagerCrawler.cs | 67 ++++++++++++------- src/shared/Extensions/DictionaryExtensions.cs | 19 ++++++ src/shared/GlobalUsings.cs | 1 + 4 files changed, 65 insertions(+), 24 deletions(-) create mode 100644 src/shared/Extensions/DictionaryExtensions.cs diff --git a/src/producer/src/Configuration/scrapers.json b/src/producer/src/Configuration/scrapers.json index 1859bb8d..da4a2dd0 100644 --- a/src/producer/src/Configuration/scrapers.json +++ b/src/producer/src/Configuration/scrapers.json @@ -28,7 +28,7 @@ }, { "Name": "SyncDmmJob", - "IntervalSeconds": 1800, + "IntervalSeconds": 10800, "Enabled": true }, { diff --git a/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs b/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs index 16639186..c690c9ff 100644 --- a/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs +++ b/src/producer/src/Features/Crawlers/Dmm/DebridMediaManagerCrawler.cs @@ -12,6 +12,8 @@ public partial class DebridMediaManagerCrawler( protected override string Url => ""; protected override IReadOnlyDictionary Mappings => new Dictionary(); protected override string Source => "DMM"; + + private const int ParallelismCount = 4; public override async Task Execute() { @@ -21,26 +23,32 @@ public override async Task Execute() logger.LogInformation("Found {Files} files to parse", files.Length); - foreach (var file in files) + var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount }; + + await Parallel.ForEachAsync(files, options, async (file, token) => { var fileName = Path.GetFileName(file); var torrentDictionary = await ExtractPageContents(file, fileName); - if (torrentDictionary != null) + if (torrentDictionary == null) { - ParseTitlesWithRtn(fileName, torrentDictionary); - var results = await ParseTorrents(torrentDictionary); + return; + } - if (results.Count > 0) - { - await InsertTorrents(results); - await Storage.MarkPageAsIngested(fileName); - } + await ParseTitlesWithRtn(fileName, torrentDictionary); + var results = await ParseTorrents(torrentDictionary); + + if (results.Count <= 0) + { + return; } - } + + await InsertTorrents(results); + await Storage.MarkPageAsIngested(fileName, token); + }); } - private void ParseTitlesWithRtn(string fileName, IDictionary page) + private async Task ParseTitlesWithRtn(string fileName, IDictionary page) { logger.LogInformation("Parsing titles for {Page}", fileName); @@ -54,17 +62,21 @@ private void ParseTitlesWithRtn(string fileName, IDictionary .GroupBy(response => response.Response.RawTitle!) .ToDictionary(group => group.Key, group => group.First()); - foreach (var infoHash in batchProcessables.Select(t => t.InfoHash)) + var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount }; + + await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options, (infoHash, _) => { if (page.TryGetValue(infoHash, out var dmmContent) && successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse)) { page[infoHash] = dmmContent with {ParseResponse = parsedResponse}; } - } + + return ValueTask.CompletedTask; + }); } - private async Task?> ExtractPageContents(string filePath, string filenameOnly) + private async Task?> ExtractPageContents(string filePath, string filenameOnly) { var (pageIngested, name) = await IsAlreadyIngested(filenameOnly); @@ -112,7 +124,7 @@ private void ParseTitlesWithRtn(string fileName, IDictionary var torrentDictionary = torrents .Where(x => x is not null) .GroupBy(x => x.InfoHash) - .ToDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null)); + .ToConcurrentDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null)); logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name); @@ -123,12 +135,15 @@ private async Task> ParseTorrents(IDictionary(); - foreach (var (infoHash, dmmContent) in page) + var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount }; + + await Parallel.ForEachAsync(page, options, async (kvp, ct) => { + var (infoHash, dmmContent) = kvp; var parsedTorrent = dmmContent.ParseResponse; if (parsedTorrent is not {Success: true}) { - continue; + return; } var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries"; @@ -138,22 +153,28 @@ private async Task> ParseTorrents(IDictionary ToConcurrentDictionary( + this IEnumerable source, + Func keySelector, + Func valueSelector) where TKey : notnull + { + var concurrentDictionary = new ConcurrentDictionary(); + + foreach (var element in source) + { + concurrentDictionary.TryAdd(keySelector(element), valueSelector(element)); + } + + return concurrentDictionary; + } +} \ No newline at end of file diff --git a/src/shared/GlobalUsings.cs b/src/shared/GlobalUsings.cs index efaf6746..f250cd92 100644 --- a/src/shared/GlobalUsings.cs +++ b/src/shared/GlobalUsings.cs @@ -1,5 +1,6 @@ // Global using directives +global using System.Collections.Concurrent; global using System.Text.Json; global using System.Text.Json.Serialization; global using System.Text.RegularExpressions;