Real Time Transcription
Argmax Pro SDK includes the WhisperKitPro
framework which implements an advanced streaming inference algorithm described in our ICML 2025 paper.
Key features:
- Accuracy in real-time mode is identical to pre-recorded transcription
- Dual output text streams: Build trust in stable and accurate results with Confirmed Text while maximizing responsiveness with Hypothesis Text.
- Streaming API design that exposes event-based callbacks, minimizing the burden on the caller
In the video above, Confirmed Text is rendered in white while Hypothesis Text is rendered in gray.
This page includes minimal CLI examples to help you get familiar with the Real-time Transcription API in Argmax Pro SDK.
If you are looking for an end-to-end example app, check out our open-source Argmax Playground project which is also published to TestFlight and used for the demo video above.
Introduction
Real-time transcription streams input audio and the corresponding output text continuously during a live recording session:
- Input audio stream: Capturing audio in small user-defined intervals
- Inference: Incremental speech-to-text inference on the input stream
- Output text streams:
- Confirmed: Finalized portion of the transcript that gets longer over time.
- Hypothesis: Preliminary transcript that may still be refined as more audio context arrives.
This approach creates an ultra low-latency user experience where words appear on the screen almost as they're spoken, with occasional refinements to the most recent words as the model gathers more context.
Migration from Cloud APIs. The Confirmed output text stream is also referred to as immutable
or final
in other products.
The Hypothesis output text stream is also referred to as mutable
or interim
in other products.
Real-time with Open-source SDK. Argmax Open-source SDK includes the WhisperKit
framework that includes basic building blocks to approximate the real-time behavior of the Pro SDK. An open-source example app that demonstrates real-time transcription with the Open-source SDK here.
Basic Example
This is a complete and self-contained CLI example project that demonstrates the usage of Argmax Pro SDK for real-time transcription from a microphone input stream.
Step 0: Verify Pro SDK setup
Argmax Pro SDK access must be set up with SwiftPM before going through this example. If unsure, please see Upgrading to Pro SDK (Step 1 only).
Step 1: Create project directory
Create a project directory as shown below and insert the code shared below into ArgmaxTestCommand.swift
and Package.swift
ArgmaxSDKRealTimeTranscriptionBasicExample
├── Package.swift
└── Sources
└── ArgmaxTestCLI
└── ArgmaxTestCommand.swift
Package.swift
:
// swift-tools-version: 5.10
// The swift-tools-version declares the minimum version of Swift required to build this package.
import PackageDescription
let package = Package(
name: "Argmax Test CLI",
platforms: [
.macOS(.v14)
],
products: [
.executable(
name: "argmax-test-cli",
targets: ["ArgmaxTestCLI"]
)
],
dependencies: [
.package(id: "argmaxinc.argmax-sdk-swift", .upToNextMinor(from: "1.7.0")),
.package(url: "https://github.com/apple/swift-argument-parser.git", exact: "1.3.0")
],
targets: [
.executableTarget(
name: "ArgmaxTestCLI",
dependencies: [
.product(name: "Argmax", package: "argmaxinc.argmax-sdk-swift"),
.product(name: "ArgumentParser", package: "swift-argument-parser")
]
),
]
)
ArgmaxTestCommand.swift
:
import Foundation
@preconcurrency import ArgumentParser
@preconcurrency import Argmax
import Combine
@main
struct ArgmaxTestCommand: AsyncParsableCommand {
static let configuration = CommandConfiguration(
abstract: "An example CLI tool for Argmax Pro SDK",
subcommands: [Transcribe.self]
)
struct Transcribe: AsyncParsableCommand {
static let configuration = CommandConfiguration(
abstract: "Real-time transcription using system microphone"
)
@Option(help: "Argmax Pro SDK API key")
var apiKey: String
@Option(help: "Model name: e.g. `parakeet-v2_476MB`, `tiny.en`, `large-v3-v20240930_626MB`. Default: `parakeet-v2_476MB`")
var modelName: String = "parakeet-v2_476MB"
@Option(help: "Mode: e.g. `alwaysOn`, `voiceTriggered`, `batteryOptimized`. Default: `voiceTriggered`")
var mode: String = "voiceTriggered"
func run() async throws {
print("Initializing Argmax Pro SDK...")
let sdkConfig = ArgmaxConfig(apiKey: apiKey)
// Temporarily disable keychain access for debug
sdkConfig.keychainPersistence = false
await ArgmaxSDK.with(sdkConfig)
print("Downloading \(modelName) model using ModelStore...")
let modelStore = ModelStore()
let repoType: RepoType
// Pick the right model repo
if modelName.lowercased().contains("parakeet") {
// Use Nvidia Parakeet models
repoType = .parakeetRepo
} else if #available(macOS 15, *) {
// Use Pro Whisper models
repoType = .proRepo
} else if #available(macOS 13, *) {
// Use Open-source Whisper models
repoType = .openSourceRepo
} else {
fatalError("Oldest supported macOS is 13.")
}
// Track download progress
let _ = modelStore.$progress.sink { progress in
if let progress = progress {
let percentage = Int(progress.fractionCompleted * 100)
print("\rDownload progress: \(percentage)%", terminator: "")
fflush(stdout)
}
}
let downloadURL = try await modelStore.downloadModel(
name: modelName,
repo: repoType
)
// To cancel download if needed:
// modelStore.cancelDownload()
let modelFolder = downloadURL.path(percentEncoded: false)
print("\nDownload completed: \(modelFolder)")
let liveTranscriber = try await setupLiveTranscriber(modelFolder: modelFolder)
// Set stream transcription mode
let streamMode: StreamTranscriptionMode
switch mode {
case "alwaysOn":
streamMode = .alwaysOn
case "voiceTriggered":
// minProcessInterval is 0 by default which is too fast for most use cases
streamMode = .voiceTriggered(minProcessInterval: 0.3)
case "batteryOptimized":
streamMode = .batteryOptimized
default:
streamMode = .voiceTriggered(minProcessInterval: 0.3)
}
print("\nStream mode: \(streamMode)")
try await transcribeStream(liveTranscriber: liveTranscriber, mode: streamMode)
}
private func setupLiveTranscriber(modelFolder: String) async throws -> LiveTranscriber {
print("Initializing WhisperKit Pro...")
let whisperConfig = WhisperKitProConfig(
modelFolder: modelFolder,
verbose: true,
logLevel: .debug
)
let whisperKitPro = try await WhisperKitPro(whisperConfig)
print("Loading WhisperKit Pro models... It may take up to 1 minute during first load after download")
try await whisperKitPro.loadModels()
print("Creating LiveTranscriber...")
let liveTranscriber = LiveTranscriber(whisperKit: whisperKitPro)
return liveTranscriber
}
private func transcribeStream(liveTranscriber: LiveTranscriber, mode: StreamTranscriptionMode) async throws {
print("Transcribing while streaming audio from microphone...")
// Create Argmax stream source for device (microphone)
// On iOS: pass nil to use default microphone
// let deviceSource = ArgmaxSource(streamType: .device())
// Or on macOS: pass AudioDeviceID to select specific input device,
// use AudioProcessor.getAudioDevices() to list avaialble devices
let macbookMicrophone = AudioProcessor.getAudioDevices().first(where: { $0.name == "MacBook Pro Microphone" })
let deviceSource = ArgmaxSource(streamType: .device(macbookMicrophone?.id))
// Configure decoding options
let options = DecodingOptionsPro(
base: .init(
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1,
streamTranscriptionMode: mode,
alignTimestampsToGlobal: true
)
// Register stream with options and optional audio callback
try await liveTranscriber.registerStream(
streamSource: deviceSource,
options: options,
audioCallback: { audioData in
// Optional: Process audio data for visualization, etc.
// print("Received \(audioData.count) audio samples")
}
)
// Start transcription
let deviceResults = try await liveTranscriber.startTranscription(for: deviceSource)
// Consume results
let dateFormatter = DateFormatter()
dateFormatter.dateFormat = "HH:mm:ss.SSS"
let transcribeTask = Task {
var accumulatedConfirmedText = ""
for try await result in deviceResults {
let timestamp = dateFormatter.string(from: Date())
var hypothesisText = ""
// LiveResult parameters:
// - text: Transcribed text (confirmed or hypothesis)
// - seconds: Elapsed time since transcription started (monotonic)
// - result: Complete TranscriptionResultPro (segments, timings, metadata)
switch result {
case .confirm(let text, let seconds, let result):
accumulatedConfirmedText += " " + text
case .hypothesis(let text, let seconds, let result):
hypothesisText = text
}
print("[\(timestamp)] \(accumulatedConfirmedText)\u{001B}[34m\(hypothesisText)\u{001B}[0m")
}
return accumulatedConfirmedText
}
// Captures ^+c signal from terminal, stop recording and clean up
signal(SIGINT, SIG_IGN)
let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: DispatchQueue.main)
signalSource.setEventHandler(handler: DispatchWorkItem(block: {
print("Stop recording...")
print("Finalizing transcription...")
Task.detached {
try? await liveTranscriber.stopAndRemoveStream(for: deviceSource)
let accumulatedConfirmedText = try! await transcribeTask.value
print("\n\nTranscription: \n\n\(accumulatedConfirmedText)\n")
}
}))
signalSource.resume()
let _ = try! await transcribeTask.value
}
}
}
Step 2: Build and run in Terminal
Run the following command in your Terminal from within the top-level project directory:
Example usage:
swift run argmax-test-cli transcribe --api-key <API_KEY>
If you observe error: no registry configured for 'argmaxinc' scope
, go back to Step 0.
Here is an example output upon successful build and launch with --model-name large-v3-v20240930_turbo
:
Advanced Example
The key differences between the Basic Example and Advanced Example are:
Feature | Basic Example | Advanced Example |
---|---|---|
Instance Management | Automatically manages WhisperKitPro instance via LiveTranscriber | Manual setup and configuration of WhisperKitPro |
Session Lifecycle | Handles session creation and lifecycle | Manual TranscribeStreamSession lifecycle management |
Audio Source Abstractions | Provides convenient abstractions (ArgmaxSource , CustomSource ) | Direct access to audioProcessor for custom audio handling |
Cleanup | Simplifies cleanup with stopAndRemoveAllTranscriptions() | Manual cleanup of streams and sessions |
API Surface | User-friendly, higher-level API | Lower-level, more flexible but requires more code |
For most use cases, the LiveTranscriber
API is recommended. Use the low-level API when you need the additional control and are comfortable managing the complexity.
To set up the Advanced Example, simply modify the ArgmaxTestCommand.swift
file from the Basic Example to the following:
ArgmaxTestCommand.swift
import Foundation
@preconcurrency import ArgumentParser
@preconcurrency import Argmax
import Combine
@main
struct ArgmaxTestCommand: AsyncParsableCommand {
static let configuration = CommandConfiguration(
abstract: "An example CLI tool for Argmax Pro SDK",
subcommands: [Transcribe.self]
)
struct Transcribe: AsyncParsableCommand {
static let configuration = CommandConfiguration(
abstract: "Real-time transcription using system microphone"
)
@Option(help: "Argmax Pro SDK API key")
var apiKey: String
@Option(help: "Model name: e.g. `parakeet-v2_476MB`, `tiny.en`, `large-v3-v20240930_626MB`. Default: `parakeet-v2_476MB`")
var modelName: String = "parakeet-v2_476MB"
@Option(help: "Mode: e.g. `alwaysOn`, `voiceTriggered`, `batteryOptimized`. Default: `voiceTriggered`")
var mode: String = "voiceTriggered"
func run() async throws {
print("Initializing Argmax Pro SDK...")
let sdkConfig = ArgmaxConfig(apiKey: apiKey)
// Temporarily disable keychain access for debug
sdkConfig.keychainPersistence = false
await ArgmaxSDK.with(sdkConfig)
print("Downloading \(modelName) model using ModelStore...")
let modelStore = ModelStore()
let repoType: RepoType
// Pick the right model repo
if modelName.lowercased().contains("parakeet") {
// Use Nvidia Parakeet models
repoType = .parakeetRepo
} else if #available(macOS 15, *) {
// Use Pro Whisper models
repoType = .proRepo
} else if #available(macOS 13, *) {
// Use Open-source Whisper models
repoType = .openSourceRepo
} else {
fatalError("Oldest supported macOS is 13.")
}
// Track download progress
let _ = modelStore.$progress.sink { progress in
if let progress = progress {
let percentage = Int(progress.fractionCompleted * 100)
print("\rDownload progress: \(percentage)%", terminator: "")
fflush(stdout)
}
}
let downloadURL = try await modelStore.downloadModel(
name: modelName,
repo: repoType
)
// To cancel download if needed:
// modelStore.cancelDownload()
let modelFolder = downloadURL.path(percentEncoded: false)
print("\nDownload completed: \(modelFolder)")
let whisperKitPro = try await setupWhisperKitPro(modelFolder: modelFolder)
// Set stream transcription mode
let streamMode: StreamTranscriptionMode
switch mode {
case "alwaysOn":
streamMode = .alwaysOn
case "voiceTriggered":
streamMode = .voiceTriggered(minProcessInterval: 0.3)
case "batteryOptimized":
streamMode = .batteryOptimized
default:
streamMode = .voiceTriggered(minProcessInterval: 0.3) // fallback to default
}
print("\nStream mode: \(streamMode)")
try await transcribeStream(whisperKitPro: whisperKitPro)
}
private func setupWhisperKitPro(modelFolder: String) async throws -> WhisperKitPro {
let config = WhisperKitProConfig(
modelFolder: modelFolder,
verbose: false,
logLevel: .debug
)
let whisperKitPro = try await WhisperKitPro(config)
return whisperKitPro
}
private func transcribeStream(whisperKitPro: WhisperKitPro) async throws {
print("Transcribing with low-level WhisperKitPro API...")
// 1. Configure decoding options
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
skipSpecialTokens: true,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// 2. Create audio stream
let (stream, continuation) = whisperKitPro.audioProcessor.startStreamingRecordingLive()
// 3. Create transcription session
let session = whisperKitPro.makeStreamSession(options: options)
// 4. Start processing
await session.start(audioInputStream: stream)
// 5. Process results
let dateFormatter = DateFormatter()
dateFormatter.dateFormat = "HH:mm:ss.SSS"
let transcribeTask = Task {
for try await result in session.results {
let timestamp = dateFormatter.string(from: Date())
if let hypothesis = result.hypothesisText {
print("[\(timestamp)] Hypothesis: \(hypothesis)")
}
if !result.text.isEmpty {
print("[\(timestamp)] Confirmed: \(result.text)")
}
// // Access word-level timestamps for words in Hypothesis Text
// if !result.hypothesisSegments.isEmpty {
// print("\nHypothesis Text Word Timestamps:")
// result.hypothesisSegments.forEach { segment in
// segment.words?.forEach { word in
// print("\(word) - \(word.start), \(word.end))")
// }
// }
// }
}
}
// 6. Handle termination signal
signal(SIGINT, SIG_IGN)
let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: DispatchQueue.main)
signalSource.setEventHandler(handler: DispatchWorkItem(block: {
print("Stop recording...")
Task {
// Cleanup - finish the stream
continuation.finish()
}
}))
signalSource.resume()
// Wait for transcription task to complete
try await transcribeTask.value
}
}
}
Advanced Features
Modes
Real-time Transcription continuously processes audio which is suboptimal in cases where there is no active talker. Downsides may include:
- False positive predictions from background noise
- Device resources being utilized indiscriminately
For this purpose, we have built StreamTranscriptionMode
which lets developers set adaptive behavior based on input audio and other use case-related intent.
.voiceTriggered
Voice-triggered mode processes audio only when there is sufficiently high energy in the input audio. To see it in action, please refer to CLI example video above.
This mode's behavior is customizable with silenceThreshold
and maxBufferLength
but the default values work for a wide range of use cases.
.batteryOptimized
Battery-optimized mode is built for use cases where battery life and thermal sustainability are the top optimization objective. This mode is built on top of the voice-triggered mode but it inserts additional adaptive delays to throttle inference speed while keeping latency as low as possible.
When should I use this mode? Use this mode when end-users run real-time transcription for more than 2+ hours/day or 30+ minutes/session. This mode is designed to avoid:
- End-user frustration with battery life when used for 2+ hours/day.
- Device heat-up after 30+ minutes of continuous operation
.alwaysOn
This mode disables adaptive mode and is not recommended unless there is a specific use case requirement for it.
Background Processing on iOS
Real-time transcription can be sustained even after your app is backgrounded if the following is inserted into your app's Info.plist
:
+ <key>UIBackgroundModes</key>
+ <array>
+ <string>audio</string>
+ </array>
This works for whisperKitPro.audioProcessor.startStreamingRecordingLive()
because it uses AVAudioSession
under the hood. Other audio sources built with AVAudioSession
will also work in the background with this change.
Limitation. Recording must begin while the app is in the foreground. Audio interruptions (e.g. incoming call, Siri invocations etc.) while in the background can not be resumed because of this reason. Please see Handling Audio Interruptions - Apple if you want to learn more.
Multiple Audio Streams
This feature allows multiple input audio streams to be real-time transcribed by the same LiveTranscriber
instance. An example use case is concurrent real-time transcription of system audio and microphone for meeting transcriptions.
Before implementing multi-stream transcription, ensure that the ArgmaxTestCommand
from Step 1 works correctly, particularly its transcribeStream
function which demonstrates the basic LiveTranscriber usage.
Multi-Stream Architecture
The same LiveTranscriber
instance can efficiently handle multiple audio streams simultaneously. Each stream gets its own registered source that shares the same LiveTranscriber
instance but maintains independent processing context, allowing them to run concurrently without interference.
Example Implementation
private func transcribeMultipleStreams() async throws {
// 1. Setup LiveTranscriber (same as single stream example)
let liveTranscriber = try await setupLiveTranscriber()
// 2. Create stream sources for each audio input
// System audio stream (custom stream - for capturing system/app audio)
let (systemStream, systemContinuation) = createSystemAudioStream() // Your implementation
let systemSource = CustomSource(
id: "system-audio",
audioStream: systemStream,
audioContinuation: systemContinuation
)
// Device microphone stream (using built-in device source)
let deviceSource = ArgmaxSource(streamType: .device())
// 3. Configure decoding options
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
skipSpecialTokens: true,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// 4. Register both streams with LiveTranscriber
try await liveTranscriber.registerStream(streamSource: systemSource, options: options)
try await liveTranscriber.registerStream(streamSource: deviceSource, options: options)
// 5. Start transcription and get results stream for each source
let systemResults = try await liveTranscriber.startTranscription(for: systemSource)
let deviceResults = try await liveTranscriber.startTranscription(for: deviceSource)
// 6. Process results from both streams concurrently
// LiveResult parameters:
// - text: Transcribed text (confirmed or hypothesis)
// - seconds: Elapsed time since transcription started (monotonic)
// - result: Complete TranscriptionResultPro (segments, timings, metadata)
try await withTaskGroup(of: Void.self) { group in
// System audio results processing
group.addTask {
for try await result in systemResults {
switch result {
case .confirm(let text, let seconds, let result):
print("[SYSTEM] Confirmed: \(text) at \(seconds)s")
case .hypothesis(let text, let seconds, let result):
print("[SYSTEM] Hypothesis: \(text) at \(seconds)s")
}
}
}
// Device microphone results processing
group.addTask {
for try await result in deviceResults {
switch result {
case .confirm(let text, let seconds, let result):
print("[DEVICE] Confirmed: \(text) at \(seconds)s")
case .hypothesis(let text, let seconds, let result):
print("[DEVICE] Hypothesis: \(text) at \(seconds)s")
}
}
}
// Wait for all processing to complete
try await group.waitForAll()
}
// 7. Cleanup - stop and remove all transcriptions
try await liveTranscriber.stopAndRemoveAllTranscriptions()
}
Key Considerations
-
Stream Management: Each audio stream requires its own
ArgmaxSource
(either built-in like.device()
or custom likeCustomSource
) registered with theLiveTranscriber
. -
Individual Results Streams: Each source gets its own results stream from
startTranscription(for: source)
- no need to filter by stream ID. -
Resource Management: The shared
LiveTranscriber
instance efficiently manages computational resources across all streams while maintaining independent processing contexts. -
Graceful Termination: Call
stopAndRemoveAllTranscriptions()
to stop and remove all registered streams at once, ensuring proper cleanup and graceful termination of transcription sessions.
Migrate from transcribeWhileRecording
(deprecating soon)
If you're currently using the older WhisperKitPro.transcribeWhileRecording
API, here's how to migrate to the new stream-based approach for better performance and cleaner code.
Old API Pattern (whisperKitPro.transcribeWhileRecording
and audioProcessor.startRecordingLive
)
// Old approach - manual buffer management
private func oldTranscribeStream(whisperKitPro: WhisperKitPro) async throws {
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// Manual buffer management
var audioBuffer: [Float] = []
let lock = NSLock()
try whisperKitPro.audioProcessor.startRecordingLive { samples in
lock.withLock {
audioBuffer.append(contentsOf: samples)
}
}
// Create recording task with callbacks
let recordingTask = whisperKitPro.transcribeWhileRecording(
options: options,
audioCallback: {
let samples = lock.withLock {
let samples = audioBuffer
audioBuffer.removeAll()
return samples
}
return AudioSamples(samples: samples)
},
resultCallback: { result in
print("Text: \(result.text)")
if let hypothesis = result.hypothesisText {
print("Hypothesis: \(hypothesis)")
}
return true
}
)
// Start and finalize
try await recordingTask.start()
let results = try await recordingTask.finalize()
}
New API Pattern (whisperKitPro.makeStreamSession
and audioProcessor.startStreamingRecordingLive
)
// New approach - stream-based with automatic management
private func newTranscribeStream(whisperKitPro: WhisperKitPro) async throws {
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// Create stream (automatic audio handling)
let (stream, continuation) = whisperKitPro.audioProcessor.startStreamingRecordingLive()
// Create session
let session = whisperKitPro.makeStreamSession(options: options)
// Start processing
await session.start(audioInputStream: stream)
// Process results with async iteration
for try await result in session.results {
print("Text: \(result.text)")
if let hypothesis = result.hypothesisText {
print("Hypothesis: \(hypothesis)")
}
}
// Graceful termination
continuation.finish()
}
Migration Benefits
- Simplified Architecture: No manual buffer management or thread synchronization required
- Better Performance: Reduced memory overhead and improved audio processing efficiency
- Cleaner Error Handling: Automatic stream lifecycle management reduces potential failure points
- Async/Await Integration: Modern Swift concurrency support with
for try await
result iteration - Resource Management: Automatic cleanup and better resource utilization
Key Migration Steps
- Replace
audioProcessor.startRecordingLive()
withaudioProcessor.startStreamingRecordingLive()
- Replace
whisperKitPro.transcribeWhileRecording()
withwhisperKitPro.makeStreamSession()
- Remove manual buffer management (audio buffers, locks, callback handling)
- Replace
resultCallback
withfor try await result in session.results
- Replace
recordingTask.start()
andfinalize()
withsession.start()
andcontinuation.finish()
- Update error handling to use Swift's structured concurrency patterns