Skip to main content
This guide covers best practices for integrating KugelAudio with large language models (LLMs) like GPT-4, Claude, and others to create real-time voice applications.

Overview

When building voice agents, the typical flow is:
  1. User speaks → Speech-to-Text (STT)
  2. LLM processes → Generates response tokens
  3. TTS streams → Audio plays as tokens arrive
KugelAudio’s streaming capabilities minimize the latency between LLM output and audio playback.

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   User      │────▶│    STT      │────▶│    LLM      │
│   Speech    │     │  (Whisper)  │     │  (GPT-4)    │
└─────────────┘     └─────────────┘     └──────┬──────┘

                                               │ tokens

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Speaker   │◀────│   Buffer    │◀────│ KugelAudio  │
│   Output    │     │  & Play     │     │    TTS      │
└─────────────┘     └─────────────┘     └─────────────┘

Basic Integration

OpenAI GPT-4

import asyncio
from openai import AsyncOpenAI
from kugelaudio import KugelAudio

openai = AsyncOpenAI()
kugelaudio = KugelAudio(api_key="YOUR_API_KEY")

async def chat_with_voice(user_message: str):
    # Stream response from GPT-4
    stream = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        stream=True,
    )
    
    # Stream tokens to TTS
    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model_id="kugel-1-turbo",
    ) as session:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                
                # Send token to TTS
                async for audio_chunk in session.send(token):
                    play_audio(audio_chunk.audio)
        
        # Flush remaining audio
        async for audio_chunk in session.flush():
            play_audio(audio_chunk.audio)

asyncio.run(chat_with_voice("Tell me a short story"))

Anthropic Claude

import asyncio
from anthropic import AsyncAnthropic
from kugelaudio import KugelAudio

anthropic = AsyncAnthropic()
kugelaudio = KugelAudio(api_key="YOUR_API_KEY")

async def chat_with_claude(user_message: str):
    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model_id="kugel-1-turbo",
    ) as session:
        # Stream from Claude
        async with anthropic.messages.stream(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": user_message}],
        ) as stream:
            async for text in stream.text_stream:
                async for audio_chunk in session.send(text):
                    play_audio(audio_chunk.audio)
        
        # Flush remaining
        async for audio_chunk in session.flush():
            play_audio(audio_chunk.audio)

Spelling Out Text

Use <spell> tags to spell out text letter by letter. This is useful when your LLM needs to communicate:
  • Email addresses
  • Codes or serial numbers
  • Acronyms
  • Any text that should be pronounced character by character

Prompting Your LLM

Add instructions to your system prompt:
SYSTEM_PROMPT = """You are a helpful assistant. When you need to spell out text 
(like email addresses, codes, or acronyms), wrap it in <spell> tags.

Examples:
- "My email is <spell>kajo@kugelaudio.com</spell>"
- "The code is <spell>ABC123</spell>"
- "That stands for <spell>API</spell>, Application Programming Interface"
"""

async def chat_with_spelling(user_message: str):
    stream = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_message}
        ],
        stream=True,
    )
    
    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model_id="kugel-1-turbo",
        normalize=True,  # Required for spell tags
        language="en",
    ) as session:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                async for audio in session.send(token):
                    play_audio(audio.audio)
        
        async for audio in session.flush():
            play_audio(audio.audio)
Streaming Safety: Spell tags work seamlessly with streaming. If the LLM streams <spell>test@ in one chunk and example.com</spell> in the next, the TTS system automatically buffers until the tag is complete. If a connection drops mid-tag, the tag is auto-closed so the content still gets spelled out.
Model recommendation: For clearer letter-by-letter pronunciation, use model_id="kugel-1" instead of kugel-1-turbo.

Language-Specific Spelling

Special characters are translated based on the language:
CharacterEnglishGermanFrenchSpanish
@atätarobasearroba
.dotPunktpointpunto
-dashStrichtiretguión
_underscoreUnterstrichunderscoreguión bajo

How the Server Buffers Text

You do not need to implement client-side sentence buffering. The KugelAudio server already accumulates LLM tokens internally and only starts generation once it has a natural sentence boundary (or a configurable minimum character count). This means you can forward raw LLM tokens one-by-one and the server will produce natural, fluent speech automatically.
The server’s text buffer follows this priority order:
  1. First chunk — waits for at least two complete sentences so the TTS model has enough context for natural prosody from the very start.
  2. Follow-up bundle — immediately after, any remaining complete sentences are bundled into one large second chunk.
  3. Accumulate — subsequent text is buffered without splitting.
  4. Buffer pressure — if the buffer exceeds 500 characters, it splits at the best available boundary.
  5. Flush / close — remaining text is emitted when you close the session.

