Skip to main content
Streaming is essential for real-time applications. This guide covers advanced streaming patterns and best practices.

Why Streaming?

Traditional TTS generates the entire audio before returning it. Streaming returns audio chunks as they’re generated, providing:
  • Lower latency: First audio arrives in ~39ms instead of waiting for full generation
  • Better UX: Users hear audio immediately while more is being generated
  • LLM integration: Process text token-by-token as it arrives from language models

Streaming Patterns

Pattern 1: Simple Streaming

The simplest pattern - stream a complete text:
for chunk in client.tts.stream(
    text="Hello, this is streaming audio.",
    model_id="kugel-1-turbo",
):
    if hasattr(chunk, 'audio'):
        play_audio(chunk.audio)

Pattern 2: LLM Token Streaming

Stream text token-by-token as it arrives from an LLM:
async def stream_from_llm(llm_response):
    async with client.tts.streaming_session(
        voice_id=123,
        cfg_scale=2.0,
        flush_timeout_ms=500,  # Auto-flush after 500ms of silence
    ) as session:
        # Stream tokens as they arrive
        async for token in llm_response:
            async for chunk in session.send(token):
                play_audio(chunk.audio)
        
        # Flush remaining text
        async for chunk in session.flush():
            play_audio(chunk.audio)

Pattern 3: Spelling Out Text

Use <spell> tags to spell out text letter by letter (requires normalize: true):
# Spell out email addresses, codes, or abbreviations
text = "Contact us at <spell>[email protected]</spell> for help."

for chunk in client.tts.stream(
    text=text,
    model_id="kugel-1-turbo",
    normalize=True,
    language="en",
):
    if hasattr(chunk, 'audio'):
        play_audio(chunk.audio)

# Output: "Contact us at S, U, P, P, O, R, T, at, K, U, G, E, L..."
Streaming with Spell Tags: When streaming text token-by-token, spell tags that span multiple chunks are automatically handled. The system buffers text until the closing </spell> tag arrives before generating audio. If the stream ends unexpectedly, incomplete tags are auto-closed.
Model recommendation: For clearer letter-by-letter pronunciation, use modelId: "kugel-1" instead of kugel-1-turbo.

Pattern 4: Sentence-Based Streaming

Buffer text until sentence boundaries for more natural speech:
import re

def split_sentences(text: str) -> list[str]:
    """Split text into sentences."""
    return re.split(r'(?<=[.!?])\s+', text)

async def stream_by_sentence(llm_response):
    buffer = ""
    
    async with client.tts.streaming_session(voice_id=123) as session:
        async for token in llm_response:
            buffer += token
            
            # Check for complete sentences
            sentences = split_sentences(buffer)
            
            # Process all complete sentences
            for sentence in sentences[:-1]:
                async for chunk in session.send(sentence + " "):
                    play_audio(chunk.audio)
            
            # Keep incomplete sentence in buffer
            buffer = sentences[-1] if sentences else ""
        
        # Flush remaining buffer
        if buffer:
            async for chunk in session.send(buffer):
                play_audio(chunk.audio)
        
        async for chunk in session.flush():
            play_audio(chunk.audio)

Multi-Context Streaming

For advanced use cases like multi-speaker conversations or pre-buffering audio, use the multi-context WebSocket endpoint. This allows managing up to 5 independent audio streams over a single connection.

Use Cases

  • Multi-speaker conversations: Generate audio for different speakers concurrently
  • Pre-buffering: Start generating the next response while the current one plays
  • Interleaved audio: Dynamically switch between speakers in real-time

Example

import asyncio
import websockets
import json
import base64

async def multi_speaker_demo():
    async with websockets.connect(
        "wss://api.kugelaudio.com/ws/tts/multi?api_key=YOUR_API_KEY"
    ) as ws:
        # Initialize narrator context
        await ws.send(json.dumps({
            "text": " ",
            "context_id": "narrator",
            "voice_settings": {"voice_id": 123},
        }))
        
        # Create character context
        await ws.send(json.dumps({
            "text": " ",
            "context_id": "character",
            "voice_settings": {"voice_id": 456},
        }))
        
        # Send text to different speakers
        await ws.send(json.dumps({
            "text": "The story begins.",
            "context_id": "narrator",
            "flush": True,
        }))
        
        await ws.send(json.dumps({
            "text": "Hello, I'm the main character!",
            "context_id": "character",
            "flush": True,
        }))
        
        # Receive audio from both contexts
        async for message in ws:
            data = json.loads(message)
            
            if "audio" in data:
                context_id = data["context_id"]
                audio_bytes = base64.b64decode(data["audio"])
                play_audio(context_id, audio_bytes)
            
            if data.get("session_closed"):
                break
        
        # Close when done
        await ws.send(json.dumps({"close_socket": True}))

