irontelemetry-dotnet/OfflineQueue.cs

257 lines
6.7 KiB
C#

using System.Text.Json;
using System.Text.Json.Serialization;
namespace IronTelemetry.Client;
/// <summary>
/// File-based offline queue for telemetry items.
/// Persists failed sends to disk and retries with exponential backoff.
/// </summary>
public class OfflineQueue : IDisposable
{
private readonly string _queueFilePath;
private readonly int _maxQueueSize;
private readonly object _fileLock = new();
private readonly SemaphoreSlim _retrySemaphore = new(1, 1);
private readonly Timer? _retryTimer;
private readonly Func<List<EnvelopeItem>, Task<bool>> _sendFunc;
private int _retryAttempt;
private bool _disposed;
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
WriteIndented = false
};
/// <summary>
/// Creates a new offline queue.
/// </summary>
/// <param name="sendFunc">Function to send items to server. Returns true on success.</param>
/// <param name="queueDirectory">Directory to store queue file. Defaults to app data.</param>
/// <param name="maxQueueSize">Maximum items to store. Oldest items dropped when exceeded.</param>
/// <param name="enableAutoRetry">Whether to automatically retry sending queued items.</param>
public OfflineQueue(
Func<List<EnvelopeItem>, Task<bool>> sendFunc,
string? queueDirectory = null,
int maxQueueSize = 1000,
bool enableAutoRetry = true)
{
_sendFunc = sendFunc;
_maxQueueSize = maxQueueSize;
var directory = queueDirectory ?? GetDefaultQueueDirectory();
Directory.CreateDirectory(directory);
_queueFilePath = Path.Combine(directory, "telemetry_queue.json");
if (enableAutoRetry)
{
// Start retry timer - initial delay 30 seconds, then every 60 seconds
_retryTimer = new Timer(
_ => _ = RetryQueuedItemsAsync(),
null,
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(60));
}
}
/// <summary>
/// Number of items currently in the queue.
/// </summary>
public int Count
{
get
{
lock (_fileLock)
{
var items = LoadQueue();
return items.Count;
}
}
}
/// <summary>
/// Enqueue items that failed to send.
/// </summary>
public void Enqueue(List<EnvelopeItem> items)
{
if (items.Count == 0) return;
lock (_fileLock)
{
var queue = LoadQueue();
queue.AddRange(items);
// Trim to max size (remove oldest)
while (queue.Count > _maxQueueSize)
{
queue.RemoveAt(0);
}
SaveQueue(queue);
}
}
/// <summary>
/// Try to send all queued items.
/// </summary>
public async Task<bool> RetryQueuedItemsAsync()
{
if (!await _retrySemaphore.WaitAsync(0))
{
// Already retrying
return false;
}
try
{
List<EnvelopeItem> items;
lock (_fileLock)
{
items = LoadQueue();
}
if (items.Count == 0)
{
_retryAttempt = 0;
return true;
}
// Try to send
var success = await _sendFunc(items);
if (success)
{
// Clear the queue
lock (_fileLock)
{
SaveQueue(new List<EnvelopeItem>());
}
_retryAttempt = 0;
return true;
}
else
{
// Exponential backoff - adjust retry timer
_retryAttempt++;
return false;
}
}
catch
{
_retryAttempt++;
return false;
}
finally
{
_retrySemaphore.Release();
}
}
/// <summary>
/// Clear all queued items without sending.
/// </summary>
public void Clear()
{
lock (_fileLock)
{
SaveQueue(new List<EnvelopeItem>());
}
}
/// <summary>
/// Get all queued items (for display/export).
/// </summary>
public List<EnvelopeItem> GetQueuedItems()
{
lock (_fileLock)
{
return LoadQueue();
}
}
private List<EnvelopeItem> LoadQueue()
{
try
{
if (!File.Exists(_queueFilePath))
{
return new List<EnvelopeItem>();
}
var json = File.ReadAllText(_queueFilePath);
return JsonSerializer.Deserialize<List<EnvelopeItem>>(json, JsonOptions)
?? new List<EnvelopeItem>();
}
catch
{
return new List<EnvelopeItem>();
}
}
private void SaveQueue(List<EnvelopeItem> items)
{
try
{
var json = JsonSerializer.Serialize(items, JsonOptions);
File.WriteAllText(_queueFilePath, json);
}
catch
{
// Ignore write errors
}
}
private static string GetDefaultQueueDirectory()
{
return Path.Combine(
Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData),
"IronTelemetry",
"Queue");
}
public void Dispose()
{
if (!_disposed)
{
_retryTimer?.Dispose();
_retrySemaphore.Dispose();
_disposed = true;
}
}
}
/// <summary>
/// Options for offline queue behavior.
/// </summary>
public class OfflineQueueOptions
{
/// <summary>
/// Directory to store queue files. Defaults to LocalApplicationData/IronTelemetry/Queue.
/// </summary>
public string? QueueDirectory { get; set; }
/// <summary>
/// Maximum number of items to store in the queue. Oldest items dropped when exceeded.
/// Default: 1000
/// </summary>
public int MaxQueueSize { get; set; } = 1000;
/// <summary>
/// Whether to automatically retry sending queued items in the background.
/// Default: true
/// </summary>
public bool EnableAutoRetry { get; set; } = true;
/// <summary>
/// Initial retry delay after a failed send. Default: 30 seconds.
/// </summary>
public TimeSpan InitialRetryDelay { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Maximum retry delay (for exponential backoff). Default: 5 minutes.
/// </summary>
public TimeSpan MaxRetryDelay { get; set; } = TimeSpan.FromMinutes(5);
}