Skip to main content
This guide covers best practices for integrating KugelAudio with large language models (LLMs) like GPT-4, Claude, and others to create real-time voice applications.

Overview

When building voice agents, the typical flow is:
  1. User speaks → Speech-to-Text (STT)
  2. LLM processes → Generates response tokens
  3. TTS streams → Audio plays as tokens arrive
KugelAudio’s streaming capabilities minimize the latency between LLM output and audio playback.

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   User      │────▶│    STT      │────▶│    LLM      │
│   Speech    │     │  (Whisper)  │     │  (GPT-4)    │
└─────────────┘     └─────────────┘     └──────┬──────┘

                                               │ tokens

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Speaker   │◀────│   Buffer    │◀────│ KugelAudio  │
│   Output    │     │  & Play     │     │    TTS      │
└─────────────┘     └─────────────┘     └─────────────┘

Basic Integration

OpenAI GPT-4

import asyncio
from openai import AsyncOpenAI
from kugelaudio import KugelAudio

openai = AsyncOpenAI()
kugelaudio = KugelAudio(api_key="YOUR_API_KEY")

async def chat_with_voice(user_message: str):
    # Stream response from GPT-4
    stream = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        stream=True,
    )
    
    # Stream tokens to TTS
    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model="kugel-1-turbo",
    ) as session:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                
                # Send token to TTS
                async for audio_chunk in session.send(token):
                    play_audio(audio_chunk.audio)
        
        # Flush remaining audio
        async for audio_chunk in session.flush():
            play_audio(audio_chunk.audio)

asyncio.run(chat_with_voice("Tell me a short story"))

Anthropic Claude

import asyncio
from anthropic import AsyncAnthropic
from kugelaudio import KugelAudio

anthropic = AsyncAnthropic()
kugelaudio = KugelAudio(api_key="YOUR_API_KEY")

async def chat_with_claude(user_message: str):
    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model="kugel-1-turbo",
    ) as session:
        # Stream from Claude
        async with anthropic.messages.stream(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": user_message}],
        ) as stream:
            async for text in stream.text_stream:
                async for audio_chunk in session.send(text):
                    play_audio(audio_chunk.audio)
        
        # Flush remaining
        async for audio_chunk in session.flush():
            play_audio(audio_chunk.audio)

Sentence Buffering

For more natural speech, buffer text until sentence boundaries:
import re

class SentenceBuffer:
    def __init__(self):
        self.buffer = ""
    
    def add(self, text: str) -> list[str]:
        """Add text and return complete sentences."""
        self.buffer += text
        
        # Find sentence boundaries
        sentences = re.split(r'(?<=[.!?])\s+', self.buffer)
        
        if len(sentences) > 1:
            # Return all complete sentences
            complete = sentences[:-1]
            self.buffer = sentences[-1]
            return complete
        
        return []
    
    def flush(self) -> str:
        """Return remaining text."""
        remaining = self.buffer
        self.buffer = ""
        return remaining

async def chat_with_sentence_buffering(user_message: str):
    buffer = SentenceBuffer()
    
    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model="kugel-1-turbo",
    ) as session:
        stream = await openai.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": user_message}],
            stream=True,
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                
                # Buffer until sentence complete
                for sentence in buffer.add(token):
                    async for audio in session.send(sentence + " "):
                        play_audio(audio.audio)
        
        # Flush remaining buffer
        remaining = buffer.flush()
        if remaining:
            async for audio in session.send(remaining):
                play_audio(audio.audio)
        
        async for audio in session.flush():
            play_audio(audio.audio)

Handling Interruptions

Allow users to interrupt the AI mid-speech:
import asyncio

