Skip to main content
Streaming is essential for real-time applications. This guide covers advanced streaming patterns and best practices.

Why Streaming?

Traditional TTS generates the entire audio before returning it. Streaming returns audio chunks as they’re generated, providing:
  • Lower latency: First audio arrives in ~39ms instead of waiting for full generation
  • Better UX: Users hear audio immediately while more is being generated
  • LLM integration: Process text token-by-token as it arrives from language models

Streaming Patterns

Pattern 1: Simple Streaming

The simplest pattern - stream a complete text:
for chunk in client.tts.stream(
    text="Hello, this is streaming audio.",
    model="kugel-1-turbo",
):
    if hasattr(chunk, 'audio'):
        play_audio(chunk.audio)

Pattern 2: LLM Token Streaming

Stream text token-by-token as it arrives from an LLM:
async def stream_from_llm(llm_response):
    async with client.tts.streaming_session(
        voice_id=123,
        cfg_scale=2.0,
        flush_timeout_ms=500,  # Auto-flush after 500ms of silence
    ) as session:
        # Stream tokens as they arrive
        async for token in llm_response:
            async for chunk in session.send(token):
                play_audio(chunk.audio)
        
        # Flush remaining text
        async for chunk in session.flush():
            play_audio(chunk.audio)

Pattern 3: Sentence-Based Streaming

Buffer text until sentence boundaries for more natural speech:
import re

def split_sentences(text: str) -> list[str]:
    """Split text into sentences."""
    return re.split(r'(?<=[.!?])\s+', text)

async def stream_by_sentence(llm_response):
    buffer = ""
    
    async with client.tts.streaming_session(voice_id=123) as session:
        async for token in llm_response:
            buffer += token
            
            # Check for complete sentences
            sentences = split_sentences(buffer)
            
            # Process all complete sentences
            for sentence in sentences[:-1]:
                async for chunk in session.send(sentence + " "):
                    play_audio(chunk.audio)
            
            # Keep incomplete sentence in buffer
            buffer = sentences[-1] if sentences else ""
        
        # Flush remaining buffer
        if buffer:
            async for chunk in session.send(buffer):
                play_audio(chunk.audio)
        
        async for chunk in session.flush():
            play_audio(chunk.audio)

WebSocket Protocol

KugelAudio Native Protocol

The native WebSocket endpoint is at /ws/tts/stream:
import websockets
import json

async def native_streaming():
    async with websockets.connect(
        "wss://api.kugelaudio.com/ws/tts/stream",
        extra_headers={"Authorization": "Bearer YOUR_API_KEY"}
    ) as ws:
        # 1. Send initial config
        await ws.send(json.dumps({
            "voice_id": 123,
            "cfg_scale": 2.0,
            "sample_rate": 24000,
        }))
        
        # 2. Send text chunks
        for text in ["Hello, ", "this is ", "streaming."]:
            await ws.send(json.dumps({"text": text}))
        
        # 3. Signal end of text
        await ws.send(json.dumps({"flush": True}))
        await ws.send(json.dumps({"close": True}))
        
        # 4. Receive audio chunks
        async for message in ws:
            data = json.loads(message)
            
            if "audio" in data:
                # Base64-encoded PCM16 audio
                audio_bytes = base64.b64decode(data["audio"])
                play_audio(audio_bytes)
            
            if data.get("final"):
                print(f"Generation complete: {data}")
                break

Message Types

Client → Server:
MessageDescription
{"voice_id": 123, "cfg_scale": 2.0}Initial configuration
{"text": "chunk"}Text to synthesize
{"flush": true}Force generation of buffered text
{"close": true}Close the session
Server → Client:
MessageDescription
{"audio": "base64...", "idx": 0}Audio chunk
{"generation_started": true, "text": "..."}Generation started
{"final": true, "dur_ms": 1234}Generation complete
{"session_closed": true}Session closed

Audio Playback

Browser Playback

import { decodePCM16 } from 'kugelaudio';

const audioContext = new AudioContext();
const scheduledTime = audioContext.currentTime;

