503 lines
15 KiB
C#
Executable File
503 lines
15 KiB
C#
Executable File
using System.Collections.Concurrent;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using MarketAlly.AIPlugin.Context.Configuration;
|
|
|
|
namespace MarketAlly.AIPlugin.Context.Concurrency
|
|
{
|
|
/// <summary>
|
|
/// Thread-safe storage manager for context operations
|
|
/// </summary>
|
|
public class ThreadSafeStorage : IDisposable
|
|
{
|
|
private readonly ContextConfiguration _configuration;
|
|
private readonly ILogger<ThreadSafeStorage> _logger;
|
|
private readonly ConcurrentDictionary<string, SemaphoreSlim> _fileLocks;
|
|
private readonly ConcurrentDictionary<string, DateTime> _fileTimestamps;
|
|
private readonly SemaphoreSlim _globalWriteLock;
|
|
private readonly ReaderWriterLockSlim _indexLock;
|
|
private readonly Timer _lockCleanupTimer;
|
|
|
|
public ThreadSafeStorage(ContextConfiguration configuration, ILogger<ThreadSafeStorage> logger)
|
|
{
|
|
_configuration = configuration;
|
|
_logger = logger;
|
|
_fileLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
|
|
_fileTimestamps = new ConcurrentDictionary<string, DateTime>();
|
|
_globalWriteLock = new SemaphoreSlim(_configuration.Performance.MaxConcurrentOperations, _configuration.Performance.MaxConcurrentOperations);
|
|
_indexLock = new ReaderWriterLockSlim();
|
|
|
|
// Clean up unused locks every 5 minutes
|
|
_lockCleanupTimer = new Timer(CleanupUnusedLocks, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Safely stores a context entry with optimistic concurrency control
|
|
/// </summary>
|
|
public async Task<StorageResult> StoreContextEntryAsync(StoredContextEntry entry, string storagePath, CancellationToken cancellationToken = default)
|
|
{
|
|
var fileName = $"context-{DateTime.UtcNow:yyyy-MM}.json";
|
|
var filePath = Path.Combine(storagePath, fileName);
|
|
|
|
// Acquire global write semaphore to limit concurrent operations
|
|
await _globalWriteLock.WaitAsync(cancellationToken);
|
|
|
|
try
|
|
{
|
|
// Get or create file-specific lock
|
|
var fileLock = _fileLocks.GetOrAdd(filePath, _ => new SemaphoreSlim(1, 1));
|
|
|
|
await fileLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
var result = await StoreEntryWithRetryAsync(entry, filePath, cancellationToken);
|
|
|
|
if (result.Success)
|
|
{
|
|
// Update index in a thread-safe manner
|
|
await UpdateIndexSafelyAsync(entry, storagePath, cancellationToken);
|
|
_fileTimestamps[filePath] = DateTime.UtcNow;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
finally
|
|
{
|
|
fileLock.Release();
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_globalWriteLock.Release();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Safely reads context entries from a file
|
|
/// </summary>
|
|
public async Task<ReadResult> ReadContextEntriesAsync(string filePath, CancellationToken cancellationToken = default)
|
|
{
|
|
if (!File.Exists(filePath))
|
|
{
|
|
return new ReadResult { Success = false, Error = "File not found" };
|
|
}
|
|
|
|
// Get or create file-specific lock for reading
|
|
var fileLock = _fileLocks.GetOrAdd(filePath, _ => new SemaphoreSlim(1, 1));
|
|
|
|
await fileLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
var fileInfo = new System.IO.FileInfo(filePath);
|
|
var entries = new List<StoredContextEntry>();
|
|
|
|
// Check if file has been modified since our last read
|
|
if (_fileTimestamps.TryGetValue(filePath, out var lastRead) && lastRead >= fileInfo.LastWriteTime)
|
|
{
|
|
_logger.LogDebug("File {FilePath} unchanged since last read", filePath);
|
|
}
|
|
|
|
using var fileStream = File.OpenRead(filePath);
|
|
var jsonContent = await new StreamReader(fileStream).ReadToEndAsync();
|
|
|
|
var deserializedEntries = JsonSerializer.Deserialize<List<StoredContextEntry>>(jsonContent);
|
|
if (deserializedEntries != null)
|
|
{
|
|
entries.AddRange(deserializedEntries);
|
|
}
|
|
|
|
_fileTimestamps[filePath] = DateTime.UtcNow;
|
|
|
|
return new ReadResult
|
|
{
|
|
Success = true,
|
|
Entries = entries,
|
|
LastModified = fileInfo.LastWriteTime
|
|
};
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to read context entries from {FilePath}", filePath);
|
|
return new ReadResult { Success = false, Error = ex.Message };
|
|
}
|
|
finally
|
|
{
|
|
fileLock.Release();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Safely deletes context entries with optimistic concurrency control
|
|
/// </summary>
|
|
public async Task<DeletionResult> DeleteContextEntriesAsync(string filePath, Func<StoredContextEntry, bool> predicate, CancellationToken cancellationToken = default)
|
|
{
|
|
if (!File.Exists(filePath))
|
|
{
|
|
return new DeletionResult { Success = false, Error = "File not found" };
|
|
}
|
|
|
|
// Acquire global write semaphore
|
|
await _globalWriteLock.WaitAsync(cancellationToken);
|
|
|
|
try
|
|
{
|
|
var fileLock = _fileLocks.GetOrAdd(filePath, _ => new SemaphoreSlim(1, 1));
|
|
|
|
await fileLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
return await DeleteEntriesWithRetryAsync(filePath, predicate, cancellationToken);
|
|
}
|
|
finally
|
|
{
|
|
fileLock.Release();
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_globalWriteLock.Release();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Safely updates the context index
|
|
/// </summary>
|
|
public async Task<bool> UpdateIndexSafelyAsync(StoredContextEntry entry, string storagePath, CancellationToken cancellationToken = default)
|
|
{
|
|
var indexPath = Path.Combine(storagePath, "context-index.json");
|
|
|
|
_indexLock.EnterWriteLock();
|
|
try
|
|
{
|
|
var indexEntries = new List<ContextIndexEntry>();
|
|
|
|
// Load existing index
|
|
if (File.Exists(indexPath))
|
|
{
|
|
var indexJson = await File.ReadAllTextAsync(indexPath, cancellationToken);
|
|
var existing = JsonSerializer.Deserialize<List<ContextIndexEntry>>(indexJson);
|
|
if (existing != null)
|
|
{
|
|
indexEntries = existing;
|
|
}
|
|
}
|
|
|
|
// Add new index entry
|
|
var indexEntry = new ContextIndexEntry
|
|
{
|
|
Id = entry.Id,
|
|
Type = entry.Type,
|
|
Summary = entry.Summary,
|
|
Tags = entry.Tags,
|
|
Priority = entry.Priority,
|
|
Timestamp = entry.Timestamp,
|
|
FileName = $"context-{entry.Timestamp:yyyy-MM}.json"
|
|
};
|
|
|
|
indexEntries.Add(indexEntry);
|
|
|
|
// Keep only the most recent entries
|
|
indexEntries = indexEntries.OrderByDescending(e => e.Timestamp)
|
|
.Take(1000)
|
|
.ToList();
|
|
|
|
// Save index atomically
|
|
var tempIndexPath = indexPath + ".tmp";
|
|
var indexJsonString = JsonSerializer.Serialize(indexEntries, new JsonSerializerOptions
|
|
{
|
|
WriteIndented = true
|
|
});
|
|
|
|
await File.WriteAllTextAsync(tempIndexPath, indexJsonString, cancellationToken);
|
|
File.Move(tempIndexPath, indexPath, overwrite: true);
|
|
|
|
return true;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to update index for entry {EntryId}", entry.Id);
|
|
return false;
|
|
}
|
|
finally
|
|
{
|
|
_indexLock.ExitWriteLock();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Safely reads the context index
|
|
/// </summary>
|
|
public async Task<List<ContextIndexEntry>> ReadIndexSafelyAsync(string storagePath, CancellationToken cancellationToken = default)
|
|
{
|
|
var indexPath = Path.Combine(storagePath, "context-index.json");
|
|
|
|
if (!File.Exists(indexPath))
|
|
{
|
|
return new List<ContextIndexEntry>();
|
|
}
|
|
|
|
_indexLock.EnterReadLock();
|
|
try
|
|
{
|
|
var indexJson = await File.ReadAllTextAsync(indexPath, cancellationToken);
|
|
var entries = JsonSerializer.Deserialize<List<ContextIndexEntry>>(indexJson);
|
|
return entries ?? new List<ContextIndexEntry>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to read index from {IndexPath}", indexPath);
|
|
return new List<ContextIndexEntry>();
|
|
}
|
|
finally
|
|
{
|
|
_indexLock.ExitReadLock();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Performs parallel processing of multiple files with concurrency control
|
|
/// </summary>
|
|
public async Task<List<T>> ProcessFilesInParallelAsync<T>(
|
|
IEnumerable<string> filePaths,
|
|
Func<string, CancellationToken, Task<T>> processor,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var results = new List<T>();
|
|
var semaphore = new SemaphoreSlim(_configuration.Performance.MaxConcurrentOperations, _configuration.Performance.MaxConcurrentOperations);
|
|
|
|
var tasks = filePaths.Select(async filePath =>
|
|
{
|
|
await semaphore.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
return await processor(filePath, cancellationToken);
|
|
}
|
|
finally
|
|
{
|
|
semaphore.Release();
|
|
}
|
|
});
|
|
|
|
var completedResults = await Task.WhenAll(tasks);
|
|
return completedResults.ToList();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Stores entry with retry logic for handling concurrent modifications
|
|
/// </summary>
|
|
private async Task<StorageResult> StoreEntryWithRetryAsync(StoredContextEntry entry, string filePath, CancellationToken cancellationToken, int maxRetries = 3)
|
|
{
|
|
var attempt = 0;
|
|
while (attempt < maxRetries)
|
|
{
|
|
try
|
|
{
|
|
var entries = new List<StoredContextEntry>();
|
|
|
|
// Load existing entries if file exists
|
|
if (File.Exists(filePath))
|
|
{
|
|
var existingJson = await File.ReadAllTextAsync(filePath, cancellationToken);
|
|
var existing = JsonSerializer.Deserialize<List<StoredContextEntry>>(existingJson);
|
|
if (existing != null)
|
|
{
|
|
entries = existing;
|
|
}
|
|
}
|
|
|
|
// Add new entry
|
|
entries.Add(entry);
|
|
|
|
// Sort by timestamp (newest first)
|
|
entries = entries.OrderByDescending(e => e.Timestamp).ToList();
|
|
|
|
// Check file size limits
|
|
if (entries.Count > _configuration.Retention.MaxEntriesPerFile)
|
|
{
|
|
entries = entries.Take(_configuration.Retention.MaxEntriesPerFile).ToList();
|
|
}
|
|
|
|
// Write atomically using temporary file
|
|
var tempFilePath = filePath + $".tmp.{Guid.NewGuid():N}";
|
|
var json = JsonSerializer.Serialize(entries, new JsonSerializerOptions
|
|
{
|
|
WriteIndented = true,
|
|
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
|
|
});
|
|
|
|
await File.WriteAllTextAsync(tempFilePath, json, cancellationToken);
|
|
File.Move(tempFilePath, filePath, overwrite: true);
|
|
|
|
return new StorageResult
|
|
{
|
|
Success = true,
|
|
EntryId = entry.Id,
|
|
FilePath = filePath,
|
|
EntriesInFile = entries.Count
|
|
};
|
|
}
|
|
catch (IOException ex) when (attempt < maxRetries - 1)
|
|
{
|
|
// File might be locked by another process, retry after a short delay
|
|
_logger.LogWarning(ex, "IO error storing entry {EntryId}, attempt {Attempt} of {MaxRetries}",
|
|
entry.Id, attempt + 1, maxRetries);
|
|
|
|
await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken);
|
|
attempt++;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to store entry {EntryId} after {Attempts} attempts", entry.Id, attempt + 1);
|
|
return new StorageResult { Success = false, Error = ex.Message };
|
|
}
|
|
}
|
|
|
|
return new StorageResult { Success = false, Error = $"Failed after {maxRetries} attempts" };
|
|
}
|
|
|
|
/// <summary>
|
|
/// Deletes entries with retry logic
|
|
/// </summary>
|
|
private async Task<DeletionResult> DeleteEntriesWithRetryAsync(string filePath, Func<StoredContextEntry, bool> predicate, CancellationToken cancellationToken, int maxRetries = 3)
|
|
{
|
|
var attempt = 0;
|
|
while (attempt < maxRetries)
|
|
{
|
|
try
|
|
{
|
|
var fileContent = await File.ReadAllTextAsync(filePath, cancellationToken);
|
|
var entries = JsonSerializer.Deserialize<List<StoredContextEntry>>(fileContent);
|
|
|
|
if (entries == null)
|
|
{
|
|
return new DeletionResult { Success = false, Error = "Failed to parse file content" };
|
|
}
|
|
|
|
var originalCount = entries.Count;
|
|
var deletedEntries = entries.Where(predicate).ToList();
|
|
entries.RemoveAll(entry => predicate(entry));
|
|
|
|
// Write back atomically
|
|
var tempFilePath = filePath + $".tmp.{Guid.NewGuid():N}";
|
|
var updatedJson = JsonSerializer.Serialize(entries, new JsonSerializerOptions
|
|
{
|
|
WriteIndented = true,
|
|
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
|
|
});
|
|
|
|
await File.WriteAllTextAsync(tempFilePath, updatedJson, cancellationToken);
|
|
File.Move(tempFilePath, filePath, overwrite: true);
|
|
|
|
return new DeletionResult
|
|
{
|
|
Success = true,
|
|
DeletedCount = originalCount - entries.Count,
|
|
RemainingCount = entries.Count,
|
|
DeletedEntries = deletedEntries.Select(e => e.Id).ToList()
|
|
};
|
|
}
|
|
catch (IOException ex) when (attempt < maxRetries - 1)
|
|
{
|
|
_logger.LogWarning(ex, "IO error deleting entries from {FilePath}, attempt {Attempt} of {MaxRetries}",
|
|
filePath, attempt + 1, maxRetries);
|
|
|
|
await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken);
|
|
attempt++;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to delete entries from {FilePath} after {Attempts} attempts", filePath, attempt + 1);
|
|
return new DeletionResult { Success = false, Error = ex.Message };
|
|
}
|
|
}
|
|
|
|
return new DeletionResult { Success = false, Error = $"Failed after {maxRetries} attempts" };
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cleans up unused file locks to prevent memory leaks
|
|
/// </summary>
|
|
private void CleanupUnusedLocks(object? state)
|
|
{
|
|
try
|
|
{
|
|
var cutoffTime = DateTime.UtcNow.AddMinutes(-10); // Remove locks unused for 10+ minutes
|
|
var locksToRemove = new List<string>();
|
|
|
|
foreach (var kvp in _fileTimestamps)
|
|
{
|
|
if (kvp.Value < cutoffTime)
|
|
{
|
|
locksToRemove.Add(kvp.Key);
|
|
}
|
|
}
|
|
|
|
foreach (var filePath in locksToRemove)
|
|
{
|
|
if (_fileLocks.TryRemove(filePath, out var lockObject))
|
|
{
|
|
lockObject.Dispose();
|
|
}
|
|
_fileTimestamps.TryRemove(filePath, out _);
|
|
}
|
|
|
|
if (locksToRemove.Count > 0)
|
|
{
|
|
_logger.LogDebug("Cleaned up {Count} unused file locks", locksToRemove.Count);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error during lock cleanup");
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_lockCleanupTimer?.Dispose();
|
|
_globalWriteLock?.Dispose();
|
|
_indexLock?.Dispose();
|
|
|
|
foreach (var lockObject in _fileLocks.Values)
|
|
{
|
|
lockObject.Dispose();
|
|
}
|
|
_fileLocks.Clear();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Result of a storage operation
|
|
/// </summary>
|
|
public class StorageResult
|
|
{
|
|
public bool Success { get; set; }
|
|
public string? Error { get; set; }
|
|
public string EntryId { get; set; } = "";
|
|
public string FilePath { get; set; } = "";
|
|
public int EntriesInFile { get; set; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Result of a read operation
|
|
/// </summary>
|
|
public class ReadResult
|
|
{
|
|
public bool Success { get; set; }
|
|
public string? Error { get; set; }
|
|
public List<StoredContextEntry> Entries { get; set; } = new();
|
|
public DateTime LastModified { get; set; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Result of a deletion operation
|
|
/// </summary>
|
|
public class DeletionResult
|
|
{
|
|
public bool Success { get; set; }
|
|
public string? Error { get; set; }
|
|
public int DeletedCount { get; set; }
|
|
public int RemainingCount { get; set; }
|
|
public List<string> DeletedEntries { get; set; } = new();
|
|
}
|
|
} |