`AsyncSequence` and `AsyncStream`: Processing Values Over Time in Swift


Some data does not arrive all at once — it flows: WebSocket messages, a file read line by line, render progress updates from a background job, sensor readings from CoreLocation. AsyncSequence brings the elegance of a for-in loop to this world of asynchronous data, and AsyncStream gives you a bridge for the callback-based APIs you are already using.

This guide covers consuming AsyncSequence with for await, the built-in async sequences Apple provides, bridging callbacks with AsyncStream and AsyncThrowingStream, useful transformation operators, and the cancellation story. We will not cover Combine Publisher — that comparison has its own post.

Contents

The Problem

Pixar’s rendering pipeline emits progress updates as each frame is processed — percentage complete, current scene name, estimated time remaining. The traditional way to consume these updates is a delegate or a completion-handler-per-chunk pattern:

// ❌ Callback-based progress: fragile, no back-pressure, manual lifetime management
class RenderEngine {
    var onProgress: ((RenderProgress) -> Void)?
    var onComplete: (() -> Void)?
    var onError: ((Error) -> Void)?
}

class FilmViewController: UIViewController {
    func startRender(film: PixarFilm) {
        engine.onProgress = { [weak self] progress in
            // Must dispatch to main manually
            DispatchQueue.main.async {
                self?.updateProgressBar(progress.percentage)
            }
        }
        engine.onComplete = { [weak self] in
            DispatchQueue.main.async { self?.showCompletionBanner() }
        }
        engine.onError = { [weak self] error in
            DispatchQueue.main.async { self?.showError(error) }
        }
        engine.start(film: film)
    }
}

This works, but it is brittle. Cancellation requires setting all three closures to nil and calling a stop() method — easy to forget. The [weak self] dance is repeated everywhere. There is no concept of back-pressure; the producer fires callbacks at its own rate regardless of whether the consumer is ready. And three separate callbacks for what is logically one stream of events makes the call site hard to reason about.

AsyncSequence and AsyncStream unify this into a single for await loop with automatic cancellation.

Consuming AsyncSequence with for await

AsyncSequence is a protocol that mirrors Sequence, but each element is delivered asynchronously. Consuming one uses the same for-in syntax with the addition of await:

// Conceptual shape of AsyncSequence consumption
for await element in someAsyncSequence {
    process(element)
}
// Code here runs after the sequence is exhausted or the task is cancelled

The loop suspends when waiting for the next element and resumes when one arrives. If the enclosing Task is cancelled, the loop terminates cleanly — no cleanup closures, no manual cancellation tokens.

Built-In Async Sequences

Apple ships several async sequences in the standard library and system frameworks. These are the ones you will reach for most often.

URLSession.bytes(from:) — Streaming HTTP

URLSession.bytes(from:delegate:) streams an HTTP response body one byte at a time, which is ideal for large downloads or server-sent events.

@available(iOS 15.0, *)
func streamFilmScript(url: URL) async throws {
    let (bytes, response) = try await URLSession.shared.bytes(from: url)

    guard let httpResponse = response as? HTTPURLResponse,
          httpResponse.statusCode == 200 else {
        throw URLError(.badServerResponse)
    }

    // Process line by line — each iteration suspends until the next line arrives
    for try await line in bytes.lines {
        parseScriptLine(line)
    }
}

bytes.lines is an AsyncLineSequence that assembles raw bytes into complete lines, giving you one String per \n. It handles both Unix and Windows line endings.

FileHandle.AsyncBytes — Async File Reading

FileHandle exposes an AsyncBytes property for reading files without blocking a thread:

@available(iOS 15.0, *)
func readSubtitleFile(url: URL) async throws -> [String] {
    let handle = try FileHandle(forReadingFrom: url)
    var lines: [String] = []

    for try await line in handle.bytes.lines {
        lines.append(line)
    }

    return lines
}

NotificationCenter.notifications(named:object:) — Async Notifications

NotificationCenter.notifications(named:object:) converts NotificationCenter posts into an AsyncSequence, removing the addObserver/removeObserver dance:

@available(iOS 15.0, *)
func watchForRenderCompletion() async {
    let notifications = NotificationCenter.default
        .notifications(named: .renderJobCompleted)

    for await notification in notifications {
        guard let job = notification.userInfo?["job"] as? PixarJob else { continue }
        await MainActor.run { updateJobStatus(job) }
    }
    // Loop ends when the task is cancelled — no removeObserver needed
}

