Skip to main content

Python SDK

The Python SDK provides a complete toolkit for integrating Adaline’s LLM deployment and observability features into your AI applications. The SDK is fully async-native, built on asyncio.

Installation

pip install adaline-client adaline-api
Requirements: Python >= 3.11

Quick Start

import asyncio
from adaline.main import Adaline

async def main():
    # Initialize client (API key from ADALINE_API_KEY env var)
    adaline = Adaline()

    # Get latest deployment
    deployment = await adaline.get_latest_deployment(
        prompt_id="your-prompt-id",
        deployment_environment_id="your-deployment-environment-id"
    )

    # Initialize monitoring
    monitor = adaline.init_monitor(
        project_id="your-project-id",
        flush_interval_seconds=5,
        max_buffer_size=100
    )

    # Create trace and spans
    trace = monitor.log_trace(
        name="User Request",
        session_id="session-123"
    )

    span = trace.log_span(
        name="LLM Call",
        prompt_id=deployment.prompt_id,
        deployment_id=deployment.id
    )

    # Update the span with the response
    from adaline_api.models.log_span_content import LogSpanContent
    from adaline_api.models.log_span_model_content import LogSpanModelContent
    import json

    span.update({
        "status": "success",
        "content": LogSpanContent(
            actual_instance=LogSpanModelContent(
                type="Model",
                provider=deployment.prompt.config.provider_name,
                model=deployment.prompt.config.model,
                input=json.dumps(str(deployment.prompt.messages)),
                output=json.dumps(str(response))
            )
        )
    })

    span.end()
    trace.end()
    await monitor.flush()
    monitor.stop()

asyncio.run(main())

Type Definitions

The SDK uses types from the adaline-api package:
# API types (deployment, messages, content, logging)
from adaline_api.models.deployment import Deployment
from adaline_api.models.deployment_prompt import DeploymentPrompt
from adaline_api.models.message import Message
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_other_content import LogSpanOtherContent

# SDK classes
from adaline.main import Adaline, Controller
from adaline.logs import Monitor, Trace, Span
See Types Reference for complete type documentation.

Error Handling

The SDK uses automatic retry logic with exponential backoff via the tenacity library:
  • 5xx errors: Automatically retried with exponential backoff (1s, 2s, 4s, … capped at 10s, 20s total budget)
  • 4xx errors: Fail immediately (no retry)
  • Network errors: Retried with exponential backoff
Failed flush entries are dropped and counted in monitor.dropped_count. Following the OpenTelemetry error handling principle, telemetry failures never propagate to your application.

Async-Native Design

All deployment methods are async and must be awaited:
import asyncio
from adaline.main import Adaline

async def main():
    adaline = Adaline()

    # All deployment methods are async
    deployment = await adaline.get_deployment(
        prompt_id="...",
        deployment_id="..."
    )

    latest = await adaline.get_latest_deployment(
        prompt_id="...",
        deployment_environment_id="..."
    )

    controller = await adaline.init_latest_deployment(
        prompt_id="...",
        deployment_environment_id="..."
    )

    cached = await controller.get()
    await controller.stop()  # stop() is also async

    # Monitor.flush() is async
    monitor = adaline.init_monitor(project_id="...")
    await monitor.flush()
    monitor.stop()  # stop() is synchronous on Monitor

asyncio.run(main())

Real-World Examples

Example 1: RAG Pipeline

import asyncio
import json
from openai import AsyncOpenAI
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent

adaline = Adaline()
openai = AsyncOpenAI()

