Real-Time Streaming ASR
About 1162 wordsAbout 4 min
2026-03-21
Real-time transcription requires a different architecture than file-based transcription. Instead of feeding a complete recording, audio arrives as a continuous stream of chunks that must be transcribed with minimal latency.
Architecture Overview
Microphone → [Ring Buffer] → VAD State Machine → [Speech Buffer]
↓
faster-whisper (segment)
↓
Transcript CallbackThe VAD state machine is the key component: it decides when to flush the speech buffer to Whisper and keeps silence from wasting inference cycles.
VAD + faster-whisper Pipeline
# realtime_asr.py
import threading
import queue
import numpy as np
import sounddevice as sd
from faster_whisper import WhisperModel
# ---------- VAD (minimal silero wrapper) ----------
import torch
class SileroVAD:
def __init__(self, sr: int = 16000, threshold: float = 0.5):
self.sr = sr
self.threshold = threshold
model, utils = torch.hub.load(
repo_or_dir="snakers4/silero-vad",
model="silero_vad",
force_reload=False,
)
self.model = model
self._get_speech_ts = utils[0]
self.model.eval()
def is_speech(self, chunk: np.ndarray) -> bool:
"""chunk: float32 numpy array at self.sr Hz"""
tensor = torch.tensor(chunk, dtype=torch.float32)
with torch.no_grad():
prob = self.model(tensor, self.sr).item()
return prob > self.threshold
# ---------- Streamer ----------
class RealtimeASR:
def __init__(self,
model_size: str = "base.en",
device: str = "cpu",
compute_type: str = "int8",
sample_rate: int = 16000,
chunk_ms: int = 30,
silence_ms: int = 600,
speech_pad_ms: int = 200,
on_transcript=None):
"""
Args:
model_size: faster-whisper model size
chunk_ms: mic chunk size in ms (30ms recommended for VAD)
silence_ms: ms of silence to trigger transcription
speech_pad_ms: ms of audio to prepend/append to each segment
on_transcript: callback(text: str, is_final: bool)
"""
self.sr = sample_rate
self.chunk_size = int(sample_rate * chunk_ms / 1000)
self.silence_chunks = int(silence_ms / chunk_ms)
self.pad_chunks = int(speech_pad_ms / chunk_ms)
self.on_transcript = on_transcript or (lambda text, final: print(f"[{'FINAL' if final else 'partial'}] {text}"))
print(f"Loading Whisper {model_size}...")
self.asr = WhisperModel(model_size, device=device, compute_type=compute_type)
self.vad = SileroVAD(sr=sample_rate)
self._audio_q: queue.Queue[np.ndarray] = queue.Queue()
self._running = False
self._speech_buffer: list[np.ndarray] = []
self._silence_count = 0
self._is_speaking = False
# ---------- Transcription ----------
def _transcribe_buffer(self, is_final: bool = True):
if not self._speech_buffer:
return
audio = np.concatenate(self._speech_buffer)
if len(audio) < self.sr * 0.3: # skip < 300ms
return
segments, _ = self.asr.transcribe(
audio,
beam_size=3 if is_final else 1,
language="en",
condition_on_previous_text=False,
vad_filter=False, # we already did VAD
)
text = " ".join(s.text.strip() for s in segments).strip()
if text:
self.on_transcript(text, is_final)
# ---------- Processing loop ----------
def _process_loop(self):
while self._running:
try:
chunk = self._audio_q.get(timeout=0.1)
except queue.Empty:
continue
speech = self.vad.is_speech(chunk)
if speech:
self._silence_count = 0
if not self._is_speaking:
self._is_speaking = True
# Add padding from before speech started
self._speech_buffer.append(chunk)
else:
if self._is_speaking:
self._speech_buffer.append(chunk) # trailing pad
self._silence_count += 1
if self._silence_count >= self.silence_chunks:
# End of utterance → transcribe
self._transcribe_buffer(is_final=True)
self._speech_buffer.clear()
self._is_speaking = False
self._silence_count = 0
# ---------- Microphone callback ----------
def _mic_callback(self, indata, frames, time_info, status):
chunk = indata[:, 0].copy()
self._audio_q.put(chunk)
# ---------- Public API ----------
def start(self):
self._running = True
self._worker = threading.Thread(target=self._process_loop, daemon=True)
self._worker.start()
self._stream = sd.InputStream(
samplerate=self.sr,
channels=1,
dtype="float32",
blocksize=self.chunk_size,
callback=self._mic_callback,
)
self._stream.start()
print("Listening... (Ctrl+C to stop)")
def stop(self):
self._running = False
self._stream.stop()
self._stream.close()
self._worker.join(timeout=3)
# Flush any remaining audio
self._transcribe_buffer(is_final=True)
if __name__ == "__main__":
import time
transcripts = []
def on_result(text, is_final):
if is_final:
transcripts.append(text)
print(f">> {text}")
asr = RealtimeASR(
model_size="base.en",
on_transcript=on_result,
)
try:
asr.start()
while True:
time.sleep(0.1)
except KeyboardInterrupt:
asr.stop()
print("\nFull transcript:")
print(" ".join(transcripts))RealtimeSTT (Higher-Level API)
# realtimestt_example.py
from RealtimeSTT import AudioToTextRecorder
def on_realtime(text: str):
print(f"\r{text} ", end="", flush=True)
def on_final(text: str):
print(f"\n>> {text}")
recorder = AudioToTextRecorder(
model="base.en",
language="en",
spinner=False,
silero_sensitivity=0.4, # VAD sensitivity (0–1)
webrtc_sensitivity=2, # WebRTC VAD aggressiveness (0–3)
min_length_of_recording=0.5, # seconds
min_gap_between_recordings=0.3, # seconds
on_realtime_transcription_update=on_realtime,
on_realtime_transcription_stabilized=on_final,
)
print("Speak now (Ctrl+C to stop)...")
try:
while True:
recorder.text(on_final) # blocks until speech detected
except KeyboardInterrupt:
recorder.stop()Vosk Partial Results (Word-Level Live Output)
# vosk_streaming.py
import sounddevice as sd
import queue
import json
from vosk import Model, KaldiRecognizer
MODEL_PATH = "vosk-model-small-en-us-0.15"
SAMPLE_RATE = 16000
model = Model(MODEL_PATH)
rec = KaldiRecognizer(model, SAMPLE_RATE)
rec.SetWords(True)
audio_queue: queue.Queue[bytes] = queue.Queue()
def mic_callback(indata, frames, time_info, status):
audio_queue.put(bytes(indata))
last_partial = ""
with sd.RawInputStream(
samplerate=SAMPLE_RATE,
blocksize=8000,
dtype="int16",
channels=1,
callback=mic_callback,
):
print("Listening with Vosk... (Ctrl+C to stop)")
try:
while True:
data = audio_queue.get()
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
if result.get("text"):
print(f"\n>> {result['text']}")
last_partial = ""
else:
partial = json.loads(rec.PartialResult()).get("partial", "")
if partial != last_partial:
print(f"\r... {partial} ", end="", flush=True)
last_partial = partial
except KeyboardInterrupt:
passAsync Producer-Consumer Pattern
For integration with async frameworks (FastAPI, asyncio):
# async_asr.py
import asyncio
import numpy as np
from faster_whisper import WhisperModel
import sounddevice as sd
class AsyncRealtimeASR:
def __init__(self, model_size: str = "base.en"):
self.model = WhisperModel(model_size, device="cpu", compute_type="int8")
self._queue: asyncio.Queue = None
self.sample_rate = 16000
self.chunk_ms = 30
async def transcribe_stream(self, audio_gen):
"""
Async generator: yields (text, is_final) tuples.
audio_gen: async iterator yielding float32 numpy chunks
"""
buffer = []
silence_count = 0
MAX_SILENCE = 20 # 20 × 30ms = 600ms
async for chunk in audio_gen:
energy = np.sqrt(np.mean(chunk ** 2))
is_speech = energy > 0.01
if is_speech:
silence_count = 0
buffer.append(chunk)
elif buffer:
silence_count += 1
buffer.append(chunk)
if silence_count >= MAX_SILENCE:
audio = np.concatenate(buffer)
segments, _ = await asyncio.to_thread(
lambda a: self.model.transcribe(a, beam_size=3),
audio,
)
text = " ".join(s.text.strip() for s in segments)
if text.strip():
yield text, True
buffer.clear()
silence_count = 0
else:
# Emit interim partial
if len(buffer) % 10 == 0:
partial_audio = np.concatenate(buffer)
segs, _ = await asyncio.to_thread(
lambda a: self.model.transcribe(a, beam_size=1),
partial_audio,
)
text = " ".join(s.text.strip() for s in segs)
if text.strip():
yield text, False
async def mic_source(sr: int = 16000, chunk_ms: int = 30):
"""Async generator that yields mic chunks."""
q: asyncio.Queue = asyncio.Queue()
chunk_size = int(sr * chunk_ms / 1000)
def callback(indata, frames, t, status):
q.put_nowait(indata[:, 0].copy())
import sounddevice as sd
with sd.InputStream(samplerate=sr, channels=1, dtype="float32",
blocksize=chunk_size, callback=callback):
try:
while True:
chunk = await q.get()
yield chunk
except asyncio.CancelledError:
pass
async def main():
asr = AsyncRealtimeASR("base.en")
async for text, is_final in asr.transcribe_stream(mic_source()):
prefix = "FINAL" if is_final else "partial"
print(f"[{prefix}] {text}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
passLatency Optimization Tips
| Technique | Latency Reduction | Trade-off |
|---|---|---|
beam_size=1 (greedy) | ~50% faster | Slightly lower WER |
compute_type="int8" | 2× faster on CPU | Negligible accuracy loss |
tiny.en model | 10× faster than large-v3 | Lower accuracy |
| Reduce chunk size (30ms→20ms) | More responsive VAD | More tiny fragments |
condition_on_previous_text=False | Avoids hallucination loops | Less context continuity |
| Stop at 30s max buffer | Prevents runaway latency | May cut long utterances |
vad_filter=False (manual VAD done) | Skip redundant VAD pass | Must do your own VAD |
Chunk Size Guidelines
chunk_ms | Latency | VAD Accuracy | CPU Usage
----------+-----------+----------------+-----------
10ms | Lowest | Poor | High
30ms | Low | Good | Medium ← recommended
50ms | Medium | Better | Lower
100ms | High | Best | Very lowFor Silero VAD: 30ms is the minimum supported chunk size at 16kHz (480 samples).
See Also
- ASR Implementation (File-Based)
- ASR Integration Guide
- Real-Time Streaming VAD — The VAD state machine described there feeds utterance audio directly into the ASR pipeline here
- Real-Time Streaming TTS — TTS playback runs concurrently; VAD barge-in detection must interrupt ongoing TTS
- VAD Real-Time Streaming
- ASR Cheatsheet