asyncio.run(multi_speaker_demo())

Multi-Context Protocol

Endpoint: /ws/tts/multi Client → Server Messages:
MessageDescription
{"text": " ", "context_id": "ctx1", "voice_settings": {"voice_id": 123}}Initialize context with voice
{"text": "Hello", "context_id": "ctx1"}Send text to context
{"text": "...", "context_id": "ctx1", "flush": true}Send text and flush buffer
{"flush": true, "context_id": "ctx1"}Flush context buffer
{"close_context": true, "context_id": "ctx1"}Close specific context
{"text": "", "context_id": "ctx1"}Keep-alive (reset inactivity timeout)
{"close_socket": true}Close all contexts and connection
Server → Client Messages:
MessageDescription
{"context_created": true, "context_id": "ctx1"}Context created
{"generation_started": true, "context_id": "ctx1", "chunk_id": 0, "text": "..."}Generation started for text chunk
{"audio": "base64...", "enc": "pcm_s16le", "context_id": "ctx1", "idx": 0, "sr": 24000, "samples": 4800}Audio chunk with context
{"chunk_complete": true, "context_id": "ctx1", "chunk_id": 0, "audio_seconds": 1.2}Text chunk generation complete
{"word_timestamps": [...], "context_id": "ctx1", "chunk_id": 0}Word-level time alignments (when enabled)
{"is_final": true, "context_id": "ctx1"}All generation complete for context
{"context_closed": true, "context_id": "ctx1"}Context closed
{"session_closed": true, "total_audio_seconds": 5.4}Session ended with stats

Limits

  • Maximum 5 concurrent contexts per connection
  • Contexts auto-close after 20 seconds of inactivity
  • Send empty text {"text": "", "context_id": "..."} to reset timeout

WebSocket Protocol

KugelAudio Native Protocol

The single-stream WebSocket endpoint is at /ws/tts/stream:
import asyncio
import websockets
import json
import base64

async def native_streaming():
    async with websockets.connect(
        "wss://api.kugelaudio.com/ws/tts/stream",
        extra_headers={"Authorization": "Bearer YOUR_API_KEY"}
    ) as ws:
        # 1. Send initial config
        await ws.send(json.dumps({
            "voice_id": 123,
            "model_id": "kugel-1-turbo",
            "cfg_scale": 2.0,
            "sample_rate": 24000,
        }))
        
        # 2. Send text chunks
        for text in ["Hello, ", "this is ", "streaming."]:
            await ws.send(json.dumps({"text": text}))
        
        # 3. Signal end of text
        await ws.send(json.dumps({"flush": True}))
        await ws.send(json.dumps({"close": True}))
        
        # 4. Receive audio chunks
        async for message in ws:
            data = json.loads(message)
            
            if "audio" in data:
                audio_bytes = base64.b64decode(data["audio"])
                play_audio(audio_bytes)
            
            if data.get("session_closed"):
                print(f"Session closed: {data}")
                break

asyncio.run(native_streaming())

Message Types

Client → Server:
MessageDescription
{"voice_id": 123, "model_id": "kugel-1-turbo", "cfg_scale": 2.0, "word_timestamps": true}Initial configuration
{"text": "chunk"}Text to synthesize
{"flush": true}Force generation of buffered text
{"close": true}Close the session
Server → Client:
MessageDescription
{"generation_started": true, "chunk_id": 0, "text": "..."}Generation started for a text chunk
{"audio": "base64...", "enc": "pcm_s16le", "idx": 0, "sr": 24000, "samples": 4800, "chunk_id": 0}Audio chunk
{"chunk_complete": true, "chunk_id": 0, "audio_seconds": 1.2, "gen_ms": 150}Text chunk generation complete
{"word_timestamps": [...], "chunk_id": 0}Word-level time alignments for a chunk (when word_timestamps: true)
{"session_closed": true, "total_audio_seconds": 5.4, ...}Session closed with stats