What to avoid: explicit per-sentence flushing

Do not call session.send(sentence, flush=True) or session.flush() between individual sentences. Each explicit flush creates a hard turn boundary on the server, which causes a perceptible silence gap between segments — even though the KV cache is preserved. For long-form content this can make the output sound choppy, similar to playing back separate audio clips.Measured impact: word-level flushing is 3-5× slower to first audio per segment compared to sentence-level flushing, and sentence-level is 2-3× slower than flushing the full turn at once.
Recommended pattern — stream tokens directly, flush only at the end of the turn:
async def chat_with_voice(user_message: str):
    stream = await openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        stream=True,
    )

    async with kugelaudio.tts.streaming_session(
        voice_id=123,
        model_id="kugel-1-turbo",
    ) as session:
        # Forward every LLM token directly — the server accumulates them
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                async for audio_chunk in session.send(chunk.choices[0].delta.content):
                    play_audio(audio_chunk.audio)

        # Single flush at the very end of the turn to emit any trailing text
        async for audio_chunk in session.flush():
            play_audio(audio_chunk.audio)

Tuning server-side auto-chunking

Two StreamConfig parameters let you control how eagerly the server starts generating — no client-side flushing required:
ParameterTypeDefaultEffect
chunk_length_schedulelist[int][5, 80, 150, 250]Minimum buffer size (chars) before each successive chunk is auto-emitted
auto_modeboolFalseStart at the very first clean sentence boundary (equivalent to ElevenLabs auto_mode=true)
Low-latency preset (fastest first audio, slightly less prosody context):
async with kugelaudio.tts.streaming_session(
    voice_id=123,
    model_id="kugel-1-turbo",
    auto_mode=True,
    chunk_length_schedule=[50, 100, 150, 250],
) as session:
    async for chunk in llm_stream:
        if chunk.choices[0].delta.content:
            async for audio in session.send(chunk.choices[0].delta.content):
                play_audio(audio.audio)
    async for audio in session.flush():
        play_audio(audio.audio)
High-quality preset (more prosody context, slightly higher TTFA):
async with kugelaudio.tts.streaming_session(
    voice_id=123,
    model_id="kugel-1-turbo",
    chunk_length_schedule=[120, 200, 300],
) as session:
    ...
Start with the default schedule and only lower it if TTFA matters more than prosody quality for your use-case. The schedule entries apply to chunks 0, 1, 2, … in order; the last entry repeats for all remaining chunks.

When explicit flushing IS appropriate

Use send(text, flush=True) or an intermediate session.flush() only when you know the entire sentence or logical phrase is complete, for example when your application receives complete utterances (not token streams) from the LLM:
# Good: flush only at natural turn boundaries
async def speak_turn(session, complete_turn_text: str):
    async for audio_chunk in session.send(complete_turn_text):
        play_audio(audio_chunk.audio)
    async for audio_chunk in session.flush():
        play_audio(audio_chunk.audio)

Handling Interruptions

Allow users to interrupt the AI mid-speech:
import asyncio

class InterruptibleConversation:
    def __init__(self, kugelaudio_client, openai_client):
        self.kugelaudio = kugelaudio_client
        self.openai = openai_client
        self.is_speaking = False
        self.should_stop = False
    
    async def speak(self, text: str):
        self.is_speaking = True
        self.should_stop = False
        
        try:
            async with self.kugelaudio.tts.streaming_session(
                voice_id=123,
                model_id="kugel-1-turbo",
            ) as session:
                async for audio in session.send(text):
                    if self.should_stop:
                        break
                    play_audio(audio.audio)
                
                if not self.should_stop:
                    async for audio in session.flush():
                        if self.should_stop:
                            break
                        play_audio(audio.audio)
        finally:
            self.is_speaking = False
    
    def interrupt(self):
        """Call this when user starts speaking."""
        if self.is_speaking:
            self.should_stop = True
            stop_audio_playback()

# Usage
conversation = InterruptibleConversation(kugelaudio, openai)

