diff --git a/LiveBot.Core/Repository/Static/Queues.cs b/LiveBot.Core/Repository/Static/Queues.cs index ba9ca4a..5ec4d91 100644 --- a/LiveBot.Core/Repository/Static/Queues.cs +++ b/LiveBot.Core/Repository/Static/Queues.cs @@ -4,13 +4,15 @@ namespace LiveBot.Core.Repository.Static { public class Queues { - public static string QueueURL - { - get => $"rabbitmq://{Environment.GetEnvironmentVariable("RabbitMQ_URL")}"; - } + private static readonly string _queueHost = Environment.GetEnvironmentVariable("RabbitMQ_URL") + ?? throw new InvalidOperationException("Required environment variable 'RabbitMQ_URL' is not set."); - public static readonly string QueueUsername = Environment.GetEnvironmentVariable("RabbitMQ_Username"); - public static readonly string QueuePassword = Environment.GetEnvironmentVariable("RabbitMQ_Password"); + public static string QueueURL => $"rabbitmq://{_queueHost}"; + + public static readonly string QueueUsername = Environment.GetEnvironmentVariable("RabbitMQ_Username") + ?? throw new InvalidOperationException("Required environment variable 'RabbitMQ_Username' is not set."); + public static readonly string QueuePassword = Environment.GetEnvironmentVariable("RabbitMQ_Password") + ?? throw new InvalidOperationException("Required environment variable 'RabbitMQ_Password' is not set."); public static readonly ushort PrefetchCount = 32; diff --git a/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordChannelDeleteConsumer.cs b/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordChannelDeleteConsumer.cs index 4d370f9..3910634 100644 --- a/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordChannelDeleteConsumer.cs +++ b/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordChannelDeleteConsumer.cs @@ -26,11 +26,18 @@ public async Task Consume(ConsumeContext context) var subscriptions = await _work.SubscriptionRepository.FindAsync(i => i.DiscordChannel.DiscordId == message.ChannelId && i.DiscordGuild.DiscordId == message.GuildId); foreach (var subscription in subscriptions) { - _logger.LogInformation("Removing Stream Subscription for {Username} on {ServiceType} because channel was delete {GuildId} {ChannelId} {ChannelName} - {SubscriptionId}", subscription.User.Username, subscription.User.ServiceType, channel.DiscordGuild.DiscordId, channel.DiscordId, channel.Name, subscription.Id); - var rolesToMention = await _work.RoleToMentionRepository.FindAsync(i => i.StreamSubscription == subscription); - foreach (var roleToMention in rolesToMention) - await _work.RoleToMentionRepository.RemoveAsync(roleToMention.Id); - await _work.SubscriptionRepository.RemoveAsync(subscription.Id); + try + { + _logger.LogInformation("Removing Stream Subscription for {Username} on {ServiceType} because channel was delete {GuildId} {ChannelId} {ChannelName} - {SubscriptionId}", subscription.User.Username, subscription.User.ServiceType, channel.DiscordGuild.DiscordId, channel.DiscordId, channel.Name, subscription.Id); + var rolesToMention = await _work.RoleToMentionRepository.FindAsync(i => i.StreamSubscription == subscription); + foreach (var roleToMention in rolesToMention) + await _work.RoleToMentionRepository.RemoveAsync(roleToMention.Id); + await _work.SubscriptionRepository.RemoveAsync(subscription.Id); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to remove Stream Subscription {SubscriptionId} for channel {ChannelId}", subscription.Id, channel.DiscordId); + } } var guildConfig = await _work.GuildConfigRepository.SingleOrDefaultAsync(i => i.DiscordGuild.DiscordId == message.GuildId); diff --git a/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordGuildDeleteConsumer.cs b/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordGuildDeleteConsumer.cs index fd497db..c2c1489 100644 --- a/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordGuildDeleteConsumer.cs +++ b/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordGuildDeleteConsumer.cs @@ -33,16 +33,32 @@ public async Task Consume(ConsumeContext context) // Remove Stream Subscriptions for this Guild foreach (var streamSubscription in streamSubscriptions) { - _logger.LogInformation("Removing Stream Subscription for {Username} on {ServiceType} because have left the Guild {GuildId} - {SubscriptionId}", streamSubscription.User.Username, streamSubscription.User.ServiceType, discordGuild.DiscordId, streamSubscription.Id); - var rolesToMention = await _work.RoleToMentionRepository.FindAsync(i => i.StreamSubscription == streamSubscription); - foreach (var roleToMention in rolesToMention) - await _work.RoleToMentionRepository.RemoveAsync(roleToMention.Id); - await _work.SubscriptionRepository.RemoveAsync(streamSubscription.Id); + try + { + _logger.LogInformation("Removing Stream Subscription for {Username} on {ServiceType} because have left the Guild {GuildId} - {SubscriptionId}", streamSubscription.User.Username, streamSubscription.User.ServiceType, discordGuild.DiscordId, streamSubscription.Id); + var rolesToMention = await _work.RoleToMentionRepository.FindAsync(i => i.StreamSubscription == streamSubscription); + foreach (var roleToMention in rolesToMention) + await _work.RoleToMentionRepository.RemoveAsync(roleToMention.Id); + await _work.SubscriptionRepository.RemoveAsync(streamSubscription.Id); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to remove Stream Subscription {SubscriptionId} for guild {GuildId}", streamSubscription.Id, discordGuild.DiscordId); + } } // Remove Discord Channels for this Guild foreach (var discordChannel in discordChannels) - await _work.ChannelRepository.RemoveAsync(discordChannel.Id); + { + try + { + await _work.ChannelRepository.RemoveAsync(discordChannel.Id); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to remove Discord Channel {ChannelId} for guild {GuildId}", discordChannel.DiscordId, discordGuild.DiscordId); + } + } // Remove Discord Guild await _work.GuildRepository.RemoveAsync(discordGuild.Id); diff --git a/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordMemberLiveConsumer.cs b/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordMemberLiveConsumer.cs index 60cfada..2a4ef5f 100644 --- a/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordMemberLiveConsumer.cs +++ b/LiveBot.Discord.SlashCommands/Consumers/Discord/DiscordMemberLiveConsumer.cs @@ -57,19 +57,22 @@ public async Task Consume(ConsumeContext context) if (userGame == null) return; + var seenGuildIds = new HashSet(); var mutualGuilds = new List(); foreach (DiscordSocketClient shard in _client.Shards) { foreach (SocketGuild guild in shard.Guilds) { - if (guild.GetUser(user.Id) != null) - if (!mutualGuilds.Any(i => i.Id == guild.Id)) - mutualGuilds.Add(guild); + if (guild.GetUser(user.Id) != null && seenGuildIds.Add(guild.Id)) + mutualGuilds.Add(guild); } } foreach (var guild in mutualGuilds) { + try + { + if (guild?.Id == null) continue; var discordGuild = await _work.GuildRepository.SingleOrDefaultAsync(i => i.DiscordId == guild.Id); @@ -241,6 +244,12 @@ public async Task Consume(ConsumeContext context) ); } } + + } // end per-guild try + catch (Exception ex) + { + _logger.LogError(ex, "Unhandled error processing member live notification for guild {GuildId}", guild?.Id); + } } } } diff --git a/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOfflineConsumer.cs b/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOfflineConsumer.cs index c313808..f2c98da 100644 --- a/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOfflineConsumer.cs +++ b/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOfflineConsumer.cs @@ -39,7 +39,7 @@ public async Task Consume(ConsumeContext context) var streamSubscriptions = await GetStreamSubscriptionsAsync(streamUser); if (!streamSubscriptions.Any()) { - _streamOfflineLogger.LogInformation("No subscriptions found for {Username} ({UserId})", + _streamOfflineLogger.LogDebug("No subscriptions found for {Username} ({UserId})", user.Username, user.Id); return; } diff --git a/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOnlineConsumer.cs b/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOnlineConsumer.cs index 69ecb8e..f656e0c 100644 --- a/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOnlineConsumer.cs +++ b/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamOnlineConsumer.cs @@ -289,10 +289,22 @@ private StreamNotification CreateStreamNotification(ILiveBotStream stream, Strea private async Task ObtainLock(string recordId, Guid lockGuid, TimeSpan lockTimeout) { + var deadline = DateTime.UtcNow.Add(lockTimeout); + int delayMs = 50; bool obtainedLock; do { obtainedLock = await _cache.ObtainLockAsync(recordId: recordId, identifier: lockGuid, expiryTime: lockTimeout); + if (!obtainedLock) + { + if (DateTime.UtcNow >= deadline) + { + _streamOnlineLogger.LogWarning("Timed out waiting for lock on {RecordId} after {Timeout}s", recordId, lockTimeout.TotalSeconds); + return false; + } + await Task.Delay(delayMs); + delayMs = Math.Min(delayMs * 2, 1000); + } } while (!obtainedLock); @@ -542,14 +554,13 @@ private double CalculateNotificationDelay(StreamNotification streamNotification, private async Task SendDiscordMessage(SocketTextChannel channel, string notificationMessage, Embed embed, TimeSpan lockTimeout) { - CancellationTokenSource cancellationToken = new(); - cancellationToken.CancelAfter((int)lockTimeout.TotalMilliseconds); + using var cts = new CancellationTokenSource((int)lockTimeout.TotalMilliseconds); var messageRequestOptions = new RequestOptions() { RetryMode = RetryMode.AlwaysFail, Timeout = (int)lockTimeout.TotalMilliseconds, - CancelToken = cancellationToken.Token + CancelToken = cts.Token }; return await channel.SendMessageAsync(text: notificationMessage, embed: embed, options: messageRequestOptions); diff --git a/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamUpdateConsumer.cs b/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamUpdateConsumer.cs index 3d83f89..f7c33a4 100644 --- a/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamUpdateConsumer.cs +++ b/LiveBot.Discord.SlashCommands/Consumers/Streams/StreamUpdateConsumer.cs @@ -33,17 +33,35 @@ public async Task Consume(ConsumeContext context) if (monitor == null) return; - ILiveBotUser user = stream.User ?? await monitor.GetUserById(stream.UserId); + ILiveBotUser? user = stream.User ?? await monitor.GetUserById(stream.UserId); + if (user == null) + { + _logger.LogWarning("Could not resolve user for stream {StreamId} on {ServiceType} during stream update; skipping", + stream.Id, stream.ServiceType); + return; + } + ILiveBotGame game = stream.Game ?? await monitor.GetGame(stream.GameId); - Expression> templateGamePredicate = (i => i.ServiceType == stream.ServiceType && i.SourceId == "0"); - var templateGame = await _work.GameRepository.SingleOrDefaultAsync(templateGamePredicate); var streamUser = await _work.UserRepository.SingleOrDefaultAsync(i => i.ServiceType == stream.ServiceType && i.SourceID == user.Id); + + if (streamUser == null) + { + _logger.LogWarning("StreamUser not found for {UserId} on {ServiceType} during stream update; skipping", + user.Id, stream.ServiceType); + return; + } + var streamSubscriptions = await _work.SubscriptionRepository.FindAsync(i => i.User == streamUser); - StreamGame streamGame; + if (!streamSubscriptions.Any()) + return; + + // Ensure the game record exists in the database + Expression> templateGamePredicate = (i => i.ServiceType == stream.ServiceType && i.SourceId == "0"); if (game.Id == "0" || string.IsNullOrEmpty(game.Id)) { + var templateGame = await _work.GameRepository.SingleOrDefaultAsync(templateGamePredicate); if (templateGame == null) { StreamGame newStreamGame = new StreamGame @@ -54,9 +72,7 @@ public async Task Consume(ConsumeContext context) ThumbnailURL = "" }; await _work.GameRepository.AddOrUpdateAsync(newStreamGame, templateGamePredicate); - templateGame = await _work.GameRepository.SingleOrDefaultAsync(templateGamePredicate); } - streamGame = templateGame; } else { @@ -68,12 +84,8 @@ public async Task Consume(ConsumeContext context) ThumbnailURL = game.ThumbnailURL }; await _work.GameRepository.AddOrUpdateAsync(newStreamGame, i => i.ServiceType == stream.ServiceType && i.SourceId == stream.GameId); - streamGame = await _work.GameRepository.SingleOrDefaultAsync(i => i.ServiceType == stream.ServiceType && i.SourceId == stream.GameId); } - if (!streamSubscriptions.Any()) - return; - bool hasValidSubscriptions = false; foreach (StreamSubscription streamSubscription in streamSubscriptions) diff --git a/LiveBot.Discord.SlashCommands/Helpers/MonitorUtils.cs b/LiveBot.Discord.SlashCommands/Helpers/MonitorUtils.cs index a433ae1..ad7cf61 100644 --- a/LiveBot.Discord.SlashCommands/Helpers/MonitorUtils.cs +++ b/LiveBot.Discord.SlashCommands/Helpers/MonitorUtils.cs @@ -45,7 +45,10 @@ internal static Embed GetSubscriptionEmbed(SocketGuild guild, StreamSubscription { var roles = new List(); if (subscription.RolesToMention.Any()) - roles = subscription.RolesToMention.Select(i => guild.GetRole(i.DiscordRoleId)).ToList(); + roles = subscription.RolesToMention + .Select(i => guild.GetRole(i.DiscordRoleId)) + .Where(r => r != null) + .ToList(); var roleStrings = new List(); foreach (var role in roles.OrderBy(i => i.Name)) @@ -86,7 +89,8 @@ internal static async Task ConsolidateRoleMentions(IUnitOfWork work, Strea if (currentRolesToMention.Any()) { var rolesToDelete = currentRolesToMention.Where(i => !roleIds.Contains(i.DiscordRoleId)).ToList(); - rolesToDelete.ForEach(async i => await work.RoleToMentionRepository.RemoveAsync(i.Id)); + foreach (var roleToDelete in rolesToDelete) + await work.RoleToMentionRepository.RemoveAsync(roleToDelete.Id); RolesUpdated = true; } foreach (var roleId in roleIds) diff --git a/LiveBot.Discord.SlashCommands/Helpers/NotificationHelpers.cs b/LiveBot.Discord.SlashCommands/Helpers/NotificationHelpers.cs index ff4d4fe..8ea72b2 100644 --- a/LiveBot.Discord.SlashCommands/Helpers/NotificationHelpers.cs +++ b/LiveBot.Discord.SlashCommands/Helpers/NotificationHelpers.cs @@ -10,8 +10,10 @@ namespace LiveBot.Discord.SlashCommands.Helpers { public static class NotificationHelpers { - public static string EscapeSpecialDiscordCharacters(string input) + public static string EscapeSpecialDiscordCharacters(string? input) { + if (string.IsNullOrEmpty(input)) + return string.Empty; return Format.Sanitize(input); } @@ -121,10 +123,10 @@ public static Embed GetStreamEmbed(ILiveBotStream stream, ILiveBotUser user, ILi .WithThumbnailUrl(user.AvatarURL); // Add Game field - builder.AddField(name: "Game", value: game.Name, inline: true); + builder.AddField(name: "Game", value: string.IsNullOrWhiteSpace(game.Name) ? "[Not Set]" : game.Name, inline: true); // Add Stream URL field - builder.AddField(name: "Stream", value: stream.StreamURL, inline: true); + builder.AddField(name: "Stream", value: string.IsNullOrWhiteSpace(stream.StreamURL) ? user.ProfileURL : stream.StreamURL, inline: true); // Add Status Field //builder.AddField(name: "Status", value: "", inline: false); diff --git a/LiveBot.Discord.SlashCommands/InteractionHandler.cs b/LiveBot.Discord.SlashCommands/InteractionHandler.cs index 5a35e0b..547a2ec 100644 --- a/LiveBot.Discord.SlashCommands/InteractionHandler.cs +++ b/LiveBot.Discord.SlashCommands/InteractionHandler.cs @@ -46,7 +46,10 @@ internal async Task ReadyAsync(DiscordSocketClient client) _logger.LogInformation(message: "Finished registering AdminModule with shard {ShardId}", client.ShardId); RegisteredCommands = true; } - catch { } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to register AdminModule on shard {ShardId}", client.ShardId); + } } } @@ -65,8 +68,9 @@ internal async Task HandleInteraction(SocketInteraction interaction) await Task.CompletedTask; } - catch + catch (Exception ex) { + _logger.LogError(ex, "Unhandled exception executing interaction {InteractionType}", interaction.Type); // If Slash Command execution fails it is most likely that the original interaction acknowledgement will persist. It is a good idea to delete the original // response, or at least let the user know that something went wrong during the command execution. if (interaction.Type is InteractionType.ApplicationCommand) @@ -89,7 +93,10 @@ internal async Task InteractionExecuted(ICommandInfo commandInfo, IInteractionCo await context.Interaction.FollowupAsync(ephemeral: true, embed: embed); } - catch { } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to send error followup for {CommandName}", commandInfo?.Name ?? "Unknown"); + } } var logLevel = LogLevel.Information; diff --git a/LiveBot.Discord.SlashCommands/LiveBotDiscordEventHandlers.cs b/LiveBot.Discord.SlashCommands/LiveBotDiscordEventHandlers.cs index cb6c4aa..602f42c 100644 --- a/LiveBot.Discord.SlashCommands/LiveBotDiscordEventHandlers.cs +++ b/LiveBot.Discord.SlashCommands/LiveBotDiscordEventHandlers.cs @@ -73,8 +73,8 @@ public async Task GuildJoined(SocketGuild guild) public async Task GuildUpdated(SocketGuild beforeGuild, SocketGuild afterGuild) { if ( - beforeGuild.Name.Equals(afterGuild.Name, StringComparison.InvariantCultureIgnoreCase) && - beforeGuild.IconUrl.Equals(afterGuild.IconUrl, StringComparison.InvariantCultureIgnoreCase) + string.Equals(beforeGuild.Name, afterGuild.Name, StringComparison.InvariantCultureIgnoreCase) && + string.Equals(beforeGuild.IconUrl, afterGuild.IconUrl, StringComparison.InvariantCultureIgnoreCase) ) return; var context = new DiscordGuildUpdate { GuildId = afterGuild.Id, GuildName = afterGuild.Name, IconUrl = afterGuild.IconUrl }; diff --git a/LiveBot.Discord.SlashCommands/Modules/MonitorModule.cs b/LiveBot.Discord.SlashCommands/Modules/MonitorModule.cs index fbc46b3..c2f4832 100644 --- a/LiveBot.Discord.SlashCommands/Modules/MonitorModule.cs +++ b/LiveBot.Discord.SlashCommands/Modules/MonitorModule.cs @@ -612,6 +612,9 @@ private ILiveBotMonitor GetMonitor(Uri uri) private async Task GetStreamUserAsync(ILiveBotMonitor monitor, Uri uri) { var monitorUser = await monitor.GetUser(profileURL: uri.AbsoluteUri); + if (monitorUser == null) + throw new ArgumentException($"Could not find a user for the provided URL: {Format.EscapeUrl(uri.AbsoluteUri)}"); + StreamUser streamUser = new() { ServiceType = monitorUser.ServiceType, diff --git a/LiveBot.Repository/ModelRepository.cs b/LiveBot.Repository/ModelRepository.cs index a325a69..152063e 100644 --- a/LiveBot.Repository/ModelRepository.cs +++ b/LiveBot.Repository/ModelRepository.cs @@ -234,11 +234,18 @@ public virtual async Task AddOrUpdateAsync(TEntity entity, Expression @@ -254,6 +261,8 @@ public async Task RemoveAsync(long Id) { await syncLock.WaitAsync().ConfigureAwait(false); TEntity entity = await DbSet.FindAsync(Id).ConfigureAwait(false); + if (entity == null) + return; DbSet.Remove(entity); await Context.SaveChangesAsync().ConfigureAwait(false); } diff --git a/LiveBot.Watcher.Twitch/DatabaseSetup.cs b/LiveBot.Watcher.Twitch/DatabaseSetup.cs index 5481f76..3a27e4e 100644 --- a/LiveBot.Watcher.Twitch/DatabaseSetup.cs +++ b/LiveBot.Watcher.Twitch/DatabaseSetup.cs @@ -25,7 +25,7 @@ public static WebApplicationBuilder SetupLiveBot(this WebApplicationBuilder buil builder.Host.UseSerilog((ctx, lc) => lc - .MinimumLevel.Information() + .MinimumLevel.Is(IsDebug ? Serilog.Events.LogEventLevel.Debug : Serilog.Events.LogEventLevel.Information) .WriteTo.Console(outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] {Message:lj}{NewLine}{Exception}") .WriteTo.DatadogLogs(apiKey: apiKey, source: source, service: service, host: hostname, tags: tags) .Enrich.FromLogContext() diff --git a/LiveBot.Watcher.Twitch/TwitchMonitor.cs b/LiveBot.Watcher.Twitch/TwitchMonitor.cs index 8d933e6..6593084 100644 --- a/LiveBot.Watcher.Twitch/TwitchMonitor.cs +++ b/LiveBot.Watcher.Twitch/TwitchMonitor.cs @@ -124,52 +124,70 @@ public void Monitor_OnServiceStarted(object? sender, OnServiceStartedArgs e) public async void Monitor_OnStreamOnline(object? sender, OnStreamOnlineArgs e) { if (!IsWatcher) return; - - ILiveBotUser? user = await GetUserById(e.Stream.UserId); - if (user == null) + try { - _logger.LogWarning("Could not find user {UserId} for online event", e.Stream.UserId); - return; + ILiveBotUser? user = await GetUserById(e.Stream.UserId); + if (user == null) + { + _logger.LogWarning("Could not find user {UserId} for online event", e.Stream.UserId); + return; + } + + ILiveBotGame game = await GetGame(e.Stream.GameId); + ILiveBotStream stream = new TwitchStream(BaseURL, ServiceType, e.Stream, user, game); + + await PublishStreamOnline(stream); + } + catch (Exception ex) + { + _logger.LogError(ex, "Unhandled exception in Monitor_OnStreamOnline for user {UserId}", e.Stream.UserId); } - - ILiveBotGame game = await GetGame(e.Stream.GameId); - ILiveBotStream stream = new TwitchStream(BaseURL, ServiceType, e.Stream, user, game); - - await PublishStreamOnline(stream); } public async void Monitor_OnStreamUpdate(object? sender, OnStreamUpdateArgs e) { if (!IsWatcher) return; - - ILiveBotUser? user = await GetUserById(e.Stream.UserId); - if (user == null) + try + { + ILiveBotUser? user = await GetUserById(e.Stream.UserId); + if (user == null) + { + _logger.LogWarning("Could not find user {UserId} for update event", e.Stream.UserId); + return; + } + + ILiveBotGame game = await GetGame(e.Stream.GameId); + ILiveBotStream stream = new TwitchStream(BaseURL, ServiceType, e.Stream, user, game); + + await PublishStreamUpdate(stream); + } + catch (Exception ex) { - _logger.LogWarning("Could not find user {UserId} for update event", e.Stream.UserId); - return; + _logger.LogError(ex, "Unhandled exception in Monitor_OnStreamUpdate for user {UserId}", e.Stream.UserId); } - - ILiveBotGame game = await GetGame(e.Stream.GameId); - ILiveBotStream stream = new TwitchStream(BaseURL, ServiceType, e.Stream, user, game); - - await PublishStreamUpdate(stream); } public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) { if (!IsWatcher) return; - - ILiveBotUser? user = await GetUserById(e.Stream.UserId); - if (user == null) + try + { + ILiveBotUser? user = await GetUserById(e.Stream.UserId); + if (user == null) + { + _logger.LogWarning("Could not find user {UserId} for offline event", e.Stream.UserId); + return; + } + + ILiveBotGame game = await GetGame(e.Stream.GameId); + ILiveBotStream stream = new TwitchStream(BaseURL, ServiceType, e.Stream, user, game); + + await PublishStreamOffline(stream); + } + catch (Exception ex) { - _logger.LogWarning("Could not find user {UserId} for offline event", e.Stream.UserId); - return; + _logger.LogError(ex, "Unhandled exception in Monitor_OnStreamOffline for user {UserId}", e.Stream.UserId); } - - ILiveBotGame game = await GetGame(e.Stream.GameId); - ILiveBotStream stream = new TwitchStream(BaseURL, ServiceType, e.Stream, user, game); - - await PublishStreamOffline(stream); } #endregion Events @@ -198,9 +216,13 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) catch (Exception e) when (e is InvalidCredentialException || e is BadScopeException) { _logger.LogError(exception: e, message: "Error getting {ServiceType} Game", ServiceType); - await UpdateAuth(force: true); - await Task.Delay(RetryDelay); - return await API_GetGame(gameId); + if (retryCount <= ApiRetryCount) + { + await UpdateAuth(force: true); + await Task.Delay(RetryDelay); + return await API_GetGame(gameId, retryCount + 1); + } + return null; } } @@ -226,9 +248,13 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) catch (Exception e) when (e is InvalidCredentialException || e is BadScopeException) { _logger.LogError(exception: e, message: "Error getting {ServiceType} User by Login", ServiceType); - await UpdateAuth(force: true); - await Task.Delay(RetryDelay); - return await API_GetUserByLogin(username); + if (retryCount <= ApiRetryCount) + { + await UpdateAuth(force: true); + await Task.Delay(RetryDelay); + return await API_GetUserByLogin(username, retryCount + 1); + } + return null; } } @@ -254,9 +280,13 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) catch (Exception e) when (e is InvalidCredentialException || e is BadScopeException) { _logger.LogError(exception: e, message: "Error getting {ServiceType} User by Id", ServiceType); - await UpdateAuth(force: true); - await Task.Delay(RetryDelay); - return await API_GetUserById(userId); + if (retryCount <= ApiRetryCount) + { + await UpdateAuth(force: true); + await Task.Delay(RetryDelay); + return await API_GetUserById(userId, retryCount + 1); + } + return null; } } @@ -281,9 +311,13 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) catch (Exception e) when (e is InvalidCredentialException || e is BadScopeException) { _logger.LogError(exception: e, message: "Error getting {ServiceType} Users by Id", ServiceType); - await UpdateAuth(force: true); - await Task.Delay(RetryDelay); - return await API_GetUsersById(userIdList); + if (retryCount <= ApiRetryCount) + { + await UpdateAuth(force: true); + await Task.Delay(RetryDelay); + return await API_GetUsersById(userIdList, retryCount + 1); + } + return null; } } @@ -308,9 +342,13 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) catch (Exception e) when (e is InvalidCredentialException || e is BadScopeException) { _logger.LogError(exception: e, message: "Error getting {ServiceType} User by URL", ServiceType); - await UpdateAuth(force: true); - await Task.Delay(RetryDelay); - return await API_GetUserByURL(url); + if (retryCount <= ApiRetryCount) + { + await UpdateAuth(force: true); + await Task.Delay(RetryDelay); + return await API_GetUserByURL(url, retryCount + 1); + } + return null; } } @@ -339,9 +377,13 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) catch (Exception e) when (e is InvalidCredentialException || e is BadScopeException) { _logger.LogError(exception: e, message: "Error getting {ServiceType} Stream", ServiceType); - await UpdateAuth(force: true); - await Task.Delay(RetryDelay); - return await API_GetStream(user); + if (retryCount <= ApiRetryCount) + { + await UpdateAuth(force: true); + await Task.Delay(RetryDelay); + return await API_GetStream(user, retryCount + 1); + } + return null; } } @@ -350,15 +392,27 @@ public async void Monitor_OnStreamOffline(object? sender, OnStreamOfflineArgs e) #region Misc Functions /// - /// Wait for an Auth lock to not be in place + /// Wait for an Auth lock to not be in place, with exponential backoff and a timeout /// /// private async Task WaitForAuthUnlockAsync() { + var deadline = DateTime.UtcNow.AddSeconds(30); + int delayMs = 100; bool authLocked; do { authLocked = await _cache.CheckForLockAsync(recordId: _authCacheName); + if (authLocked) + { + if (DateTime.UtcNow >= deadline) + { + _logger.LogWarning("Timed out waiting for auth lock to be released after 30s; proceeding anyway"); + return; + } + await Task.Delay(delayMs); + delayMs = Math.Min(delayMs * 2, 2000); + } } while (authLocked); } @@ -378,6 +432,12 @@ private async Task GetAuthAsync() { var tempAuth = await _work.AuthRepository.SingleOrDefaultAsync(i => i.ServiceType == ServiceType && i.ClientId == ClientId && i.Expired == false); + if (tempAuth == null) + { + _logger.LogError("No active auth entry found for {ServiceType} with ClientId {ClientId}", ServiceType, ClientId); + throw new InvalidOperationException($"No active auth entry found for {ServiceType} with ClientId {ClientId}"); + } + auth = new TwitchAuth { Id = tempAuth.Id, @@ -420,13 +480,13 @@ public async Task GetAndSetActiveAuth() public async Task UpdateAuth(bool force = false) { - _logger.LogInformation(message: "Updating Auth for {ServiceType} with Client Id {ClientId}", ServiceType, ClientId); + _logger.LogDebug(message: "Updating Auth for {ServiceType} with Client Id {ClientId}", ServiceType, ClientId); if (!IsWatcher) { try { var activeAuth = await GetAndSetActiveAuth(); - _logger.LogInformation(message: "Successfully set AccessToken for {ServiceType} with Client Id {ClientId} to active auth", ServiceType, ClientId); + _logger.LogDebug(message: "Successfully set AccessToken for {ServiceType} with Client Id {ClientId} to active auth", ServiceType, ClientId); // Trigger it 5 minutes before expiration time to be safe var timeToExpire = activeAuth.ExpiresAt - DateTime.UtcNow; @@ -448,10 +508,22 @@ public async Task UpdateAuth(bool force = false) if (oldAuth.Expired || oldAuth.ExpiresAt <= DateTime.UtcNow || force) { + var lockDeadline = DateTime.UtcNow.AddSeconds(30); + int lockDelayMs = 100; do { // Obtain a lock for a maximum of 30 seconds obtainedLock = await _cache.ObtainLockAsync(recordId: _authCacheName, identifier: lockGuid, expiryTime: TimeSpan.FromSeconds(30)); + if (!obtainedLock) + { + if (DateTime.UtcNow >= lockDeadline) + { + _logger.LogWarning("Timed out waiting for auth lock; skipping token refresh"); + break; + } + await Task.Delay(lockDelayMs); + lockDelayMs = Math.Min(lockDelayMs * 2, 2000); + } } while (!obtainedLock); @@ -474,7 +546,7 @@ public async Task UpdateAuth(bool force = false) await _cache.SetRecordAsync(recordId: _authCacheName, data: newAuth, expiryTime: timeToExpire.Duration()); oldAuth.Expired = true; - await _work.AuthRepository.AddOrUpdateAsync(oldAuth, i => i.ServiceType == ServiceType && i.ClientId == ClientId && i.AccessToken == oldAuth.AccessToken); + await _work.AuthRepository.UpdateAsync(oldAuth); AccessToken = newAuth.AccessToken; _logger.LogDebug("{ServiceType} Expiration time: {ExpirationSeconds}", ServiceType, refreshResponse.ExpiresIn < 1800 ? 1800 : refreshResponse.ExpiresIn); @@ -501,16 +573,25 @@ public async Task UpdateAuth(bool force = false) private void SetupAuthTimer(TimeSpan timeSpan) { - RefreshAuthTimer?.Stop(); - if (timeSpan.TotalSeconds < 1800) timeSpan = TimeSpan.FromSeconds(1800); - RefreshAuthTimer = new System.Timers.Timer(timeSpan.Duration().TotalMilliseconds) + if (RefreshAuthTimer == null) { - AutoReset = false - }; - RefreshAuthTimer.Elapsed += async (sender, e) => await UpdateAuth(); + RefreshAuthTimer = new System.Timers.Timer { AutoReset = false }; + RefreshAuthTimer.Elapsed += (sender, e) => + { + _ = UpdateAuth().ContinueWith(t => + _logger.LogError(t.Exception, "Unhandled exception in auth refresh timer for {ServiceType}", ServiceType), + TaskContinuationOptions.OnlyOnFaulted); + }; + } + else + { + RefreshAuthTimer.Stop(); + } + + RefreshAuthTimer.Interval = timeSpan.Duration().TotalMilliseconds; RefreshAuthTimer.Start(); } @@ -525,14 +606,7 @@ public async Task UpdateUsers() foreach (User user in users.Users) { var twitchUser = new TwitchUser(BaseURL, ServiceType, user); - if (_userCache.ContainsKey(user.Id)) - { - _userCache[user.Id] = twitchUser; - } - else - { - _userCache.TryAdd(user.Id, twitchUser); - } + _userCache[user.Id] = twitchUser; await _cache.SetListItemAsync(recordId: _userCacheName, fieldName: twitchUser.Id, data: twitchUser); try {