Skip to main content

Python SDK

The Python SDK provides a complete toolkit for integrating Adaline’s LLM deployment and observability features into your AI applications. The SDK is fully async and uses asyncio throughout.

Installation

pip
pip install adaline-client adaline-api

Quick Start

import asyncio
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent

async def main():
    adaline = Adaline()

    deployment = await adaline.get_latest_deployment(
        prompt_id="your-prompt-id",
        deployment_environment_id="your-environment-id"
    )

    monitor = adaline.init_monitor(
        project_id="your-project-id",
        flush_interval_seconds=1,
        max_buffer_size=1000
    )

    trace = monitor.log_trace(
        name="Chat Completion",
        session_id="user-session-123",
        tags=["production"]
    )

    llm_span = trace.log_span(
        name="OpenAI GPT-4 Call",
        prompt_id=deployment.prompt_id,
        deployment_id=deployment.id
    )

    # ... make your LLM call here ...

    llm_span.update({
        "status": "success",
        "content": LogSpanContent(
            actual_instance=LogSpanModelContent(
                type="Model",
                provider=deployment.prompt.config.provider_name,
                model=deployment.prompt.config.model,
                input=str(deployment.prompt.messages),
                output="LLM response here"
            )
        )
    })

    trace.update({"status": "success"})
    trace.end()

    await monitor.flush()
    monitor.stop()

asyncio.run(main())

Type Definitions

The SDK uses types from the adaline_api package:
from adaline_api.models.deployment import Deployment
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_model_stream_content import LogSpanModelStreamContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_function_content import LogSpanFunctionContent
from adaline_api.models.log_span_tool_content import LogSpanToolContent
from adaline_api.models.log_span_guardrail_content import LogSpanGuardrailContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
from adaline_api.models.log_span_other_content import LogSpanOtherContent

Resource management

Beyond deployments and monitoring, Adaline exposes seven namespace clients that cover the platform’s full REST surface. Each method is async, retries 5xx responses, aborts on 4xx, and uses keyword-only arguments:
import asyncio
from adaline.main import Adaline
from adaline_api.models.create_dataset_request import CreateDatasetRequest
from adaline_api.models.add_dataset_rows_request import AddDatasetRowsRequest
from adaline_api.models.create_evaluation_request import CreateEvaluationRequest

async def main():
    adaline = Adaline()

    # Projects and prompts
    projects = await adaline.projects.list()
    prompts = await adaline.prompts.list(project_id="project_abc123")

    # Datasets: CRUD + columns + rows + dynamic resolution
    dataset = await adaline.datasets.create(
        dataset=CreateDatasetRequest(project_id="project_abc123", title="Eval set"),
    )
    await adaline.datasets.rows.create(
        dataset_id=dataset.id,
        values_by="columnName",
        rows=[{"values": {"question": "How do I reset my password?"}}],
    )

    # Evaluations: kick off a run, then poll for results
    evaluation = await adaline.prompts.evaluations.create(
        prompt_id="prompt_abc123",
        evaluation=CreateEvaluationRequest(
            dataset_id=dataset.id,
            evaluator_ids=["evaluator_abc123"],
            deployment_id="deploy_abc123",
        ),
    )

    results = await adaline.init_evaluation_results(
        prompt_id="prompt_abc123",
        evaluation_id=evaluation.id,
        expand="row",
        refresh_interval=30,
    )
    page = await results.get()

asyncio.run(main())
See the per-client pages for full method lists: Need the same retry behavior on a raw call? Use the exported with_retry helper:
from adaline.clients import with_retry

response = await with_retry(
    lambda: adaline.deployments_api.get_deployment("prompt_abc123", "deploy_xyz789")
)

Error Handling

The SDK uses automatic retry logic with exponential backoff for API calls:
  • 5xx errors: Automatically retried with exponential backoff (1s, 2s, 4s, … capped at 10s) within a 20s budget
  • 4xx errors: Fail immediately (no retry)
Failed flush entries are dropped and counted via monitor.dropped_count. Successfully sent entries are tracked via monitor.sent_count.

Real-World Example: RAG Pipeline

import asyncio
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
import openai
import json

async def answer_question(
    adaline: Adaline,
    monitor,
    session_id: str,
    question: str
):
    trace = monitor.log_trace(
        name="RAG Query",
        session_id=session_id,
        tags=["rag", "qa"]
    )

    try:
        # Step 1: Generate embedding
        embed_span = trace.log_span(
            name="Generate Query Embedding",
            tags=["embedding"]
        )

        client = openai.AsyncOpenAI()
        embedding_response = await client.embeddings.create(
            model="text-embedding-3-large",
            input=question
        )

        embed_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanEmbeddingsContent(
                    type="Embeddings",
                    input=json.dumps({"model": "text-embedding-3-large", "input": question}),
                    output=json.dumps({"dimensions": len(embedding_response.data[0].embedding)})
                )
            )
        })
        embed_span.end()

        # Step 2: Retrieve documents
        retrieval_span = trace.log_span(
            name="Vector Search",
            tags=["retrieval"]
        )

        # ... perform vector search ...
        search_results = {"ids": ["doc1", "doc2"], "scores": [0.95, 0.87]}

        retrieval_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanRetrievalContent(
                    type="Retrieval",
                    input=json.dumps({"query": question, "top_k": 5}),
                    output=json.dumps(search_results)
                )
            )
        })
        retrieval_span.end()

        # Step 3: Get LLM deployment and generate answer
        deployment = await adaline.get_latest_deployment(
            prompt_id="rag-answer-prompt",
            deployment_environment_id="environment_abc123"
        )

        llm_span = trace.log_span(
            name="Generate Answer",
            prompt_id=deployment.prompt_id,
            deployment_id=deployment.id,
            run_evaluation=True,
            tags=["llm"]
        )

        messages = [
            *[{"role": m.role, "content": m.content} for m in deployment.prompt.messages],
            {"role": "user", "content": question}
        ]

        completion = await client.chat.completions.create(
            model=deployment.prompt.config.model,
            messages=messages
        )

        llm_span.update({
            "status": "success",
            "content": LogSpanContent(
                actual_instance=LogSpanModelContent(
                    type="Model",
                    provider=deployment.prompt.config.provider_name,
                    model=deployment.prompt.config.model,
                    input=json.dumps(messages),
                    output=json.dumps(completion.choices[0].message.model_dump())
                )
            )
        })
        llm_span.end()

        trace.update({"status": "success"})
        return completion.choices[0].message.content

    except Exception as e:
        trace.update({"status": "failure", "attributes": {"error": str(e)}})
        raise

    finally:
        trace.end()

async def main():
    adaline = Adaline()
    monitor = adaline.init_monitor(project_id="rag-system")

    answer = await answer_question(
        adaline, monitor,
        session_id="session-1",
        question="How does authentication work?"
    )
    print(answer)

    await monitor.flush()
    monitor.stop()

asyncio.run(main())

API Reference

Adaline Class

Core client for deployments and monitoring.

Monitor Class

Buffered log submission with automatic flushing.

Trace Class

High-level operation tracking.

Span Class

Granular operation tracking with content types.