# When user starts speaking (detected by VAD)
conversation.interrupt()

Optimizing Latency

1. Pre-warm Connections

# Pre-establish WebSocket connection
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    model_id="kugel-1-turbo",
).__aenter__()

# Keep session open for multiple turns
for user_message in conversation:
    llm_response = get_llm_response(user_message)
    async for audio in session.send(llm_response):
        play_audio(audio.audio)
    async for audio in session.flush():
        play_audio(audio.audio)

2. Use the Right Model

# For real-time conversations, use turbo
tts = kugelaudio.TTS(model_id="kugel-1-turbo")  # ~39ms TTFA

# For pre-recorded content, use premium
tts = kugelaudio.TTS(model_id="kugel-1")  # ~77ms TTFA, higher quality

3. Use auto_mode for the lowest TTFA

# Emit audio at the first clean sentence boundary
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    auto_mode=True,
    chunk_length_schedule=[50, 100, 150, 250],
)

4. Tune CFG Scale

# Lower CFG = faster, slightly less expressive
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    cfg_scale=1.5,  # Faster than default 2.0
)

5. Optimize Per-Segment Latency

For real-time voice agents, per-segment latency (time between sentences) matters. Use optimize_streaming_latency to halve the diffusion steps, reducing per-segment TTFA by ~40-50%:
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    auto_mode=True,
    optimize_streaming_latency=True,  # ~2x faster per-segment
)
For fine-grained control, set diffusion steps explicitly:
session = await kugelaudio.tts.streaming_session(
    voice_id=123,
    num_diffusion_steps=5,  # fewer steps = lower latency
)

6. Parallel Processing

async def process_turn(user_message: str):
    # Start TTS session immediately
    session_task = asyncio.create_task(
        kugelaudio.tts.streaming_session(voice_id=123).__aenter__()
    )
    
    # Get LLM response in parallel
    llm_task = asyncio.create_task(
        get_llm_stream(user_message)
    )
    
    session = await session_task
    llm_stream = await llm_task
    
    # Now stream with minimal delay
    async for token in llm_stream:
        async for audio in session.send(token):
            play_audio(audio.audio)

Complete Example

Here’s a complete voice assistant using OpenAI and KugelAudio:
import asyncio
from openai import AsyncOpenAI
from kugelaudio import KugelAudio
import pyaudio
import numpy as np

class VoiceAssistant:
    def __init__(self):
        self.openai = AsyncOpenAI()
        self.kugelaudio = KugelAudio(api_key="YOUR_API_KEY")
        self.audio_player = AudioPlayer()
        self.conversation = []
    
    async def chat(self, user_message: str):
        # Add to conversation history
        self.conversation.append({
            "role": "user",
            "content": user_message
        })
        
        # Stream from GPT-4
        stream = await self.openai.chat.completions.create(
            model="gpt-4o",
            messages=self.conversation,
            stream=True,
        )
        
        # Collect full response for history
        full_response = ""
        
        # Stream to TTS
        async with self.kugelaudio.tts.streaming_session(
            voice_id=123,
            model_id="kugel-1-turbo",
            cfg_scale=2.0,
        ) as session:
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    token = chunk.choices[0].delta.content
                    full_response += token
                    
                    # Send to TTS
                    async for audio in session.send(token):
                        self.audio_player.play(audio.audio)
            
            # Flush remaining
            async for audio in session.flush():
                self.audio_player.play(audio.audio)
        
        # Add to history
        self.conversation.append({
            "role": "assistant",
            "content": full_response
        })
        
        return full_response

class AudioPlayer:
    def __init__(self, sample_rate=24000):
        self.p = pyaudio.PyAudio()
        self.stream = self.p.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=sample_rate,
            output=True,
        )
    
    def play(self, audio_bytes: bytes):
        self.stream.write(audio_bytes)
    
    def close(self):
        self.stream.stop_stream()
        self.stream.close()
        self.p.terminate()

# Usage
async def main():
    assistant = VoiceAssistant()
    
    response = await assistant.chat("Hello! Tell me a joke.")
    print(f"Assistant: {response}")
    
    response = await assistant.chat("That was funny! Tell me another one.")
    print(f"Assistant: {response}")

asyncio.run(main())

Next Steps

Streaming

Advanced streaming techniques

Text Processing

Normalization and spell tags reference