Initial SDK implementation

- Core client with exception capture and message logging
- Journey and step tracking for user flows
- Breadcrumb management
- Offline queue with JSON file persistence
- HTTP transport with httpx (sync and async)
- Full type hints
- Python 3.8+ support
This commit is contained in:
David Friedel 2025-12-25 10:12:21 +00:00
parent e26f16ad1b
commit 44f702acb8
11 changed files with 1672 additions and 2 deletions

78
.gitignore vendored Normal file
View File

@ -0,0 +1,78 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.idea/
.vscode/
*.swp
*.swo
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# ruff
.ruff_cache/
# OS
.DS_Store
Thumbs.db

188
README.md
View File

@ -1,2 +1,186 @@
# irontelemetry-python # IronTelemetry SDK for Python
IronTelemetry SDK for Python - Error monitoring and crash reporting
Error monitoring and crash reporting SDK for Python applications. Capture exceptions, track user journeys, and get insights to fix issues faster.
[![PyPI](https://img.shields.io/pypi/v/irontelemetry.svg)](https://pypi.org/project/irontelemetry/)
[![Python](https://img.shields.io/pypi/pyversions/irontelemetry.svg)](https://pypi.org/project/irontelemetry/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## Installation
```bash
pip install irontelemetry
```
## Quick Start
### Basic Exception Capture
```python
import irontelemetry
# Initialize with your DSN
irontelemetry.init("https://pk_live_xxx@irontelemetry.com")
# Capture exceptions
try:
do_something()
except Exception as e:
irontelemetry.capture_exception(e)
raise
```
### Journey Tracking
Track user journeys to understand the context of errors:
```python
import irontelemetry
# Track a complete user journey
with irontelemetry.start_journey("Checkout Flow"):
irontelemetry.set_user("user-123", "user@example.com")
with irontelemetry.start_step("Validate Cart", "business"):
validate_cart()
with irontelemetry.start_step("Process Payment", "business"):
process_payment()
with irontelemetry.start_step("Send Confirmation", "notification"):
send_confirmation_email()
```
Any exceptions captured during the journey are automatically correlated.
## Configuration
```python
from irontelemetry import TelemetryOptions, init
init(TelemetryOptions(
dsn="https://pk_live_xxx@irontelemetry.com",
environment="production",
app_version="1.2.3",
sample_rate=1.0, # 100% of events
debug=False,
before_send=lambda event: event if "expected" not in (event.message or "") else None,
))
```
### Configuration Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `dsn` | str | required | Your Data Source Name |
| `environment` | str | 'production' | Environment name |
| `app_version` | str | '0.0.0' | Application version |
| `sample_rate` | float | 1.0 | Sample rate (0.0 to 1.0) |
| `max_breadcrumbs` | int | 100 | Max breadcrumbs to keep |
| `debug` | bool | False | Enable debug logging |
| `before_send` | callable | None | Hook to filter/modify events |
| `enable_offline_queue` | bool | True | Enable offline queue |
| `max_offline_queue_size` | int | 500 | Max offline queue size |
## Features
- **Automatic Exception Capture**: Capture and report exceptions with full stack traces
- **Journey Tracking**: Track user flows and correlate errors with context
- **Breadcrumbs**: Leave a trail of events leading up to an error
- **User Context**: Associate errors with specific users
- **Tags & Extras**: Add custom metadata to your events
- **Offline Queue**: Events are queued when offline and sent when connectivity returns
- **Async Support**: Full async/await support with `capture_exception_async` and `capture_message_async`
- **Type Hints**: Full type annotations for IDE support
## Breadcrumbs
```python
from irontelemetry import add_breadcrumb, BreadcrumbCategory
# Add breadcrumbs to understand what happened before an error
add_breadcrumb("User clicked checkout button", BreadcrumbCategory.UI)
add_breadcrumb("Payment API called", BreadcrumbCategory.HTTP)
# Or with full control
add_breadcrumb(
"User logged in",
category=BreadcrumbCategory.AUTH,
level=SeverityLevel.INFO,
data={"user_id": "123"},
)
```
## Global Exception Handling
```python
import irontelemetry
irontelemetry.init("your-dsn")
irontelemetry.use_unhandled_exception_handler()
```
This sets up a handler for `sys.excepthook` to capture uncaught exceptions.
## Helper Methods
```python
from irontelemetry import track_step
# Track a step with automatic error handling
track_step("Process Order", lambda: process_order())
# With return value
result = track_step("Calculate Total", lambda: calculate_total())
```
## Async Support
```python
import irontelemetry
# Async exception capture
await irontelemetry.capture_exception_async(error)
# Async message capture
await irontelemetry.capture_message_async("Something happened")
# Async flush
await irontelemetry.flush_async()
```
## Flushing
```python
# Flush pending events before app shutdown
irontelemetry.flush()
```
## Type Support
This package includes full type annotations:
```python
from irontelemetry import (
TelemetryOptions,
TelemetryEvent,
Breadcrumb,
SeverityLevel,
BreadcrumbCategory,
)
```
## Python Version Support
- Python 3.8+
- Full type hints support
- async/await support
## Links
- [Documentation](https://www.irontelemetry.com/docs)
- [Dashboard](https://www.irontelemetry.com)
## License
MIT License - see [LICENSE](LICENSE) for details.

73
pyproject.toml Normal file
View File

@ -0,0 +1,73 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "irontelemetry"
version = "0.1.0"
description = "Error monitoring and crash reporting SDK for Python applications"
readme = "README.md"
license = "MIT"
requires-python = ">=3.8"
authors = [
{ name = "IronServices", email = "support@ironservices.com" }
]
keywords = [
"error-monitoring",
"crash-reporting",
"telemetry",
"exception-tracking",
"irontelemetry",
"ironservices"
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
"Typing :: Typed",
]
dependencies = [
"httpx>=0.24.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"mypy>=1.0.0",
"ruff>=0.1.0",
]
[project.urls]
Homepage = "https://www.irontelemetry.com"
Documentation = "https://www.irontelemetry.com/docs"
Repository = "https://github.com/IronServices/irontelemetry-python"
Issues = "https://github.com/IronServices/irontelemetry-python/issues"
[tool.hatch.build.targets.wheel]
packages = ["src/irontelemetry"]
[tool.mypy]
python_version = "3.8"
strict = true
warn_return_any = true
warn_unused_ignores = true
[tool.ruff]
target-version = "py38"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

View File

@ -0,0 +1,221 @@
"""IronTelemetry SDK for Python.
Error monitoring and crash reporting SDK for Python applications.
"""
import sys
from typing import Any, Callable, Dict, Optional, TypeVar
from .client import TelemetryClient
from .journey import Journey, JourneyScope, Step
from .types import (
Breadcrumb,
BreadcrumbCategory,
ExceptionInfo,
JourneyContext,
ParsedDsn,
PlatformInfo,
SendResult,
SeverityLevel,
StackFrame,
TelemetryEvent,
TelemetryOptions,
User,
)
__version__ = "0.1.0"
__all__ = [
# Client
"TelemetryClient",
"TelemetryOptions",
# Types
"Breadcrumb",
"BreadcrumbCategory",
"ExceptionInfo",
"JourneyContext",
"ParsedDsn",
"PlatformInfo",
"SendResult",
"SeverityLevel",
"StackFrame",
"TelemetryEvent",
"User",
# Journey
"Journey",
"JourneyScope",
"Step",
# Functions
"init",
"get_client",
"capture_exception",
"capture_message",
"add_breadcrumb",
"set_user",
"clear_user",
"set_tag",
"set_extra",
"start_journey",
"start_step",
"flush",
"close",
"use_unhandled_exception_handler",
"track_step",
]
# Global client instance
_client: Optional[TelemetryClient] = None
T = TypeVar("T")
def init(dsn_or_options: str | TelemetryOptions) -> TelemetryClient:
"""Initialize the global IronTelemetry client."""
global _client
if isinstance(dsn_or_options, str):
options = TelemetryOptions(dsn=dsn_or_options)
else:
options = dsn_or_options
_client = TelemetryClient(options)
return _client
def get_client() -> Optional[TelemetryClient]:
"""Get the global client instance."""
return _client
def capture_exception(
error: BaseException,
extra: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Capture an exception using the global client."""
if not _client:
print("[IronTelemetry] Client not initialized. Call init() first.", file=sys.stderr)
return SendResult(success=False, error="Client not initialized")
return _client.capture_exception(error, extra)
def capture_message(
message: str,
level: SeverityLevel = SeverityLevel.INFO,
) -> SendResult:
"""Capture a message using the global client."""
if not _client:
print("[IronTelemetry] Client not initialized. Call init() first.", file=sys.stderr)
return SendResult(success=False, error="Client not initialized")
return _client.capture_message(message, level)
def add_breadcrumb(
message: str,
category: BreadcrumbCategory = BreadcrumbCategory.CUSTOM,
level: SeverityLevel = SeverityLevel.INFO,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""Add a breadcrumb using the global client."""
if not _client:
print("[IronTelemetry] Client not initialized. Call init() first.", file=sys.stderr)
return
_client.add_breadcrumb(message, category, level, data)
def set_user(
id: str,
email: Optional[str] = None,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""Set user context using the global client."""
if not _client:
print("[IronTelemetry] Client not initialized. Call init() first.", file=sys.stderr)
return
_client.set_user(id, email, data)
def clear_user() -> None:
"""Clear user context using the global client."""
if _client:
_client.clear_user()
def set_tag(key: str, value: str) -> None:
"""Set a tag using the global client."""
if not _client:
print("[IronTelemetry] Client not initialized. Call init() first.", file=sys.stderr)
return
_client.set_tag(key, value)
def set_extra(key: str, value: Any) -> None:
"""Set extra context using the global client."""
if not _client:
print("[IronTelemetry] Client not initialized. Call init() first.", file=sys.stderr)
return
_client.set_extra(key, value)
def start_journey(name: str) -> JourneyScope:
"""Start a journey using the global client."""
if not _client:
raise RuntimeError("[IronTelemetry] Client not initialized. Call init() first.")
return _client.start_journey(name)
def start_step(name: str, category: Optional[str] = None) -> Step:
"""Start a step in the current journey using the global client."""
if not _client:
raise RuntimeError("[IronTelemetry] Client not initialized. Call init() first.")
return _client.start_step(name, category)
def flush() -> None:
"""Flush pending events using the global client."""
if _client:
_client.flush()
def close() -> None:
"""Close the global client."""
global _client
if _client:
_client.close()
_client = None
def use_unhandled_exception_handler() -> None:
"""Set up global unhandled exception handler."""
original_hook = sys.excepthook
def exception_handler(
exc_type: type,
exc_value: BaseException,
exc_traceback: Any,
) -> None:
"""Handle uncaught exceptions."""
if _client:
_client.capture_exception(exc_value)
_client.flush()
# Call the original hook
original_hook(exc_type, exc_value, exc_traceback)
sys.excepthook = exception_handler
def track_step(
name: str,
fn: Callable[[], T],
category: Optional[str] = None,
) -> T:
"""Track a step with automatic error handling."""
if not _client:
return fn()
with start_step(name, category) as step:
try:
return fn()
except Exception:
step.fail()
raise

View File

@ -0,0 +1,56 @@
"""Breadcrumb management for IronTelemetry SDK."""
from datetime import datetime
from typing import Any, Dict, List, Optional
from .types import Breadcrumb, BreadcrumbCategory, SeverityLevel
class BreadcrumbManager:
"""Manages breadcrumbs for an SDK instance."""
def __init__(self, max_breadcrumbs: int = 100):
self._max_breadcrumbs = max_breadcrumbs
self._breadcrumbs: List[Breadcrumb] = []
def add(
self,
message: str,
category: BreadcrumbCategory = BreadcrumbCategory.CUSTOM,
level: SeverityLevel = SeverityLevel.INFO,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""Add a breadcrumb."""
breadcrumb = Breadcrumb(
timestamp=datetime.now(),
category=category,
message=message,
level=level,
data=data,
)
self._breadcrumbs.append(breadcrumb)
# Trim to max size
if len(self._breadcrumbs) > self._max_breadcrumbs:
self._breadcrumbs = self._breadcrumbs[-self._max_breadcrumbs:]
def add_breadcrumb(self, breadcrumb: Breadcrumb) -> None:
"""Add a full Breadcrumb object."""
self._breadcrumbs.append(breadcrumb)
if len(self._breadcrumbs) > self._max_breadcrumbs:
self._breadcrumbs = self._breadcrumbs[-self._max_breadcrumbs:]
def get_all(self) -> List[Breadcrumb]:
"""Get all breadcrumbs."""
return list(self._breadcrumbs)
def clear(self) -> None:
"""Clear all breadcrumbs."""
self._breadcrumbs = []
@property
def count(self) -> int:
"""Get the number of breadcrumbs."""
return len(self._breadcrumbs)

308
src/irontelemetry/client.py Normal file
View File

@ -0,0 +1,308 @@
"""Main IronTelemetry client class."""
import platform
import random
import sys
import traceback
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from .breadcrumbs import BreadcrumbManager
from .config import generate_event_id, resolve_options
from .journey import Journey, JourneyScope, Step
from .queue import OfflineQueue
from .transport import Transport
from .types import (
Breadcrumb,
BreadcrumbCategory,
ExceptionInfo,
PlatformInfo,
SendResult,
SeverityLevel,
StackFrame,
TelemetryEvent,
TelemetryOptions,
User,
)
class TelemetryClient:
"""Main IronTelemetry client class."""
def __init__(self, options: TelemetryOptions):
self._options, self._parsed_dsn = resolve_options(options)
self._transport = Transport(
self._parsed_dsn,
self._options.api_base_url or self._parsed_dsn.api_base_url,
self._options.debug,
)
self._queue: Optional[OfflineQueue] = None
if self._options.enable_offline_queue:
self._queue = OfflineQueue(
self._options.max_offline_queue_size,
self._options.debug,
)
self._breadcrumbs = BreadcrumbManager(self._options.max_breadcrumbs)
self._tags: Dict[str, str] = {}
self._extra: Dict[str, Any] = {}
self._user: Optional[User] = None
self._current_journey: Optional[Journey] = None
if self._options.debug:
print(f"[IronTelemetry] Initialized with DSN: {self._options.dsn}")
def capture_exception(
self,
error: BaseException,
extra: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Capture an exception."""
exception = self._parse_exception(error)
event = self._create_event(SeverityLevel.ERROR, exception.message, exception)
if extra:
event.extra.update(extra)
return self._send_event(event)
async def capture_exception_async(
self,
error: BaseException,
extra: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Capture an exception asynchronously."""
exception = self._parse_exception(error)
event = self._create_event(SeverityLevel.ERROR, exception.message, exception)
if extra:
event.extra.update(extra)
return await self._send_event_async(event)
def capture_message(
self,
message: str,
level: SeverityLevel = SeverityLevel.INFO,
) -> SendResult:
"""Capture a message."""
event = self._create_event(level, message)
return self._send_event(event)
async def capture_message_async(
self,
message: str,
level: SeverityLevel = SeverityLevel.INFO,
) -> SendResult:
"""Capture a message asynchronously."""
event = self._create_event(level, message)
return await self._send_event_async(event)
def add_breadcrumb(
self,
message: str,
category: BreadcrumbCategory = BreadcrumbCategory.CUSTOM,
level: SeverityLevel = SeverityLevel.INFO,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""Add a breadcrumb."""
self._breadcrumbs.add(message, category, level, data)
def set_user(
self,
id: str,
email: Optional[str] = None,
data: Optional[Dict[str, Any]] = None,
) -> None:
"""Set user context."""
self._user = User(id=id, email=email, data=data)
def clear_user(self) -> None:
"""Clear user context."""
self._user = None
def set_tag(self, key: str, value: str) -> None:
"""Set a tag."""
self._tags[key] = value
def set_extra(self, key: str, value: Any) -> None:
"""Set extra context."""
self._extra[key] = value
def start_journey(self, name: str) -> JourneyScope:
"""Start a new journey."""
self._current_journey = Journey(name)
# Copy user context to journey
if self._user:
self._current_journey.set_user(
self._user.id,
self._user.email,
self._user.data,
)
def on_complete() -> None:
self._current_journey = None
return JourneyScope(self._current_journey, on_complete)
def start_step(self, name: str, category: Optional[str] = None) -> Step:
"""Start a step in the current journey."""
if not self._current_journey:
raise RuntimeError("No active journey. Call start_journey() first.")
return self._current_journey.start_step(name, category)
def flush(self) -> None:
"""Flush pending events."""
self._process_queue()
async def flush_async(self) -> None:
"""Flush pending events asynchronously."""
await self._process_queue_async()
def close(self) -> None:
"""Close the client."""
self._transport.close()
def _create_event(
self,
level: SeverityLevel,
message: Optional[str] = None,
exception: Optional[ExceptionInfo] = None,
) -> TelemetryEvent:
"""Create a telemetry event."""
user = self._current_journey.get_user() if self._current_journey else self._user
return TelemetryEvent(
event_id=generate_event_id(),
timestamp=datetime.now(),
level=level,
message=message,
exception=exception,
user=user,
tags=dict(self._tags),
extra=dict(self._extra),
breadcrumbs=self._breadcrumbs.get_all(),
journey=self._current_journey.get_context() if self._current_journey else None,
environment=self._options.environment,
app_version=self._options.app_version,
platform=self._get_platform_info(),
)
def _send_event(self, event: TelemetryEvent) -> SendResult:
"""Send an event."""
# Check sample rate
if random.random() > self._options.sample_rate:
if self._options.debug:
print("[IronTelemetry] Event dropped due to sample rate")
return SendResult(success=True, event_id=event.event_id)
# Apply before_send hook
if self._options.before_send:
result = self._options.before_send(event)
if result is None:
if self._options.debug:
print("[IronTelemetry] Event dropped by before_send hook")
return SendResult(success=True, event_id=event.event_id)
event = result
# Try to send
result = self._transport.send(event)
if not result.success and self._queue:
self._queue.enqueue(event)
return SendResult(
success=result.success,
event_id=event.event_id,
error=result.error,
queued=True,
)
return result
async def _send_event_async(self, event: TelemetryEvent) -> SendResult:
"""Send an event asynchronously."""
# Check sample rate
if random.random() > self._options.sample_rate:
if self._options.debug:
print("[IronTelemetry] Event dropped due to sample rate")
return SendResult(success=True, event_id=event.event_id)
# Apply before_send hook
if self._options.before_send:
result = self._options.before_send(event)
if result is None:
if self._options.debug:
print("[IronTelemetry] Event dropped by before_send hook")
return SendResult(success=True, event_id=event.event_id)
event = result
# Try to send
result = await self._transport.send_async(event)
if not result.success and self._queue:
self._queue.enqueue(event)
return SendResult(
success=result.success,
event_id=event.event_id,
error=result.error,
queued=True,
)
return result
def _process_queue(self) -> None:
"""Process offline queue."""
if not self._queue or self._queue.is_empty:
return
if not self._transport.is_online():
return
for event in self._queue.get_all():
result = self._transport.send(event)
if result.success:
self._queue.remove(event.event_id)
async def _process_queue_async(self) -> None:
"""Process offline queue asynchronously."""
if not self._queue or self._queue.is_empty:
return
if not self._transport.is_online():
return
for event in self._queue.get_all():
result = await self._transport.send_async(event)
if result.success:
self._queue.remove(event.event_id)
def _parse_exception(self, error: BaseException) -> ExceptionInfo:
"""Parse an error into exception info."""
tb = error.__traceback__
frames: List[StackFrame] = []
if tb:
for frame_summary in traceback.extract_tb(tb):
frames.append(
StackFrame(
function=frame_summary.name,
filename=frame_summary.filename,
lineno=frame_summary.lineno,
)
)
return ExceptionInfo(
type=type(error).__name__,
message=str(error),
stacktrace=frames if frames else None,
)
def _get_platform_info(self) -> PlatformInfo:
"""Get platform information."""
return PlatformInfo(
name="python",
version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
os=platform.system(),
)

View File

@ -0,0 +1,52 @@
"""Configuration handling for IronTelemetry SDK."""
import uuid
from urllib.parse import urlparse
from .types import ParsedDsn, TelemetryOptions
def parse_dsn(dsn: str) -> ParsedDsn:
"""Parse a DSN string into its components.
Format: https://pk_live_xxx@irontelemetry.com
"""
try:
parsed = urlparse(dsn)
public_key = parsed.username
if not public_key or not public_key.startswith("pk_"):
raise ValueError("DSN must contain a valid public key starting with pk_")
protocol = parsed.scheme
host = parsed.hostname or ""
return ParsedDsn(
public_key=public_key,
host=host,
protocol=protocol,
api_base_url=f"{protocol}://{host}",
)
except Exception as e:
if "pk_" in str(e):
raise
raise ValueError(f"Invalid DSN format: {dsn}") from e
def generate_event_id() -> str:
"""Generate a unique event ID."""
return str(uuid.uuid4())
def resolve_options(options: TelemetryOptions) -> tuple[TelemetryOptions, ParsedDsn]:
"""Validate and resolve options with defaults."""
parsed_dsn = parse_dsn(options.dsn)
# Clamp sample rate
options.sample_rate = max(0.0, min(1.0, options.sample_rate))
# Set API base URL if not provided
if not options.api_base_url:
options.api_base_url = parsed_dsn.api_base_url
return options, parsed_dsn

View File

@ -0,0 +1,159 @@
"""Journey tracking for IronTelemetry SDK."""
from datetime import datetime
from typing import Any, Dict, Optional
from .config import generate_event_id
from .types import JourneyContext, User
class Journey:
"""Represents an active journey tracking session."""
def __init__(self, name: str):
self._id = generate_event_id()
self._name = name
self._started_at = datetime.now()
self._metadata: Dict[str, Any] = {}
self._user: Optional[User] = None
self._current_step: Optional["Step"] = None
self._completed = False
self._failed = False
def set_user(
self,
id: str,
email: Optional[str] = None,
data: Optional[Dict[str, Any]] = None,
) -> "Journey":
"""Set user context for this journey."""
self._user = User(id=id, email=email, data=data)
return self
def set_metadata(self, key: str, value: Any) -> "Journey":
"""Set metadata for this journey."""
self._metadata[key] = value
return self
def start_step(self, name: str, category: Optional[str] = None) -> "Step":
"""Start a new step in this journey."""
# Complete any existing step
if self._current_step and self._current_step._status == "in_progress":
self._current_step._status = "completed"
self._current_step._ended_at = datetime.now()
step = Step(name, category, self)
self._current_step = step
return step
def complete(self) -> None:
"""Mark the journey as completed."""
if self._current_step and self._current_step._status == "in_progress":
self._current_step._status = "completed"
self._current_step._ended_at = datetime.now()
self._completed = True
def fail(self) -> None:
"""Mark the journey as failed."""
if self._current_step and self._current_step._status == "in_progress":
self._current_step._status = "failed"
self._current_step._ended_at = datetime.now()
self._failed = True
def get_context(self) -> JourneyContext:
"""Get the journey context for an event."""
return JourneyContext(
journey_id=self._id,
name=self._name,
current_step=self._current_step._name if self._current_step else None,
started_at=self._started_at,
metadata=self._metadata,
)
def get_user(self) -> Optional[User]:
"""Get the user context for this journey."""
return self._user
@property
def is_complete(self) -> bool:
"""Check if the journey is complete."""
return self._completed or self._failed
@property
def journey_id(self) -> str:
"""Get journey ID."""
return self._id
class Step:
"""Represents a step within a journey."""
def __init__(self, name: str, category: Optional[str], journey: Journey):
self._name = name
self._category = category
self._journey = journey
self._started_at = datetime.now()
self._ended_at: Optional[datetime] = None
self._status = "in_progress"
self._data: Dict[str, Any] = {}
def set_data(self, key: str, value: Any) -> "Step":
"""Set data for this step."""
self._data[key] = value
return self
def complete(self) -> None:
"""Mark the step as completed."""
self._status = "completed"
self._ended_at = datetime.now()
def fail(self) -> None:
"""Mark the step as failed."""
self._status = "failed"
self._ended_at = datetime.now()
@property
def name(self) -> str:
"""Get the step name."""
return self._name
def get_journey(self) -> Journey:
"""Get the parent journey."""
return self._journey
def __enter__(self) -> "Step":
"""Enter the step context."""
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the step context."""
if exc_type is not None:
self.fail()
elif self._status == "in_progress":
self.complete()
class JourneyScope:
"""Journey scope that auto-completes on exit."""
def __init__(self, journey: Journey, on_complete: Optional[callable] = None):
self._journey = journey
self._on_complete = on_complete
def get_journey(self) -> Journey:
"""Get the underlying journey."""
return self._journey
def __enter__(self) -> "JourneyScope":
"""Enter the journey scope."""
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the journey scope."""
if exc_type is not None:
self._journey.fail()
elif not self._journey.is_complete:
self._journey.complete()
if self._on_complete:
self._on_complete()

224
src/irontelemetry/queue.py Normal file
View File

@ -0,0 +1,224 @@
"""Offline queue for storing events when the network is unavailable."""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from .types import (
Breadcrumb,
BreadcrumbCategory,
ExceptionInfo,
JourneyContext,
PlatformInfo,
SeverityLevel,
StackFrame,
TelemetryEvent,
User,
)
class OfflineQueue:
"""Offline queue for storing events when the network is unavailable."""
def __init__(
self,
max_size: int = 500,
debug: bool = False,
storage_path: Optional[Path] = None,
):
self._max_size = max_size
self._debug = debug
self._queue: List[TelemetryEvent] = []
# Default to user's local app data directory
if storage_path is None:
app_data = os.environ.get("LOCALAPPDATA") or os.environ.get("HOME") or "."
storage_path = Path(app_data) / ".irontelemetry"
self._storage_path = storage_path
self._queue_file = self._storage_path / "queue.json"
self._load()
def enqueue(self, event: TelemetryEvent) -> None:
"""Add an event to the queue."""
if len(self._queue) >= self._max_size:
self._queue.pop(0)
if self._debug:
print("[IronTelemetry] Queue full, dropping oldest event")
self._queue.append(event)
self._save()
if self._debug:
print(f"[IronTelemetry] Event queued, queue size: {len(self._queue)}")
def get_all(self) -> List[TelemetryEvent]:
"""Get all queued events."""
return list(self._queue)
def remove(self, event_id: str) -> None:
"""Remove an event from the queue."""
self._queue = [e for e in self._queue if e.event_id != event_id]
self._save()
def clear(self) -> None:
"""Clear all queued events."""
self._queue = []
self._save()
@property
def size(self) -> int:
"""Get the number of queued events."""
return len(self._queue)
@property
def is_empty(self) -> bool:
"""Check if the queue is empty."""
return len(self._queue) == 0
def _load(self) -> None:
"""Load queue from persistent storage."""
try:
if self._queue_file.exists():
with open(self._queue_file, "r") as f:
data = json.load(f)
self._queue = [self._deserialize_event(e) for e in data]
except Exception as e:
if self._debug:
print(f"[IronTelemetry] Failed to load queue from storage: {e}")
def _save(self) -> None:
"""Save queue to persistent storage."""
try:
self._storage_path.mkdir(parents=True, exist_ok=True)
with open(self._queue_file, "w") as f:
json.dump([self._serialize_event(e) for e in self._queue], f)
except Exception as e:
if self._debug:
print(f"[IronTelemetry] Failed to save queue to storage: {e}")
def _serialize_event(self, event: TelemetryEvent) -> dict:
"""Serialize an event for storage."""
return {
"event_id": event.event_id,
"timestamp": event.timestamp.isoformat(),
"level": event.level.value,
"message": event.message,
"exception": {
"type": event.exception.type,
"message": event.exception.message,
"stacktrace": [
{"function": f.function, "filename": f.filename, "lineno": f.lineno}
for f in (event.exception.stacktrace or [])
] if event.exception.stacktrace else None,
} if event.exception else None,
"user": {
"id": event.user.id,
"email": event.user.email,
"data": event.user.data,
} if event.user else None,
"tags": event.tags,
"extra": event.extra,
"breadcrumbs": [
{
"timestamp": b.timestamp.isoformat(),
"category": b.category.value,
"message": b.message,
"level": b.level.value,
"data": b.data,
}
for b in event.breadcrumbs
],
"journey": {
"journey_id": event.journey.journey_id,
"name": event.journey.name,
"current_step": event.journey.current_step,
"started_at": event.journey.started_at.isoformat(),
"metadata": event.journey.metadata,
} if event.journey else None,
"environment": event.environment,
"app_version": event.app_version,
"platform": {
"name": event.platform.name,
"version": event.platform.version,
"os": event.platform.os,
},
}
def _deserialize_event(self, data: dict) -> TelemetryEvent:
"""Deserialize an event from storage."""
exception = None
if data.get("exception"):
exc_data = data["exception"]
stacktrace = None
if exc_data.get("stacktrace"):
stacktrace = [
StackFrame(
function=f.get("function"),
filename=f.get("filename"),
lineno=f.get("lineno"),
)
for f in exc_data["stacktrace"]
]
exception = ExceptionInfo(
type=exc_data["type"],
message=exc_data["message"],
stacktrace=stacktrace,
)
user = None
if data.get("user"):
user_data = data["user"]
user = User(
id=user_data["id"],
email=user_data.get("email"),
data=user_data.get("data"),
)
journey = None
if data.get("journey"):
j_data = data["journey"]
journey = JourneyContext(
journey_id=j_data["journey_id"],
name=j_data["name"],
current_step=j_data.get("current_step"),
started_at=datetime.fromisoformat(j_data["started_at"]),
metadata=j_data.get("metadata", {}),
)
breadcrumbs = [
Breadcrumb(
timestamp=datetime.fromisoformat(b["timestamp"]),
category=BreadcrumbCategory(b["category"]),
message=b["message"],
level=SeverityLevel(b["level"]),
data=b.get("data"),
)
for b in data.get("breadcrumbs", [])
]
platform_data = data.get("platform", {})
platform = PlatformInfo(
name=platform_data.get("name", "python"),
version=platform_data.get("version"),
os=platform_data.get("os"),
)
return TelemetryEvent(
event_id=data["event_id"],
timestamp=datetime.fromisoformat(data["timestamp"]),
level=SeverityLevel(data["level"]),
message=data.get("message"),
exception=exception,
user=user,
tags=data.get("tags", {}),
extra=data.get("extra", {}),
breadcrumbs=breadcrumbs,
journey=journey,
environment=data.get("environment"),
app_version=data.get("app_version"),
platform=platform,
)

View File

@ -0,0 +1,182 @@
"""HTTP transport for sending events to the server."""
from datetime import datetime
from typing import Any, Dict
import httpx
from .types import ParsedDsn, SendResult, TelemetryEvent
class Transport:
"""HTTP transport for sending events to the server."""
def __init__(
self,
parsed_dsn: ParsedDsn,
api_base_url: str,
debug: bool = False,
timeout: float = 30.0,
):
self._api_base_url = api_base_url
self._public_key = parsed_dsn.public_key
self._debug = debug
self._timeout = timeout
self._client = httpx.Client(timeout=timeout)
def send(self, event: TelemetryEvent) -> SendResult:
"""Send an event to the server."""
url = f"{self._api_base_url}/api/v1/events"
try:
response = self._client.post(
url,
headers={
"Content-Type": "application/json",
"X-Public-Key": self._public_key,
},
json=self._serialize_event(event),
)
if response.status_code >= 400:
if self._debug:
print(f"[IronTelemetry] Failed to send event: {response.status_code}")
return SendResult(
success=False,
error=f"HTTP {response.status_code}: {response.text}",
)
result = response.json()
if self._debug:
print(f"[IronTelemetry] Event sent successfully: {event.event_id}")
return SendResult(
success=True,
event_id=result.get("eventId", event.event_id),
)
except Exception as e:
if self._debug:
print(f"[IronTelemetry] Failed to send event: {e}")
return SendResult(success=False, error=str(e))
async def send_async(self, event: TelemetryEvent) -> SendResult:
"""Send an event to the server asynchronously."""
url = f"{self._api_base_url}/api/v1/events"
try:
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.post(
url,
headers={
"Content-Type": "application/json",
"X-Public-Key": self._public_key,
},
json=self._serialize_event(event),
)
if response.status_code >= 400:
if self._debug:
print(f"[IronTelemetry] Failed to send event: {response.status_code}")
return SendResult(
success=False,
error=f"HTTP {response.status_code}: {response.text}",
)
result = response.json()
if self._debug:
print(f"[IronTelemetry] Event sent successfully: {event.event_id}")
return SendResult(
success=True,
event_id=result.get("eventId", event.event_id),
)
except Exception as e:
if self._debug:
print(f"[IronTelemetry] Failed to send event: {e}")
return SendResult(success=False, error=str(e))
def is_online(self) -> bool:
"""Check if the server is reachable."""
try:
response = self._client.get(
f"{self._api_base_url}/api/v1/health",
headers={"X-Public-Key": self._public_key},
)
return response.status_code == 200
except Exception:
return False
def close(self) -> None:
"""Close the transport."""
self._client.close()
def _serialize_event(self, event: TelemetryEvent) -> Dict[str, Any]:
"""Serialize an event for sending."""
return {
"eventId": event.event_id,
"timestamp": event.timestamp.isoformat(),
"level": event.level.value,
"message": event.message,
"exception": self._serialize_exception(event.exception) if event.exception else None,
"user": self._serialize_user(event.user) if event.user else None,
"tags": event.tags,
"extra": event.extra,
"breadcrumbs": [self._serialize_breadcrumb(b) for b in event.breadcrumbs],
"journey": self._serialize_journey(event.journey) if event.journey else None,
"environment": event.environment,
"appVersion": event.app_version,
"platform": {
"name": event.platform.name,
"version": event.platform.version,
"os": event.platform.os,
},
}
def _serialize_exception(self, exc: Any) -> Dict[str, Any]:
"""Serialize exception info."""
return {
"type": exc.type,
"message": exc.message,
"stacktrace": [
{
"function": f.function,
"filename": f.filename,
"lineno": f.lineno,
"colno": f.colno,
}
for f in (exc.stacktrace or [])
] if exc.stacktrace else None,
}
def _serialize_user(self, user: Any) -> Dict[str, Any]:
"""Serialize user info."""
return {
"id": user.id,
"email": user.email,
"name": user.name,
"data": user.data,
}
def _serialize_breadcrumb(self, breadcrumb: Any) -> Dict[str, Any]:
"""Serialize a breadcrumb."""
return {
"timestamp": breadcrumb.timestamp.isoformat(),
"category": breadcrumb.category.value,
"message": breadcrumb.message,
"level": breadcrumb.level.value,
"data": breadcrumb.data,
}
def _serialize_journey(self, journey: Any) -> Dict[str, Any]:
"""Serialize journey context."""
return {
"journeyId": journey.journey_id,
"name": journey.name,
"currentStep": journey.current_step,
"startedAt": journey.started_at.isoformat(),
"metadata": journey.metadata,
}

133
src/irontelemetry/types.py Normal file
View File

@ -0,0 +1,133 @@
"""Type definitions for IronTelemetry SDK."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
class SeverityLevel(str, Enum):
"""Severity levels for events."""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
FATAL = "fatal"
class BreadcrumbCategory(str, Enum):
"""Breadcrumb categories."""
UI = "ui"
HTTP = "http"
NAVIGATION = "navigation"
CONSOLE = "console"
AUTH = "auth"
BUSINESS = "business"
NOTIFICATION = "notification"
CUSTOM = "custom"
@dataclass
class Breadcrumb:
"""A breadcrumb representing an event leading up to an error."""
timestamp: datetime
category: BreadcrumbCategory
message: str
level: SeverityLevel = SeverityLevel.INFO
data: Optional[Dict[str, Any]] = None
@dataclass
class User:
"""User information for context."""
id: str
email: Optional[str] = None
name: Optional[str] = None
data: Optional[Dict[str, Any]] = None
@dataclass
class StackFrame:
"""Stack frame information."""
function: Optional[str] = None
filename: Optional[str] = None
lineno: Optional[int] = None
colno: Optional[int] = None
context: Optional[List[str]] = None
@dataclass
class ExceptionInfo:
"""Exception/error information."""
type: str
message: str
stacktrace: Optional[List[StackFrame]] = None
@dataclass
class PlatformInfo:
"""Platform/runtime information."""
name: str
version: Optional[str] = None
os: Optional[str] = None
@dataclass
class JourneyContext:
"""Journey context for tracking user flows."""
journey_id: str
name: str
started_at: datetime
current_step: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TelemetryEvent:
"""Event payload sent to the server."""
event_id: str
timestamp: datetime
level: SeverityLevel
platform: PlatformInfo
tags: Dict[str, str] = field(default_factory=dict)
extra: Dict[str, Any] = field(default_factory=dict)
breadcrumbs: List[Breadcrumb] = field(default_factory=list)
message: Optional[str] = None
exception: Optional[ExceptionInfo] = None
user: Optional[User] = None
journey: Optional[JourneyContext] = None
environment: Optional[str] = None
app_version: Optional[str] = None
@dataclass
class ParsedDsn:
"""Parsed DSN components."""
public_key: str
host: str
protocol: str
api_base_url: str
@dataclass
class SendResult:
"""Result of sending an event."""
success: bool
event_id: Optional[str] = None
error: Optional[str] = None
queued: bool = False
@dataclass
class TelemetryOptions:
"""Options for initializing the SDK."""
dsn: str
environment: str = "production"
app_version: str = "0.0.0"
sample_rate: float = 1.0
max_breadcrumbs: int = 100
debug: bool = False
before_send: Optional[Callable[[TelemetryEvent], Optional[TelemetryEvent]]] = None
enable_offline_queue: bool = True
max_offline_queue_size: int = 500
api_base_url: Optional[str] = None