Overview
When building voice agents, the typical flow is:- User speaks → Speech-to-Text (STT)
- LLM processes → Generates response tokens
- TTS streams → Audio plays as tokens arrive
Architecture
Copy
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ User │────▶│ STT │────▶│ LLM │
│ Speech │ │ (Whisper) │ │ (GPT-4) │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
│ tokens
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Speaker │◀────│ Buffer │◀────│ KugelAudio │
│ Output │ │ & Play │ │ TTS │
└─────────────┘ └─────────────┘ └─────────────┘
Basic Integration
OpenAI GPT-4
Copy
import asyncio
from openai import AsyncOpenAI
from kugelaudio import KugelAudio
openai = AsyncOpenAI()
kugelaudio = KugelAudio(api_key="YOUR_API_KEY")
async def chat_with_voice(user_message: str):
# Stream response from GPT-4
stream = await openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
stream=True,
)
# Stream tokens to TTS
async with kugelaudio.tts.streaming_session(
voice_id=123,
model="kugel-1-turbo",
) as session:
async for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
# Send token to TTS
async for audio_chunk in session.send(token):
play_audio(audio_chunk.audio)
# Flush remaining audio
async for audio_chunk in session.flush():
play_audio(audio_chunk.audio)
asyncio.run(chat_with_voice("Tell me a short story"))
Anthropic Claude
Copy
import asyncio
from anthropic import AsyncAnthropic
from kugelaudio import KugelAudio
anthropic = AsyncAnthropic()
kugelaudio = KugelAudio(api_key="YOUR_API_KEY")
async def chat_with_claude(user_message: str):
async with kugelaudio.tts.streaming_session(
voice_id=123,
model="kugel-1-turbo",
) as session:
# Stream from Claude
async with anthropic.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for text in stream.text_stream:
async for audio_chunk in session.send(text):
play_audio(audio_chunk.audio)
# Flush remaining
async for audio_chunk in session.flush():
play_audio(audio_chunk.audio)
Sentence Buffering
For more natural speech, buffer text until sentence boundaries:Copy
import re
class SentenceBuffer:
def __init__(self):
self.buffer = ""
def add(self, text: str) -> list[str]:
"""Add text and return complete sentences."""
self.buffer += text
# Find sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', self.buffer)
if len(sentences) > 1:
# Return all complete sentences
complete = sentences[:-1]
self.buffer = sentences[-1]
return complete
return []
def flush(self) -> str:
"""Return remaining text."""
remaining = self.buffer
self.buffer = ""
return remaining
async def chat_with_sentence_buffering(user_message: str):
buffer = SentenceBuffer()
async with kugelaudio.tts.streaming_session(
voice_id=123,
model="kugel-1-turbo",
) as session:
stream = await openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
# Buffer until sentence complete
for sentence in buffer.add(token):
async for audio in session.send(sentence + " "):
play_audio(audio.audio)
# Flush remaining buffer
remaining = buffer.flush()
if remaining:
async for audio in session.send(remaining):
play_audio(audio.audio)
async for audio in session.flush():
play_audio(audio.audio)
Handling Interruptions
Allow users to interrupt the AI mid-speech:Copy
import asyncio
class InterruptibleConversation:
def __init__(self, kugelaudio_client, openai_client):
self.kugelaudio = kugelaudio_client
self.openai = openai_client
self.is_speaking = False
self.should_stop = False
async def speak(self, text: str):
self.is_speaking = True
self.should_stop = False
try:
async with self.kugelaudio.tts.streaming_session(
voice_id=123,
model="kugel-1-turbo",
) as session:
async for audio in session.send(text):
if self.should_stop:
break
play_audio(audio.audio)
if not self.should_stop:
async for audio in session.flush():
if self.should_stop:
break
play_audio(audio.audio)
finally:
self.is_speaking = False
def interrupt(self):
"""Call this when user starts speaking."""
if self.is_speaking:
self.should_stop = True
stop_audio_playback()
# Usage
conversation = InterruptibleConversation(kugelaudio, openai)
# When user starts speaking (detected by VAD)
conversation.interrupt()
Optimizing Latency
1. Pre-warm Connections
Copy
# Pre-establish WebSocket connection
session = await kugelaudio.tts.streaming_session(
voice_id=123,
model="kugel-1-turbo",
).__aenter__()
# Keep session open for multiple turns
for user_message in conversation:
llm_response = get_llm_response(user_message)
async for audio in session.send(llm_response):
play_audio(audio.audio)
async for audio in session.flush():
play_audio(audio.audio)
2. Use the Right Model
Copy
# For real-time conversations, use turbo
tts = kugelaudio.TTS(model="kugel-1-turbo") # ~39ms TTFA
# For pre-recorded content, use premium
tts = kugelaudio.TTS(model="kugel-1") # ~77ms TTFA, higher quality
3. Tune CFG Scale
Copy
# Lower CFG = faster, slightly less expressive
session = await kugelaudio.tts.streaming_session(
voice_id=123,
cfg_scale=1.5, # Faster than default 2.0
)
4. Parallel Processing
Copy
async def process_turn(user_message: str):
# Start TTS session immediately
session_task = asyncio.create_task(
kugelaudio.tts.streaming_session(voice_id=123).__aenter__()
)
# Get LLM response in parallel
llm_task = asyncio.create_task(
get_llm_stream(user_message)
)
session = await session_task
llm_stream = await llm_task
# Now stream with minimal delay
async for token in llm_stream:
async for audio in session.send(token):
play_audio(audio.audio)
Complete Example
Here’s a complete voice assistant using OpenAI and KugelAudio:Copy
import asyncio
from openai import AsyncOpenAI
from kugelaudio import KugelAudio
import pyaudio
import numpy as np
class VoiceAssistant:
def __init__(self):
self.openai = AsyncOpenAI()
self.kugelaudio = KugelAudio(api_key="YOUR_API_KEY")
self.audio_player = AudioPlayer()
self.conversation = []
async def chat(self, user_message: str):
# Add to conversation history
self.conversation.append({
"role": "user",
"content": user_message
})
# Stream from GPT-4
stream = await self.openai.chat.completions.create(
model="gpt-4o",
messages=self.conversation,
stream=True,
)
# Collect full response for history
full_response = ""
# Stream to TTS
async with self.kugelaudio.tts.streaming_session(
voice_id=123,
model="kugel-1-turbo",
cfg_scale=2.0,
) as session:
async for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
full_response += token
# Send to TTS
async for audio in session.send(token):
self.audio_player.play(audio.audio)
# Flush remaining
async for audio in session.flush():
self.audio_player.play(audio.audio)
# Add to history
self.conversation.append({
"role": "assistant",
"content": full_response
})
return full_response
class AudioPlayer:
def __init__(self, sample_rate=24000):
self.p = pyaudio.PyAudio()
self.stream = self.p.open(
format=pyaudio.paInt16,
channels=1,
rate=sample_rate,
output=True,
)
def play(self, audio_bytes: bytes):
self.stream.write(audio_bytes)
def close(self):
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
# Usage
async def main():
assistant = VoiceAssistant()
response = await assistant.chat("Hello! Tell me a joke.")
print(f"Assistant: {response}")
response = await assistant.chat("That was funny! Tell me another one.")
print(f"Assistant: {response}")
asyncio.run(main())