using System.Text.Json;
using MarketAlly.AIPlugin.Context.Configuration;
using Microsoft.Extensions.Logging;
namespace MarketAlly.AIPlugin.Context.Performance
{
///
/// Provides streaming JSON processing capabilities for large context files
///
public class StreamingJsonProcessor
{
private readonly ContextConfiguration _configuration;
private readonly ILogger _logger;
public StreamingJsonProcessor(ContextConfiguration configuration, ILogger logger)
{
_configuration = configuration;
_logger = logger;
}
///
/// Streams context entries from a file without loading the entire file into memory
///
public async IAsyncEnumerable StreamContextEntriesAsync(string filePath, CancellationToken cancellationToken = default)
{
if (!File.Exists(filePath))
{
_logger.LogWarning("Context file not found: {FilePath}", filePath);
yield break;
}
using var fileStream = File.OpenRead(filePath);
using var document = await JsonDocument.ParseAsync(fileStream, cancellationToken: cancellationToken);
if (document.RootElement.ValueKind != JsonValueKind.Array)
{
_logger.LogError("Invalid JSON format in context file: {FilePath}", filePath);
yield break;
}
var entriesProcessed = 0;
foreach (var element in document.RootElement.EnumerateArray())
{
if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Streaming operation cancelled after processing {Count} entries from {FilePath}",
entriesProcessed, filePath);
yield break;
}
StoredContextEntry? entry = null;
try
{
entry = JsonSerializer.Deserialize(element.GetRawText());
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to deserialize context entry at index {Index} in file {FilePath}",
entriesProcessed, filePath);
continue;
}
if (entry != null)
{
entriesProcessed++;
yield return entry;
}
}
_logger.LogDebug("Streamed {Count} entries from {FilePath}", entriesProcessed, filePath);
}
///
/// Streams and filters context entries based on search criteria
///
public async IAsyncEnumerable StreamAndFilterEntriesAsync(
string filePath,
Func filter,
CancellationToken cancellationToken = default)
{
await foreach (var entry in StreamContextEntriesAsync(filePath, cancellationToken))
{
if (filter(entry))
{
yield return entry;
}
}
}
///
/// Processes multiple context files in parallel using streaming
///
public async Task> ProcessMultipleFilesAsync(
IEnumerable filePaths,
Func filter,
int maxConcurrency = 5,
CancellationToken cancellationToken = default)
{
var results = new List();
var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
var tasks = filePaths.Select(async filePath =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
var fileResults = new List();
await foreach (var entry in StreamAndFilterEntriesAsync(filePath, filter, cancellationToken))
{
fileResults.Add(entry);
}
return fileResults;
}
finally
{
semaphore.Release();
}
});
var fileResults = await Task.WhenAll(tasks);
return fileResults.SelectMany(entries => entries);
}
///
/// Writes context entries to a file using streaming approach
///
public async Task WriteContextEntriesStreamAsync(
string filePath,
IAsyncEnumerable entries,
CancellationToken cancellationToken = default)
{
using var fileStream = File.Create(filePath);
using var writer = new Utf8JsonWriter(fileStream, new JsonWriterOptions
{
Indented = true,
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping
});
writer.WriteStartArray();
var entryCount = 0;
await foreach (var entry in entries)
{
if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Write operation cancelled after processing {Count} entries", entryCount);
break;
}
var entryJson = JsonSerializer.Serialize(entry);
writer.WriteRawValue(entryJson);
entryCount++;
// Flush periodically to avoid memory buildup
if (entryCount % 100 == 0)
{
await writer.FlushAsync(cancellationToken);
}
}
writer.WriteEndArray();
await writer.FlushAsync(cancellationToken);
_logger.LogDebug("Wrote {Count} entries to {FilePath}", entryCount, filePath);
}
///
/// Compresses a context file by removing entries older than the retention period
///
public async Task CompactFileAsync(string filePath, CancellationToken cancellationToken = default)
{
var originalSize = new System.IO.FileInfo(filePath).Length;
var cutoffDate = DateTime.UtcNow.AddDays(-_configuration.Retention.RetentionDays);
var tempFilePath = filePath + ".tmp";
var retainedEntries = 0;
var removedEntries = 0;
try
{
var filteredEntries = StreamAndFilterEntriesAsync(filePath, entry =>
{
if (entry.Timestamp >= cutoffDate)
{
retainedEntries++;
return true;
}
else
{
removedEntries++;
return false;
}
}, cancellationToken);
await WriteContextEntriesStreamAsync(tempFilePath, filteredEntries, cancellationToken);
// Replace original file with compacted version
File.Move(tempFilePath, filePath, overwrite: true);
var newSize = new System.IO.FileInfo(filePath).Length;
var result = new CompactionResult
{
OriginalSizeBytes = originalSize,
NewSizeBytes = newSize,
EntriesRetained = retainedEntries,
EntriesRemoved = removedEntries,
SpaceSavedBytes = originalSize - newSize,
Success = true
};
_logger.LogInformation("Compacted {FilePath}: removed {RemovedEntries} entries, saved {SpaceSaved} bytes",
filePath, removedEntries, result.SpaceSavedBytes);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to compact context file: {FilePath}", filePath);
// Clean up temp file if it exists
if (File.Exists(tempFilePath))
{
File.Delete(tempFilePath);
}
return new CompactionResult { Success = false, Error = ex.Message };
}
}
}
///
/// Result of a file compaction operation
///
public class CompactionResult
{
public bool Success { get; set; }
public string? Error { get; set; }
public long OriginalSizeBytes { get; set; }
public long NewSizeBytes { get; set; }
public int EntriesRetained { get; set; }
public int EntriesRemoved { get; set; }
public long SpaceSavedBytes { get; set; }
public double CompressionRatio => OriginalSizeBytes > 0 ? (double)NewSizeBytes / OriginalSizeBytes : 1.0;
}
}