Docs
Real Time Transcription

Real Time Transcription

Context

Argmax Pro SDK includes the WhisperKitPro framework which implements an advanced streaming inference algorithm described in our paper. Key features include:

  • Accuracy is identical to offline file transcription
  • Dual output text streams can be leveraged in the user experience to build trust in stable and accurate results (Confirmed) while maximizing responsiveness (Hypothesis).
  • Streaming API design that exposes event-based callbacks, minimizing the burden on the caller

Real-time transcription streams input audio and the corresponding output text continuously during a live recording session:

  1. Input audio stream: Capturing audio in small user-defined intervals
  2. Inference: Incremental speech-to-text inference on the input stream
  3. Output text streams:
    • Confirmed: Finalized portion of the transcript that gets longer over time.
    • Hypothesis: Preliminary transcript that may still be refined as more audio context arrives.

This approach creates an ultra low-latency user experience where words appear on the screen almost as they're spoken, with occasional refinements as the model gathers more context.



Basic Example

This is a complete and self-contained CLI example project that demonstrates the usage of Argmax Pro SDK for real-time transcription from a microphone input stream.

Step 0: Verify Pro SDK setup

Argmax Pro SDK access must be set up with SwiftPM before going through this example. If unsure, please see Upgrading to Pro SDK (Step 1 only).

Step 1: Create project directory

Create a project directory as shown below and insert the code shared below into ArgmaxTestCommand.swift and Package.swift

ArgmaxSDKRealTimeTranscriptionBasicExample
├── Package.swift
└── Sources
    └── ArgmaxTestCLI
        └── ArgmaxTestCommand.swift

Package.swift:

// swift-tools-version: 5.10
// The swift-tools-version declares the minimum version of Swift required to build this package.
 
import PackageDescription
 
let package = Package(
    name: "Argmax Test CLI",
    platforms: [
        .macOS(.v14)
    ],
    products: [
        .executable(
            name: "argmax-test-cli",
            targets: ["ArgmaxTestCLI"]
        )
    ],
    dependencies: [
        .package(id: "argmaxinc.argmax-sdk-swift", .upToNextMinor(from: "1.6.0")),
        .package(url: "https://github.com/apple/swift-argument-parser.git", exact: "1.3.0")
    ],
    targets: [
        .executableTarget(
            name: "ArgmaxTestCLI",
            dependencies: [
                .product(name: "Argmax", package: "argmaxinc.argmax-sdk-swift"),
                .product(name: "ArgumentParser", package: "swift-argument-parser")
            ]
        ),
    ]
)
 

ArgmaxTestCommand.swift:

import Foundation
@preconcurrency import ArgumentParser
@preconcurrency import Argmax
import Combine
 
@main
struct ArgmaxTestCommand: AsyncParsableCommand {
    static let configuration = CommandConfiguration(
        abstract: "An example CLI tool for Argmax Pro SDK",
        subcommands: [Transcribe.self]
    )
 
    struct Transcribe: AsyncParsableCommand {
        static let configuration = CommandConfiguration(
            abstract: "Real-time transcription using system microphone"
        )
 
        @Option(help: "Argmax Pro SDK API key")
        var apiKey: String
 
        @Option(help: "Model name: e.g. `tiny.en` or `large-v3-v20240930_626MB`. Default: `large-v3-v20240930_626MB`")
        var modelName: String = "large-v3-v20240930_626MB"
 
        @Option(help: "HuggingFace token if accessing Pro models")
        var modelToken: String?
        
        func run() async throws {
 
            print("Initializing Argmax Pro SDK...")
 
            let sdkConfig = ArgmaxConfig(apiKey: apiKey)
            // Temporarily disable keychain access for debug
            sdkConfig.keychainPersistence = false
            await ArgmaxSDK.with(sdkConfig)
 
            print("Downloading \(modelName) model using ModelStore...")
            let modelStore = ModelStore()
            let repoType: RepoType
            
            if #available(macOS 15, *) {
                repoType = .proRepo
            } else if #available(macOS 13, *) {
                repoType = .openSourceRepo
            } else {
                fatalError("Not avaialble on pre macOS 13")
            }
            
