Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/Exceptionless.Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ public static void RegisterServices(IServiceCollection services, AppOptions appO
services.AddSingleton<WorkItemHandlers>(s =>
{
var handlers = new WorkItemHandlers();
handlers.Register<FixStackStatsWorkItem>(s.GetRequiredService<FixStackStatsWorkItemHandler>);
handlers.Register<OrganizationMaintenanceWorkItem>(s.GetRequiredService<OrganizationMaintenanceWorkItemHandler>);
handlers.Register<OrganizationNotificationWorkItem>(s.GetRequiredService<OrganizationNotificationWorkItemHandler>);
handlers.Register<ProjectMaintenanceWorkItem>(s.GetRequiredService<ProjectMaintenanceWorkItemHandler>);
handlers.Register<ReindexWorkItem>(s.GetRequiredService<ReindexWorkItemHandler>);
handlers.Register<RemoveStacksWorkItem>(s.GetRequiredService<RemoveStacksWorkItemHandler>);
handlers.Register<RemoveBotEventsWorkItem>(s.GetRequiredService<RemoveBotEventsWorkItemHandler>);
handlers.Register<RemoveStacksWorkItem>(s.GetRequiredService<RemoveStacksWorkItemHandler>);
handlers.Register<SetLocationFromGeoWorkItem>(s.GetRequiredService<SetLocationFromGeoWorkItemHandler>);
handlers.Register<SetProjectIsConfiguredWorkItem>(s.GetRequiredService<SetProjectIsConfiguredWorkItemHandler>);
handlers.Register<OrganizationMaintenanceWorkItem>(s.GetRequiredService<OrganizationMaintenanceWorkItemHandler>);
handlers.Register<OrganizationNotificationWorkItem>(s.GetRequiredService<OrganizationNotificationWorkItemHandler>);
handlers.Register<ProjectMaintenanceWorkItem>(s.GetRequiredService<ProjectMaintenanceWorkItemHandler>);
handlers.Register<UserMaintenanceWorkItem>(s.GetRequiredService<UserMaintenanceWorkItemHandler>);
return handlers;
});
Expand Down
2 changes: 1 addition & 1 deletion src/Exceptionless.Core/Jobs/EventPostsJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Exceptionless.Core.Plugins.EventParser;
using Exceptionless.Core.Queues.Models;
using Exceptionless.Core.Repositories;
using Exceptionless.Core.Repositories.Base;
using Foundatio.Repositories.Exceptions;
using Exceptionless.Core.Services;
using Exceptionless.Core.Validation;
using FluentValidation;
Expand Down
2 changes: 1 addition & 1 deletion src/Exceptionless.Core/Jobs/EventUserDescriptionsJob.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Exceptionless.Core.Models.Data;
using Exceptionless.Core.Queues.Models;
using Exceptionless.Core.Repositories;
using Exceptionless.Core.Repositories.Base;
using Foundatio.Repositories.Exceptions;
using Foundatio.Jobs;
using Foundatio.Queues;
using Foundatio.Repositories.Extensions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
using Exceptionless.Core.Models;
using Exceptionless.Core.Models.WorkItems;
using Exceptionless.Core.Repositories;
using Exceptionless.DateTimeExtensions;
using Foundatio.Jobs;
using Foundatio.Lock;
using Foundatio.Repositories;
using Foundatio.Repositories.Models;
using Microsoft.Extensions.Logging;

namespace Exceptionless.Core.Jobs.WorkItemHandlers;

public class FixStackStatsWorkItemHandler : WorkItemHandlerBase
{
private readonly IStackRepository _stackRepository;
private readonly IEventRepository _eventRepository;
private readonly ILockProvider _lockProvider;
private readonly TimeProvider _timeProvider;

public FixStackStatsWorkItemHandler(IStackRepository stackRepository, IEventRepository eventRepository, ILockProvider lockProvider, TimeProvider timeProvider, ILoggerFactory loggerFactory)
: base(loggerFactory)
{
_stackRepository = stackRepository;
_eventRepository = eventRepository;
_lockProvider = lockProvider;
_timeProvider = timeProvider;
}

public override Task<ILock> GetWorkItemLockAsync(object workItem, CancellationToken cancellationToken = default)
{
return _lockProvider.AcquireAsync(nameof(FixStackStatsWorkItemHandler), TimeSpan.FromHours(1), cancellationToken);
}

public override async Task HandleItemAsync(WorkItemContext context)
{
var wi = context.GetData<FixStackStatsWorkItem>();
var utcEnd = wi.UtcEnd ?? _timeProvider.GetUtcNow().UtcDateTime;

Log.LogInformation("Starting stack stats repair for {UtcStart:O} to {UtcEnd:O}. OrganizationId={Organization}", wi.UtcStart, utcEnd, wi.OrganizationId);
await context.ReportProgressAsync(0, $"Starting stack stats repair for window {wi.UtcStart:O} – {utcEnd:O}");

var organizationIds = await GetOrganizationIdsAsync(wi, utcEnd);
Log.LogInformation("Found {OrganizationCount} organizations to process", organizationIds.Count);

int repaired = 0;
int skipped = 0;

for (int index = 0; index < organizationIds.Count; index++)
{
if (context.CancellationToken.IsCancellationRequested)
break;

var (organizationRepaired, organizationSkipped) = await ProcessOrganizationAsync(context, organizationIds[index], wi.UtcStart, utcEnd);
repaired += organizationRepaired;
skipped += organizationSkipped;

int percentage = (int)Math.Min(99, (index + 1) * 100.0 / organizationIds.Count);
await context.ReportProgressAsync(percentage, $"Organization {index + 1}/{organizationIds.Count} ({percentage}%): repaired {repaired}, skipped {skipped}");
}

Log.LogInformation("Stack stats repair complete: Repaired={Repaired} Skipped={Skipped}", repaired, skipped);
await context.ReportProgressAsync(100, $"Done. Repaired {repaired} stacks, skipped={skipped}.");
}

private async Task<IReadOnlyList<string>> GetOrganizationIdsAsync(FixStackStatsWorkItem wi, DateTime utcEnd)
{
if (wi.OrganizationId is not null)
return [wi.OrganizationId];

var countResult = await _eventRepository.CountAsync(q => q
.DateRange(wi.UtcStart, utcEnd, (PersistentEvent e) => e.Date)
.Index(wi.UtcStart, utcEnd)
.AggregationsExpression("terms:(organization_id~65536)"));

return countResult.Aggregations.Terms<string>("terms_organization_id")?.Buckets
.Select(b => b.Key)
.ToList() ?? [];
}

private async Task<(int Repaired, int Skipped)> ProcessOrganizationAsync(WorkItemContext context, string organizationId, DateTime utcStart, DateTime utcEnd)
{
using var _ = Log.BeginScope(new ExceptionlessState().Organization(organizationId));
await context.RenewLockAsync();

var countResult = await _eventRepository.CountAsync(q => q
.Organization(organizationId)
.DateRange(utcStart, utcEnd, (PersistentEvent e) => e.Date)
.Index(utcStart, utcEnd)
.AggregationsExpression("terms:(stack_id~65536 min:date max:date)"));

var stackBuckets = countResult.Aggregations.Terms<string>("terms_stack_id")?.Buckets ?? [];
if (stackBuckets.Count is 0)
return (0, 0);

var statsByStackId = new Dictionary<string, StackEventStats>(stackBuckets.Count);
foreach (var bucket in stackBuckets)
{
var firstOccurrence = bucket.Aggregations.Min<DateTime>("min_date")?.Value;
var lastOccurrence = bucket.Aggregations.Max<DateTime>("max_date")?.Value;
if (firstOccurrence is null || lastOccurrence is null || bucket.Total is null)
continue;

statsByStackId[bucket.Key] = new StackEventStats(firstOccurrence.Value, lastOccurrence.Value, bucket.Total.Value);
}

int repaired = 0;
int skipped = 0;

foreach (string[] batch in statsByStackId.Keys.Chunk(100))
{
if (context.CancellationToken.IsCancellationRequested)
break;

await context.RenewLockAsync();

var stacks = await _stackRepository.GetByIdsAsync(batch);
foreach (var stack in stacks)
{
if (!statsByStackId.TryGetValue(stack.Id, out var stats))
{
skipped++;
continue;
}

bool shouldUpdateFirst = stack.FirstOccurrence.IsAfter(stats.FirstOccurrence);
bool shouldUpdateLast = stack.LastOccurrence.IsBefore(stats.LastOccurrence);
bool shouldUpdateTotal = stats.TotalOccurrences > stack.TotalOccurrences;
if (!shouldUpdateFirst && !shouldUpdateLast && !shouldUpdateTotal)
{
skipped++;
continue;
}

var newFirst = shouldUpdateFirst ? stats.FirstOccurrence : stack.FirstOccurrence;
var newLast = shouldUpdateLast ? stats.LastOccurrence : stack.LastOccurrence;
long newTotal = shouldUpdateTotal ? stats.TotalOccurrences : stack.TotalOccurrences;

Log.LogInformation(
"Repairing stack {StackId}: first={OldFirst:O}->{NewFirst:O} last={OldLast:O}->{NewLast:O} total={OldTotal}->{NewTotal}",
stack.Id,
stack.FirstOccurrence, newFirst,
stack.LastOccurrence, newLast,
stack.TotalOccurrences, newTotal);

await _stackRepository.SetEventCounterAsync(stack.Id, newFirst, newLast, newTotal, sendNotifications: false);
repaired++;
}
}

Log.LogDebug("Processed organization: Repaired={Repaired} Skipped={Skipped}", repaired, skipped);
return (repaired, skipped);
}
}

internal record StackEventStats(DateTime FirstOccurrence, DateTime LastOccurrence, long TotalOccurrences);
14 changes: 14 additions & 0 deletions src/Exceptionless.Core/Models/WorkItems/FixStackStatsWorkItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Exceptionless.Core.Models.WorkItems;

public record FixStackStatsWorkItem
{
public DateTime UtcStart { get; init; }

public DateTime? UtcEnd { get; init; }

/// <summary>
/// When set, only stacks belonging to this organization are repaired.
/// When null, all organizations with events in the time window are processed.
/// </summary>
public string? OrganizationId { get; init; }
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public interface IStackRepository : IRepositoryOwnedByOrganizationAndProject<Sta
Task<FindResults<Stack>> GetExpiredSnoozedStatuses(DateTime utcNow, CommandOptionsDescriptor<Stack>? options = null);
Task MarkAsRegressedAsync(string stackId);
Task<bool> IncrementEventCounterAsync(string organizationId, string projectId, string stackId, DateTime minOccurrenceDateUtc, DateTime maxOccurrenceDateUtc, int count, bool sendNotifications = true);
Task<bool> SetEventCounterAsync(string stackId, DateTime firstOccurrenceUtc, DateTime lastOccurrenceUtc, long totalOccurrences, bool sendNotifications = true);
Task<FindResults<Stack>> GetStacksForCleanupAsync(string organizationId, DateTime cutoff);
Task<FindResults<Stack>> GetSoftDeleted();
Task<long> SoftDeleteByProjectIdAsync(string organizationId, string projectId);
Expand Down
83 changes: 68 additions & 15 deletions src/Exceptionless.Core/Repositories/StackRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using Exceptionless.Core.Repositories.Configuration;
using FluentValidation;
using Foundatio.Repositories;
using Foundatio.Repositories.Exceptions;
using Foundatio.Repositories.Models;
using Foundatio.Repositories.Options;
using Microsoft.Extensions.Logging;
using Nest;

namespace Exceptionless.Core.Repositories;
Expand Down Expand Up @@ -63,37 +63,90 @@ Instant parseDate(def dt) {
if (ctx._source.total_occurrences == 0 || parseDate(ctx._source.first_occurrence).isAfter(parseDate(params.minOccurrenceDateUtc))) {
ctx._source.first_occurrence = params.minOccurrenceDateUtc;
}

if (parseDate(ctx._source.last_occurrence).isBefore(parseDate(params.maxOccurrenceDateUtc))) {
ctx._source.last_occurrence = params.maxOccurrenceDateUtc;
}

if (parseDate(ctx._source.updated_utc).isBefore(parseDate(params.updatedUtc))) {
ctx._source.updated_utc = params.updatedUtc;
}

ctx._source.total_occurrences += params.count;";

var request = new UpdateRequest<Stack, Stack>(ElasticIndex.GetIndex(stackId), stackId)
var operation = new ScriptPatch(script.TrimScript())
{
Script = new InlineScript(script.TrimScript())
Params = new Dictionary<string, object>(4)
{
Params = new Dictionary<string, object>(3) {
{ "minOccurrenceDateUtc", minOccurrenceDateUtc },
{ "maxOccurrenceDateUtc", maxOccurrenceDateUtc },
{ "count", count },
{ "updatedUtc", _timeProvider.GetUtcNow().UtcDateTime }
}
{ "minOccurrenceDateUtc", minOccurrenceDateUtc },
{ "maxOccurrenceDateUtc", maxOccurrenceDateUtc },
{ "count", count },
{ "updatedUtc", _timeProvider.GetUtcNow().UtcDateTime }
}
};

var result = await _client.UpdateAsync(request);
if (!result.IsValid)
try
{
await PatchAsync(stackId, operation, o => o.Notifications(false));
}
catch (DocumentNotFoundException)
{
_logger.LogError(result.OriginalException, "Error occurred incrementing total event occurrences on stack {Stack}. Error: {Message}", stackId, result.ServerError?.Error);
return result.ServerError?.Status == 404;
return true;
}

await Cache.RemoveAsync(stackId);
if (sendNotifications)
await PublishMessageAsync(CreateEntityChanged(ChangeType.Saved, organizationId, projectId, null, stackId), TimeSpan.FromSeconds(1.5));
await PublishMessageAsync(CreateEntityChanged(ChangeType.Saved, organizationId, projectId, null, stackId));

return true;
}

public async Task<bool> SetEventCounterAsync(string stackId, DateTime firstOccurrenceUtc, DateTime lastOccurrenceUtc, long totalOccurrences, bool sendNotifications = true)
{
const string script = @"
Instant parseDate(def dt) {
if (dt != null) {
try {
return Instant.parse(dt);
} catch(DateTimeParseException e) {}
}
return Instant.MIN;
}

if (ctx._source.total_occurrences == null || ctx._source.total_occurrences < params.totalOccurrences) {
ctx._source.total_occurrences = params.totalOccurrences;
}

if (parseDate(ctx._source.first_occurrence).isAfter(parseDate(params.firstOccurrenceUtc))) {
ctx._source.first_occurrence = params.firstOccurrenceUtc;
}

if (parseDate(ctx._source.last_occurrence).isBefore(parseDate(params.lastOccurrenceUtc))) {
ctx._source.last_occurrence = params.lastOccurrenceUtc;
}

if (parseDate(ctx._source.updated_utc).isBefore(parseDate(params.updatedUtc))) {
ctx._source.updated_utc = params.updatedUtc;
}";

var operation = new ScriptPatch(script.TrimScript())
{
Params = new Dictionary<string, object>(4)
{
{ "firstOccurrenceUtc", firstOccurrenceUtc },
{ "lastOccurrenceUtc", lastOccurrenceUtc },
{ "totalOccurrences", totalOccurrences },
{ "updatedUtc", _timeProvider.GetUtcNow().UtcDateTime }
}
};

try
{
await PatchAsync(stackId, operation, o => o.Notifications(sendNotifications));
}
catch (DocumentNotFoundException)
{
return true;
}

return true;
}
Expand Down
Loading