Instrumenting your audio application to send events and traces to Arize involves capturing key events from the OpenAI Realtime API's WebSocket and converting them into spans that provide meaningful insights into your system's behavior.
Key Events for Instrumentation
We have identified the following events from OpenAI Realtime API's WebSocket as the most valuable for LLM observability. While there are many other events, the majority of useful information can be captured by listening for these events:
Session Events
session.created: Indicates the creation of a new session.
session.updated: Denotes updates to the session's parameters or state.
Audio Input Events
input_audio_buffer.speech_started: Signals the start of speech input.
input_audio_buffer.speech_stopped: Indicates the end of speech input.
input_audio_buffer.committed: Confirms that the audio input buffer has been committed for processing.
Conversation Events
conversation.item.created: Represents the creation of a new conversation item, such as a user message.
Response Events
response.audio_transcript.delta: Provides incremental transcripts of the audio response.
response.audio_transcript.done: Indicates the completion of the audio transcript.
response.done: Marks the completion of the response generation.
response.audio.delta : Represents the output audio bytes
Error Events
error: Conveys any errors encountered during processing.
Mapping Events to Spans
For each of these key events, you can create corresponding spans to capture the event's context and metadata:
Session Management
Upon receiving session.created, start a new span to represent the session's lifecycle.
Update the span with any changes when session.updated is received.
Audio Input Handling
Start a span when input_audio_buffer.speech_started is detected.
Attach attributes such as input audio URL, MIME type, and transcript as they become available.
End the span upon receiving input_audio_buffer.speech_stopped.
Conversation Tracking
Create a span for each conversation.item.created event to monitor user inputs and system messages.
Include attributes like message role and content.
Response Generation
Initiate a span when response generation begins.
Update the span with incremental transcripts from response.audio_transcript.delta.
Finalize the span upon receiving response.done, adding attributes such as output audio URL, MIME type, and any function call details.
Error Handling
For any error events, log the error details within the relevant active span to aid in debugging and observability.
Listening for Events and Setting Spans
Session Creation: When receiving a session.created event, start a parent span to represent the session lifecycle.
if event.get("type") == "session.created":
with tracer.start_as_current_span("session.lifecycle") as parent_span:
parent_span.set_attribute("session.id", event["session"]["id"])
log_event("Session Created", f"Session ID: {event['session']['id']}")
Response Handling: Log output audio transcripts and set response attributes.
if event.get("type") == "response.audio_transcript.done":
transcript = event.get("transcript", "")
with tracer.start_as_current_span("Audio Output") as span:
span.set_attribute("output.audio.transcript", transcript)
Tool Calls and Nested Spans: For response.function_call_arguments.done, create nested spans to track tool invocations.
When processing tool calls, you may need to extract attributes and metadata about the tools and set them in spans for observability. Below is an example implementation for processing tools within a session update event. This is just one example and can be adapted for your specific use case.
def process_tools(session_update_event, _span):
"""
Process tools in the session update event and set their attributes.
"""
tools = session_update_event["session"].get("tools", [])
for i, tool in enumerate(tools):
tool_name = tool.get("name", f"tool_{i}")
tool_type = tool.get("type", "unknown")
tool_description = tool.get("description", "")
tool_parameters = tool.get("parameters", {})
# Create a JSON schema-like attribute for the tool
tool_json_schema = json.dumps(tool_parameters)
# Set tool attributes in the span
_span.set_attribute(f"llm.tools.{i}.tool.name", tool_name)
_span.set_attribute(f"llm.tools.{i}.tool.type", tool_type)
_span.set_attribute(f"llm.tools.{i}.tool.description", tool_description)
_span.set_attribute(f"llm.tools.{i}.tool.json_schema", tool_json_schema)
# Log the tool processing
log_event(
"Tool Processed",
f"Processed tool {tool_name}: Type={tool_type}, Description={tool_description}",
)
Adding URLs: Add input and output audio URLs to the span whenever they become available.
When working with URLs, you may need to save audio files or other data to a storage service like Google Cloud Storage (GCS). Below is an example implementation for GCS. Please note, this is just one example, and you may need to adjust the code for your specific storage solution.
See our integrations page, for more info on granting access to your files for other providers.
def upload_to_gcs(file_path, bucket_name, destination_blob_name, make_public=False):
"""Uploads a file to Google Cloud Storage."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(file_path)
if make_public:
blob.make_public()
return blob.public_url
else:
return destination_blob_name
except Exception as e:
raise RuntimeError(f"Failed to upload {file_path} to GCS: {e}")
def process_audio_and_upload(pcm16_audio, span):
"""Processes audio, saves as WAV, uploads to GCS, and cleans up."""
timestamp = time.strftime("%Y%m%d_%H%M%S")
file_name = f"audio_{timestamp}.wav"
file_path = file_name
bucket_name = "jz999"
try:
save_audio_to_wav(pcm16_audio, file_path)
gcs_url = upload_to_gcs(file_path, bucket_name, f"sallyann/audio/{file_name}")
span.set_attribute("input.audio.url", gcs_url)
finally:
if os.path.exists(file_path):
os.remove(file_path)
return gcs_url
import { Storage } from '@google-cloud/storage';
import { Span, trace } from '@opentelemetry/api';
import * as fs from 'fs';
import * as path from 'path';
/**
* Uploads a file to Google Cloud Storage and returns the URL.
* @param filePath - The local path to the file to upload.
* @param bucketName - The GCS bucket name.
* @param destinationBlobName - The destination path in the GCS bucket.
* @param makePublic - Whether to make the file public.
*/
async function uploadToGcs(
filePath: string,
bucketName: string,
destinationBlobName: string,
makePublic: boolean = false
): Promise<string> {
const storage = new Storage();
const bucket = storage.bucket(bucketName);
const blob = bucket.file(destinationBlobName);
try {
// Upload the file to the specified bucket
await bucket.upload(filePath, {
destination: destinationBlobName,
});
if (makePublic) {
// Make the file public if requested
await blob.makePublic();
return blob.publicUrl();
} else {
return `gs://${bucketName}/${destinationBlobName}`;
}
} catch (error) {
throw new Error(`Failed to upload ${filePath} to GCS: ${error.message}`);
}
}
/**
* Processes PCM16 audio data, converts it to WAV, uploads it to GCS, and sets the URL in the span.
* @param pcm16Audio - The audio data in PCM16 format.
* @param span - The current tracing span.
*/
async function processAudioAndUpload(pcm16Audio: Buffer, span: Span): Promise<string> {
const timestamp = new Date().toISOString().replace(/[-:.]/g, '_');
const fileName = `audio_${timestamp}.wav`;
const filePath = path.join(__dirname, fileName);
const bucketName = 'jz999';
const destinationBlobName = `sallyann/audio/${fileName}`;
try {
// Save audio as a WAV file locally
await saveAudioToWav(pcm16Audio, filePath);
// Upload the file to GCS
const gcsUrl = await uploadToGcs(filePath, bucketName, destinationBlobName, true);
// Set the GCS URL as a span attribute
span.setAttribute('input.audio.url', gcsUrl);
return gcsUrl;
} finally {
// Clean up the local file after upload
if (fs.existsSync(filePath)) {
fs.unlinkSync(filePath);
}
}
}
/**
* Converts PCM16 audio data into a WAV file and saves it locally.
* @param pcm16Audio - The audio data in PCM16 format.
* @param outputPath - The path to save the WAV file.
*/
async function saveAudioToWav(pcm16Audio: Buffer, outputPath: string): Promise<void> {
// Implement WAV file conversion logic here
// For demonstration, we assume the audio buffer is directly saved as a WAV
fs.writeFileSync(outputPath, pcm16Audio);
}
Save the Audio to a Local File: Converts the PCM16 audio data into a WAV file.
Upload to GCS: Uploads the WAV file to the specified GCS bucket.
Set Span Attribute: Adds the GCS URL as an attribute to the span for observability.
Clean Up: Deletes the local file after it has been uploaded to GCS.
Notes:
Replace bucket_name, destination_blob_name, and file_path with your own values.
This is an example specific to Google Cloud Storage. You can adapt a similar pattern for other storage providers like AWS S3 or Azure Blob Storage.
If you need the file to be public, set the make_public parameter to True.
This example illustrates one way to handle storage, but always tailor the implementation to fit your infrastructure and application needs.
Semantic Conventions
The following semantic conventions define attributes for sessions, audio, conversations, responses, and errors.
Session Attributes
session.id: Unique identifier for the session.
session.status: Current status of the session (e.g., active, completed).
Audio Attributes
input.audio.url: URL of the input audio file.
input.audio.mime_type: MIME type of the input audio (e.g., audio/wav).
input.audio.transcript: Transcript of the input audio.
output.audio.url: URL of the output audio file.
output.audio.mime_type: MIME type of the output audio.
output.audio.transcript: Transcript of the output audio.
Conversation Attributes
message.role: Role of the message sender (e.g., user, system).
message.content: Content of the message.
Response Attributes
response.id: Unique identifier for the response.
response.status: Status of the response (e.g., in_progress, completed).
response.token_count: Number of tokens in the response.
Error Attributes
error.type: Type of error encountered.
error.message: Detailed error message.
Implementation Considerations
While this guide provides a framework for instrumentation, tailor the implementation to fit your application's architecture. Ensure that your instrumentation captures the specified key events to provide comprehensive observability into your application's interactions with the OpenAI Realtime API.
Currently, voice evaluations are supported exclusively with OpenAI models. Support for additional models is planned and will be available soon.
This guide provides instructions on how to evaluate voice applications using OpenAI models within the Phoenix framework. The example notebook linked below demonstrates the process of configuring and running evaluations.
Prerequisites
Phoenix Installation: Make sure the phoenix package is installed in your Python environment.
OpenAI API Key: Obtain an API key for the OpenAI model you plan to use.
Audio Data: Prepare the audio data required for evaluation. This can be in the form of raw audio bytes, base64-encoded strings, or URLs pointing to audio files. If you have existing data in Arize, you can use our export client to retrieve it.
Python Environment: Ensure you are using Python version 3.7 or higher.
Steps to Evaluate
1. Set Up the Model
Use the OpenAIModel class to define the OpenAI model for evaluation. Replace the placeholder API key with your own.
from phoenix.evals import OpenAIModel
model = OpenAIModel(model="gpt-4o-audio-preview", api_key="your_openai_api_key")
2. Define the Template
Templates are used to configure prompts sent to the OpenAI model, ensuring that the task is clearly defined and the model's responses are constrained to valid outputs. Templates consist of rails (the set of valid responses) and a sequence of prompt parts that define the type and content of the input or instructions.
In addition to custom templates, we offer an out-of-the-box template for emotion detection. This template streamlines setup, allowing you to start classifying audio with minimal configuration.
Below is an example template for tone classification.
from phoenix.evals.templates import (
ClassificationTemplate,
PromptPartContentType,
PromptPartTemplate,
)
# Define valid classification labels (rails)
TONE_EMOTION_RAILS = ["positive", "neutral", "negative"]
# Create the classification template
template = ClassificationTemplate(
rails=TONE_EMOTION_RAILS, # Specify the valid output labels
template=[
# Prompt part 1: Task description
PromptPartTemplate(
content_type=PromptPartContentType.TEXT,
template="""
You are a helpful AI bot that checks for the tone of the audio.
Analyze the audio file and determine the tone (e.g., positive, neutral, negative).
Your evaluation should provide a multiclass label from the following options: ['positive', 'neutral', 'negative'].
Here is the audio:
""",
),
# Prompt part 2: Insert the audio data
PromptPartTemplate(
content_type=PromptPartContentType.AUDIO,
template="{audio}", # Placeholder for the audio content
),
# Prompt part 3: Define the response format
PromptPartTemplate(
content_type=PromptPartContentType.TEXT,
template="""
Your response must be a string, either positive, neutral, or negative, and should not contain any text or characters aside from that.
""",
),
],
)
How It Works
Prompt Parts
Part 1: Provides task instructions and specifies valid response labels.
Part 2: Dynamically inserts the audio data for analysis using the placeholder. You'll want to ensure that the prompt variable you choose corresponds to the column that holds your base64-encoded audio data.
Part 3: Ensures the model outputs a response in the desired format (a single-word string: positive, neutral, or negative).
Rails
Rails define the set of valid outputs for the classification task: ["positive", "neutral", "negative"].
Any response outside this set can be flagged as invalid.
This modular approach ensures flexibility, allowing you to reuse and adapt templates for different use cases or models.
If you are evaluating text (e.g., a transcript) instead of audio, you can directly use a string prompt without needing dynamic placeholders.
3. Prepare the Data Processor (Optional)
Using a data processor with Phoenix enables parallel processing of your audio data, improving efficiency and scalability. A data processor is responsible for transforming raw audio data into base64-encoded strings, which can then be utilized by your models.
Processor Requirements
To ensure compatibility with Phoenix, your data processor must meet the following criteria:
Consistent Input and Output Types
The input and output of the processor must maintain the same type.
For example: If you are processing a DataFrame, the input would be a series (a single row), and the output would also be a series (updated row with the encoded audio).
Audio Link Processing
The processor must fetch audio from a provided link (either from cloud storage or local storage) and produce a base64-encoded string.
Column Assignment Consistency
The encoded string must be assigned to the same column referenced in your prompt template.
For example, if you are using the EMOTION_AUDIO_TEMPLATE, the base64-encoded audio string should be assigned to the "audio" column.
Example: Fetching and Encoding Audio from Google Cloud Storage
Below is an example data processor that demonstrates how to fetch audio from Google Cloud Storage, encode it as a base64 string, and assign it to the appropriate column in the dataframe:
async def async_fetch_gcloud_data(row: pd.Series) -> pd.Series:
"""
Fetches data from a Google Cloud Storage URL and returns the content as a base64-encoded string.
"""
token = None
try:
# Execute the gcloud command to fetch the access token
output = await asyncio.create_subprocess_exec(
"gcloud",
"auth",
"print-access-token",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await output.communicate()
if output.returncode != 0:
raise RuntimeError(f"Error executing gcloud command: {stderr.decode('UTF-8').strip()}")
token = stdout.decode("UTF-8").strip()
# Ensure the token is not empty or None
if not token:
raise ValueError("Failed to retrieve a valid access token. Token is empty.")
except Exception as e:
# Catch any other exceptions and re-raise them with additional context
raise RuntimeError(f"An unexpected error occurred: {str(e)}")
# Set the token in the header
gcloud_header = {"Authorization": f"Bearer {token}"}
# Must ensure that the url begins with storage.googleapis..., rather than store.cloud.google...
url = row["attributes.input.audio.url"]
G_API_HOST = "https://storage.googleapis.com/"
not_googleapis = url.startswith("https://storage.cloud.google.com/") or url.startswith("gs://")
g_api_url = (
url.replace("https://storage.cloud.google.com/", G_API_HOST)
if url and not_googleapis
else url
)
# Get a response back, present the status
async with aiohttp.ClientSession() as session:
async with session.get(g_api_url, headers=gcloud_header) as response:
response.raise_for_status()
content = await response.read()
encoded_string = base64.b64encode(content).decode("utf-8")
row["audio"] = encoded_string
return row
If your audio data is already in base64 format as an encoded string, you can skip this step.
4. Perform the Evaluation
To run an evaluation, use the llm_classify function. This function accepts a DataFrame, a list of audio URLs, or raw audio bytes as input. In the example below, data is exported directly from Arize to perform the evaluation.
from phoenix.evals.classify import llm_classify
import pandas as pd
# Example DataFrame
df = client.export_model_to_df(
space_id='SPACE_ID',
model_id='PROJECT_NAME',
environment=Environments.TRACING,
start_time=datetime.fromisoformat('2024-12-23T07:00:00.000+00:00'),
end_time=datetime.fromisoformat('2024-12-31T06:59:59.999+00:00'),
)
# Run the evaluation
results = llm_classify(
model=model,
data=df,
data_processor=async_fetch_gcloud_data,
template=EMOTION_PROMPT_TEMPLATE,
rails=EMOTION_AUDIO_RAILS,
provide_explanation=True,
)
Considerations
Data Processors: Custom functions can transform audio paths, URLs, or raw data to the required format.
Templates: Modify the templates to fit your specific evaluation needs.
Remember: template variables should have the same naming as the desired columns in the dataframe
Explanations: Enable provide_explanation=True to include detailed reasoning in the evaluation output.