            // Track download progress
            let _ = modelStore.$progress.sink { progress in
                if let progress = progress {
                    let percentage = Int(progress.fractionCompleted * 100)
                    print("\rDownload progress: \(percentage)%", terminator: "")
                    fflush(stdout)
                }
            }
            
            let downloadURL = try await modelStore.downloadModel(
                name: modelName,
                repo: repoType,
                token: modelToken
            )
            
            // To cancel download if needed:
            // modelStore.cancelDownload()
            
            let modelFolder = downloadURL.path(percentEncoded: false)
            print("\nDownload completed: \(modelFolder)")
 
            let liveTranscriber = try await setupLiveTranscriber(modelFolder: modelFolder)
            try await transcribeStream(liveTranscriber: liveTranscriber)
        }
 
        private func setupLiveTranscriber(modelFolder: String) async throws -> LiveTranscriber {
            print("Initializing WhisperKit Pro...")
            let whisperConfig = WhisperKitProConfig(
                modelFolder: modelFolder,
                verbose: true,
                logLevel: .debug
            )
            let whisperKitPro = try await WhisperKitPro(whisperConfig)
 
            print("Loading WhisperKit Pro models... It may take up to 1 minute during first load after download")
            try await whisperKitPro.loadModels()
 
            print("Creating LiveTranscriber...")
            let liveTranscriber = LiveTranscriber(whisperKit: whisperKitPro)
 
            return liveTranscriber
        }
 
        private func transcribeStream(liveTranscriber: LiveTranscriber) async throws {
            print("Transcribing while streaming audio from microphone...")
 
            // Create Argmax stream source for device (microphone)
            // On iOS: pass nil to use default microphone
            // let deviceSource = ArgmaxSource(streamType: .device())
 
            // Or on macOS: pass AudioDeviceID to select specific input device,
            //  use AudioProcessor.getAudioDevices() to list avaialble devices
            let macbookMicrophone = AudioProcessor.getAudioDevices().first(where: { $0.name == "MacBook Pro Microphone" })
            let deviceSource = ArgmaxSource(streamType: .device(macbookMicrophone?.id))
 
            // Configure decoding options
            let options = DecodingOptionsPro(
                base: .init(
                        verbose: verbose,
                        task: .transcribe,
                        wordTimestamps: true,
                        chunkingStrategy: .vad
                    ),
                transcribeInterval: 0.1,
                streamTranscriptionMode: .voiceTriggered  // .voiceTriggered or .alwaysOn
            )
 
            // Register stream with options and optional audio callback
            try await liveTranscriber.registerStream(
                streamSource: deviceSource,
                options: options,
                audioCallback: { audioData in
                    // Optional: Process audio data for visualization, etc.
                    // print("Received \(audioData.count) audio samples")
                }
            )
 
            // Start transcription
            let deviceResults = try await liveTranscriber.startTranscription(for: deviceSource)
            
            // Consume results
            let dateFormatter = DateFormatter()
            dateFormatter.dateFormat = "HH:mm:ss.SSS"
            let transcribeTask = Task {
                var accumulatedConfirmedText = ""
                for try await result in deviceResults {
                    let timestamp = dateFormatter.string(from: Date())
                    var hypothesisText = ""
                    switch result {
                    case .confirm(let text, _, _):
                        accumulatedConfirmedText += " " + text
                    case .hypothesis(let text, _):
                        hypothesisText = text
                    }
                    print("[\(timestamp)] \(accumulatedConfirmedText)\u{001B}[34m\(hypothesisText)\u{001B}[0m")
                }
                return accumulatedConfirmedText
            }
            
            // Captures ^+c signal from terminal, stop recording and clean up
            signal(SIGINT, SIG_IGN)
            let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: DispatchQueue.main)
            signalSource.setEventHandler(handler: DispatchWorkItem(block: {
                print("Stop recording...")
                print("Finalizing transcription...")
                Task.detached {
                    try? await liveTranscriber.stopAndRemoveStream(for: deviceSource)
                    let accumulatedConfirmedText = try! await transcribeTask.value
                    print("\n\nTranscription: \n\n\(accumulatedConfirmedText)\n")
                }
            }))
            
            signalSource.resume()
            let _ = try! await transcribeTask.value
        }
    }
}
 

