Integrate KugelAudio with language models for real-time voice
This guide covers best practices for integrating KugelAudio with large language models (LLMs) like GPT-4, Claude, and others to create real-time voice applications.
import asynciofrom anthropic import AsyncAnthropicfrom kugelaudio import KugelAudioanthropic = AsyncAnthropic()kugelaudio = KugelAudio(api_key="YOUR_API_KEY")async def chat_with_claude(user_message: str): async with kugelaudio.tts.streaming_session( voice_id=123, model_id="kugel-1-turbo", ) as session: # Stream from Claude async with anthropic.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[{"role": "user", "content": user_message}], ) as stream: async for text in stream.text_stream: async for audio_chunk in session.send(text): play_audio(audio_chunk.audio) # Flush remaining async for audio_chunk in session.flush(): play_audio(audio_chunk.audio)
SYSTEM_PROMPT = """You are a helpful assistant. When you need to spell out text (like email addresses, codes, or acronyms), wrap it in <spell> tags.Examples:- "My email is <spell>kajo@kugelaudio.com</spell>"- "The code is <spell>ABC123</spell>"- "That stands for <spell>API</spell>, Application Programming Interface""""async def chat_with_spelling(user_message: str): stream = await openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_message} ], stream=True, ) async with kugelaudio.tts.streaming_session( voice_id=123, model_id="kugel-1-turbo", normalize=True, # Required for spell tags language="en", ) as session: async for chunk in stream: if chunk.choices[0].delta.content: token = chunk.choices[0].delta.content async for audio in session.send(token): play_audio(audio.audio) async for audio in session.flush(): play_audio(audio.audio)
Streaming Safety: Spell tags work seamlessly with streaming. If the LLM streams
<spell>test@ in one chunk and example.com</spell> in the next, the TTS system
automatically buffers until the tag is complete. If a connection drops mid-tag,
the tag is auto-closed so the content still gets spelled out.
Model recommendation: For clearer letter-by-letter pronunciation, use model_id="kugel-1" instead of kugel-1-turbo.
You do not need to implement client-side sentence buffering. The KugelAudio server already
accumulates LLM tokens internally and only starts generation once it has a natural sentence
boundary (or a configurable minimum character count). This means you can forward raw LLM tokens
one-by-one and the server will produce natural, fluent speech automatically.
The server’s text buffer follows this priority order:
First chunk — waits for at least two complete sentences so the TTS model has enough context for natural prosody from the very start.
Follow-up bundle — immediately after, any remaining complete sentences are bundled into one large second chunk.
Accumulate — subsequent text is buffered without splitting.
Buffer pressure — if the buffer exceeds 500 characters, it splits at the best available boundary.
Flush / close — remaining text is emitted when you close the session.
Do not call session.send(sentence, flush=True) or session.flush() between individual sentences.
Each explicit flush creates a hard turn boundary on the server, which causes a perceptible silence
gap between segments — even though the KV cache is preserved. For long-form content this can make
the output sound choppy, similar to playing back separate audio clips.Measured impact: word-level flushing is 3-5× slower to first audio per segment compared to
sentence-level flushing, and sentence-level is 2-3× slower than flushing the full turn at once.
Recommended pattern — stream tokens directly, flush only at the end of the turn:
async def chat_with_voice(user_message: str): stream = await openai.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": user_message}], stream=True, ) async with kugelaudio.tts.streaming_session( voice_id=123, model_id="kugel-1-turbo", ) as session: # Forward every LLM token directly — the server accumulates them async for chunk in stream: if chunk.choices[0].delta.content: async for audio_chunk in session.send(chunk.choices[0].delta.content): play_audio(audio_chunk.audio) # Single flush at the very end of the turn to emit any trailing text async for audio_chunk in session.flush(): play_audio(audio_chunk.audio)
Two StreamConfig parameters let you control how eagerly the server starts generating — no client-side flushing required:
Parameter
Type
Default
Effect
chunk_length_schedule
list[int]
[5, 80, 150, 250]
Minimum buffer size (chars) before each successive chunk is auto-emitted
auto_mode
bool
False
Start at the very first clean sentence boundary (equivalent to ElevenLabs auto_mode=true)
Low-latency preset (fastest first audio, slightly less prosody context):
async with kugelaudio.tts.streaming_session( voice_id=123, model_id="kugel-1-turbo", auto_mode=True, chunk_length_schedule=[50, 100, 150, 250],) as session: async for chunk in llm_stream: if chunk.choices[0].delta.content: async for audio in session.send(chunk.choices[0].delta.content): play_audio(audio.audio) async for audio in session.flush(): play_audio(audio.audio)
async with kugelaudio.tts.streaming_session( voice_id=123, model_id="kugel-1-turbo", chunk_length_schedule=[120, 200, 300],) as session: ...
Start with the default schedule and only lower it if TTFA matters more than prosody quality for your use-case. The schedule entries apply to chunks 0, 1, 2, … in order; the last entry repeats for all remaining chunks.
Use send(text, flush=True) or an intermediate session.flush() only when you know the entire sentence or logical phrase is complete, for example when your application receives complete utterances (not token streams) from the LLM:
# Good: flush only at natural turn boundariesasync def speak_turn(session, complete_turn_text: str): async for audio_chunk in session.send(complete_turn_text): play_audio(audio_chunk.audio) async for audio_chunk in session.flush(): play_audio(audio_chunk.audio)
# Pre-establish WebSocket connectionsession = await kugelaudio.tts.streaming_session( voice_id=123, model_id="kugel-1-turbo",).__aenter__()# Keep session open for multiple turnsfor user_message in conversation: llm_response = get_llm_response(user_message) async for audio in session.send(llm_response): play_audio(audio.audio) async for audio in session.flush(): play_audio(audio.audio)
For real-time voice agents, per-segment latency (time between sentences) matters. Use optimize_streaming_latency to halve the diffusion steps, reducing per-segment TTFA by ~40-50%: