Span Class
The Span class represents a specific operation within a trace. Spans are the building blocks of observability, tracking individual steps like LLM calls, tool executions, database queries, or any custom operation in your LLM pipeline.
Overview
A span captures:
Operation details : name, timing, status
Content : input/output data for different span types (Model, Tool, Retrieval, etc.)
Metadata : tags, attributes for filtering and analysis
Hierarchy : parent-child relationships for nested operations
Evaluation : optional evaluator execution
Common span types:
Model - LLM inference calls
ModelStream - Streaming LLM responses
Tool - Function/API executions
Retrieval - RAG and vector searches
Embeddings - Embedding generation
Function - Custom application logic
Guardrail - Safety/compliance checks
Other - Any custom operation
Creation
Create spans from a trace or parent span:
# From trace
span = trace.log_span( name = "LLM Call" , tags = [ "llm" ])
# From parent span (nested)
child_span = parent_span.log_span( name = "Tool Execution" , tags = [ "tool" ])
Properties
span
span: CreateLogSpanRequest
The underlying span request object containing all span data.
Methods
log_span()
Create a nested child span under this span.
log_span(
* ,
name: str ,
status: str = "unknown" ,
reference_id: Optional[ str ] = None ,
prompt_id: Optional[ str ] = None ,
deployment_id: Optional[ str ] = None ,
run_evaluation: Optional[ bool ] = None ,
tags: Optional[List[ str ]] = None ,
attributes: Optional[Dict[ str , Any]] = None ,
content: Optional[LogSpanContent] = None
) -> Span
This allows you to create hierarchical span relationships like:
Trace
+-- Parent Span
+-- Child Span 1
+-- Child Span 2
+-- Grandchild Span
Parameters
Same parameters as Trace.log_span(). See Trace.log_span() for details.
Example
parent_span = trace.log_span( name = "RAG Pipeline" )
embedding_span = parent_span.log_span(
name = "Generate Embedding" ,
tags = [ "embedding" ]
)
embedding_span.end()
retrieval_span = parent_span.log_span(
name = "Vector Search" ,
tags = [ "retrieval" ]
)
retrieval_span.end()
parent_span.end()
update()
Update span metadata and content.
update(updates: dict ) -> Span
Parameters
Dictionary of fields to update. Only the keys "name", "status", "tags", "attributes", "run_evaluation", and "content" are applied; all other keys are silently ignored.
Returns
Returns self for method chaining.
Examples
Update Status
Update Content
Method Chaining
span = trace.log_span( name = "API Call" )
try :
await call_api()
span.update({ "status" : "success" })
except Exception :
span.update({ "status" : "failure" })
span.end()
end()
Mark the span as complete and ready to be flushed.
Behavior
Sets ended_at timestamp
Marks the span as ready in the monitor’s buffer
Recursively ends all child spans
Returns the span’s reference ID
Idempotent: subsequent calls return the reference ID without side effects
Always call end() on your spans! Spans that are never ended will never be flushed to the API.
Example
span = trace.log_span( name = "Operation" )
try :
await do_work()
span.update({ "status" : "success" })
finally :
span.end()
Span Content Types
Different content types for different operations. In Python, span content uses the LogSpanContent wrapper with actual_instance.
Model (LLM Inference)
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanModelContent(
type = "Model" ,
provider = "openai" ,
model = "gpt-4o" ,
input = json.dumps([{ "role" : "user" , "content" : "Hello" }]),
output = json.dumps({ "role" : "assistant" , "content" : "Hi!" }),
cost = 0.002 ,
variables = { "user_name" : "John" }
)
)
})
ModelStream (Streaming LLM)
from adaline_api.models.log_span_model_stream_content import LogSpanModelStreamContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanModelStreamContent(
type = "ModelStream" ,
provider = "anthropic" ,
model = "claude-3-opus" ,
input = json.dumps(messages),
output = raw_chunks,
aggregate_output = json.dumps({ "role" : "assistant" , "content" : full_response}),
cost = 0.005
)
)
})
from adaline_api.models.log_span_tool_content import LogSpanToolContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanToolContent(
type = "Tool" ,
input = json.dumps({ "function" : "get_weather" , "city" : "Paris" }),
output = json.dumps({ "temperature" : 24 , "conditions" : "sunny" })
)
)
})
Retrieval (RAG/Vector Search)
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanRetrievalContent(
type = "Retrieval" ,
input = json.dumps({ "query" : "What is ML?" , "top_k" : 5 }),
output = json.dumps({ "documents" : [{ "id" : "doc1" , "score" : 0.95 }]})
)
)
})
Embeddings
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanEmbeddingsContent(
type = "Embeddings" ,
input = json.dumps({ "texts" : [ "text1" , "text2" ]}),
output = json.dumps({ "dimensions" : 3072 })
)
)
})
Function (Custom Logic)
from adaline_api.models.log_span_function_content import LogSpanFunctionContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanFunctionContent(
type = "Function" ,
input = json.dumps({ "operation" : "process" , "id" : 123 }),
output = json.dumps({ "result" : "success" , "items" : 42 })
)
)
})
Guardrail (Safety Check)
from adaline_api.models.log_span_guardrail_content import LogSpanGuardrailContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanGuardrailContent(
type = "Guardrail" ,
input = json.dumps({ "text" : "User input..." , "checks" : [ "toxicity" , "pii" ]}),
output = json.dumps({ "safe" : True , "scores" : { "toxicity" : 0.05 }})
)
)
})
Other (Custom)
from adaline_api.models.log_span_other_content import LogSpanOtherContent
span.update({
"content" : LogSpanContent(
actual_instance = LogSpanOtherContent(
type = "Other" ,
input = json.dumps({ "custom" : "input" }),
output = json.dumps({ "custom" : "output" })
)
)
})
Complete Examples
LLM Call with Deployment
import json
from openai import AsyncOpenAI
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
adaline = Adaline()
openai = AsyncOpenAI()
monitor = adaline.init_monitor( project_id = "my-project" )
async def generate_response ( user_id : str , message : str ):
deployment = await adaline.get_latest_deployment(
prompt_id = "chat-prompt" ,
deployment_environment_id = "environment_abc123"
)
trace = monitor.log_trace(
name = "Chat Completion" ,
session_id = user_id
)
span = trace.log_span(
name = "LLM Completion" ,
prompt_id = deployment.prompt_id,
deployment_id = deployment.id,
run_evaluation = True ,
tags = [ "llm" , deployment.prompt.config.provider_name]
)
try :
response = await openai.chat.completions.create(
model = deployment.prompt.config.model,
messages = [
* deployment.prompt.messages,
{ "role" : "user" , "content" : message}
],
** deployment.prompt.config.settings
)
reply = response.choices[ 0 ].message.content
span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanModelContent(
type = "Model" ,
provider = deployment.prompt.config.provider_name,
model = deployment.prompt.config.model,
input = json.dumps( str (deployment.prompt.messages)),
output = json.dumps(response.choices[ 0 ].message.model_dump())
)
)
})
return reply
except Exception as error:
span.update({
"status" : "failure" ,
"attributes" : { "error" : str (error)}
})
raise
finally :
span.end()
trace.end()
Nested RAG Pipeline
import json
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
async def rag_pipeline ( question : str ):
trace = monitor.log_trace( name = "RAG Pipeline" , tags = [ "rag" ])
pipeline_span = trace.log_span( name = "RAG Workflow" , tags = [ "pipeline" ])
try :
# Step 1: Embedding
embed_span = pipeline_span.log_span(
name = "Generate Embedding" ,
tags = [ "embedding" ]
)
embed_response = await openai.embeddings.create(
model = "text-embedding-3-large" ,
input = question
)
embedding = embed_response.data[ 0 ].embedding
embed_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanEmbeddingsContent(
type = "Embeddings" ,
input = json.dumps({ "text" : question}),
output = json.dumps({ "dimensions" : len (embedding)})
)
)
})
embed_span.end()
# Step 2: Retrieval
retrieval_span = pipeline_span.log_span(
name = "Vector Search" ,
tags = [ "retrieval" ]
)
results = await vector_db.query( embedding = embedding, top_k = 5 )
retrieval_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanRetrievalContent(
type = "Retrieval" ,
input = json.dumps({ "query" : question, "top_k" : 5 }),
output = json.dumps({ "document_ids" : results.ids})
)
),
"attributes" : { "documents_found" : len (results.ids)}
})
retrieval_span.end()
# Step 3: LLM Generation
llm_span = pipeline_span.log_span(
name = "Generate Answer" ,
run_evaluation = True ,
tags = [ "llm" , "answer" ]
)
context = " \n\n " .join(results.documents)
completion = await openai.chat.completions.create(
model = "gpt-4o" ,
messages = [
{ "role" : "system" , "content" : "Answer based on the provided context." },
{ "role" : "user" , "content" : f "Context: \n { context } \n\n Question: { question } " }
]
)
answer = completion.choices[ 0 ].message.content
llm_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanModelContent(
type = "Model" ,
provider = "openai" ,
model = "gpt-4o" ,
input = json.dumps({ "question" : question}),
output = json.dumps({ "answer" : answer})
)
)
})
llm_span.end()
pipeline_span.update({ "status" : "success" })
pipeline_span.end()
trace.update({ "status" : "success" })
return answer
except Exception :
pipeline_span.update({ "status" : "failure" })
trace.update({ "status" : "failure" })
raise
finally :
trace.end()
Best Practices
1. Use Appropriate Content Types
# LLM calls -> Model content
llm_span.update({ "content" : LogSpanContent(
actual_instance = LogSpanModelContent( type = "Model" , ... )
)})
# Tool calls -> Tool content
tool_span.update({ "content" : LogSpanContent(
actual_instance = LogSpanToolContent( type = "Tool" , ... )
)})
# Retrieval -> Retrieval content
search_span.update({ "content" : LogSpanContent(
actual_instance = LogSpanRetrievalContent( type = "Retrieval" , ... )
)})
2. Always End Spans
span = trace.log_span( name = "Op" )
try :
await work()
finally :
span.end()
span = trace.log_span(
name = "OpenAI Call" ,
tags = [ "llm" , "openai" , "gpt-4o" , "production" ],
attributes = {
"user_id" : "user-123" ,
"region" : "us-east-1" ,
"cached" : False ,
}
)