Step 2: Build and run in Terminal

Run the following command in your Terminal from within the top-level project directory:

Example usage:

swift run argmax-test-cli transcribe --api-key <API_KEY>

If you observe error: no registry configured for 'argmaxinc' scope, go back to Step 0.

Here is an example output upon successful build and launch:


Advanced Example

This example complements the basic example above by showing how to use the low-level WhisperKitPro API directly instead of the high-level LiveTranscriber API. All steps remain the same (Step 0 setup, Step 2 build and run), but you'll modify the code in ArgmaxTestCommand.swift to use the low-level API for more granular control.

The high-level API (LiveTranscriber) encapsulates a WhisperKitPro instance and calls these low-level APIs under the hood. To learn more, you can review the source code of LiveTranscriber inside Argmax SDK.

Modified ArgmaxTestCommand.swift

Replace the setupLiveTranscriber() and transcribeStream() functions with this low-level implementation:

// Replace setupLiveTranscriber() with this:
private func setupWhisperKitPro() async throws -> WhisperKitPro {
    // Download model using ModelStore
    let modelStore = ModelStore()
    
    let repoType: RepoType = .openSourceRepo
    // Uncomment to access Pro models (requires `modelToken`)
    // let repoType: RepoType = .proRepo
    
    // Track download progress
    let cancellable = modelStore.$progress.sink { progress in
        if let progress = progress {
            let percentage = Int(progress.fractionCompleted * 100)
            print("\rDownload progress: \(percentage)%", terminator: "")
            fflush(stdout)
        }
    }
 
    let modelURL = try await modelStore.downloadModel(
        name: "large-v3-v20240930_626MB",
        repo: repoType,
        token: apiKey
    )
    
    // To cancel download if needed:
    // modelStore.cancelDownload()
    
    let config = WhisperKitProConfig(
        modelFolder: modelURL.path,
        verbose: true,
        logLevel: .debug
    )
    
    let whisperKitPro = try await WhisperKitPro(config)
    return whisperKitPro
}
 
// Replace transcribeStream() with this:
private func transcribeStream(whisperKitPro: WhisperKitPro) async throws {
    print("Transcribing with low-level WhisperKitPro API...")
    
    // 1. Configure decoding options
    let options = DecodingOptionsPro(
        base: .init(
                verbose: true,
                task: .transcribe,
                wordTimestamps: true,
                chunkingStrategy: .vad
            ),
        transcribeInterval: 0.1
    )
    
    // 2. Create audio stream
    let (stream, continuation) = whisperKitPro.audioProcessor.startStreamingRecordingLive()
    
    // 3. Create transcription session
    let session = whisperKitPro.makeStreamSession(options: options)
    
    // 4. Start processing
    await session.start(audioInputStream: stream)
    
    // 5. Process results
    let dateFormatter = DateFormatter()
    dateFormatter.dateFormat = "HH:mm:ss.SSS"
    let transcribeTask = Task {
        for try await result in session.results {
            let timestamp = dateFormatter.string(from: Date())
            if let hypothesis = result.hypothesisText {
                print("[\(timestamp)] Hypothesis: \(hypothesis)")
            }
            if !result.text.isEmpty {
                print("[\(timestamp)] Confirmed: \(result.text)")
            }
        }
    }
    
    // 6. Handle termination signal
    signal(SIGINT, SIG_IGN)
    let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: DispatchQueue.main)
    signalSource.setEventHandler(handler: DispatchWorkItem(block: {
        print("Stop recording...")
        Task {
            // Cleanup - finish the stream
            continuation.finish()
        }
    }))
    signalSource.resume()
    
    // Wait for transcription task to complete
    try await transcribeTask.value
}
 
// Update the run() method to use the new functions:
public func run() async throws {
    let whisperKitPro = try await setupWhisperKitPro()
    try await transcribeStream(whisperKitPro: whisperKitPro)
}

Key Differences from High-level API

  • Manual Setup: You create and configure the WhisperKitPro instance yourself
  • Session Management: You manage TranscribeStreamSession lifecycle directly
  • Audio Stream Control: Direct access to audioProcessor for custom audio handling
  • Resource Management: Manual cleanup of streams and sessions