function playChunk(chunk: AudioChunk) {
  const float32Data = decodePCM16(chunk.audio);
  
  const audioBuffer = audioContext.createBuffer(
    1, // mono
    float32Data.length,
    chunk.sampleRate
  );
  audioBuffer.copyToChannel(float32Data, 0);
  
  const source = audioContext.createBufferSource();
  source.buffer = audioBuffer;
  source.connect(audioContext.destination);
  
  // Schedule playback
  source.start(scheduledTime);
  scheduledTime += audioBuffer.duration;
}

Python Playback with PyAudio

import pyaudio
import threading
import queue

class AudioPlayer:
    def __init__(self, sample_rate=24000):
        self.sample_rate = sample_rate
        self.queue = queue.Queue()
        self.p = pyaudio.PyAudio()
        self.stream = self.p.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=sample_rate,
            output=True,
        )
        self.running = True
        self.thread = threading.Thread(target=self._play_loop)
        self.thread.start()
    
    def _play_loop(self):
        while self.running:
            try:
                audio = self.queue.get(timeout=0.1)
                self.stream.write(audio)
            except queue.Empty:
                continue
    
    def play(self, audio_bytes: bytes):
        self.queue.put(audio_bytes)
    
    def close(self):
        self.running = False
        self.thread.join()
        self.stream.stop_stream()
        self.stream.close()
        self.p.terminate()

# Usage
player = AudioPlayer()

for chunk in client.tts.stream(text="Hello!", model="kugel-1-turbo"):
    if hasattr(chunk, 'audio'):
        player.play(chunk.audio)

player.close()

Performance Tips

1. Pre-warm Connections

For latency-critical applications, pre-establish WebSocket connections:
# Pre-warm the connection
session = await client.tts.streaming_session(voice_id=123).__aenter__()

# Later, when you need to generate
async for chunk in session.send("Hello!"):
    play_audio(chunk.audio)

2. Use Appropriate Chunk Sizes

The server automatically chunks text at natural boundaries. For custom control:
# Let the server handle chunking (recommended)
await session.send(long_text)

# Or chunk manually for more control
for sentence in split_sentences(long_text):
    await session.send(sentence)

3. Handle Backpressure

If audio arrives faster than you can play it:
import asyncio

async def stream_with_backpressure():
    buffer = asyncio.Queue(maxsize=10)  # Limit buffer size
    
    async def producer():
        async for chunk in client.tts.stream_async(text=text, model=model):
            if hasattr(chunk, 'audio'):
                await buffer.put(chunk.audio)
        await buffer.put(None)  # Signal end
    
    async def consumer():
        while True:
            audio = await buffer.get()
            if audio is None:
                break
            play_audio(audio)
            await asyncio.sleep(len(audio) / 2 / 24000)  # Simulate playback time
    
    await asyncio.gather(producer(), consumer())

4. Monitor Metrics

Track streaming performance:
import time

start_time = time.time()
first_audio_time = None
total_audio_ms = 0

for chunk in client.tts.stream(text=text, model=model):
    if hasattr(chunk, 'audio'):
        if first_audio_time is None:
            first_audio_time = time.time()
            ttfa = (first_audio_time - start_time) * 1000
            print(f"Time to first audio: {ttfa:.0f}ms")
        
        total_audio_ms += len(chunk.audio) / 2 / 24000 * 1000

total_time = (time.time() - start_time) * 1000
rtf = total_time / total_audio_ms
print(f"Real-time factor: {rtf:.2f}x")

Error Handling

import websockets

async def robust_streaming():
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            async for chunk in client.tts.stream_async(
                text="Hello!",
                model="kugel-1-turbo",
            ):
                if hasattr(chunk, 'audio'):
                    play_audio(chunk.audio)
            break  # Success
            
        except websockets.ConnectionClosed as e:
            if attempt < max_retries - 1:
                print(f"Connection closed, retrying... ({attempt + 1}/{max_retries})")
                await asyncio.sleep(1)
            else:
                raise
        
        except Exception as e:
            print(f"Streaming error: {e}")
            raise