Word-Level Timestamps

When word_timestamps: true is set in the initial configuration, the server performs forced alignment on each generated audio chunk and sends a word_timestamps message shortly after the corresponding audio. Each timestamp contains:
{
  "word_timestamps": [
    {"word": "Hello", "start_ms": 0, "end_ms": 320, "char_start": 0, "char_end": 5, "score": 0.98},
    {"word": "world", "start_ms": 350, "end_ms": 680, "char_start": 7, "char_end": 12, "score": 0.95}
  ],
  "chunk_id": 0
}
FieldTypeDescription
wordstringThe aligned word
start_msintStart time in milliseconds (relative to chunk start)
end_msintEnd time in milliseconds (relative to chunk start)
char_startintStart character offset in the original text
char_endintEnd character offset in the original text
scorefloatAlignment confidence score (0.0 - 1.0)
Word timestamps add no extra audio latency. The alignment model runs on the same GPU as TTS and timestamps arrive ~50-200ms after the corresponding audio chunk.

Audio Playback

import { decodePCM16 } from 'kugelaudio';

const audioContext = new AudioContext();
let scheduledTime = audioContext.currentTime;

function playChunk(chunk: AudioChunk) {
  const float32Data = decodePCM16(chunk.audio);
  
  const audioBuffer = audioContext.createBuffer(
    1, // mono
    float32Data.length,
    chunk.sampleRate
  );
  audioBuffer.copyToChannel(float32Data, 0);
  
  const source = audioContext.createBufferSource();
  source.buffer = audioBuffer;
  source.connect(audioContext.destination);
  
  // Schedule playback
  source.start(scheduledTime);
  scheduledTime += audioBuffer.duration;
}

Performance Tips

1. Pre-warm Connections

For latency-critical applications, pre-establish WebSocket connections:
# Pre-warm the connection
session = await client.tts.streaming_session(voice_id=123).__aenter__()

# Later, when you need to generate
async for chunk in session.send("Hello!"):
    play_audio(chunk.audio)

2. Use Appropriate Chunk Sizes

The server automatically chunks text at natural boundaries. For custom control:
# Let the server handle chunking (recommended)
await session.send(long_text)

# Or chunk manually for more control
for sentence in split_sentences(long_text):
    await session.send(sentence)

3. Handle Backpressure

If audio arrives faster than you can play it:
import asyncio

async def stream_with_backpressure():
    buffer = asyncio.Queue(maxsize=10)  # Limit buffer size
    
    async def producer():
        async for chunk in client.tts.stream_async(text=text, model=model):
            if hasattr(chunk, 'audio'):
                await buffer.put(chunk.audio)
        await buffer.put(None)  # Signal end
    
    async def consumer():
        while True:
            audio = await buffer.get()
            if audio is None:
                break
            play_audio(audio)
            await asyncio.sleep(len(audio) / 2 / 24000)  # Simulate playback time
    
    await asyncio.gather(producer(), consumer())

4. Monitor Metrics

Track streaming performance:
import time

start_time = time.time()
first_audio_time = None
total_audio_ms = 0

for chunk in client.tts.stream(text=text, model=model):
    if hasattr(chunk, 'audio'):
        if first_audio_time is None:
            first_audio_time = time.time()
            ttfa = (first_audio_time - start_time) * 1000
            print(f"Time to first audio: {ttfa:.0f}ms")
        
        total_audio_ms += len(chunk.audio) / 2 / 24000 * 1000

total_time = (time.time() - start_time) * 1000
rtf = total_time / total_audio_ms
print(f"Real-time factor: {rtf:.2f}x")

Error Handling

import websockets

async def robust_streaming():
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            async for chunk in client.tts.stream_async(
                text="Hello!",
                model_id="kugel-1-turbo",
            ):
                if hasattr(chunk, 'audio'):
                    play_audio(chunk.audio)
            break  # Success
            
        except websockets.ConnectionClosed as e:
            if attempt < max_retries - 1:
                print(f"Connection closed, retrying... ({attempt + 1}/{max_retries})")
                await asyncio.sleep(1)
            else:
                raise
        
        except Exception as e:
            print(f"Streaming error: {e}")
            raise