222 lines
7.9 KiB
C#
Executable File
222 lines
7.9 KiB
C#
Executable File
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace MarketAlly.AIPlugin.DevOps.Performance
|
|
{
|
|
public class ParallelAnalyzer<TInput, TResult>
|
|
{
|
|
private readonly ILogger<ParallelAnalyzer<TInput, TResult>> _logger;
|
|
private readonly SemaphoreSlim _semaphore;
|
|
private readonly int _maxConcurrency;
|
|
|
|
public ParallelAnalyzer(int maxConcurrency = 4, ILogger<ParallelAnalyzer<TInput, TResult>> logger = null)
|
|
{
|
|
_maxConcurrency = Math.Max(1, Math.Min(maxConcurrency, Environment.ProcessorCount * 2));
|
|
_semaphore = new SemaphoreSlim(_maxConcurrency, _maxConcurrency);
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task<IList<TResult>> AnalyzeAsync(
|
|
IEnumerable<TInput> inputs,
|
|
Func<TInput, Task<TResult>> analyzer,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var inputList = inputs.ToList();
|
|
var results = new ConcurrentBag<TResult>();
|
|
var exceptions = new ConcurrentBag<Exception>();
|
|
|
|
_logger?.LogInformation("Starting parallel analysis of {Count} items with {MaxConcurrency} max concurrency",
|
|
inputList.Count, _maxConcurrency);
|
|
|
|
var tasks = inputList.Select(async input =>
|
|
{
|
|
await _semaphore.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
var result = await analyzer(input);
|
|
if (result != null)
|
|
{
|
|
results.Add(result);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
exceptions.Add(ex);
|
|
_logger?.LogError(ex, "Analysis failed for input: {Input}", input);
|
|
}
|
|
finally
|
|
{
|
|
_semaphore.Release();
|
|
}
|
|
});
|
|
|
|
await Task.WhenAll(tasks);
|
|
|
|
if (exceptions.Any())
|
|
{
|
|
_logger?.LogWarning("Analysis completed with {ExceptionCount} errors out of {TotalCount} items",
|
|
exceptions.Count, inputList.Count);
|
|
}
|
|
else
|
|
{
|
|
_logger?.LogInformation("Analysis completed successfully for all {Count} items", inputList.Count);
|
|
}
|
|
|
|
return results.ToList();
|
|
}
|
|
|
|
public async Task<IDictionary<TInput, TResult>> AnalyzeWithKeysAsync(
|
|
IEnumerable<TInput> inputs,
|
|
Func<TInput, Task<TResult>> analyzer,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var inputList = inputs.ToList();
|
|
var results = new ConcurrentDictionary<TInput, TResult>();
|
|
var exceptions = new ConcurrentBag<Exception>();
|
|
|
|
_logger?.LogInformation("Starting keyed parallel analysis of {Count} items", inputList.Count);
|
|
|
|
var tasks = inputList.Select(async input =>
|
|
{
|
|
await _semaphore.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
var result = await analyzer(input);
|
|
if (result != null)
|
|
{
|
|
results.TryAdd(input, result);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
exceptions.Add(ex);
|
|
_logger?.LogError(ex, "Keyed analysis failed for input: {Input}", input);
|
|
}
|
|
finally
|
|
{
|
|
_semaphore.Release();
|
|
}
|
|
});
|
|
|
|
await Task.WhenAll(tasks);
|
|
|
|
_logger?.LogInformation("Keyed analysis completed: {SuccessCount} successful, {ErrorCount} errors",
|
|
results.Count, exceptions.Count);
|
|
|
|
return results;
|
|
}
|
|
|
|
public async Task<AnalysisBatch<TInput, TResult>> AnalyzeBatchAsync(
|
|
IEnumerable<TInput> inputs,
|
|
Func<TInput, Task<TResult>> analyzer,
|
|
int batchSize = 10,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var inputList = inputs.ToList();
|
|
var batches = CreateBatches(inputList, batchSize);
|
|
var allResults = new List<TResult>();
|
|
var allErrors = new List<AnalysisError<TInput>>();
|
|
|
|
_logger?.LogInformation("Starting batch analysis: {TotalItems} items in {BatchCount} batches of size {BatchSize}",
|
|
inputList.Count, batches.Count, batchSize);
|
|
|
|
var batchNumber = 0;
|
|
foreach (var batch in batches)
|
|
{
|
|
batchNumber++;
|
|
_logger?.LogDebug("Processing batch {BatchNumber}/{TotalBatches}", batchNumber, batches.Count);
|
|
|
|
var batchResults = new ConcurrentBag<TResult>();
|
|
var batchErrors = new ConcurrentBag<AnalysisError<TInput>>();
|
|
|
|
var batchTasks = batch.Select(async input =>
|
|
{
|
|
await _semaphore.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
var result = await analyzer(input);
|
|
if (result != null)
|
|
{
|
|
batchResults.Add(result);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
batchErrors.Add(new AnalysisError<TInput>
|
|
{
|
|
Input = input,
|
|
Exception = ex,
|
|
BatchNumber = batchNumber
|
|
});
|
|
}
|
|
finally
|
|
{
|
|
_semaphore.Release();
|
|
}
|
|
});
|
|
|
|
await Task.WhenAll(batchTasks);
|
|
|
|
allResults.AddRange(batchResults);
|
|
allErrors.AddRange(batchErrors);
|
|
|
|
// Optional: Add delay between batches to prevent overwhelming the system
|
|
if (batchNumber < batches.Count)
|
|
{
|
|
await Task.Delay(100, cancellationToken);
|
|
}
|
|
}
|
|
|
|
_logger?.LogInformation("Batch analysis completed: {SuccessCount} successful, {ErrorCount} errors",
|
|
allResults.Count, allErrors.Count);
|
|
|
|
return new AnalysisBatch<TInput, TResult>
|
|
{
|
|
Results = allResults,
|
|
Errors = allErrors,
|
|
TotalProcessed = inputList.Count,
|
|
SuccessCount = allResults.Count,
|
|
ErrorCount = allErrors.Count
|
|
};
|
|
}
|
|
|
|
private List<List<TInput>> CreateBatches(IList<TInput> inputs, int batchSize)
|
|
{
|
|
var batches = new List<List<TInput>>();
|
|
for (int i = 0; i < inputs.Count; i += batchSize)
|
|
{
|
|
var batch = inputs.Skip(i).Take(batchSize).ToList();
|
|
batches.Add(batch);
|
|
}
|
|
return batches;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_semaphore?.Dispose();
|
|
}
|
|
}
|
|
|
|
public class AnalysisBatch<TInput, TResult>
|
|
{
|
|
public IList<TResult> Results { get; set; } = new List<TResult>();
|
|
public IList<AnalysisError<TInput>> Errors { get; set; } = new List<AnalysisError<TInput>>();
|
|
public int TotalProcessed { get; set; }
|
|
public int SuccessCount { get; set; }
|
|
public int ErrorCount { get; set; }
|
|
public double SuccessRate => TotalProcessed > 0 ? (double)SuccessCount / TotalProcessed * 100 : 0;
|
|
}
|
|
|
|
public class AnalysisError<TInput>
|
|
{
|
|
public TInput Input { get; set; }
|
|
public Exception Exception { get; set; }
|
|
public int BatchNumber { get; set; }
|
|
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
|
|
}
|
|
} |