Instrumenting your audio application to send events and traces to Arize involves capturing key events from the OpenAI Realtime API's WebSocket and converting them into spans that provide meaningful insights into your system's behavior.
Key Events for Instrumentation
We have identified the following events from OpenAI Realtime API's WebSocket as the most valuable for LLM observability. While there are many other events, the majority of useful information can be captured by listening for these events:
Session Events
session.created: Indicates the creation of a new session.
session.updated: Denotes updates to the session's parameters or state.
Audio Input Events
input_audio_buffer.speech_started: Signals the start of speech input.
input_audio_buffer.speech_stopped: Indicates the end of speech input.
input_audio_buffer.committed: Confirms that the audio input buffer has been committed for processing.
Conversation Events
conversation.item.created: Represents the creation of a new conversation item, such as a user message.
Response Events
response.audio_transcript.delta: Provides incremental transcripts of the audio response.
response.audio_transcript.done: Indicates the completion of the audio transcript.
response.done: Marks the completion of the response generation.
response.audio.delta : Represents the output audio bytes
Error Events
error: Conveys any errors encountered during processing.
Mapping Events to Spans
For each of these key events, you can create corresponding spans to capture the event's context and metadata:
Session Management
Upon receiving session.created, start a new span to represent the session's lifecycle.
Update the span with any changes when session.updated is received.
Audio Input Handling
Start a span when input_audio_buffer.speech_started is detected.
Attach attributes such as input audio URL, MIME type, and transcript as they become available.
End the span upon receiving input_audio_buffer.speech_stopped.
Conversation Tracking
Create a span for each conversation.item.created event to monitor user inputs and system messages.
Include attributes like message role and content.
Response Generation
Initiate a span when response generation begins.
Update the span with incremental transcripts from response.audio_transcript.delta.
Finalize the span upon receiving response.done, adding attributes such as output audio URL, MIME type, and any function call details.
Error Handling
For any error events, log the error details within the relevant active span to aid in debugging and observability.
Listening for Events and Setting Spans
Session Creation: When receiving a session.created event, start a parent span to represent the session lifecycle.
if event.get("type") == "session.created":
with tracer.start_as_current_span("session.lifecycle") as parent_span:
parent_span.set_attribute("session.id", event["session"]["id"])
log_event("Session Created", f"Session ID: {event['session']['id']}")
Response Handling: Log output audio transcripts and set response attributes.
if event.get("type") == "response.audio_transcript.done":
transcript = event.get("transcript", "")
with tracer.start_as_current_span("Audio Output") as span:
span.set_attribute("output.audio.transcript", transcript)
Tool Calls and Nested Spans: For response.function_call_arguments.done, create nested spans to track tool invocations.
When processing tool calls, you may need to extract attributes and metadata about the tools and set them in spans for observability. Below is an example implementation for processing tools within a session update event. This is just one example and can be adapted for your specific use case.
def process_tools(session_update_event, _span):
"""
Process tools in the session update event and set their attributes.
"""
tools = session_update_event["session"].get("tools", [])
for i, tool in enumerate(tools):
tool_name = tool.get("name", f"tool_{i}")
tool_type = tool.get("type", "unknown")
tool_description = tool.get("description", "")
tool_parameters = tool.get("parameters", {})
# Create a JSON schema-like attribute for the tool
tool_json_schema = json.dumps(tool_parameters)
# Set tool attributes in the span
_span.set_attribute(f"llm.tools.{i}.tool.name", tool_name)
_span.set_attribute(f"llm.tools.{i}.tool.type", tool_type)
_span.set_attribute(f"llm.tools.{i}.tool.description", tool_description)
_span.set_attribute(f"llm.tools.{i}.tool.json_schema", tool_json_schema)
# Log the tool processing
log_event(
"Tool Processed",
f"Processed tool {tool_name}: Type={tool_type}, Description={tool_description}",
)
Adding URLs: Add input and output audio URLs to the span whenever they become available.
When working with URLs, you may need to save audio files or other data to a storage service like Google Cloud Storage (GCS). Below is an example implementation for GCS. Please note, this is just one example, and you may need to adjust the code for your specific storage solution.
See our integrations page, for more info on granting access to your files for other providers.
def upload_to_gcs(file_path, bucket_name, destination_blob_name, make_public=False):
"""Uploads a file to Google Cloud Storage."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(file_path)
if make_public:
blob.make_public()
return blob.public_url
else:
return destination_blob_name
except Exception as e:
raise RuntimeError(f"Failed to upload {file_path} to GCS: {e}")
def process_audio_and_upload(pcm16_audio, span):
"""Processes audio, saves as WAV, uploads to GCS, and cleans up."""
timestamp = time.strftime("%Y%m%d_%H%M%S")
file_name = f"audio_{timestamp}.wav"
file_path = file_name
bucket_name = "jz999"
try:
save_audio_to_wav(pcm16_audio, file_path)
gcs_url = upload_to_gcs(file_path, bucket_name, f"sallyann/audio/{file_name}")
span.set_attribute("input.audio.url", gcs_url)
finally:
if os.path.exists(file_path):
os.remove(file_path)
return gcs_url
import { Storage } from '@google-cloud/storage';
import { Span, trace } from '@opentelemetry/api';
import * as fs from 'fs';
import * as path from 'path';
/**
* Uploads a file to Google Cloud Storage and returns the URL.
* @param filePath - The local path to the file to upload.
* @param bucketName - The GCS bucket name.
* @param destinationBlobName - The destination path in the GCS bucket.
* @param makePublic - Whether to make the file public.
*/
async function uploadToGcs(
filePath: string,
bucketName: string,
destinationBlobName: string,
makePublic: boolean = false
): Promise<string> {
const storage = new Storage();
const bucket = storage.bucket(bucketName);
const blob = bucket.file(destinationBlobName);
try {
// Upload the file to the specified bucket
await bucket.upload(filePath, {
destination: destinationBlobName,
});
if (makePublic) {
// Make the file public if requested
await blob.makePublic();
return blob.publicUrl();
} else {
return `gs://${bucketName}/${destinationBlobName}`;
}
} catch (error) {
throw new Error(`Failed to upload ${filePath} to GCS: ${error.message}`);
}
}
/**
* Processes PCM16 audio data, converts it to WAV, uploads it to GCS, and sets the URL in the span.
* @param pcm16Audio - The audio data in PCM16 format.
* @param span - The current tracing span.
*/
async function processAudioAndUpload(pcm16Audio: Buffer, span: Span): Promise<string> {
const timestamp = new Date().toISOString().replace(/[-:.]/g, '_');
const fileName = `audio_${timestamp}.wav`;
const filePath = path.join(__dirname, fileName);
const bucketName = 'jz999';
const destinationBlobName = `sallyann/audio/${fileName}`;
try {
// Save audio as a WAV file locally
await saveAudioToWav(pcm16Audio, filePath);
// Upload the file to GCS
const gcsUrl = await uploadToGcs(filePath, bucketName, destinationBlobName, true);
// Set the GCS URL as a span attribute
span.setAttribute('input.audio.url', gcsUrl);
return gcsUrl;
} finally {
// Clean up the local file after upload
if (fs.existsSync(filePath)) {
fs.unlinkSync(filePath);
}
}
}
/**
* Converts PCM16 audio data into a WAV file and saves it locally.
* @param pcm16Audio - The audio data in PCM16 format.
* @param outputPath - The path to save the WAV file.
*/
async function saveAudioToWav(pcm16Audio: Buffer, outputPath: string): Promise<void> {
// Implement WAV file conversion logic here
// For demonstration, we assume the audio buffer is directly saved as a WAV
fs.writeFileSync(outputPath, pcm16Audio);
}
Save the Audio to a Local File: Converts the PCM16 audio data into a WAV file.
Upload to GCS: Uploads the WAV file to the specified GCS bucket.
Set Span Attribute: Adds the GCS URL as an attribute to the span for observability.
Clean Up: Deletes the local file after it has been uploaded to GCS.
Notes:
Replace bucket_name, destination_blob_name, and file_path with your own values.
This is an example specific to Google Cloud Storage. You can adapt a similar pattern for other storage providers like AWS S3 or Azure Blob Storage.
If you need the file to be public, set the make_public parameter to True.
This example illustrates one way to handle storage, but always tailor the implementation to fit your infrastructure and application needs.
Semantic Conventions
The following semantic conventions define attributes for sessions, audio, conversations, responses, and errors.
Session Attributes
session.id: Unique identifier for the session.
session.status: Current status of the session (e.g., active, completed).
Audio Attributes
input.audio.url: URL of the input audio file.
input.audio.mime_type: MIME type of the input audio (e.g., audio/wav).
input.audio.transcript: Transcript of the input audio.
output.audio.url: URL of the output audio file.
output.audio.mime_type: MIME type of the output audio.
output.audio.transcript: Transcript of the output audio.
Conversation Attributes
message.role: Role of the message sender (e.g., user, system).
message.content: Content of the message.
Response Attributes
response.id: Unique identifier for the response.
response.status: Status of the response (e.g., in_progress, completed).
response.token_count: Number of tokens in the response.
Error Attributes
error.type: Type of error encountered.
error.message: Detailed error message.
Implementation Considerations
While this guide provides a framework for instrumentation, tailor the implementation to fit your application's architecture. Ensure that your instrumentation captures the specified key events to provide comprehensive observability into your application's interactions with the OpenAI Realtime API.