MarketAlly.AIPlugin.Extensions/MarketAlly.AIPlugin.Refacto.../Performance/AdaptiveConcurrencyManager.cs

298 lines
10 KiB
C#
Executable File

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace MarketAlly.AIPlugin.Refactoring.Performance
{
public interface IConcurrencyOptimizer
{
int CalculateOptimalConcurrency();
void RecordTaskCompletion(TimeSpan duration, bool success);
}
public class SystemResourceOptimizer : IConcurrencyOptimizer
{
private readonly ConcurrentQueue<TaskMetrics> _recentTasks = new();
private readonly object _lock = new();
private int _currentOptimalConcurrency = Environment.ProcessorCount;
private DateTime _lastOptimizationCheck = DateTime.UtcNow;
private record TaskMetrics(TimeSpan Duration, bool Success, DateTime CompletedAt);
public int CalculateOptimalConcurrency()
{
lock (_lock)
{
// Recalculate every 30 seconds
if (DateTime.UtcNow - _lastOptimizationCheck < TimeSpan.FromSeconds(30))
{
return _currentOptimalConcurrency;
}
_lastOptimizationCheck = DateTime.UtcNow;
// Clean up old metrics (keep last 5 minutes)
var cutoff = DateTime.UtcNow.AddMinutes(-5);
var recentMetrics = new List<TaskMetrics>();
while (_recentTasks.TryDequeue(out var metric))
{
if (metric.CompletedAt > cutoff)
{
recentMetrics.Add(metric);
}
}
// Re-add recent metrics
foreach (var metric in recentMetrics)
{
_recentTasks.Enqueue(metric);
}
if (recentMetrics.Count < 10)
{
// Not enough data, use default
return _currentOptimalConcurrency;
}
// Calculate metrics
var successRate = recentMetrics.Count(m => m.Success) / (double)recentMetrics.Count;
var averageDuration = recentMetrics.Average(m => m.Duration.TotalMilliseconds);
var cpuUsage = GetCurrentCpuUsage();
var memoryPressure = GetMemoryPressure();
// Adaptive algorithm
if (successRate > 0.95 && cpuUsage < 0.8 && memoryPressure < 0.7)
{
// System is performing well, can increase concurrency
_currentOptimalConcurrency = Math.Min(_currentOptimalConcurrency + 1, Environment.ProcessorCount * 2);
}
else if (successRate < 0.8 || cpuUsage > 0.9 || memoryPressure > 0.8)
{
// System is struggling, reduce concurrency
_currentOptimalConcurrency = Math.Max(_currentOptimalConcurrency - 1, 1);
}
return _currentOptimalConcurrency;
}
}
public void RecordTaskCompletion(TimeSpan duration, bool success)
{
_recentTasks.Enqueue(new TaskMetrics(duration, success, DateTime.UtcNow));
// Keep queue size manageable
while (_recentTasks.Count > 1000)
{
_recentTasks.TryDequeue(out _);
}
}
private double GetCurrentCpuUsage()
{
try
{
using var process = Process.GetCurrentProcess();
return process.TotalProcessorTime.TotalMilliseconds / (Environment.TickCount * Environment.ProcessorCount) * 100;
}
catch
{
return 0.5; // Default assumption
}
}
private double GetMemoryPressure()
{
try
{
var totalMemory = GC.GetTotalMemory(false);
var gen2Collections = GC.CollectionCount(2);
// Simple heuristic based on memory allocation
return Math.Min(1.0, totalMemory / (1024.0 * 1024 * 1024)); // Normalize to GB
}
catch
{
return 0.5; // Default assumption
}
}
}
public class AdaptiveConcurrencyManager
{
private readonly IConcurrencyOptimizer _optimizer;
private readonly SemaphoreSlim _semaphore;
private int _currentConcurrency;
public AdaptiveConcurrencyManager(IConcurrencyOptimizer? optimizer = null)
{
_optimizer = optimizer ?? new SystemResourceOptimizer();
_currentConcurrency = _optimizer.CalculateOptimalConcurrency();
_semaphore = new SemaphoreSlim(_currentConcurrency, _currentConcurrency);
}
public async Task<T[]> ProcessConcurrentlyAsync<T>(
IEnumerable<Func<Task<T>>> tasks,
CancellationToken cancellationToken = default)
{
var taskList = tasks.ToList();
if (!taskList.Any())
return Array.Empty<T>();
// Update concurrency if needed
await UpdateConcurrencyAsync();
var results = new T[taskList.Count];
var completionSource = new TaskCompletionSource<bool>();
var completed = 0;
var exceptions = new ConcurrentBag<Exception>();
// Use a channel for work distribution
var channel = Channel.CreateUnbounded<WorkItem<T>>();
var writer = channel.Writer;
// Queue all work items
for (int i = 0; i < taskList.Count; i++)
{
await writer.WriteAsync(new WorkItem<T>(i, taskList[i]), cancellationToken);
}
writer.Complete();
// Start worker tasks
var workers = new List<Task>();
for (int i = 0; i < _currentConcurrency; i++)
{
workers.Add(ProcessWorkAsync(channel.Reader, results, exceptions,
() => Interlocked.Increment(ref completed),
taskList.Count, completionSource, cancellationToken));
}
// Wait for completion
await completionSource.Task;
await Task.WhenAll(workers);
if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
return results;
}
private async Task ProcessWorkAsync<T>(
ChannelReader<WorkItem<T>> reader,
T[] results,
ConcurrentBag<Exception> exceptions,
Func<int> incrementCompleted,
int totalWork,
TaskCompletionSource<bool> completionSource,
CancellationToken cancellationToken)
{
await foreach (var workItem in reader.ReadAllAsync(cancellationToken))
{
var stopwatch = Stopwatch.StartNew();
bool success = false;
try
{
await _semaphore.WaitAsync(cancellationToken);
try
{
results[workItem.Index] = await workItem.Task();
success = true;
}
finally
{
_semaphore.Release();
stopwatch.Stop();
_optimizer.RecordTaskCompletion(stopwatch.Elapsed, success);
}
}
catch (Exception ex)
{
exceptions.Add(ex);
}
if (incrementCompleted() == totalWork)
{
completionSource.SetResult(true);
}
}
}
private async Task UpdateConcurrencyAsync()
{
var optimalConcurrency = _optimizer.CalculateOptimalConcurrency();
if (optimalConcurrency != _currentConcurrency)
{
var difference = optimalConcurrency - _currentConcurrency;
if (difference > 0)
{
// Increase concurrency
_semaphore.Release(difference);
}
else
{
// Decrease concurrency by waiting for permits
for (int i = 0; i < Math.Abs(difference); i++)
{
await _semaphore.WaitAsync();
}
}
_currentConcurrency = optimalConcurrency;
}
}
public async Task<T> ProcessSingleAsync<T>(
Func<Task<T>> task,
CancellationToken cancellationToken = default)
{
var results = await ProcessConcurrentlyAsync(new[] { task }, cancellationToken);
return results[0];
}
public int CurrentConcurrency => _currentConcurrency;
public void Dispose()
{
_semaphore?.Dispose();
}
private record WorkItem<T>(int Index, Func<Task<T>> Task);
}
// Extension methods for common scenarios
public static class ConcurrencyExtensions
{
public static async Task<TResult[]> ProcessWithAdaptiveConcurrencyAsync<TSource, TResult>(
this IEnumerable<TSource> source,
Func<TSource, Task<TResult>> processor,
CancellationToken cancellationToken = default)
{
var manager = new AdaptiveConcurrencyManager();
var tasks = source.Select(item => new Func<Task<TResult>>(() => processor(item)));
return await manager.ProcessConcurrentlyAsync(tasks, cancellationToken);
}
public static async Task ProcessWithAdaptiveConcurrencyAsync<TSource>(
this IEnumerable<TSource> source,
Func<TSource, Task> processor,
CancellationToken cancellationToken = default)
{
var manager = new AdaptiveConcurrencyManager();
var tasks = source.Select(item => new Func<Task<object?>>(() => processor(item).ContinueWith(_ => (object?)null, cancellationToken)));
await manager.ProcessConcurrentlyAsync(tasks, cancellationToken);
}
}
}