Note: The notifications sequence never terminates on its own — it runs until the enclosing task is cancelled. This is the intended pattern: wrap the call in a Task stored as a property, and cancel it when the observer is no longer needed.

Bridging Callbacks with AsyncStream

Most third-party SDKs and older Apple frameworks use delegate callbacks or completion handlers. AsyncStream is the official bridge from that world to AsyncSequence.

Basic AsyncStream Usage

The initializer takes a Continuation that you use to yield values and signal completion:

@available(iOS 15.0, *)
func makeRenderProgressStream(engine: RenderEngine) -> AsyncStream<RenderProgress> {
    AsyncStream { continuation in
        // Bridge the callback-based API to the async stream
        engine.onProgress = { progress in
            continuation.yield(progress)
        }

        engine.onComplete = {
            continuation.finish() // Ends the for-await loop
        }

        // Handle task cancellation — stop the engine when the consumer stops listening
        continuation.onTermination = { _ in
            engine.stop()
        }
    }
}

// Consuming the stream
@available(iOS 15.0, *)
func renderFilm(_ film: PixarFilm) async {
    let progressStream = makeRenderProgressStream(engine: renderEngine)

    for await progress in progressStream {
        await MainActor.run {
            progressBar.progress = Float(progress.percentage / 100.0)
            statusLabel.text = "Rendering \(progress.currentScene)..."
        }
    }
    // We arrive here when the engine calls continuation.finish()
    await MainActor.run { showCompletionBanner() }
}

continuation.onTermination is the key detail: it fires when the for await loop ends, whether that is because the stream finished naturally or because the consuming Task was cancelled. Use it to clean up the underlying resources — stop the engine, unsubscribe from the delegate, release the callback.

AsyncThrowingStream for Failable Streams

When the underlying source can produce an error, use AsyncThrowingStream:

@available(iOS 15.0, *)
func makeRenderStream(engine: RenderEngine) -> AsyncThrowingStream<RenderProgress, any Error> {
    AsyncThrowingStream { continuation in
        engine.onProgress = { progress in
            continuation.yield(progress)
        }

        engine.onComplete = {
            continuation.finish() // Normal termination
        }

        engine.onError = { error in
            continuation.finish(throwing: error) // Terminates with an error
        }

        continuation.onTermination = { _ in
            engine.stop()
        }
    }
}

// The consumer uses try await instead of just await
@available(iOS 15.0, *)
func renderWithErrorHandling(_ film: PixarFilm) async {
    do {
        for try await progress in makeRenderStream(engine: renderEngine) {
            await MainActor.run { update(progress) }
        }
    } catch {
        await MainActor.run { showError(error) }
    }
}

Warning: AsyncStream uses a bufferingPolicy parameter that defaults to .unbounded. In a high-throughput scenario — a render engine emitting thousands of progress updates per second — an unbounded buffer can grow without limit. Use .bufferingNewest(n) or .bufferingOldest(n) to apply back-pressure and prevent memory growth.

// Limit buffer to the 10 most recent progress updates
AsyncStream(bufferingPolicy: .bufferingNewest(10)) { continuation in
    engine.onProgress = { continuation.yield($0) }
    engine.onComplete = { continuation.finish() }
}

Advanced Usage

Transforming Sequences with Standard Operators

AsyncSequence supports a subset of the Sequence operators. In Swift 5.9+, you can use map, filter, compactMap, prefix, dropFirst, and contains directly on any AsyncSequence:

@available(iOS 15.0, *)
func filteredProgress(engine: RenderEngine) -> some AsyncSequence {
    makeRenderProgressStream(engine: engine)
        .filter { $0.percentage.truncatingRemainder(dividingBy: 10) == 0 } // Every 10%
        .map { "Rendering \($0.currentScene): \(Int($0.percentage))%" }
        .prefix(11) // 0%, 10%, 20%, ..., 100%
}

Note: As of Swift 5.9, the standard library operators on AsyncSequence return opaque types (some AsyncSequence) rather than named concrete types. Swift 6 improved this further with typed throws in AsyncSequence. Always check the minimum deployment target when using these operators.

Custom AsyncSequence Conformance

