Real Time Transcription
Context
Argmax Pro SDK includes the WhisperKitPro
framework which implements an advanced streaming inference algorithm described in our paper. Key features include:
- Accuracy is identical to offline file transcription
- Dual output text streams can be leveraged in the user experience to build trust in stable and accurate results (Confirmed) while maximizing responsiveness (Hypothesis).
- Streaming API design that exposes event-based callbacks, minimizing the burden on the caller
Real-time transcription streams input audio and the corresponding output text continuously during a live recording session:
- Input audio stream: Capturing audio in small user-defined intervals
- Inference: Incremental speech-to-text inference on the input stream
- Output text streams:
- Confirmed: Finalized portion of the transcript that gets longer over time.
- Hypothesis: Preliminary transcript that may still be refined as more audio context arrives.
This approach creates an ultra low-latency user experience where words appear on the screen almost as they're spoken, with occasional refinements as the model gathers more context.
Migration from Cloud APIs. The Confirmed output text stream is also referred to as immutable
or final
in other products.
The Hypothesis output text stream is also referred to as mutable
or interim
in other products.
Real-time with Open-source SDK. Argmax Open-source SDK includes the WhisperKit
framework that includes basic building blocks to approximate the real-time behavior of the Pro SDK. An open-source example app that demonstrates real-time transcription with the Open-source SDK here.
Basic Example
This is a complete and self-contained CLI example project that demonstrates the usage of Argmax Pro SDK for real-time transcription from a microphone input stream.
Step 0: Verify Pro SDK setup
Argmax Pro SDK access must be set up with SwiftPM before going through this example. If unsure, please see Upgrading to Pro SDK (Step 1 only).
Step 1: Create project directory
Create a project directory as shown below and insert the code shared below into ArgmaxTestCommand.swift
and Package.swift
ArgmaxSDKRealTimeTranscriptionBasicExample
├── Package.swift
└── Sources
└── ArgmaxTestCLI
└── ArgmaxTestCommand.swift
Package.swift
:
// swift-tools-version: 5.10
// The swift-tools-version declares the minimum version of Swift required to build this package.
import PackageDescription
let package = Package(
name: "Argmax Test CLI",
platforms: [
.macOS(.v14)
],
products: [
.executable(
name: "argmax-test-cli",
targets: ["ArgmaxTestCLI"]
)
],
dependencies: [
.package(id: "argmaxinc.argmax-sdk-swift", .upToNextMinor(from: "1.6.0")),
.package(url: "https://github.com/apple/swift-argument-parser.git", exact: "1.3.0")
],
targets: [
.executableTarget(
name: "ArgmaxTestCLI",
dependencies: [
.product(name: "Argmax", package: "argmaxinc.argmax-sdk-swift"),
.product(name: "ArgumentParser", package: "swift-argument-parser")
]
),
]
)
ArgmaxTestCommand.swift
:
import Foundation
@preconcurrency import ArgumentParser
@preconcurrency import Argmax
import Combine
@main
struct ArgmaxTestCommand: AsyncParsableCommand {
static let configuration = CommandConfiguration(
abstract: "An example CLI tool for Argmax Pro SDK",
subcommands: [Transcribe.self]
)
struct Transcribe: AsyncParsableCommand {
static let configuration = CommandConfiguration(
abstract: "Real-time transcription using system microphone"
)
@Option(help: "Argmax Pro SDK API key")
var apiKey: String
@Option(help: "Model name: e.g. `tiny.en` or `large-v3-v20240930_626MB`. Default: `large-v3-v20240930_626MB`")
var modelName: String = "large-v3-v20240930_626MB"
@Option(help: "HuggingFace token if accessing Pro models")
var modelToken: String?
func run() async throws {
print("Initializing Argmax Pro SDK...")
let sdkConfig = ArgmaxConfig(apiKey: apiKey)
// Temporarily disable keychain access for debug
sdkConfig.keychainPersistence = false
await ArgmaxSDK.with(sdkConfig)
print("Downloading \(modelName) model using ModelStore...")
let modelStore = ModelStore()
let repoType: RepoType
if #available(macOS 15, *) {
repoType = .proRepo
} else if #available(macOS 13, *) {
repoType = .openSourceRepo
} else {
fatalError("Not avaialble on pre macOS 13")
}
// Track download progress
let _ = modelStore.$progress.sink { progress in
if let progress = progress {
let percentage = Int(progress.fractionCompleted * 100)
print("\rDownload progress: \(percentage)%", terminator: "")
fflush(stdout)
}
}
let downloadURL = try await modelStore.downloadModel(
name: modelName,
repo: repoType,
token: modelToken
)
// To cancel download if needed:
// modelStore.cancelDownload()
let modelFolder = downloadURL.path(percentEncoded: false)
print("\nDownload completed: \(modelFolder)")
let liveTranscriber = try await setupLiveTranscriber(modelFolder: modelFolder)
try await transcribeStream(liveTranscriber: liveTranscriber)
}
private func setupLiveTranscriber(modelFolder: String) async throws -> LiveTranscriber {
print("Initializing WhisperKit Pro...")
let whisperConfig = WhisperKitProConfig(
modelFolder: modelFolder,
verbose: true,
logLevel: .debug
)
let whisperKitPro = try await WhisperKitPro(whisperConfig)
print("Loading WhisperKit Pro models... It may take up to 1 minute during first load after download")
try await whisperKitPro.loadModels()
print("Creating LiveTranscriber...")
let liveTranscriber = LiveTranscriber(whisperKit: whisperKitPro)
return liveTranscriber
}
private func transcribeStream(liveTranscriber: LiveTranscriber) async throws {
print("Transcribing while streaming audio from microphone...")
// Create Argmax stream source for device (microphone)
// On iOS: pass nil to use default microphone
// let deviceSource = ArgmaxSource(streamType: .device())
// Or on macOS: pass AudioDeviceID to select specific input device,
// use AudioProcessor.getAudioDevices() to list avaialble devices
let macbookMicrophone = AudioProcessor.getAudioDevices().first(where: { $0.name == "MacBook Pro Microphone" })
let deviceSource = ArgmaxSource(streamType: .device(macbookMicrophone?.id))
// Configure decoding options
let options = DecodingOptionsPro(
base: .init(
verbose: verbose,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1,
streamTranscriptionMode: .voiceTriggered // .voiceTriggered or .alwaysOn
)
// Register stream with options and optional audio callback
try await liveTranscriber.registerStream(
streamSource: deviceSource,
options: options,
audioCallback: { audioData in
// Optional: Process audio data for visualization, etc.
// print("Received \(audioData.count) audio samples")
}
)
// Start transcription
let deviceResults = try await liveTranscriber.startTranscription(for: deviceSource)
// Consume results
let dateFormatter = DateFormatter()
dateFormatter.dateFormat = "HH:mm:ss.SSS"
let transcribeTask = Task {
var accumulatedConfirmedText = ""
for try await result in deviceResults {
let timestamp = dateFormatter.string(from: Date())
var hypothesisText = ""
switch result {
case .confirm(let text, _, _):
accumulatedConfirmedText += " " + text
case .hypothesis(let text, _):
hypothesisText = text
}
print("[\(timestamp)] \(accumulatedConfirmedText)\u{001B}[34m\(hypothesisText)\u{001B}[0m")
}
return accumulatedConfirmedText
}
// Captures ^+c signal from terminal, stop recording and clean up
signal(SIGINT, SIG_IGN)
let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: DispatchQueue.main)
signalSource.setEventHandler(handler: DispatchWorkItem(block: {
print("Stop recording...")
print("Finalizing transcription...")
Task.detached {
try? await liveTranscriber.stopAndRemoveStream(for: deviceSource)
let accumulatedConfirmedText = try! await transcribeTask.value
print("\n\nTranscription: \n\n\(accumulatedConfirmedText)\n")
}
}))
signalSource.resume()
let _ = try! await transcribeTask.value
}
}
}
Step 2: Build and run in Terminal
Run the following command in your Terminal from within the top-level project directory:
Example usage:
swift run argmax-test-cli transcribe --api-key <API_KEY>
If you observe error: no registry configured for 'argmaxinc' scope
, go back to Step 0.
Here is an example output upon successful build and launch:
Advanced Example
This example complements the basic example above by showing how to use the low-level WhisperKitPro
API directly instead of the high-level LiveTranscriber
API. All steps remain the same (Step 0 setup, Step 2 build and run), but you'll modify the code in ArgmaxTestCommand.swift
to use the low-level API for more granular control.
The high-level API (LiveTranscriber
) encapsulates a WhisperKitPro
instance and calls these low-level APIs under the hood. To learn more, you can review the source code of LiveTranscriber
inside Argmax SDK.
Modified ArgmaxTestCommand.swift
Replace the setupLiveTranscriber()
and transcribeStream()
functions with this low-level implementation:
// Replace setupLiveTranscriber() with this:
private func setupWhisperKitPro() async throws -> WhisperKitPro {
// Download model using ModelStore
let modelStore = ModelStore()
let repoType: RepoType = .openSourceRepo
// Uncomment to access Pro models (requires `modelToken`)
// let repoType: RepoType = .proRepo
// Track download progress
let cancellable = modelStore.$progress.sink { progress in
if let progress = progress {
let percentage = Int(progress.fractionCompleted * 100)
print("\rDownload progress: \(percentage)%", terminator: "")
fflush(stdout)
}
}
let modelURL = try await modelStore.downloadModel(
name: "large-v3-v20240930_626MB",
repo: repoType,
token: apiKey
)
// To cancel download if needed:
// modelStore.cancelDownload()
let config = WhisperKitProConfig(
modelFolder: modelURL.path,
verbose: true,
logLevel: .debug
)
let whisperKitPro = try await WhisperKitPro(config)
return whisperKitPro
}
// Replace transcribeStream() with this:
private func transcribeStream(whisperKitPro: WhisperKitPro) async throws {
print("Transcribing with low-level WhisperKitPro API...")
// 1. Configure decoding options
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// 2. Create audio stream
let (stream, continuation) = whisperKitPro.audioProcessor.startStreamingRecordingLive()
// 3. Create transcription session
let session = whisperKitPro.makeStreamSession(options: options)
// 4. Start processing
await session.start(audioInputStream: stream)
// 5. Process results
let dateFormatter = DateFormatter()
dateFormatter.dateFormat = "HH:mm:ss.SSS"
let transcribeTask = Task {
for try await result in session.results {
let timestamp = dateFormatter.string(from: Date())
if let hypothesis = result.hypothesisText {
print("[\(timestamp)] Hypothesis: \(hypothesis)")
}
if !result.text.isEmpty {
print("[\(timestamp)] Confirmed: \(result.text)")
}
}
}
// 6. Handle termination signal
signal(SIGINT, SIG_IGN)
let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: DispatchQueue.main)
signalSource.setEventHandler(handler: DispatchWorkItem(block: {
print("Stop recording...")
Task {
// Cleanup - finish the stream
continuation.finish()
}
}))
signalSource.resume()
// Wait for transcription task to complete
try await transcribeTask.value
}
// Update the run() method to use the new functions:
public func run() async throws {
let whisperKitPro = try await setupWhisperKitPro()
try await transcribeStream(whisperKitPro: whisperKitPro)
}
Key Differences from High-level API
- Manual Setup: You create and configure the
WhisperKitPro
instance yourself - Session Management: You manage
TranscribeStreamSession
lifecycle directly - Audio Stream Control: Direct access to
audioProcessor
for custom audio handling - Resource Management: Manual cleanup of streams and sessions
High-level vs Low-level API
The high-level API (LiveTranscriber
) simplifies the above process by:
- Automatically managing the
WhisperKitPro
instance - Handling session creation and lifecycle
- Providing convenient source abstractions (
ArgmaxSource
,CustomSource
) - Simplifying cleanup with
stopAndRemoveAllTranscriptions()
- Offering a more user-friendly API surface
For most use cases, the high-level API is recommended. Use the low-level API when you need the additional control and are comfortable managing the complexity.
Advanced Features
Pro Models
Pro SDK offers additional models with significantly higher speed, accuracy, and energy-efficiency.
Nvidia Parakeet Models
These models are not yet supported for real-time transcription. Coming soon.
Whisper Models
This second set of Whisper models are further optimized for speed and energy-efficiency on top of their open-source counterparts. During this upgrade, accuracy remains identical while speed improves.
In order to use upgraded Whisper models, simply switch to .proRepo
in your initial configuration code:
let downloadURL = try await modelStore.downloadModel(
name: "large-v3-v20240930_626MB",
repo: .proRepo, // Replacing .openSourceRepo
)
OS Compatibility. Note that .proRepo
models support iOS 18/macOS15 and newer. For users still on iOS 17/macOS 14, please keep using .openSourceRepo
Multiple Audio Streams
This feature allows multiple input audio streams to be real-time transcribed by the same LiveTranscriber
instance. An example use case is concurrent real-time transcription of system audio and microphone for meeting transcriptions.
Before implementing multi-stream transcription, ensure that the ArgmaxTestCommand
from Step 1 works correctly, particularly its transcribeStream
function which demonstrates the basic LiveTranscriber usage.
Multi-Stream Architecture
The same LiveTranscriber
instance can efficiently handle multiple audio streams simultaneously. Each stream gets its own registered source that shares the same LiveTranscriber
instance but maintains independent processing context, allowing them to run concurrently without interference.
Example Implementation
private func transcribeMultipleStreams() async throws {
// 1. Setup LiveTranscriber (same as single stream example)
let liveTranscriber = try await setupLiveTranscriber()
// 2. Create stream sources for each audio input
// System audio stream (custom stream - for capturing system/app audio)
let (systemStream, systemContinuation) = createSystemAudioStream() // Your implementation
let systemSource = CustomSource(
id: "system-audio",
audioStream: systemStream,
audioContinuation: systemContinuation
)
// Device microphone stream (using built-in device source)
let deviceSource = ArgmaxSource(streamType: .device())
// 3. Configure decoding options
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// 4. Register both streams with LiveTranscriber
try await liveTranscriber.registerStream(streamSource: systemSource, options: options)
try await liveTranscriber.registerStream(streamSource: deviceSource, options: options)
// 5. Start transcription and get results stream for each source
let systemResults = try await liveTranscriber.startTranscription(for: systemSource)
let deviceResults = try await liveTranscriber.startTranscription(for: deviceSource)
// 6. Process results from both streams concurrently
try await withTaskGroup(of: Void.self) { group in
// System audio results processing
group.addTask {
for try await result in systemResults {
switch result {
case .confirm(let text, let seconds, _):
print("[SYSTEM] Confirmed: \(text) at \(seconds)s")
case .hypothesis(let text, let seconds):
print("[SYSTEM] Hypothesis: \(text) at \(seconds)s")
}
}
}
// Device microphone results processing
group.addTask {
for try await result in deviceResults {
switch result {
case .confirm(let text, let seconds, _):
print("[DEVICE] Confirmed: \(text) at \(seconds)s")
case .hypothesis(let text, let seconds):
print("[DEVICE] Hypothesis: \(text) at \(seconds)s")
}
}
}
// Wait for all processing to complete
try await group.waitForAll()
}
// 7. Cleanup - stop and remove all transcriptions
try await liveTranscriber.stopAndRemoveAllTranscriptions()
}
Key Considerations
-
Stream Management: Each audio stream requires its own
ArgmaxSource
(either built-in like.device()
or custom likeCustomSource
) registered with theLiveTranscriber
. -
Individual Results Streams: Each source gets its own results stream from
startTranscription(for: source)
- no need to filter by stream ID. -
Resource Management: The shared
LiveTranscriber
instance efficiently manages computational resources across all streams while maintaining independent processing contexts. -
Graceful Termination: Call
stopAndRemoveAllTranscriptions()
to stop and remove all registered streams at once, ensuring proper cleanup and graceful termination of transcription sessions.
Migrate from transcribeWhileRecording
(deprecating soon)
If you're currently using the older WhisperKitPro.transcribeWhileRecording
API, here's how to migrate to the new stream-based approach for better performance and cleaner code.
Old API Pattern (whisperKitPro.transcribeWhileRecording
and audioProcessor.startRecordingLive
)
// Old approach - manual buffer management
private func oldTranscribeStream(whisperKitPro: WhisperKitPro) async throws {
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// Manual buffer management
var audioBuffer: [Float] = []
let lock = NSLock()
try whisperKitPro.audioProcessor.startRecordingLive { samples in
lock.withLock {
audioBuffer.append(contentsOf: samples)
}
}
// Create recording task with callbacks
let recordingTask = whisperKitPro.transcribeWhileRecording(
options: options,
audioCallback: {
let samples = lock.withLock {
let samples = audioBuffer
audioBuffer.removeAll()
return samples
}
return AudioSamples(samples: samples)
},
resultCallback: { result in
print("Text: \(result.text)")
if let hypothesis = result.hypothesisText {
print("Hypothesis: \(hypothesis)")
}
return true
}
)
// Start and finalize
try await recordingTask.start()
let results = try await recordingTask.finalize()
}
New API Pattern (whisperKitPro.makeStreamSession
and audioProcessor.startStreamingRecordingLive
)
// New approach - stream-based with automatic management
private func newTranscribeStream(whisperKitPro: WhisperKitPro) async throws {
let options = DecodingOptionsPro(
base: .init(
verbose: true,
task: .transcribe,
wordTimestamps: true,
chunkingStrategy: .vad
),
transcribeInterval: 0.1
)
// Create stream (automatic audio handling)
let (stream, continuation) = whisperKitPro.audioProcessor.startStreamingRecordingLive()
// Create session
let session = whisperKitPro.makeStreamSession(options: options)
// Start processing
await session.start(audioInputStream: stream)
// Process results with async iteration
for try await result in session.results {
print("Text: \(result.text)")
if let hypothesis = result.hypothesisText {
print("Hypothesis: \(hypothesis)")
}
}
// Graceful termination
continuation.finish()
}
Migration Benefits
- Simplified Architecture: No manual buffer management or thread synchronization required
- Better Performance: Reduced memory overhead and improved audio processing efficiency
- Cleaner Error Handling: Automatic stream lifecycle management reduces potential failure points
- Async/Await Integration: Modern Swift concurrency support with
for try await
result iteration - Resource Management: Automatic cleanup and better resource utilization
Key Migration Steps
- Replace
audioProcessor.startRecordingLive()
withaudioProcessor.startStreamingRecordingLive()
- Replace
whisperKitPro.transcribeWhileRecording()
withwhisperKitPro.makeStreamSession()
- Remove manual buffer management (audio buffers, locks, callback handling)
- Replace
resultCallback
withfor try await result in session.results
- Replace
recordingTask.start()
andfinalize()
withsession.start()
andcontinuation.finish()
- Update error handling to use Swift's structured concurrency patterns