298 lines
10 KiB
C#
Executable File
298 lines
10 KiB
C#
Executable File
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Channels;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace MarketAlly.AIPlugin.Refactoring.Performance
|
|
{
|
|
public interface IConcurrencyOptimizer
|
|
{
|
|
int CalculateOptimalConcurrency();
|
|
void RecordTaskCompletion(TimeSpan duration, bool success);
|
|
}
|
|
|
|
public class SystemResourceOptimizer : IConcurrencyOptimizer
|
|
{
|
|
private readonly ConcurrentQueue<TaskMetrics> _recentTasks = new();
|
|
private readonly object _lock = new();
|
|
private int _currentOptimalConcurrency = Environment.ProcessorCount;
|
|
private DateTime _lastOptimizationCheck = DateTime.UtcNow;
|
|
|
|
private record TaskMetrics(TimeSpan Duration, bool Success, DateTime CompletedAt);
|
|
|
|
public int CalculateOptimalConcurrency()
|
|
{
|
|
lock (_lock)
|
|
{
|
|
// Recalculate every 30 seconds
|
|
if (DateTime.UtcNow - _lastOptimizationCheck < TimeSpan.FromSeconds(30))
|
|
{
|
|
return _currentOptimalConcurrency;
|
|
}
|
|
|
|
_lastOptimizationCheck = DateTime.UtcNow;
|
|
|
|
// Clean up old metrics (keep last 5 minutes)
|
|
var cutoff = DateTime.UtcNow.AddMinutes(-5);
|
|
var recentMetrics = new List<TaskMetrics>();
|
|
|
|
while (_recentTasks.TryDequeue(out var metric))
|
|
{
|
|
if (metric.CompletedAt > cutoff)
|
|
{
|
|
recentMetrics.Add(metric);
|
|
}
|
|
}
|
|
|
|
// Re-add recent metrics
|
|
foreach (var metric in recentMetrics)
|
|
{
|
|
_recentTasks.Enqueue(metric);
|
|
}
|
|
|
|
if (recentMetrics.Count < 10)
|
|
{
|
|
// Not enough data, use default
|
|
return _currentOptimalConcurrency;
|
|
}
|
|
|
|
// Calculate metrics
|
|
var successRate = recentMetrics.Count(m => m.Success) / (double)recentMetrics.Count;
|
|
var averageDuration = recentMetrics.Average(m => m.Duration.TotalMilliseconds);
|
|
var cpuUsage = GetCurrentCpuUsage();
|
|
var memoryPressure = GetMemoryPressure();
|
|
|
|
// Adaptive algorithm
|
|
if (successRate > 0.95 && cpuUsage < 0.8 && memoryPressure < 0.7)
|
|
{
|
|
// System is performing well, can increase concurrency
|
|
_currentOptimalConcurrency = Math.Min(_currentOptimalConcurrency + 1, Environment.ProcessorCount * 2);
|
|
}
|
|
else if (successRate < 0.8 || cpuUsage > 0.9 || memoryPressure > 0.8)
|
|
{
|
|
// System is struggling, reduce concurrency
|
|
_currentOptimalConcurrency = Math.Max(_currentOptimalConcurrency - 1, 1);
|
|
}
|
|
|
|
return _currentOptimalConcurrency;
|
|
}
|
|
}
|
|
|
|
public void RecordTaskCompletion(TimeSpan duration, bool success)
|
|
{
|
|
_recentTasks.Enqueue(new TaskMetrics(duration, success, DateTime.UtcNow));
|
|
|
|
// Keep queue size manageable
|
|
while (_recentTasks.Count > 1000)
|
|
{
|
|
_recentTasks.TryDequeue(out _);
|
|
}
|
|
}
|
|
|
|
private double GetCurrentCpuUsage()
|
|
{
|
|
try
|
|
{
|
|
using var process = Process.GetCurrentProcess();
|
|
return process.TotalProcessorTime.TotalMilliseconds / (Environment.TickCount * Environment.ProcessorCount) * 100;
|
|
}
|
|
catch
|
|
{
|
|
return 0.5; // Default assumption
|
|
}
|
|
}
|
|
|
|
private double GetMemoryPressure()
|
|
{
|
|
try
|
|
{
|
|
var totalMemory = GC.GetTotalMemory(false);
|
|
var gen2Collections = GC.CollectionCount(2);
|
|
|
|
// Simple heuristic based on memory allocation
|
|
return Math.Min(1.0, totalMemory / (1024.0 * 1024 * 1024)); // Normalize to GB
|
|
}
|
|
catch
|
|
{
|
|
return 0.5; // Default assumption
|
|
}
|
|
}
|
|
}
|
|
|
|
public class AdaptiveConcurrencyManager
|
|
{
|
|
private readonly IConcurrencyOptimizer _optimizer;
|
|
private readonly SemaphoreSlim _semaphore;
|
|
private int _currentConcurrency;
|
|
|
|
public AdaptiveConcurrencyManager(IConcurrencyOptimizer? optimizer = null)
|
|
{
|
|
_optimizer = optimizer ?? new SystemResourceOptimizer();
|
|
_currentConcurrency = _optimizer.CalculateOptimalConcurrency();
|
|
_semaphore = new SemaphoreSlim(_currentConcurrency, _currentConcurrency);
|
|
}
|
|
|
|
public async Task<T[]> ProcessConcurrentlyAsync<T>(
|
|
IEnumerable<Func<Task<T>>> tasks,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var taskList = tasks.ToList();
|
|
if (!taskList.Any())
|
|
return Array.Empty<T>();
|
|
|
|
// Update concurrency if needed
|
|
await UpdateConcurrencyAsync();
|
|
|
|
var results = new T[taskList.Count];
|
|
var completionSource = new TaskCompletionSource<bool>();
|
|
var completed = 0;
|
|
var exceptions = new ConcurrentBag<Exception>();
|
|
|
|
// Use a channel for work distribution
|
|
var channel = Channel.CreateUnbounded<WorkItem<T>>();
|
|
var writer = channel.Writer;
|
|
|
|
// Queue all work items
|
|
for (int i = 0; i < taskList.Count; i++)
|
|
{
|
|
await writer.WriteAsync(new WorkItem<T>(i, taskList[i]), cancellationToken);
|
|
}
|
|
writer.Complete();
|
|
|
|
// Start worker tasks
|
|
var workers = new List<Task>();
|
|
for (int i = 0; i < _currentConcurrency; i++)
|
|
{
|
|
workers.Add(ProcessWorkAsync(channel.Reader, results, exceptions,
|
|
() => Interlocked.Increment(ref completed),
|
|
taskList.Count, completionSource, cancellationToken));
|
|
}
|
|
|
|
// Wait for completion
|
|
await completionSource.Task;
|
|
await Task.WhenAll(workers);
|
|
|
|
if (exceptions.Any())
|
|
{
|
|
throw new AggregateException(exceptions);
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
private async Task ProcessWorkAsync<T>(
|
|
ChannelReader<WorkItem<T>> reader,
|
|
T[] results,
|
|
ConcurrentBag<Exception> exceptions,
|
|
Func<int> incrementCompleted,
|
|
int totalWork,
|
|
TaskCompletionSource<bool> completionSource,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
await foreach (var workItem in reader.ReadAllAsync(cancellationToken))
|
|
{
|
|
var stopwatch = Stopwatch.StartNew();
|
|
bool success = false;
|
|
|
|
try
|
|
{
|
|
await _semaphore.WaitAsync(cancellationToken);
|
|
|
|
try
|
|
{
|
|
results[workItem.Index] = await workItem.Task();
|
|
success = true;
|
|
}
|
|
finally
|
|
{
|
|
_semaphore.Release();
|
|
stopwatch.Stop();
|
|
_optimizer.RecordTaskCompletion(stopwatch.Elapsed, success);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
exceptions.Add(ex);
|
|
}
|
|
|
|
if (incrementCompleted() == totalWork)
|
|
{
|
|
completionSource.SetResult(true);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task UpdateConcurrencyAsync()
|
|
{
|
|
var optimalConcurrency = _optimizer.CalculateOptimalConcurrency();
|
|
|
|
if (optimalConcurrency != _currentConcurrency)
|
|
{
|
|
var difference = optimalConcurrency - _currentConcurrency;
|
|
|
|
if (difference > 0)
|
|
{
|
|
// Increase concurrency
|
|
_semaphore.Release(difference);
|
|
}
|
|
else
|
|
{
|
|
// Decrease concurrency by waiting for permits
|
|
for (int i = 0; i < Math.Abs(difference); i++)
|
|
{
|
|
await _semaphore.WaitAsync();
|
|
}
|
|
}
|
|
|
|
_currentConcurrency = optimalConcurrency;
|
|
}
|
|
}
|
|
|
|
public async Task<T> ProcessSingleAsync<T>(
|
|
Func<Task<T>> task,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var results = await ProcessConcurrentlyAsync(new[] { task }, cancellationToken);
|
|
return results[0];
|
|
}
|
|
|
|
public int CurrentConcurrency => _currentConcurrency;
|
|
|
|
public void Dispose()
|
|
{
|
|
_semaphore?.Dispose();
|
|
}
|
|
|
|
private record WorkItem<T>(int Index, Func<Task<T>> Task);
|
|
}
|
|
|
|
// Extension methods for common scenarios
|
|
public static class ConcurrencyExtensions
|
|
{
|
|
public static async Task<TResult[]> ProcessWithAdaptiveConcurrencyAsync<TSource, TResult>(
|
|
this IEnumerable<TSource> source,
|
|
Func<TSource, Task<TResult>> processor,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var manager = new AdaptiveConcurrencyManager();
|
|
var tasks = source.Select(item => new Func<Task<TResult>>(() => processor(item)));
|
|
|
|
return await manager.ProcessConcurrentlyAsync(tasks, cancellationToken);
|
|
}
|
|
|
|
public static async Task ProcessWithAdaptiveConcurrencyAsync<TSource>(
|
|
this IEnumerable<TSource> source,
|
|
Func<TSource, Task> processor,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var manager = new AdaptiveConcurrencyManager();
|
|
var tasks = source.Select(item => new Func<Task<object?>>(() => processor(item).ContinueWith(_ => (object?)null, cancellationToken)));
|
|
|
|
await manager.ProcessConcurrentlyAsync(tasks, cancellationToken);
|
|
}
|
|
}
|
|
} |