You can conform any type to AsyncSequence by implementing makeAsyncIterator(), which returns a type conforming to AsyncIteratorProtocol. This is useful when you need full control over the iteration logic.

// A custom AsyncSequence that yields Pixar film titles from a paginated API
struct PixarFilmPageSequence: AsyncSequence {
    typealias Element = [PixarFilm]

    let startPage: Int
    let service: FilmService

    func makeAsyncIterator() -> Iterator {
        Iterator(page: startPage, service: service)
    }

    struct Iterator: AsyncIteratorProtocol {
        var page: Int
        let service: FilmService

        mutating func next() async throws -> [PixarFilm]? {
            let films = try await service.fetchPage(page)
            guard !films.isEmpty else { return nil } // nil ends the sequence
            page += 1
            return films
        }
    }
}

// Usage reads like a regular for-in loop
for try await page in PixarFilmPageSequence(startPage: 1, service: filmService) {
    await MainActor.run { appendToList(page) }
}

Cancellation Is Automatic

The for await loop participates in Swift’s cooperative cancellation. When the enclosing Task is cancelled, the loop terminates at the next suspension point. For AsyncStream, continuation.onTermination fires immediately.

class FilmListViewController: UIViewController {
    private var streamTask: Task<Void, Never>?

    func startStreamingUpdates() {
        streamTask = Task {
            for await update in liveUpdateStream {
                guard !Task.isCancelled else { break }
                await MainActor.run { apply(update) }
            }
        }
    }

    override func viewDidDisappear(_ animated: Bool) {
        super.viewDidDisappear(animated)
        streamTask?.cancel() // Cancels the task; onTermination fires on the stream
    }
}

Tip: You do not need to check Task.isCancelled inside the loop body for most sequences — the for await loop itself will stop producing elements once the task is cancelled. The explicit check is useful only if the loop body performs additional async work that should also stop early.

AsyncChannel for Multi-Consumer Scenarios

The standard AsyncStream is single-consumer: only one for await loop can consume from a given AsyncStream instance. For multi-consumer broadcasting — where multiple parts of the app observe the same stream — the swift-async-algorithms package provides AsyncChannel:

// AsyncChannel from swift-async-algorithms supports multiple consumers
import AsyncAlgorithms

let renderChannel = AsyncChannel<RenderProgress>()

// Multiple consumers can await from the same channel
Task { for await p in renderChannel { updateProgressBar(p) } }
Task { for await p in renderChannel { logProgress(p) } }

// Producer sends values to all consumers
await renderChannel.send(RenderProgress(percentage: 45, currentScene: "Nemo swims"))

Note: swift-async-algorithms is an open-source Apple package available at github.com/apple/swift-async-algorithms. It is not part of the Swift standard library but is maintained by the Swift team and considered production-ready.

When to Use (and When Not To)

ScenarioRecommendation
Streaming HTTP response bodies or server-sent eventsURLSession.bytes(from:) with for await
Bridging a delegate or callback-based API to async codeAsyncStream or AsyncThrowingStream
Observing NotificationCenter events in an async contextNotificationCenter.notifications(named:)
Paginated API where you need lazy page fetchingCustom AsyncSequence conformance
Broadcasting values to multiple concurrent consumersAsyncChannel from swift-async-algorithms
Reactive UI bindings that transform and combine multiple streamsCombine Publisher — richer operator set and SwiftUI integration
One-shot async operations (fetch once, no streaming)Plain async/await — simpler and more appropriate

Summary

  • AsyncSequence gives you for await loops for any asynchronous stream of values — the loop suspends waiting for each element and resumes when one arrives.
  • Apple provides built-in async sequences for the most common cases: URLSession.bytes, FileHandle.AsyncBytes, and NotificationCenter.notifications.
  • AsyncStream and AsyncThrowingStream are the official bridges from callback-based APIs to AsyncSequence, with continuation.onTermination handling cleanup when the consumer stops listening.
  • Buffer policy matters for high-throughput streams — use .bufferingNewest(n) to prevent unbounded memory growth when the producer is faster than the consumer.
  • Cancellation is automatic: cancelling the enclosing Task terminates the for await loop and fires onTermination on any active AsyncStream continuations.

With AsyncSequence and AsyncStream in your toolkit, you can model almost any stream of asynchronous events as a first-class sequence. For protecting the mutable state that these streams update, see Actors in Swift: Eliminating Data Races at Compile Time.