async def answer_question(session_id: str, question: str):
    monitor = adaline.init_monitor(project_id="rag-system")

    trace = monitor.log_trace(
        name="RAG Query",
        session_id=session_id,
        tags=["rag", "qa"]
    )

    try:
        # Step 1: Generate embedding
        embed_span = trace.log_span(
            name="Generate Query Embedding",
            tags=["embedding"]
        )

        embedding_response = await openai.embeddings.create(
            model="text-embedding-3-large",
            input=question
        )
        embedding = embedding_response.data[0].embedding

        embed_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanEmbeddingsContent(
                    type="Embeddings",
                    input=json.dumps({"query": question}),
                    output=json.dumps({"dimensions": len(embedding)})
                )
            )
        })
        embed_span.end()

        # Step 2: Retrieve documents
        retrieval_span = trace.log_span(
            name="Vector Search",
            tags=["retrieval"]
        )

        results = await vector_db.query(embedding=embedding, top_k=5)

        retrieval_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanRetrievalContent(
                    type="Retrieval",
                    input=json.dumps({"query": question, "top_k": 5}),
                    output=json.dumps({"document_ids": results.ids})
                )
            )
        })
        retrieval_span.end()

        # Step 3: Generate answer with deployment
        deployment = await adaline.get_latest_deployment(
            prompt_id="rag-answer-prompt",
            deployment_environment_id="environment_abc123"
        )

        llm_span = trace.log_span(
            name="Generate Answer",
            prompt_id=deployment.prompt_id,
            deployment_id=deployment.id,
            run_evaluation=True,
            tags=["llm", "answer"]
        )

        context = "\n\n".join(results.documents)

        completion = await openai.chat.completions.create(
            model=deployment.prompt.config.model,
            messages=[
                *deployment.prompt.messages,
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
            ],
            **deployment.prompt.config.settings
        )

        answer = completion.choices[0].message.content

        llm_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanModelContent(
                    type="Model",
                    provider=deployment.prompt.config.provider_name,
                    model=deployment.prompt.config.model,
                    input=json.dumps(str(deployment.prompt.messages)),
                    output=json.dumps(completion.choices[0].message.model_dump())
                )
            ),
            "attributes": {"context_docs": len(results.ids)}
        })
        llm_span.end()

        trace.update({"status": "success"})
        return answer

    except Exception as error:
        trace.update({"status": "failure", "attributes": {"error": str(error)}})
        raise

    finally:
        trace.end()
        await monitor.flush()
        monitor.stop()

Example 2: Multi-Step Agent

import asyncio
import json
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
from adaline_api.models.log_span_other_content import LogSpanOtherContent

async def run_agent():
    adaline = Adaline(debug=True)

    controller = await adaline.init_latest_deployment(
        prompt_id="agent-prompt",
        deployment_environment_id="environment_abc123",
        refresh_interval=60
    )

    monitor = adaline.init_monitor(
        project_id="agent-project",
        flush_interval_seconds=5
    )

    queries = [
        "What are the latest AI trends?",
        "Compare Python and Rust for ML workloads"
    ]

    session_id = "demo-session"

    for query in queries:
        deployment = await controller.get()

        trace = monitor.log_trace(
            name="Agent Query",
            session_id=session_id,
            tags=["agent", "research"],
            attributes={"query_length": len(query)}
        )

        # Intent classification
        intent_span = trace.log_span(
            name="Intent Classification",
            tags=["model", "classification"]
        )
        intent_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanModelContent(
                    type="Model",
                    provider="openai",
                    model="gpt-4o-mini",
                    input=json.dumps({"query": query}),
                    output=json.dumps({"intent": "research"})
                )
            )
        })
        intent_span.end()

        # Knowledge retrieval
        retrieval_span = trace.log_span(
            name="Knowledge Retrieval",
            tags=["retrieval"]
        )
        retrieval_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanRetrievalContent(
                    type="Retrieval",
                    input=json.dumps({"query": query}),
                    output=json.dumps({"chunks": ["result1", "result2"]})
                )
            )
        })
        retrieval_span.end()

        # Response generation
        response_span = trace.log_span(
            name="Response Generation",
            prompt_id=deployment.prompt_id,
            deployment_id=deployment.id,
            run_evaluation=True,
            tags=["model", "generation"]
        )
        response_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanModelContent(
                    type="Model",
                    provider=deployment.prompt.config.provider_name,
                    model=deployment.prompt.config.model,
                    input=json.dumps({"query": query, "context": "..."}),
                    output=json.dumps({"response": "Generated answer..."})
                )
            )
        })
        response_span.end()

        trace.update({"status": "success"})
        trace.end()

    await monitor.flush()
    monitor.stop()
    await controller.stop()

asyncio.run(run_agent())