From df334500a96a2dffe28c112451236ed3e5d952a5 Mon Sep 17 00:00:00 2001 From: Amelia <77553571+Fesaa@users.noreply.github.com> Date: Fri, 13 Jun 2025 00:39:33 +0200 Subject: [PATCH] First bit of refactoring - Introduces regions to split up code, and easier navigations - Add a lot of doc comments - Introduces ScrobbleSyncContext to encapsulate all variables being passed around during sync - Split up sync method is smaller methods for readability --- API/Services/Plus/ScrobblingService.cs | 994 +++++++++++++------------ 1 file changed, 533 insertions(+), 461 deletions(-) diff --git a/API/Services/Plus/ScrobblingService.cs b/API/Services/Plus/ScrobblingService.cs index c89097221..12bd661d7 100644 --- a/API/Services/Plus/ScrobblingService.cs +++ b/API/Services/Plus/ScrobblingService.cs @@ -46,23 +46,72 @@ public enum ScrobbleProvider public interface IScrobblingService { Task CheckExternalAccessTokens(); + + /// + /// Checks if the token has expired with , if it has double checks with K+, + /// otherwise return false. + /// + /// + /// + /// + /// Returns true if there is no license present Task HasTokenExpired(int userId, ScrobbleProvider provider); Task ScrobbleRatingUpdate(int userId, int seriesId, float rating); Task ScrobbleReviewUpdate(int userId, int seriesId, string? reviewTitle, string reviewBody); Task ScrobbleReadingUpdate(int userId, int seriesId); Task ScrobbleWantToReadUpdate(int userId, int seriesId, bool onWantToRead); + /// + /// Removed all processed events that are at least 7 days old + /// + /// [DisableConcurrentExecution(60 * 60 * 60)] [AutomaticRetry(Attempts = 3, OnAttemptsExceeded = AttemptsExceededAction.Delete)] public Task ClearProcessedEvents(); + + /// + /// + /// + /// [DisableConcurrentExecution(60 * 60 * 60)] [AutomaticRetry(Attempts = 3, OnAttemptsExceeded = AttemptsExceededAction.Delete)] Task ProcessUpdatesSinceLastSync(); + Task CreateEventsFromExistingHistory(int userId = 0); Task CreateEventsFromExistingHistoryForSeries(int seriesId); Task ClearEventsForSeries(int userId, int seriesId); } +/// +/// Context used when syncing scrobble events. Do NOT reuse between syncs +/// +public class ScrobbleSyncContext +{ + public required List ReadEvents {get; init;} + public required List RatingEvents {get; init;} + /// Do not use this as events to send to K+, use + public required List AddToWantToRead {get; init;} + /// Do not use this as events to send to K+, use + public required List RemoveWantToRead {get; init;} + /// + /// Final events list if all AddTo- and RemoveWantToRead would be processed sequentially + /// + public required List Decisions {get; init;} + public required string License { get; init; } + public required Dictionary RateLimits { get; init; } + + public List Users { get; set; } + /// + /// Amount fo already processed events + /// + public int ProgressCounter { get; set; } + + /// + /// Sum of all events to process + /// + public int TotalCount => ReadEvents.Count + RatingEvents.Count + AddToWantToRead.Count + RemoveWantToRead.Count; +} + public class ScrobblingService : IScrobblingService { private readonly IUnitOfWork _unitOfWork; @@ -122,6 +171,7 @@ public class ScrobblingService : IScrobblingService FlurlConfiguration.ConfigureClientForUrl(Configuration.KavitaPlusApiUrl); } + #region Access token checks /// /// An automated job that will run against all user's tokens and validate if they are still active @@ -199,7 +249,6 @@ public class ScrobblingService : IScrobblingService } - public async Task HasTokenExpired(int userId, ScrobbleProvider provider) { var token = await GetTokenForProvider(userId, provider); @@ -217,8 +266,7 @@ public class ScrobblingService : IScrobblingService private async Task HasTokenExpired(string token, ScrobbleProvider provider) { - if (string.IsNullOrEmpty(token) || - !TokenService.HasTokenExpired(token)) return false; + if (string.IsNullOrEmpty(token) || !TokenService.HasTokenExpired(token)) return false; var license = await _unitOfWork.SettingsRepository.GetSettingAsync(ServerSettingKey.LicenseKey); if (string.IsNullOrEmpty(license.Value)) return true; @@ -251,6 +299,10 @@ public class ScrobblingService : IScrobblingService } ?? string.Empty; } + #endregion + + #region Scrobble ingest + public async Task ScrobbleReviewUpdate(int userId, int seriesId, string? reviewTitle, string reviewBody) { // Currently disabled until at least hardcover is implemented @@ -299,13 +351,6 @@ public class ScrobblingService : IScrobblingService _logger.LogDebug("Added Scrobbling Review update on {SeriesName} with Userid {AppUserId} ", series.Name, userId); } - private static bool IsAniListReviewValid(string reviewTitle, string reviewBody) - { - return string.IsNullOrEmpty(reviewTitle) || string.IsNullOrEmpty(reviewBody) || (reviewTitle.Length < 2200 || - reviewTitle.Length > 120 || - reviewTitle.Length < 20); - } - public async Task ScrobbleRatingUpdate(int userId, int seriesId, float rating) { if (!await _licenseService.HasActiveLicense()) return; @@ -348,18 +393,6 @@ public class ScrobblingService : IScrobblingService _logger.LogDebug("Added Scrobbling Rating update on {SeriesName} with Userid {AppUserId}", series.Name, userId); } - public static long? GetMalId(Series series) - { - var malId = ExtractId(series.Metadata.WebLinks, MalWeblinkWebsite); - return malId ?? series.ExternalSeriesMetadata?.MalId; - } - - public static int? GetAniListId(Series seriesWithExternalMetadata) - { - var aniListId = ExtractId(seriesWithExternalMetadata.Metadata.WebLinks, AniListWeblinkWebsite); - return aniListId ?? seriesWithExternalMetadata.ExternalSeriesMetadata?.AniListId; - } - public async Task ScrobbleReadingUpdate(int userId, int seriesId) { if (!await _licenseService.HasActiveLicense()) return; @@ -468,13 +501,109 @@ public class ScrobblingService : IScrobblingService _logger.LogDebug("Added Scrobbling WantToRead update on {SeriesName} with Userid {AppUserId} ", series.Name, userId); } + #endregion + + #region Scrobble provider methods + + private static bool IsAniListReviewValid(string reviewTitle, string reviewBody) + { + return string.IsNullOrEmpty(reviewTitle) || string.IsNullOrEmpty(reviewBody) || (reviewTitle.Length < 2200 || + reviewTitle.Length > 120 || + reviewTitle.Length < 20); + } + + public static long? GetMalId(Series series) + { + var malId = ExtractId(series.Metadata.WebLinks, MalWeblinkWebsite); + return malId ?? series.ExternalSeriesMetadata?.MalId; + } + + public static int? GetAniListId(Series seriesWithExternalMetadata) + { + var aniListId = ExtractId(seriesWithExternalMetadata.Metadata.WebLinks, AniListWeblinkWebsite); + return aniListId ?? seriesWithExternalMetadata.ExternalSeriesMetadata?.AniListId; + } + + /// + /// Extract an Id from a given weblink + /// + /// + /// + /// + public static T? ExtractId(string webLinks, string website) + { + var index = WeblinkExtractionMap[website]; + foreach (var webLink in webLinks.Split(',')) + { + if (!webLink.StartsWith(website)) continue; + var tokens = webLink.Split(website)[1].Split('/'); + var value = tokens[index]; + if (typeof(T) == typeof(int?)) + { + if (int.TryParse(value, CultureInfo.InvariantCulture, out var intValue)) + return (T)(object)intValue; + } + else if (typeof(T) == typeof(int)) + { + if (int.TryParse(value, CultureInfo.InvariantCulture, out var intValue)) + return (T)(object)intValue; + return default; + } + else if (typeof(T) == typeof(long?)) + { + if (long.TryParse(value, CultureInfo.InvariantCulture, out var longValue)) + return (T)(object)longValue; + } + else if (typeof(T) == typeof(string)) + { + return (T)(object)value; + } + } + + return default; + } + + /// + /// Generate a URL from a given ID and website + /// + /// Type of the ID (e.g., int, long, string) + /// The ID to embed in the URL + /// The base website URL + /// The generated URL or null if the website is not supported + public static string? GenerateUrl(T id, string website) + { + if (!WeblinkExtractionMap.ContainsKey(website)) + { + return null; // Unsupported website + } + + if (Equals(id, default(T))) + { + throw new ArgumentNullException(nameof(id), "ID cannot be null."); + } + + // Ensure the type of the ID matches supported types + if (typeof(T) == typeof(int) || typeof(T) == typeof(long) || typeof(T) == typeof(string)) + { + return $"{website}{id}"; + } + + throw new ArgumentException("Unsupported ID type. Supported types are int, long, and string.", nameof(id)); + } + + public static string CreateUrl(string url, long? id) + { + return id is null or 0 ? string.Empty : $"{url}{id}/"; + } + + #endregion + private async Task CheckIfCannotScrobble(int userId, int seriesId, Series series) { if (series.DontMatch) return true; if (await _unitOfWork.UserRepository.HasHoldOnSeries(userId, seriesId)) { - _logger.LogInformation("Series {SeriesName} is on AppUserId {AppUserId}'s hold list. Not scrobbling", series.Name, - userId); + _logger.LogInformation("Series {SeriesName} is on AppUserId {AppUserId}'s hold list. Not scrobbling", series.Name, userId); return true; } @@ -488,6 +617,7 @@ public class ScrobblingService : IScrobblingService private async Task GetRateLimit(string license, string aniListToken) { if (string.IsNullOrWhiteSpace(aniListToken)) return 0; + try { return await _kavitaPlusApiService.GetRateLimit(license, aniListToken); @@ -500,6 +630,383 @@ public class ScrobblingService : IScrobblingService return 0; } + #region Scrobble process (Requests to K+) + + /// + /// Retrieve all events for which the series has not errored, then delete all current errors + /// + private async Task PrepareScrobbleContext() + { + var librariesWithScrobbling = (await _unitOfWork.LibraryRepository.GetLibrariesAsync()) + .AsEnumerable() + .Where(l => l.AllowScrobbling) + .Select(l => l.Id) + .ToImmutableHashSet(); + + var errors = (await _unitOfWork.ScrobbleRepository.GetScrobbleErrors()) + .Where(e => e.Comment == "Unknown Series" || e.Comment == UnknownSeriesErrorMessage || e.Comment == AccessTokenErrorMessage) + .Select(e => e.SeriesId) + .ToList(); + + var readEvents = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.ChapterRead)) + .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) + .Where(e => !errors.Contains(e.SeriesId)) + .ToList(); + var addToWantToRead = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.AddWantToRead)) + .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) + .Where(e => !errors.Contains(e.SeriesId)) + .ToList(); + var removeWantToRead = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.RemoveWantToRead)) + .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) + .Where(e => !errors.Contains(e.SeriesId)) + .ToList(); + var ratingEvents = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.ScoreUpdated)) + .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) + .Where(e => !errors.Contains(e.SeriesId)) + .ToList(); + + await ClearErrors(errors); + + return new ScrobbleSyncContext + { + ReadEvents = readEvents, + RatingEvents = ratingEvents, + AddToWantToRead = addToWantToRead, + RemoveWantToRead = removeWantToRead, + Decisions = CalculateNetWantToReadDecisions(addToWantToRead, removeWantToRead), + RateLimits = [], + License = (await _unitOfWork.SettingsRepository.GetSettingAsync(ServerSettingKey.LicenseKey)).Value, + }; + } + + /// + /// Remove all events for which its series has errored + /// + /// + private async Task ClearErrors(List erroredSeries) + { + // Clear any events that are already on error table + var erroredEvents = await _unitOfWork.ScrobbleRepository.GetAllEventsWithSeriesIds(erroredSeries); + if (erroredEvents.Count > 0) + { + _unitOfWork.ScrobbleRepository.Remove(erroredEvents); + await _unitOfWork.CommitAsync(); + } + } + + /// + /// Filters users who can scrobble, sets their rate limit and updates the + /// + /// + /// + private async Task PrepareUsersToScrobble(ScrobbleSyncContext ctx) + { + // For all userIds, ensure that we can connect and have access + var usersToScrobble = ctx.ReadEvents.Select(r => r.AppUser) + .Concat(ctx.AddToWantToRead.Select(r => r.AppUser)) + .Concat(ctx.RemoveWantToRead.Select(r => r.AppUser)) + .Concat(ctx.RatingEvents.Select(r => r.AppUser)) + .Where(user => !string.IsNullOrEmpty(user.AniListAccessToken)) + .Where(user => user.UserPreferences.AniListScrobblingEnabled) + .DistinctBy(u => u.Id) + .ToList(); + + foreach (var user in usersToScrobble) + { + await SetAndCheckRateLimit(ctx.RateLimits, user, ctx.License); + } + + ctx.Users = usersToScrobble; + } + + /// + /// Cleans up any events that are due to bugs or legacy + /// + private async Task CleanupOldOrBuggedEvents() + { + try + { + var eventsWithoutAnilistToken = (await _unitOfWork.ScrobbleRepository.GetEvents()) + .Where(e => !e.IsProcessed && !e.IsErrored) + .Where(e => string.IsNullOrEmpty(e.AppUser.AniListAccessToken)); + + _unitOfWork.ScrobbleRepository.Remove(eventsWithoutAnilistToken); + await _unitOfWork.CommitAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "There was an exception when trying to delete old scrobble events when the user has no active token"); + } + } + + /// + /// This is a task that is run on a fixed schedule (every few hours or every day) that clears out the scrobble event table + /// and offloads the data to the API server which performs the syncing to the providers. + /// + [DisableConcurrentExecution(60 * 60 * 60)] + [AutomaticRetry(Attempts = 3, OnAttemptsExceeded = AttemptsExceededAction.Delete)] + public async Task ProcessUpdatesSinceLastSync() + { + var ctx = await PrepareScrobbleContext(); + if (ctx.TotalCount == 0) return; + + // Get all the applicable users to scrobble and set their rate limits + await PrepareUsersToScrobble(ctx); + + _logger.LogInformation("Scrobble Processing Details:" + + "\n Read Events: {ReadEventsCount}" + + "\n Want to Read Events: {WantToReadEventsCount}" + + "\n Rating Events: {RatingEventsCount}" + + "\n Users to Scrobble: {UsersToScrobbleCount}" + + "\n Total Events to Process: {TotalEvents}", + ctx.ReadEvents.Count, + ctx.Decisions.Count, + ctx.RatingEvents.Count, + ctx.Users.Count, + ctx.TotalCount); + + try + { + await ProcessReadEvents(ctx); + await ProcessRatingEvents(ctx); + await ProcessWantToReadRatingEvents(ctx); + } + catch (FlurlHttpException ex) + { + _logger.LogError(ex, "Kavita+ API or a Scrobble service may be experiencing an outage. Stopping sending data"); + return; + } + + + await SaveToDb(ctx.ProgressCounter, true); + _logger.LogInformation("Scrobbling Events is complete"); + + await CleanupOldOrBuggedEvents(); + } + + /// + /// Calculates the net want-to-read decisions by considering all events. + /// Returns events that represent the final state for each user/series pair. + /// + /// List of events for adding to want-to-read + /// List of events for removing from want-to-read + /// List of events that represent the final state (add or remove) + private static List CalculateNetWantToReadDecisions(List addEvents, List removeEvents) + { + // Create a dictionary to track the latest event for each user/series combination + var latestEvents = new Dictionary<(int SeriesId, int AppUserId), ScrobbleEvent>(); + + // Process all add events + foreach (var addEvent in addEvents) + { + var key = (addEvent.SeriesId, addEvent.AppUserId); + + if (latestEvents.TryGetValue(key, out var value) && addEvent.CreatedUtc <= value.CreatedUtc) continue; + + value = addEvent; + latestEvents[key] = value; + } + + // Process all remove events + foreach (var removeEvent in removeEvents) + { + var key = (removeEvent.SeriesId, removeEvent.AppUserId); + + if (latestEvents.TryGetValue(key, out var value) && removeEvent.CreatedUtc <= value.CreatedUtc) continue; + + value = removeEvent; + latestEvents[key] = value; + } + + // Return all events that represent the final state + return latestEvents.Values.ToList(); + } + + private async Task ProcessWantToReadRatingEvents(ScrobbleSyncContext ctx) + { + await ProcessEvents(ctx.Decisions, ctx, evt => Task.FromResult(new ScrobbleDto + { + Format = evt.Format, + AniListId = evt.AniListId, + MALId = (int?) evt.MalId, + ScrobbleEventType = evt.ScrobbleEventType, + ChapterNumber = evt.ChapterNumber, + VolumeNumber = (int?) evt.VolumeNumber, + AniListToken = evt.AppUser.AniListAccessToken ?? "", + SeriesName = evt.Series.Name, + LocalizedSeriesName = evt.Series.LocalizedName, + Year = evt.Series.Metadata.ReleaseYear + })); + + // After decisions, we need to mark all the want to read and remove from want to read as completed + var processedDecisions = ctx.Decisions.Where(d => d.IsProcessed).ToList(); + if (processedDecisions.Count > 0) + { + foreach (var scrobbleEvent in processedDecisions) + { + scrobbleEvent.IsProcessed = true; + scrobbleEvent.ProcessDateUtc = DateTime.UtcNow; + _unitOfWork.ScrobbleRepository.Update(scrobbleEvent); + } + await _unitOfWork.CommitAsync(); + } + } + + private async Task ProcessRatingEvents(ScrobbleSyncContext ctx) + { + await ProcessEvents(ctx.RatingEvents, ctx, evt => Task.FromResult(new ScrobbleDto + { + Format = evt.Format, + AniListId = evt.AniListId, + MALId = (int?) evt.MalId, + ScrobbleEventType = evt.ScrobbleEventType, + AniListToken = evt.AppUser.AniListAccessToken ?? "", + SeriesName = evt.Series.Name, + LocalizedSeriesName = evt.Series.LocalizedName, + Rating = evt.Rating, + Year = evt.Series.Metadata.ReleaseYear + })); + } + + private async Task ProcessReadEvents(ScrobbleSyncContext ctx) + { + // Recalculate the highest volume/chapter + foreach (var readEvt in ctx.ReadEvents) + { + // Note: this causes skewing in the scrobble history because it makes it look like there are duplicate events + readEvt.VolumeNumber = + (int) await _unitOfWork.AppUserProgressRepository.GetHighestFullyReadVolumeForSeries(readEvt.SeriesId, + readEvt.AppUser.Id); + readEvt.ChapterNumber = + await _unitOfWork.AppUserProgressRepository.GetHighestFullyReadChapterForSeries(readEvt.SeriesId, + readEvt.AppUser.Id); + _unitOfWork.ScrobbleRepository.Update(readEvt); + } + + await ProcessEvents(ctx.ReadEvents, ctx, async evt => new ScrobbleDto + { + Format = evt.Format, + AniListId = evt.AniListId, + MALId = (int?) evt.MalId, + ScrobbleEventType = evt.ScrobbleEventType, + ChapterNumber = evt.ChapterNumber, + VolumeNumber = (int?) evt.VolumeNumber, + AniListToken = evt.AppUser.AniListAccessToken ?? "", + SeriesName = evt.Series.Name, + LocalizedSeriesName = evt.Series.LocalizedName, + ScrobbleDateUtc = evt.LastModifiedUtc, + Year = evt.Series.Metadata.ReleaseYear, + StartedReadingDateUtc = await _unitOfWork.AppUserProgressRepository.GetFirstProgressForSeries(evt.SeriesId, evt.AppUser.Id), + LatestReadingDateUtc = await _unitOfWork.AppUserProgressRepository.GetLatestProgressForSeries(evt.SeriesId, evt.AppUser.Id), + }); + } + + private async Task ProcessEvents(IEnumerable events, ScrobbleSyncContext ctx, Func> createEvent) + { + foreach (var evt in events.Where(CanProcessScrobbleEvent)) + { + _logger.LogDebug("Processing Scrobble Events: {Count} / {Total}", ctx.ProgressCounter, ctx.TotalCount); + ctx.ProgressCounter++; + + if (TokenService.HasTokenExpired(evt.AppUser.AniListAccessToken)) + { + _unitOfWork.ScrobbleRepository.Attach(new ScrobbleError + { + Comment = "AniList token has expired and needs rotating. Scrobbling wont work until then", + Details = $"User: {evt.AppUser.UserName}, Expired: {TokenService.GetTokenExpiry(evt.AppUser.AniListAccessToken)}", + LibraryId = evt.LibraryId, + SeriesId = evt.SeriesId + }); + await _unitOfWork.CommitAsync(); + continue; + } + + if (evt.Series.IsBlacklisted || evt.Series.DontMatch) + { + _logger.LogInformation("Series {SeriesName} ({SeriesId}) can't be matched and thus cannot scrobble this event", evt.Series.Name, evt.SeriesId); + _unitOfWork.ScrobbleRepository.Attach(new ScrobbleError() + { + Comment = UnknownSeriesErrorMessage, + Details = $"User: {evt.AppUser.UserName} Series: {evt.Series.Name}", + LibraryId = evt.LibraryId, + SeriesId = evt.SeriesId + }); + evt.IsErrored = true; + evt.ErrorDetails = UnknownSeriesErrorMessage; + evt.ProcessDateUtc = DateTime.UtcNow; + _unitOfWork.ScrobbleRepository.Update(evt); + await _unitOfWork.CommitAsync(); + + continue; + } + + var count = await SetAndCheckRateLimit(ctx.RateLimits, evt.AppUser, ctx.License); + ctx.RateLimits[evt.AppUserId] = count; + if (count == 0) + { + if (ctx.Users.Count == 1) break; + continue; + } + + try + { + var data = await createEvent(evt); + // We need to handle the encoding and changing it to the old one until we can update the API layer to handle these + // which could happen in v0.8.3 + if (data.VolumeNumber is Parser.SpecialVolumeNumber or Parser.DefaultChapterNumber) + { + data.VolumeNumber = 0; + } + + if (data.ChapterNumber is Parser.DefaultChapterNumber) + { + data.ChapterNumber = 0; + } + ctx.RateLimits[evt.AppUserId] = await PostScrobbleUpdate(data, ctx.License, evt); + evt.IsProcessed = true; + evt.ProcessDateUtc = DateTime.UtcNow; + _unitOfWork.ScrobbleRepository.Update(evt); + } + catch (FlurlHttpException) + { + // If a flurl exception occured, the API is likely down. Kill processing + throw; + } + catch (KavitaException ex) + { + if (ex.Message.Contains("Access token is invalid")) + { + _logger.LogCritical(ex, "Access Token for AppUserId: {AppUserId} needs to be regenerated/renewed to continue scrobbling", evt.AppUser.Id); + evt.IsErrored = true; + evt.ErrorDetails = AccessTokenErrorMessage; + _unitOfWork.ScrobbleRepository.Update(evt); + } + } + catch (Exception ex) + { + /* Swallow as it's already been handled in PostScrobbleUpdate */ + _logger.LogError(ex, "Error processing event {EventId}", evt.Id); + } + + await SaveToDb(ctx.ProgressCounter); + + // We can use count to determine how long to sleep based on rate gain. It might be specific to AniList, but we can model others + var delay = count > 10 ? TimeSpan.FromMilliseconds(ScrobbleSleepTime) : TimeSpan.FromSeconds(60); + await Task.Delay(delay); + } + + await SaveToDb(ctx.ProgressCounter, true); + } + + private async Task SaveToDb(int progressCounter, bool force = false) + { + if ((force || progressCounter % 5 == 0) && _unitOfWork.HasChanges()) + { + _logger.LogDebug("Saving Scrobbling Event Processing Progress"); + await _unitOfWork.CommitAsync(); + } + } + public async Task PostScrobbleUpdate(ScrobbleDto data, string license, ScrobbleEvent evt) { try @@ -609,6 +1116,8 @@ public class ScrobblingService : IScrobblingService } } + #endregion + /// /// This will backfill events from existing progress history, ratings, and want to read for users that have a valid license /// @@ -768,371 +1277,6 @@ public class ScrobblingService : IScrobblingService await _unitOfWork.CommitAsync(); } - /// - /// This is a task that is ran on a fixed schedule (every few hours or every day) that clears out the scrobble event table - /// and offloads the data to the API server which performs the syncing to the providers. - /// - [DisableConcurrentExecution(60 * 60 * 60)] - [AutomaticRetry(Attempts = 3, OnAttemptsExceeded = AttemptsExceededAction.Delete)] - public async Task ProcessUpdatesSinceLastSync() - { - // Check how many scrobble events we have available then only do those. - var userRateLimits = new Dictionary(); - - var progressCounter = 0; - - var librariesWithScrobbling = (await _unitOfWork.LibraryRepository.GetLibrariesAsync()) - .AsEnumerable() - .Where(l => l.AllowScrobbling) - .Select(l => l.Id) - .ToImmutableHashSet(); - - var errors = (await _unitOfWork.ScrobbleRepository.GetScrobbleErrors()) - .Where(e => e.Comment == "Unknown Series" || e.Comment == UnknownSeriesErrorMessage || e.Comment == AccessTokenErrorMessage) - .Select(e => e.SeriesId) - .ToList(); - - var readEvents = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.ChapterRead)) - .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) - .Where(e => !errors.Contains(e.SeriesId)) - .ToList(); - var addToWantToRead = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.AddWantToRead)) - .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) - .Where(e => !errors.Contains(e.SeriesId)) - .ToList(); - var removeWantToRead = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.RemoveWantToRead)) - .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) - .Where(e => !errors.Contains(e.SeriesId)) - .ToList(); - var ratingEvents = (await _unitOfWork.ScrobbleRepository.GetByEvent(ScrobbleEventType.ScoreUpdated)) - .Where(e => librariesWithScrobbling.Contains(e.LibraryId)) - .Where(e => !errors.Contains(e.SeriesId)) - .ToList(); - - var decisions = CalculateNetWantToReadDecisions(addToWantToRead, removeWantToRead); - - // Clear any events that are already on error table - var erroredEvents = await _unitOfWork.ScrobbleRepository.GetAllEventsWithSeriesIds(errors); - if (erroredEvents.Count > 0) - { - _unitOfWork.ScrobbleRepository.Remove(erroredEvents); - await _unitOfWork.CommitAsync(); - } - - var totalEvents = readEvents.Count + decisions.Count + ratingEvents.Count; - if (totalEvents == 0) return; - - // Get all the applicable users to scrobble and set their rate limits - var license = await _unitOfWork.SettingsRepository.GetSettingAsync(ServerSettingKey.LicenseKey); - var usersToScrobble = await PrepareUsersToScrobble(readEvents, addToWantToRead, removeWantToRead, ratingEvents, userRateLimits, license); - - - _logger.LogInformation("Scrobble Processing Details:" + - "\n Read Events: {ReadEventsCount}" + - "\n Want to Read Events: {WantToReadEventsCount}" + - "\n Rating Events: {RatingEventsCount}" + - "\n Users to Scrobble: {UsersToScrobbleCount}" + - "\n Total Events to Process: {TotalEvents}", - readEvents.Count, - decisions.Count, - ratingEvents.Count, - usersToScrobble.Count, - totalEvents); - - try - { - progressCounter = await ProcessReadEvents(readEvents, userRateLimits, usersToScrobble, totalEvents, progressCounter); - - progressCounter = await ProcessRatingEvents(ratingEvents, userRateLimits, usersToScrobble, totalEvents, progressCounter); - - progressCounter = await ProcessWantToReadRatingEvents(decisions, userRateLimits, usersToScrobble, totalEvents, progressCounter); - } - catch (FlurlHttpException ex) - { - _logger.LogError(ex, "Kavita+ API or a Scrobble service may be experiencing an outage. Stopping sending data"); - return; - } - - - await SaveToDb(progressCounter, true); - _logger.LogInformation("Scrobbling Events is complete"); - - // Cleanup any events that are due to bugs or legacy - try - { - var eventsWithoutAnilistToken = (await _unitOfWork.ScrobbleRepository.GetEvents()) - .Where(e => !e.IsProcessed && !e.IsErrored) - .Where(e => string.IsNullOrEmpty(e.AppUser.AniListAccessToken)); - - _unitOfWork.ScrobbleRepository.Remove(eventsWithoutAnilistToken); - await _unitOfWork.CommitAsync(); - } - catch (Exception ex) - { - _logger.LogError(ex, "There was an exception when trying to delete old scrobble events when the user has no active token"); - } - } - - /// - /// Calculates the net want-to-read decisions by considering all events. - /// Returns events that represent the final state for each user/series pair. - /// - /// List of events for adding to want-to-read - /// List of events for removing from want-to-read - /// List of events that represent the final state (add or remove) - private static List CalculateNetWantToReadDecisions(List addEvents, List removeEvents) - { - // Create a dictionary to track the latest event for each user/series combination - var latestEvents = new Dictionary<(int SeriesId, int AppUserId), ScrobbleEvent>(); - - // Process all add events - foreach (var addEvent in addEvents) - { - var key = (addEvent.SeriesId, addEvent.AppUserId); - - if (latestEvents.TryGetValue(key, out var value) && addEvent.CreatedUtc <= value.CreatedUtc) continue; - - value = addEvent; - latestEvents[key] = value; - } - - // Process all remove events - foreach (var removeEvent in removeEvents) - { - var key = (removeEvent.SeriesId, removeEvent.AppUserId); - - if (latestEvents.TryGetValue(key, out var value) && removeEvent.CreatedUtc <= value.CreatedUtc) continue; - - value = removeEvent; - latestEvents[key] = value; - } - - // Return all events that represent the final state - return latestEvents.Values.ToList(); - } - - private async Task ProcessWantToReadRatingEvents(List decisions, Dictionary userRateLimits, List usersToScrobble, int totalEvents, int progressCounter) - { - progressCounter = await ProcessEvents(decisions, userRateLimits, usersToScrobble.Count, progressCounter, - totalEvents, evt => Task.FromResult(new ScrobbleDto() - { - Format = evt.Format, - AniListId = evt.AniListId, - MALId = (int?) evt.MalId, - ScrobbleEventType = evt.ScrobbleEventType, - ChapterNumber = evt.ChapterNumber, - VolumeNumber = (int?) evt.VolumeNumber, - AniListToken = evt.AppUser.AniListAccessToken, - SeriesName = evt.Series.Name, - LocalizedSeriesName = evt.Series.LocalizedName, - Year = evt.Series.Metadata.ReleaseYear - })); - - // After decisions, we need to mark all the want to read and remove from want to read as completed - if (decisions.Any(d => d.IsProcessed)) - { - foreach (var scrobbleEvent in decisions.Where(d => d.IsProcessed)) - { - scrobbleEvent.IsProcessed = true; - scrobbleEvent.ProcessDateUtc = DateTime.UtcNow; - _unitOfWork.ScrobbleRepository.Update(scrobbleEvent); - } - await _unitOfWork.CommitAsync(); - } - - return progressCounter; - } - - private async Task ProcessRatingEvents(List ratingEvents, Dictionary userRateLimits, List usersToScrobble, - int totalEvents, int progressCounter) - { - return await ProcessEvents(ratingEvents, userRateLimits, usersToScrobble.Count, progressCounter, - totalEvents, evt => Task.FromResult(new ScrobbleDto() - { - Format = evt.Format, - AniListId = evt.AniListId, - MALId = (int?) evt.MalId, - ScrobbleEventType = evt.ScrobbleEventType, - AniListToken = evt.AppUser.AniListAccessToken, - SeriesName = evt.Series.Name, - LocalizedSeriesName = evt.Series.LocalizedName, - Rating = evt.Rating, - Year = evt.Series.Metadata.ReleaseYear - })); - } - - - private async Task ProcessReadEvents(List readEvents, Dictionary userRateLimits, List usersToScrobble, int totalEvents, - int progressCounter) - { - // Recalculate the highest volume/chapter - foreach (var readEvt in readEvents) - { - // Note: this causes skewing in the scrobble history because it makes it look like there are duplicate events - readEvt.VolumeNumber = - (int) await _unitOfWork.AppUserProgressRepository.GetHighestFullyReadVolumeForSeries(readEvt.SeriesId, - readEvt.AppUser.Id); - readEvt.ChapterNumber = - await _unitOfWork.AppUserProgressRepository.GetHighestFullyReadChapterForSeries(readEvt.SeriesId, - readEvt.AppUser.Id); - _unitOfWork.ScrobbleRepository.Update(readEvt); - } - - return await ProcessEvents(readEvents, userRateLimits, usersToScrobble.Count, progressCounter, totalEvents, - async evt => new ScrobbleDto() - { - Format = evt.Format, - AniListId = evt.AniListId, - MALId = (int?) evt.MalId, - ScrobbleEventType = evt.ScrobbleEventType, - ChapterNumber = evt.ChapterNumber, - VolumeNumber = (int?) evt.VolumeNumber, - AniListToken = evt.AppUser.AniListAccessToken!, - SeriesName = evt.Series.Name, - LocalizedSeriesName = evt.Series.LocalizedName, - ScrobbleDateUtc = evt.LastModifiedUtc, - Year = evt.Series.Metadata.ReleaseYear, - StartedReadingDateUtc = await _unitOfWork.AppUserProgressRepository.GetFirstProgressForSeries(evt.SeriesId, evt.AppUser.Id), - LatestReadingDateUtc = await _unitOfWork.AppUserProgressRepository.GetLatestProgressForSeries(evt.SeriesId, evt.AppUser.Id), - }); - } - - - private async Task> PrepareUsersToScrobble(List readEvents, List addToWantToRead, List removeWantToRead, List ratingEvents, - Dictionary userRateLimits, ServerSetting license) - { - // For all userIds, ensure that we can connect and have access - var usersToScrobble = readEvents.Select(r => r.AppUser) - .Concat(addToWantToRead.Select(r => r.AppUser)) - .Concat(removeWantToRead.Select(r => r.AppUser)) - .Concat(ratingEvents.Select(r => r.AppUser)) - .Where(user => !string.IsNullOrEmpty(user.AniListAccessToken)) - .Where(user => user.UserPreferences.AniListScrobblingEnabled) - .DistinctBy(u => u.Id) - .ToList(); - - foreach (var user in usersToScrobble) - { - await SetAndCheckRateLimit(userRateLimits, user, license.Value); - } - - return usersToScrobble; - } - - - private async Task ProcessEvents(IEnumerable events, Dictionary userRateLimits, - int usersToScrobble, int progressCounter, int totalProgress, Func> createEvent) - { - var license = await _unitOfWork.SettingsRepository.GetSettingAsync(ServerSettingKey.LicenseKey); - foreach (var evt in events) - { - _logger.LogDebug("Processing Reading Events: {Count} / {Total}", progressCounter, totalProgress); - progressCounter++; - - // Check if this media item can even be processed for this user - if (!CanProcessScrobbleEvent(evt)) - { - continue; - } - - if (TokenService.HasTokenExpired(evt.AppUser.AniListAccessToken)) - { - _unitOfWork.ScrobbleRepository.Attach(new ScrobbleError() - { - Comment = "AniList token has expired and needs rotating. Scrobbling wont work until then", - Details = $"User: {evt.AppUser.UserName}, Expired: {TokenService.GetTokenExpiry(evt.AppUser.AniListAccessToken)}", - LibraryId = evt.LibraryId, - SeriesId = evt.SeriesId - }); - await _unitOfWork.CommitAsync(); - continue; - } - - if (evt.Series.IsBlacklisted || evt.Series.DontMatch) - { - _logger.LogInformation("Series {SeriesName} ({SeriesId}) can't be matched and thus cannot scrobble this event", evt.Series.Name, evt.SeriesId); - _unitOfWork.ScrobbleRepository.Attach(new ScrobbleError() - { - Comment = UnknownSeriesErrorMessage, - Details = $"User: {evt.AppUser.UserName} Series: {evt.Series.Name}", - LibraryId = evt.LibraryId, - SeriesId = evt.SeriesId - }); - evt.IsErrored = true; - evt.ErrorDetails = UnknownSeriesErrorMessage; - evt.ProcessDateUtc = DateTime.UtcNow; - _unitOfWork.ScrobbleRepository.Update(evt); - await _unitOfWork.CommitAsync(); - - continue; - } - - var count = await SetAndCheckRateLimit(userRateLimits, evt.AppUser, license.Value); - userRateLimits[evt.AppUserId] = count; - if (count == 0) - { - if (usersToScrobble == 1) break; - continue; - } - - try - { - var data = await createEvent(evt); - // We need to handle the encoding and changing it to the old one until we can update the API layer to handle these - // which could happen in v0.8.3 - if (data.VolumeNumber is Parser.SpecialVolumeNumber or Parser.DefaultChapterNumber) - { - data.VolumeNumber = 0; - } - - if (data.ChapterNumber is Parser.DefaultChapterNumber) - { - data.ChapterNumber = 0; - } - userRateLimits[evt.AppUserId] = await PostScrobbleUpdate(data, license.Value, evt); - evt.IsProcessed = true; - evt.ProcessDateUtc = DateTime.UtcNow; - _unitOfWork.ScrobbleRepository.Update(evt); - } - catch (FlurlHttpException) - { - // If a flurl exception occured, the API is likely down. Kill processing - throw; - } - catch (KavitaException ex) - { - if (ex.Message.Contains("Access token is invalid")) - { - _logger.LogCritical(ex, "Access Token for AppUserId: {AppUserId} needs to be regenerated/renewed to continue scrobbling", evt.AppUser.Id); - evt.IsErrored = true; - evt.ErrorDetails = AccessTokenErrorMessage; - _unitOfWork.ScrobbleRepository.Update(evt); - } - } - catch (Exception ex) - { - /* Swallow as it's already been handled in PostScrobbleUpdate */ - _logger.LogError(ex, "Error processing event {EventId}", evt.Id); - } - await SaveToDb(progressCounter); - // We can use count to determine how long to sleep based on rate gain. It might be specific to AniList, but we can model others - var delay = count > 10 ? TimeSpan.FromMilliseconds(ScrobbleSleepTime) : TimeSpan.FromSeconds(60); - await Task.Delay(delay); - } - - await SaveToDb(progressCounter, true); - return progressCounter; - } - - private async Task SaveToDb(int progressCounter, bool force = false) - { - if ((force || progressCounter % 5 == 0) && _unitOfWork.HasChanges()) - { - _logger.LogDebug("Saving Scrobbling Event Processing Progress"); - await _unitOfWork.CommitAsync(); - } - } - private static bool CanProcessScrobbleEvent(ScrobbleEvent readEvent) { @@ -1160,78 +1304,6 @@ public class ScrobblingService : IScrobblingService return providers; } - /// - /// Extract an Id from a given weblink - /// - /// - /// - /// - public static T? ExtractId(string webLinks, string website) - { - var index = WeblinkExtractionMap[website]; - foreach (var webLink in webLinks.Split(',')) - { - if (!webLink.StartsWith(website)) continue; - var tokens = webLink.Split(website)[1].Split('/'); - var value = tokens[index]; - if (typeof(T) == typeof(int?)) - { - if (int.TryParse(value, CultureInfo.InvariantCulture, out var intValue)) - return (T)(object)intValue; - } - else if (typeof(T) == typeof(int)) - { - if (int.TryParse(value, CultureInfo.InvariantCulture, out var intValue)) - return (T)(object)intValue; - return default; - } - else if (typeof(T) == typeof(long?)) - { - if (long.TryParse(value, CultureInfo.InvariantCulture, out var longValue)) - return (T)(object)longValue; - } - else if (typeof(T) == typeof(string)) - { - return (T)(object)value; - } - } - - return default; - } - - /// - /// Generate a URL from a given ID and website - /// - /// Type of the ID (e.g., int, long, string) - /// The ID to embed in the URL - /// The base website URL - /// The generated URL or null if the website is not supported - public static string? GenerateUrl(T id, string website) - { - if (!WeblinkExtractionMap.ContainsKey(website)) - { - return null; // Unsupported website - } - - if (Equals(id, default(T))) - { - throw new ArgumentNullException(nameof(id), "ID cannot be null."); - } - - // Ensure the type of the ID matches supported types - if (typeof(T) == typeof(int) || typeof(T) == typeof(long) || typeof(T) == typeof(string)) - { - return $"{website}{id}"; - } - - throw new ArgumentException("Unsupported ID type. Supported types are int, long, and string.", nameof(id)); - } - - public static string CreateUrl(string url, long? id) - { - return id is null or 0 ? string.Empty : $"{url}{id}/"; - } - private async Task SetAndCheckRateLimit(IDictionary userRateLimits, AppUser user, string license) {