High-level vs Low-level API

The high-level API (LiveTranscriber) simplifies the above process by:

  • Automatically managing the WhisperKitPro instance
  • Handling session creation and lifecycle
  • Providing convenient source abstractions (ArgmaxSource, CustomSource)
  • Simplifying cleanup with stopAndRemoveAllTranscriptions()
  • Offering a more user-friendly API surface

For most use cases, the high-level API is recommended. Use the low-level API when you need the additional control and are comfortable managing the complexity.

Advanced Features

Pro Models

Pro SDK offers additional models with significantly higher speed, accuracy, and energy-efficiency.

Nvidia Parakeet Models

These models are not yet supported for real-time transcription. Coming soon.

Whisper Models

This second set of Whisper models are further optimized for speed and energy-efficiency on top of their open-source counterparts. During this upgrade, accuracy remains identical while speed improves.

In order to use upgraded Whisper models, simply switch to .proRepo in your initial configuration code:

let downloadURL = try await modelStore.downloadModel(
    name: "large-v3-v20240930_626MB",
    repo: .proRepo, // Replacing .openSourceRepo
)

Multiple Audio Streams

This feature allows multiple input audio streams to be real-time transcribed by the same LiveTranscriber instance. An example use case is concurrent real-time transcription of system audio and microphone for meeting transcriptions.

Before implementing multi-stream transcription, ensure that the ArgmaxTestCommand from Step 1 works correctly, particularly its transcribeStream function which demonstrates the basic LiveTranscriber usage.

Multi-Stream Architecture

The same LiveTranscriber instance can efficiently handle multiple audio streams simultaneously. Each stream gets its own registered source that shares the same LiveTranscriber instance but maintains independent processing context, allowing them to run concurrently without interference.

Example Implementation

private func transcribeMultipleStreams() async throws {
    // 1. Setup LiveTranscriber (same as single stream example)
    let liveTranscriber = try await setupLiveTranscriber()
    
    // 2. Create stream sources for each audio input
    
    // System audio stream (custom stream - for capturing system/app audio)
    let (systemStream, systemContinuation) = createSystemAudioStream() // Your implementation
    let systemSource = CustomSource(
        id: "system-audio",
        audioStream: systemStream,
        audioContinuation: systemContinuation
    )
    
    // Device microphone stream (using built-in device source)
    let deviceSource = ArgmaxSource(streamType: .device())
    
    // 3. Configure decoding options
    let options = DecodingOptionsPro(
        base: .init(
                verbose: true,
                task: .transcribe,
                wordTimestamps: true,
                chunkingStrategy: .vad
            ),
        transcribeInterval: 0.1
    )
    
    // 4. Register both streams with LiveTranscriber
    try await liveTranscriber.registerStream(streamSource: systemSource, options: options)
    try await liveTranscriber.registerStream(streamSource: deviceSource, options: options)
    
    // 5. Start transcription and get results stream for each source
    let systemResults = try await liveTranscriber.startTranscription(for: systemSource)
    let deviceResults = try await liveTranscriber.startTranscription(for: deviceSource)
    
    // 6. Process results from both streams concurrently
    try await withTaskGroup(of: Void.self) { group in
        // System audio results processing
        group.addTask {
            for try await result in systemResults {
                switch result {
                case .confirm(let text, let seconds, _):
                    print("[SYSTEM] Confirmed: \(text) at \(seconds)s")
                case .hypothesis(let text, let seconds):
                    print("[SYSTEM] Hypothesis: \(text) at \(seconds)s")
                }
            }
        }
        
        // Device microphone results processing
        group.addTask {
            for try await result in deviceResults {
                switch result {
                case .confirm(let text, let seconds, _):
                    print("[DEVICE] Confirmed: \(text) at \(seconds)s")
                case .hypothesis(let text, let seconds):
                    print("[DEVICE] Hypothesis: \(text) at \(seconds)s")
                }
            }
        }
        
        // Wait for all processing to complete
        try await group.waitForAll()
    }
    
    // 7. Cleanup - stop and remove all transcriptions
    try await liveTranscriber.stopAndRemoveAllTranscriptions()
}
 
 

