import asyncio
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
import openai
import json
async def answer_question(
adaline: Adaline,
monitor,
session_id: str,
question: str
):
trace = monitor.log_trace(
name="RAG Query",
session_id=session_id,
tags=["rag", "qa"]
)
try:
# Step 1: Generate embedding
embed_span = trace.log_span(
name="Generate Query Embedding",
tags=["embedding"]
)
client = openai.AsyncOpenAI()
embedding_response = await client.embeddings.create(
model="text-embedding-3-large",
input=question
)
embed_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanEmbeddingsContent(
type="Embeddings",
input=json.dumps({"model": "text-embedding-3-large", "input": question}),
output=json.dumps({"dimensions": len(embedding_response.data[0].embedding)})
)
)
})
embed_span.end()
# Step 2: Retrieve documents
retrieval_span = trace.log_span(
name="Vector Search",
tags=["retrieval"]
)
# ... perform vector search ...
search_results = {"ids": ["doc1", "doc2"], "scores": [0.95, 0.87]}
retrieval_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanRetrievalContent(
type="Retrieval",
input=json.dumps({"query": question, "top_k": 5}),
output=json.dumps(search_results)
)
)
})
retrieval_span.end()
# Step 3: Get LLM deployment and generate answer
deployment = await adaline.get_latest_deployment(
prompt_id="rag-answer-prompt",
deployment_environment_id="environment_abc123"
)
llm_span = trace.log_span(
name="Generate Answer",
prompt_id=deployment.prompt_id,
deployment_id=deployment.id,
run_evaluation=True,
tags=["llm"]
)
messages = [
*[{"role": m.role, "content": m.content} for m in deployment.prompt.messages],
{"role": "user", "content": question}
]
completion = await client.chat.completions.create(
model=deployment.prompt.config.model,
messages=messages
)
llm_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanModelContent(
type="Model",
provider=deployment.prompt.config.provider_name,
model=deployment.prompt.config.model,
input=json.dumps(messages),
output=json.dumps(completion.choices[0].message.model_dump())
)
)
})
llm_span.end()
trace.update({"status": "success"})
return completion.choices[0].message.content
except Exception as e:
trace.update({"status": "failure", "attributes": {"error": str(e)}})
raise
finally:
trace.end()
async def main():
adaline = Adaline()
monitor = adaline.init_monitor(project_id="rag-system")
answer = await answer_question(
adaline, monitor,
session_id="session-1",
question="How does authentication work?"
)
print(answer)
await monitor.flush()
monitor.stop()
asyncio.run(main())