245 lines
7.0 KiB
C#
Executable File
245 lines
7.0 KiB
C#
Executable File
using System.Text.Json;
|
|
using MarketAlly.AIPlugin.Context.Configuration;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace MarketAlly.AIPlugin.Context.Performance
|
|
{
|
|
/// <summary>
|
|
/// Provides streaming JSON processing capabilities for large context files
|
|
/// </summary>
|
|
public class StreamingJsonProcessor
|
|
{
|
|
private readonly ContextConfiguration _configuration;
|
|
private readonly ILogger<StreamingJsonProcessor> _logger;
|
|
|
|
public StreamingJsonProcessor(ContextConfiguration configuration, ILogger<StreamingJsonProcessor> logger)
|
|
{
|
|
_configuration = configuration;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Streams context entries from a file without loading the entire file into memory
|
|
/// </summary>
|
|
public async IAsyncEnumerable<StoredContextEntry> StreamContextEntriesAsync(string filePath, CancellationToken cancellationToken = default)
|
|
{
|
|
if (!File.Exists(filePath))
|
|
{
|
|
_logger.LogWarning("Context file not found: {FilePath}", filePath);
|
|
yield break;
|
|
}
|
|
|
|
using var fileStream = File.OpenRead(filePath);
|
|
using var document = await JsonDocument.ParseAsync(fileStream, cancellationToken: cancellationToken);
|
|
|
|
if (document.RootElement.ValueKind != JsonValueKind.Array)
|
|
{
|
|
_logger.LogError("Invalid JSON format in context file: {FilePath}", filePath);
|
|
yield break;
|
|
}
|
|
|
|
var entriesProcessed = 0;
|
|
foreach (var element in document.RootElement.EnumerateArray())
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
{
|
|
_logger.LogInformation("Streaming operation cancelled after processing {Count} entries from {FilePath}",
|
|
entriesProcessed, filePath);
|
|
yield break;
|
|
}
|
|
|
|
StoredContextEntry? entry = null;
|
|
try
|
|
{
|
|
entry = JsonSerializer.Deserialize<StoredContextEntry>(element.GetRawText());
|
|
}
|
|
catch (JsonException ex)
|
|
{
|
|
_logger.LogWarning(ex, "Failed to deserialize context entry at index {Index} in file {FilePath}",
|
|
entriesProcessed, filePath);
|
|
continue;
|
|
}
|
|
|
|
if (entry != null)
|
|
{
|
|
entriesProcessed++;
|
|
yield return entry;
|
|
}
|
|
}
|
|
|
|
_logger.LogDebug("Streamed {Count} entries from {FilePath}", entriesProcessed, filePath);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Streams and filters context entries based on search criteria
|
|
/// </summary>
|
|
public async IAsyncEnumerable<StoredContextEntry> StreamAndFilterEntriesAsync(
|
|
string filePath,
|
|
Func<StoredContextEntry, bool> filter,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
await foreach (var entry in StreamContextEntriesAsync(filePath, cancellationToken))
|
|
{
|
|
if (filter(entry))
|
|
{
|
|
yield return entry;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Processes multiple context files in parallel using streaming
|
|
/// </summary>
|
|
public async Task<IEnumerable<StoredContextEntry>> ProcessMultipleFilesAsync(
|
|
IEnumerable<string> filePaths,
|
|
Func<StoredContextEntry, bool> filter,
|
|
int maxConcurrency = 5,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var results = new List<StoredContextEntry>();
|
|
var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
|
|
|
|
var tasks = filePaths.Select(async filePath =>
|
|
{
|
|
await semaphore.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
var fileResults = new List<StoredContextEntry>();
|
|
await foreach (var entry in StreamAndFilterEntriesAsync(filePath, filter, cancellationToken))
|
|
{
|
|
fileResults.Add(entry);
|
|
}
|
|
return fileResults;
|
|
}
|
|
finally
|
|
{
|
|
semaphore.Release();
|
|
}
|
|
});
|
|
|
|
var fileResults = await Task.WhenAll(tasks);
|
|
return fileResults.SelectMany(entries => entries);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Writes context entries to a file using streaming approach
|
|
/// </summary>
|
|
public async Task WriteContextEntriesStreamAsync(
|
|
string filePath,
|
|
IAsyncEnumerable<StoredContextEntry> entries,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
using var fileStream = File.Create(filePath);
|
|
using var writer = new Utf8JsonWriter(fileStream, new JsonWriterOptions
|
|
{
|
|
Indented = true,
|
|
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
|
|
});
|
|
|
|
writer.WriteStartArray();
|
|
|
|
var entryCount = 0;
|
|
await foreach (var entry in entries)
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
{
|
|
_logger.LogInformation("Write operation cancelled after processing {Count} entries", entryCount);
|
|
break;
|
|
}
|
|
|
|
var entryJson = JsonSerializer.Serialize(entry);
|
|
writer.WriteRawValue(entryJson);
|
|
entryCount++;
|
|
|
|
// Flush periodically to avoid memory buildup
|
|
if (entryCount % 100 == 0)
|
|
{
|
|
await writer.FlushAsync(cancellationToken);
|
|
}
|
|
}
|
|
|
|
writer.WriteEndArray();
|
|
await writer.FlushAsync(cancellationToken);
|
|
|
|
_logger.LogDebug("Wrote {Count} entries to {FilePath}", entryCount, filePath);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Compresses a context file by removing entries older than the retention period
|
|
/// </summary>
|
|
public async Task<CompactionResult> CompactFileAsync(string filePath, CancellationToken cancellationToken = default)
|
|
{
|
|
var originalSize = new System.IO.FileInfo(filePath).Length;
|
|
var cutoffDate = DateTime.UtcNow.AddDays(-_configuration.Retention.RetentionDays);
|
|
var tempFilePath = filePath + ".tmp";
|
|
|
|
var retainedEntries = 0;
|
|
var removedEntries = 0;
|
|
|
|
try
|
|
{
|
|
var filteredEntries = StreamAndFilterEntriesAsync(filePath, entry =>
|
|
{
|
|
if (entry.Timestamp >= cutoffDate)
|
|
{
|
|
retainedEntries++;
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
removedEntries++;
|
|
return false;
|
|
}
|
|
}, cancellationToken);
|
|
|
|
await WriteContextEntriesStreamAsync(tempFilePath, filteredEntries, cancellationToken);
|
|
|
|
// Replace original file with compacted version
|
|
File.Move(tempFilePath, filePath, overwrite: true);
|
|
|
|
var newSize = new System.IO.FileInfo(filePath).Length;
|
|
var result = new CompactionResult
|
|
{
|
|
OriginalSizeBytes = originalSize,
|
|
NewSizeBytes = newSize,
|
|
EntriesRetained = retainedEntries,
|
|
EntriesRemoved = removedEntries,
|
|
SpaceSavedBytes = originalSize - newSize,
|
|
Success = true
|
|
};
|
|
|
|
_logger.LogInformation("Compacted {FilePath}: removed {RemovedEntries} entries, saved {SpaceSaved} bytes",
|
|
filePath, removedEntries, result.SpaceSavedBytes);
|
|
|
|
return result;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to compact context file: {FilePath}", filePath);
|
|
|
|
// Clean up temp file if it exists
|
|
if (File.Exists(tempFilePath))
|
|
{
|
|
File.Delete(tempFilePath);
|
|
}
|
|
|
|
return new CompactionResult { Success = false, Error = ex.Message };
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Result of a file compaction operation
|
|
/// </summary>
|
|
public class CompactionResult
|
|
{
|
|
public bool Success { get; set; }
|
|
public string? Error { get; set; }
|
|
public long OriginalSizeBytes { get; set; }
|
|
public long NewSizeBytes { get; set; }
|
|
public int EntriesRetained { get; set; }
|
|
public int EntriesRemoved { get; set; }
|
|
public long SpaceSavedBytes { get; set; }
|
|
public double CompressionRatio => OriginalSizeBytes > 0 ? (double)NewSizeBytes / OriginalSizeBytes : 1.0;
|
|
}
|
|
} |