Key Considerations

  • Stream Management: Each audio stream requires its own ArgmaxSource (either built-in like .device() or custom like CustomSource) registered with the LiveTranscriber.

  • Individual Results Streams: Each source gets its own results stream from startTranscription(for: source) - no need to filter by stream ID.

  • Resource Management: The shared LiveTranscriber instance efficiently manages computational resources across all streams while maintaining independent processing contexts.

  • Graceful Termination: Call stopAndRemoveAllTranscriptions() to stop and remove all registered streams at once, ensuring proper cleanup and graceful termination of transcription sessions.

Migrate from transcribeWhileRecording (deprecating soon)

If you're currently using the older WhisperKitPro.transcribeWhileRecording API, here's how to migrate to the new stream-based approach for better performance and cleaner code.

Old API Pattern (whisperKitPro.transcribeWhileRecording and audioProcessor.startRecordingLive)

// Old approach - manual buffer management
private func oldTranscribeStream(whisperKitPro: WhisperKitPro) async throws {
    let options = DecodingOptionsPro(
        base: .init(
                verbose: true,
                task: .transcribe,
                wordTimestamps: true,
                chunkingStrategy: .vad
            ),
        transcribeInterval: 0.1
    )
    
    // Manual buffer management
    var audioBuffer: [Float] = []
    let lock = NSLock()
    
    try whisperKitPro.audioProcessor.startRecordingLive { samples in
        lock.withLock {
            audioBuffer.append(contentsOf: samples)
        }
    }
    
    // Create recording task with callbacks
    let recordingTask = whisperKitPro.transcribeWhileRecording(
        options: options,
        audioCallback: {
            let samples = lock.withLock {
                let samples = audioBuffer
                audioBuffer.removeAll()
                return samples
            }
            return AudioSamples(samples: samples)
        },
        resultCallback: { result in
            print("Text: \(result.text)")
            if let hypothesis = result.hypothesisText {
                print("Hypothesis: \(hypothesis)")
            }
            return true
        }
    )
    
    // Start and finalize
    try await recordingTask.start()
    let results = try await recordingTask.finalize()
}

New API Pattern (whisperKitPro.makeStreamSession and audioProcessor.startStreamingRecordingLive)

// New approach - stream-based with automatic management
private func newTranscribeStream(whisperKitPro: WhisperKitPro) async throws {
    let options = DecodingOptionsPro(
        base: .init(
                verbose: true,
                task: .transcribe,
                wordTimestamps: true,
                chunkingStrategy: .vad
            ),
        transcribeInterval: 0.1
    )
    
    // Create stream (automatic audio handling)
    let (stream, continuation) = whisperKitPro.audioProcessor.startStreamingRecordingLive()
    
    // Create session
    let session = whisperKitPro.makeStreamSession(options: options)
    
    // Start processing
    await session.start(audioInputStream: stream)
    
    // Process results with async iteration
    for try await result in session.results {
        print("Text: \(result.text)")
        if let hypothesis = result.hypothesisText {
            print("Hypothesis: \(hypothesis)")
        }
    }
    
    // Graceful termination
    continuation.finish()
}

Migration Benefits

  1. Simplified Architecture: No manual buffer management or thread synchronization required
  2. Better Performance: Reduced memory overhead and improved audio processing efficiency
  3. Cleaner Error Handling: Automatic stream lifecycle management reduces potential failure points
  4. Async/Await Integration: Modern Swift concurrency support with for try await result iteration
  5. Resource Management: Automatic cleanup and better resource utilization

Key Migration Steps

  1. Replace audioProcessor.startRecordingLive() with audioProcessor.startStreamingRecordingLive()
  2. Replace whisperKitPro.transcribeWhileRecording() with whisperKitPro.makeStreamSession()
  3. Remove manual buffer management (audio buffers, locks, callback handling)
  4. Replace resultCallback with for try await result in session.results
  5. Replace recordingTask.start() and finalize() with session.start() and continuation.finish()
  6. Update error handling to use Swift's structured concurrency patterns