MarketAlly.AIPlugin.Extensions/MarketAlly.AIPlugin.Context/Concurrency/ThreadSafeStorage.cs

503 lines
15 KiB
C#
Executable File

using System.Collections.Concurrent;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using MarketAlly.AIPlugin.Context.Configuration;
namespace MarketAlly.AIPlugin.Context.Concurrency
{
/// <summary>
/// Thread-safe storage manager for context operations
/// </summary>
public class ThreadSafeStorage : IDisposable
{
private readonly ContextConfiguration _configuration;
private readonly ILogger<ThreadSafeStorage> _logger;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _fileLocks;
private readonly ConcurrentDictionary<string, DateTime> _fileTimestamps;
private readonly SemaphoreSlim _globalWriteLock;
private readonly ReaderWriterLockSlim _indexLock;
private readonly Timer _lockCleanupTimer;
public ThreadSafeStorage(ContextConfiguration configuration, ILogger<ThreadSafeStorage> logger)
{
_configuration = configuration;
_logger = logger;
_fileLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
_fileTimestamps = new ConcurrentDictionary<string, DateTime>();
_globalWriteLock = new SemaphoreSlim(_configuration.Performance.MaxConcurrentOperations, _configuration.Performance.MaxConcurrentOperations);
_indexLock = new ReaderWriterLockSlim();
// Clean up unused locks every 5 minutes
_lockCleanupTimer = new Timer(CleanupUnusedLocks, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
}
/// <summary>
/// Safely stores a context entry with optimistic concurrency control
/// </summary>
public async Task<StorageResult> StoreContextEntryAsync(StoredContextEntry entry, string storagePath, CancellationToken cancellationToken = default)
{
var fileName = $"context-{DateTime.UtcNow:yyyy-MM}.json";
var filePath = Path.Combine(storagePath, fileName);
// Acquire global write semaphore to limit concurrent operations
await _globalWriteLock.WaitAsync(cancellationToken);
try
{
// Get or create file-specific lock
var fileLock = _fileLocks.GetOrAdd(filePath, _ => new SemaphoreSlim(1, 1));
await fileLock.WaitAsync(cancellationToken);
try
{
var result = await StoreEntryWithRetryAsync(entry, filePath, cancellationToken);
if (result.Success)
{
// Update index in a thread-safe manner
await UpdateIndexSafelyAsync(entry, storagePath, cancellationToken);
_fileTimestamps[filePath] = DateTime.UtcNow;
}
return result;
}
finally
{
fileLock.Release();
}
}
finally
{
_globalWriteLock.Release();
}
}
/// <summary>
/// Safely reads context entries from a file
/// </summary>
public async Task<ReadResult> ReadContextEntriesAsync(string filePath, CancellationToken cancellationToken = default)
{
if (!File.Exists(filePath))
{
return new ReadResult { Success = false, Error = "File not found" };
}
// Get or create file-specific lock for reading
var fileLock = _fileLocks.GetOrAdd(filePath, _ => new SemaphoreSlim(1, 1));
await fileLock.WaitAsync(cancellationToken);
try
{
var fileInfo = new System.IO.FileInfo(filePath);
var entries = new List<StoredContextEntry>();
// Check if file has been modified since our last read
if (_fileTimestamps.TryGetValue(filePath, out var lastRead) && lastRead >= fileInfo.LastWriteTime)
{
_logger.LogDebug("File {FilePath} unchanged since last read", filePath);
}
using var fileStream = File.OpenRead(filePath);
var jsonContent = await new StreamReader(fileStream).ReadToEndAsync();
var deserializedEntries = JsonSerializer.Deserialize<List<StoredContextEntry>>(jsonContent);
if (deserializedEntries != null)
{
entries.AddRange(deserializedEntries);
}
_fileTimestamps[filePath] = DateTime.UtcNow;
return new ReadResult
{
Success = true,
Entries = entries,
LastModified = fileInfo.LastWriteTime
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to read context entries from {FilePath}", filePath);
return new ReadResult { Success = false, Error = ex.Message };
}
finally
{
fileLock.Release();
}
}
/// <summary>
/// Safely deletes context entries with optimistic concurrency control
/// </summary>
public async Task<DeletionResult> DeleteContextEntriesAsync(string filePath, Func<StoredContextEntry, bool> predicate, CancellationToken cancellationToken = default)
{
if (!File.Exists(filePath))
{
return new DeletionResult { Success = false, Error = "File not found" };
}
// Acquire global write semaphore
await _globalWriteLock.WaitAsync(cancellationToken);
try
{
var fileLock = _fileLocks.GetOrAdd(filePath, _ => new SemaphoreSlim(1, 1));
await fileLock.WaitAsync(cancellationToken);
try
{
return await DeleteEntriesWithRetryAsync(filePath, predicate, cancellationToken);
}
finally
{
fileLock.Release();
}
}
finally
{
_globalWriteLock.Release();
}
}
/// <summary>
/// Safely updates the context index
/// </summary>
public async Task<bool> UpdateIndexSafelyAsync(StoredContextEntry entry, string storagePath, CancellationToken cancellationToken = default)
{
var indexPath = Path.Combine(storagePath, "context-index.json");
_indexLock.EnterWriteLock();
try
{
var indexEntries = new List<ContextIndexEntry>();
// Load existing index
if (File.Exists(indexPath))
{
var indexJson = await File.ReadAllTextAsync(indexPath, cancellationToken);
var existing = JsonSerializer.Deserialize<List<ContextIndexEntry>>(indexJson);
if (existing != null)
{
indexEntries = existing;
}
}
// Add new index entry
var indexEntry = new ContextIndexEntry
{
Id = entry.Id,
Type = entry.Type,
Summary = entry.Summary,
Tags = entry.Tags,
Priority = entry.Priority,
Timestamp = entry.Timestamp,
FileName = $"context-{entry.Timestamp:yyyy-MM}.json"
};
indexEntries.Add(indexEntry);
// Keep only the most recent entries
indexEntries = indexEntries.OrderByDescending(e => e.Timestamp)
.Take(1000)
.ToList();
// Save index atomically
var tempIndexPath = indexPath + ".tmp";
var indexJsonString = JsonSerializer.Serialize(indexEntries, new JsonSerializerOptions
{
WriteIndented = true
});
await File.WriteAllTextAsync(tempIndexPath, indexJsonString, cancellationToken);
File.Move(tempIndexPath, indexPath, overwrite: true);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to update index for entry {EntryId}", entry.Id);
return false;
}
finally
{
_indexLock.ExitWriteLock();
}
}
/// <summary>
/// Safely reads the context index
/// </summary>
public async Task<List<ContextIndexEntry>> ReadIndexSafelyAsync(string storagePath, CancellationToken cancellationToken = default)
{
var indexPath = Path.Combine(storagePath, "context-index.json");
if (!File.Exists(indexPath))
{
return new List<ContextIndexEntry>();
}
_indexLock.EnterReadLock();
try
{
var indexJson = await File.ReadAllTextAsync(indexPath, cancellationToken);
var entries = JsonSerializer.Deserialize<List<ContextIndexEntry>>(indexJson);
return entries ?? new List<ContextIndexEntry>();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to read index from {IndexPath}", indexPath);
return new List<ContextIndexEntry>();
}
finally
{
_indexLock.ExitReadLock();
}
}
/// <summary>
/// Performs parallel processing of multiple files with concurrency control
/// </summary>
public async Task<List<T>> ProcessFilesInParallelAsync<T>(
IEnumerable<string> filePaths,
Func<string, CancellationToken, Task<T>> processor,
CancellationToken cancellationToken = default)
{
var results = new List<T>();
var semaphore = new SemaphoreSlim(_configuration.Performance.MaxConcurrentOperations, _configuration.Performance.MaxConcurrentOperations);
var tasks = filePaths.Select(async filePath =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
return await processor(filePath, cancellationToken);
}
finally
{
semaphore.Release();
}
});
var completedResults = await Task.WhenAll(tasks);
return completedResults.ToList();
}
/// <summary>
/// Stores entry with retry logic for handling concurrent modifications
/// </summary>
private async Task<StorageResult> StoreEntryWithRetryAsync(StoredContextEntry entry, string filePath, CancellationToken cancellationToken, int maxRetries = 3)
{
var attempt = 0;
while (attempt < maxRetries)
{
try
{
var entries = new List<StoredContextEntry>();
// Load existing entries if file exists
if (File.Exists(filePath))
{
var existingJson = await File.ReadAllTextAsync(filePath, cancellationToken);
var existing = JsonSerializer.Deserialize<List<StoredContextEntry>>(existingJson);
if (existing != null)
{
entries = existing;
}
}
// Add new entry
entries.Add(entry);
// Sort by timestamp (newest first)
entries = entries.OrderByDescending(e => e.Timestamp).ToList();
// Check file size limits
if (entries.Count > _configuration.Retention.MaxEntriesPerFile)
{
entries = entries.Take(_configuration.Retention.MaxEntriesPerFile).ToList();
}
// Write atomically using temporary file
var tempFilePath = filePath + $".tmp.{Guid.NewGuid():N}";
var json = JsonSerializer.Serialize(entries, new JsonSerializerOptions
{
WriteIndented = true,
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
});
await File.WriteAllTextAsync(tempFilePath, json, cancellationToken);
File.Move(tempFilePath, filePath, overwrite: true);
return new StorageResult
{
Success = true,
EntryId = entry.Id,
FilePath = filePath,
EntriesInFile = entries.Count
};
}
catch (IOException ex) when (attempt < maxRetries - 1)
{
// File might be locked by another process, retry after a short delay
_logger.LogWarning(ex, "IO error storing entry {EntryId}, attempt {Attempt} of {MaxRetries}",
entry.Id, attempt + 1, maxRetries);
await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken);
attempt++;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to store entry {EntryId} after {Attempts} attempts", entry.Id, attempt + 1);
return new StorageResult { Success = false, Error = ex.Message };
}
}
return new StorageResult { Success = false, Error = $"Failed after {maxRetries} attempts" };
}
/// <summary>
/// Deletes entries with retry logic
/// </summary>
private async Task<DeletionResult> DeleteEntriesWithRetryAsync(string filePath, Func<StoredContextEntry, bool> predicate, CancellationToken cancellationToken, int maxRetries = 3)
{
var attempt = 0;
while (attempt < maxRetries)
{
try
{
var fileContent = await File.ReadAllTextAsync(filePath, cancellationToken);
var entries = JsonSerializer.Deserialize<List<StoredContextEntry>>(fileContent);
if (entries == null)
{
return new DeletionResult { Success = false, Error = "Failed to parse file content" };
}
var originalCount = entries.Count;
var deletedEntries = entries.Where(predicate).ToList();
entries.RemoveAll(entry => predicate(entry));
// Write back atomically
var tempFilePath = filePath + $".tmp.{Guid.NewGuid():N}";
var updatedJson = JsonSerializer.Serialize(entries, new JsonSerializerOptions
{
WriteIndented = true,
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
});
await File.WriteAllTextAsync(tempFilePath, updatedJson, cancellationToken);
File.Move(tempFilePath, filePath, overwrite: true);
return new DeletionResult
{
Success = true,
DeletedCount = originalCount - entries.Count,
RemainingCount = entries.Count,
DeletedEntries = deletedEntries.Select(e => e.Id).ToList()
};
}
catch (IOException ex) when (attempt < maxRetries - 1)
{
_logger.LogWarning(ex, "IO error deleting entries from {FilePath}, attempt {Attempt} of {MaxRetries}",
filePath, attempt + 1, maxRetries);
await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken);
attempt++;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to delete entries from {FilePath} after {Attempts} attempts", filePath, attempt + 1);
return new DeletionResult { Success = false, Error = ex.Message };
}
}
return new DeletionResult { Success = false, Error = $"Failed after {maxRetries} attempts" };
}
/// <summary>
/// Cleans up unused file locks to prevent memory leaks
/// </summary>
private void CleanupUnusedLocks(object? state)
{
try
{
var cutoffTime = DateTime.UtcNow.AddMinutes(-10); // Remove locks unused for 10+ minutes
var locksToRemove = new List<string>();
foreach (var kvp in _fileTimestamps)
{
if (kvp.Value < cutoffTime)
{
locksToRemove.Add(kvp.Key);
}
}
foreach (var filePath in locksToRemove)
{
if (_fileLocks.TryRemove(filePath, out var lockObject))
{
lockObject.Dispose();
}
_fileTimestamps.TryRemove(filePath, out _);
}
if (locksToRemove.Count > 0)
{
_logger.LogDebug("Cleaned up {Count} unused file locks", locksToRemove.Count);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during lock cleanup");
}
}
public void Dispose()
{
_lockCleanupTimer?.Dispose();
_globalWriteLock?.Dispose();
_indexLock?.Dispose();
foreach (var lockObject in _fileLocks.Values)
{
lockObject.Dispose();
}
_fileLocks.Clear();
}
}
/// <summary>
/// Result of a storage operation
/// </summary>
public class StorageResult
{
public bool Success { get; set; }
public string? Error { get; set; }
public string EntryId { get; set; } = "";
public string FilePath { get; set; } = "";
public int EntriesInFile { get; set; }
}
/// <summary>
/// Result of a read operation
/// </summary>
public class ReadResult
{
public bool Success { get; set; }
public string? Error { get; set; }
public List<StoredContextEntry> Entries { get; set; } = new();
public DateTime LastModified { get; set; }
}
/// <summary>
/// Result of a deletion operation
/// </summary>
public class DeletionResult
{
public bool Success { get; set; }
public string? Error { get; set; }
public int DeletedCount { get; set; }
public int RemainingCount { get; set; }
public List<string> DeletedEntries { get; set; } = new();
}
}