Skip to main content

Monitor Class

The Monitor class manages buffering, batching, and flushing of traces and spans to the Adaline API. It handles automatic retries, background flushing, and failure tracking.

Overview

The Monitor acts as a central coordinator for all observability operations:
  • Buffers traces and spans in memory
  • Batches multiple entries for efficient API calls
  • Flushes automatically using an async background loop
  • Retries failed requests with exponential backoff
  • Drops entries that fail after retries, following OpenTelemetry error handling principles

Creation

Create a Monitor using the Adaline.init_monitor() method:
from adaline.main import Adaline

adaline = Adaline()

monitor = adaline.init_monitor(
    project_id="your-project-id",
    flush_interval_seconds=5,
    max_buffer_size=100
)

Properties

buffer

buffer: list
In-memory list storing trace and span entries waiting to be flushed. Each entry is a dict with ready, data, and category keys. Example:
print(f"Buffer size: {len(monitor.buffer)}")

for entry in monitor.buffer:
    if entry["category"] == "trace":
        print("Trace:", entry["data"].trace.trace.name)
    else:
        print("Span:", entry["data"].span.span.name)

sent_count

sent_count: int
Number of entries successfully sent to the API.

dropped_count

dropped_count: int
Number of entries dropped due to buffer overflow or send failure. Example:
print(f"Sent: {monitor.sent_count}, Dropped: {monitor.dropped_count}")

project_id

project_id: str
The project ID that all traces and spans are associated with.

default_content

default_content: LogSpanContent
Default span content used when no explicit content is provided. Defaults to LogSpanContent(actual_instance=LogSpanOtherContent(type="Other", input="{}", output="{}")).

Methods

log_trace()

Create a new trace and add it to the buffer.
log_trace(
    *,
    name: str,
    status: str = "unknown",
    session_id: Optional[str] = None,
    reference_id: Optional[str] = None,
    tags: Optional[List[str]] = None,
    attributes: Optional[Dict[str, Any]] = None
) -> Trace

Parameters

name
str
required
Human-readable name for this trace (e.g., “User Login”, “Generate Report”).
status
str
default:"unknown"
Initial status: 'success' | 'failure' | 'aborted' | 'cancelled' | 'pending' | 'unknown'
session_id
str
Session identifier to group related traces.
reference_id
str
Custom reference ID. Auto-generated UUID if not provided.
tags
List[str]
List of tags for categorization and filtering.
attributes
Dict[str, Any]
Key-value metadata for additional context. Values must be str, int, float, or bool.

Returns

trace
Trace
A new Trace instance. See Trace Class for details.

Examples

trace = monitor.log_trace(name="API Request")
trace.end()

flush()

Manually flush all ready entries in the buffer to the API. This is an async method.
async flush() -> None
This method is automatically called in a background loop (based on flush_interval_seconds). Manual calls are typically only needed during shutdown or testing.

Behavior

  1. Skips if a flush is already in progress
  2. Filters for entries marked as ready (via trace.end() or span.end())
  3. Sends each entry to the API concurrently with automatic retry
  4. Updates trace_id on traces after successful creation
  5. Removes successfully flushed entries from buffer
  6. Increments dropped_count for entries that fail after retries

Example

trace = monitor.log_trace(name="Important Event")
trace.end()

await monitor.flush()

stop()

Stop the background flush loop and cancel the flush task.
stop() -> None
After calling stop(), the monitor will no longer automatically flush. You should call flush() before stop() to send remaining entries.

Example

await monitor.flush()
monitor.stop()

Complete Examples

Basic Usage

import asyncio
from adaline.main import Adaline

async def main():
    adaline = Adaline()
    monitor = adaline.init_monitor(
        project_id="my-project",
        flush_interval_seconds=5,
        max_buffer_size=100
    )

    async def handle_request(user_id: str):
        trace = monitor.log_trace(
            name="User Request",
            session_id=user_id,
            tags=["api"]
        )

        span = trace.log_span(
            name="Process Data",
            tags=["processing"]
        )

        await process_data()

        span.update({"status": "success"})
        span.end()

        trace.update({"status": "success"})
        trace.end()

    await handle_request("user-123")
    await monitor.flush()
    monitor.stop()

asyncio.run(main())

Production Setup with Health Monitoring

import asyncio
from adaline.main import Adaline

async def main():
    adaline = Adaline(debug=True)

    monitor = adaline.init_monitor(
        project_id="production-app",
        flush_interval_seconds=5,
        max_buffer_size=200
    )

    # Periodic health check
    async def health_check():
        while True:
            print(f"Monitor Health: buffer={len(monitor.buffer)}, "
                  f"sent={monitor.sent_count}, dropped={monitor.dropped_count}")

            if monitor.dropped_count > 0:
                print("WARNING: Some telemetry entries were dropped")

            await asyncio.sleep(60)

    health_task = asyncio.create_task(health_check())

    try:
        # ... your application logic ...
        pass
    finally:
        health_task.cancel()
        await monitor.flush()
        monitor.stop()

asyncio.run(main())