class InterruptibleConversation:
    def __init__(self, kugelaudio_client, openai_client):
        self.kugelaudio = kugelaudio_client
        self.openai = openai_client
        self.is_speaking = False
        self.should_stop = False
    
    async def speak(self, text: str):
        self.is_speaking = True
        self.should_stop = False
        
        try:
            async with self.kugelaudio.tts.streaming_session(
                voice_id=123,
                model="kugel-1-turbo",
            ) as session:
                async for audio in session.send(text):
                    if self.should_stop:
                        break
                    play_audio(audio.audio)
                
                if not self.should_stop:
                    async for audio in session.flush():
                        if self.should_stop:
                            break
                        play_audio(audio.audio)
        finally:
            self.is_speaking = False
    
    def interrupt(self):
        """Call this when user starts speaking."""
        if self.is_speaking:
            self.should_stop = True
            stop_audio_playback()

# Usage
conversation = InterruptibleConversation(kugelaudio, openai)

# When user starts speaking (detected by VAD)
conversation.interrupt()

Optimizing Latency

1. Pre-warm Connections

# Pre-establish WebSocket connection
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    model="kugel-1-turbo",
).__aenter__()

# Keep session open for multiple turns
for user_message in conversation:
    llm_response = get_llm_response(user_message)
    async for audio in session.send(llm_response):
        play_audio(audio.audio)
    async for audio in session.flush():
        play_audio(audio.audio)

2. Use the Right Model

# For real-time conversations, use turbo
tts = kugelaudio.TTS(model="kugel-1-turbo")  # ~39ms TTFA

# For pre-recorded content, use premium
tts = kugelaudio.TTS(model="kugel-1")  # ~77ms TTFA, higher quality

3. Tune CFG Scale

# Lower CFG = faster, slightly less expressive
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    cfg_scale=1.5,  # Faster than default 2.0
)

4. Parallel Processing

async def process_turn(user_message: str):
    # Start TTS session immediately
    session_task = asyncio.create_task(
        kugelaudio.tts.streaming_session(voice_id=123).__aenter__()
    )
    
    # Get LLM response in parallel
    llm_task = asyncio.create_task(
        get_llm_stream(user_message)
    )
    
    session = await session_task
    llm_stream = await llm_task
    
    # Now stream with minimal delay
    async for token in llm_stream:
        async for audio in session.send(token):
            play_audio(audio.audio)

Complete Example

Here’s a complete voice assistant using OpenAI and KugelAudio:
import asyncio
from openai import AsyncOpenAI
from kugelaudio import KugelAudio
import pyaudio
import numpy as np

class VoiceAssistant:
    def __init__(self):
        self.openai = AsyncOpenAI()
        self.kugelaudio = KugelAudio(api_key="YOUR_API_KEY")
        self.audio_player = AudioPlayer()
        self.conversation = []
    
    async def chat(self, user_message: str):
        # Add to conversation history
        self.conversation.append({
            "role": "user",
            "content": user_message
        })
        
        # Stream from GPT-4
        stream = await self.openai.chat.completions.create(
            model="gpt-4o",
            messages=self.conversation,
            stream=True,
        )
        
        # Collect full response for history
        full_response = ""
        
        # Stream to TTS
        async with self.kugelaudio.tts.streaming_session(
            voice_id=123,
            model="kugel-1-turbo",
            cfg_scale=2.0,
        ) as session:
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    token = chunk.choices[0].delta.content
                    full_response += token
                    
                    # Send to TTS
                    async for audio in session.send(token):
                        self.audio_player.play(audio.audio)
            
            # Flush remaining
            async for audio in session.flush():
                self.audio_player.play(audio.audio)
        
        # Add to history
        self.conversation.append({
            "role": "assistant",
            "content": full_response
        })
        
        return full_response

class AudioPlayer:
    def __init__(self, sample_rate=24000):
        self.p = pyaudio.PyAudio()
        self.stream = self.p.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=sample_rate,
            output=True,
        )
    
    def play(self, audio_bytes: bytes):
        self.stream.write(audio_bytes)
    
    def close(self):
        self.stream.stop_stream()
        self.stream.close()
        self.p.terminate()

# Usage
async def main():
    assistant = VoiceAssistant()
    
    response = await assistant.chat("Hello! Tell me a joke.")
    print(f"Assistant: {response}")
    
    response = await assistant.chat("That was funny! Tell me another one.")
    print(f"Assistant: {response}")

asyncio.run(main())

Next Steps