Combinators
High-level functional orchestration tools for concurrent and sequential pipelines.
result.combinators
Result Combinators: High-level functional utilities for Result patterns.
This module provides advanced orchestration tools like error accumulation, short-circuiting iteration, concurrent mapping, and functional pipelining.
add_context
Enrich an error payload with additional context if the result is an Err.
This utility is polymorphic: - If error is a list, it applies context to every item. - If error is an Exception, it converts it to str before prepending context. - Otherwise, it stringifies the error and prepends context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
res
|
Result[T, E]
|
The original Result. |
required |
context
|
str
|
The string to prepend to the error. |
required |
Returns:
| Type | Description |
|---|---|
Result[T, Any]
|
The original result if Ok, or a new Err with the context prepended. |
Examples:
>>> # Handling lists (e.g. from validate())
>>> add_context(Err(["e1", "e2"]), "Batch")
Err(['Batch: e1', 'Batch: e2'])
Source code in src/result/combinators.py
combine_outcomes
Aggregate a collection of outcomes into a single master Outcome.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outcomes
|
Iterable[Outcome[T, E]]
|
An iterable of Outcome instances. |
required |
Returns:
| Type | Description |
|---|---|
Outcome[list[T], list[E]]
|
A master Outcome containing all values and a flat list of all errors. |
Examples:
>>> from result import Outcome
>>> combine_outcomes([Outcome(1, "err1"), Outcome(2, None)])
Outcome(value=[1, 2], error=['err1'])
Source code in src/result/combinators.py
ensure
ensure(condition: bool, error: E) -> Result[None, E]
Lift a boolean condition into a Result.
Often used inside @do_notation to guard logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition
|
bool
|
The boolean check to perform. |
required |
error
|
E
|
The error value to return if the condition is False. |
required |
Returns:
| Type | Description |
|---|---|
Result[None, E]
|
Ok(None) if True, Err(error) if False. |
Examples:
Source code in src/result/combinators.py
flow
Sequential pipeline orchestrator. Pipes data through fallible functions.
Equivalent to chaining .and_then() repeatedly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial
|
Any
|
The starting value. |
required |
*funcs
|
Callable[[Any], Result[Any, Any]]
|
Multiple functions that take a value and return a Result. |
()
|
Returns:
| Type | Description |
|---|---|
Result[Any, Any]
|
The final Result of the pipeline, or the first Err encountered. |
Examples:
Source code in src/result/combinators.py
gather_outcomes
async
gather_outcomes(
*coroutines: Awaitable[Outcome[T, Any]],
panic: bool = True,
) -> Any
Concurrently await multiple Outcome-returning tasks and merge them.
Ensures that every task completes and all diagnostics are gathered.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*coroutines
|
Awaitable[Outcome[T, Any]]
|
Async tasks returning Outcomes. |
()
|
panic
|
bool
|
If True (default), raises TypeError on contract violations. If False, returns Outcome([], TypeError). |
True
|
Returns:
| Type | Description |
|---|---|
Any
|
A master Outcome containing all values and combined errors. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If a task returns a non-Outcome type and panic is True. |
Source code in src/result/combinators.py
gather_results
async
gather_results(
*coroutines: Awaitable[Result[T, E]],
cancel_on_err: bool = True,
panic: bool = True,
) -> Any
Monadic 'All-or-Nothing' async concurrency.
Runs multiple Result-returning tasks concurrently. Resolves to the first Err encountered, or Ok(list) if all succeed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*coroutines
|
Awaitable[Result[T, E]]
|
Async tasks that return a Result. |
()
|
cancel_on_err
|
bool
|
If True, cancels all pending tasks when an Err is encountered. |
True
|
panic
|
bool
|
If True (default), raises TypeError on contract violations. If False, returns Err(TypeError). |
True
|
Returns:
| Type | Description |
|---|---|
Any
|
The first Err found, or Ok containing a list of all results in order. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If a task returns a non-Result type and panic is True. |
Examples:
>>> async def worker(n):
... return Ok(n * 2)
>>> await gather_results(worker(1), worker(2))
Ok([2, 4])
Source code in src/result/combinators.py
partition_exceptions
Partition a mixed iterable of values and Python Exceptions into Oks and Errs.
Ideal for handling raw results from asyncio.gather(..., return_exceptions=True).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T | E]
|
An iterable containing either values of type T or Exceptions of type E. |
required |
Returns:
| Type | Description |
|---|---|
tuple[list[Ok[T]], list[Err[E]]]
|
A tuple of (ok_variants, err_variants). |
Examples:
>>> items = [1, ValueError("fail"), 2]
>>> oks, errs = partition_exceptions(items)
>>> oks[0]
Ok(1)
>>> errs[0]
Err(ValueError('fail'))
Source code in src/result/combinators.py
partition_map
partition_map(
items: Iterable[T],
func: Callable[[T], Result[U, E]],
) -> tuple[list[tuple[T, U]], list[tuple[T, E]]]
Map a fallible function over an iterable and partition into successes and failures.
Unlike partition_results, this preserves the original input item alongside
the result, making it easy to identify which inputs failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T]
|
An iterable of inputs. |
required |
func
|
Callable[[T], Result[U, E]]
|
A function returning a Result for each input. |
required |
Returns:
| Type | Description |
|---|---|
tuple[list[tuple[T, U]], list[tuple[T, E]]]
|
A tuple of (ok_pairs, err_pairs) where each pair is (original_item, value_or_error). |
Examples:
>>> def parse(s: str) -> Result[int, str]:
... return Ok(int(s)) if s.isdigit() else Err(f"not a digit: {s}")
>>> oks, errs = partition_map(["1", "a", "2"], parse)
>>> oks
[('1', 1), ('2', 2)]
>>> errs
[('a', 'not a digit: a')]
Source code in src/result/combinators.py
partition_results
partition_results(
results: Iterable[Result[T, E]],
) -> tuple[list[T], list[E]]
Procedurally slice a completed batch of Results into values and errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
results
|
Iterable[Result[T, E]]
|
An iterable of Results. |
required |
Returns:
| Type | Description |
|---|---|
tuple[list[T], list[E]]
|
A tuple of (values, errors). |
Examples:
Source code in src/result/combinators.py
succeeds
succeeds(results: Iterable[Result[T, E]]) -> list[T]
Filter out all Errs and return a list of all success values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
results
|
Iterable[Result[T, E]]
|
An iterable of Results. |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
A list of all values contained in Ok variants. |
Examples:
Source code in src/result/combinators.py
traverse
Map a fallible function over an iterable, short-circuiting on the first error.
Ideal for bulk processing where any single failure invalidates the batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T]
|
An iterable of items to process. |
required |
func
|
Callable[[T], Result[U, E]]
|
A function returning a Result for each item. |
required |
Returns:
| Type | Description |
|---|---|
Result[list[U], E]
|
Ok(list) of all transformed values, or the first Err encountered. |
Examples:
>>> # Short-circuits on '!'
>>> traverse(["1", "!", "2"], lambda x: Ok(int(x)) if x.isdigit() else Err(f"bad: {x}"))
Err('bad: !')
Source code in src/result/combinators.py
traverse_async
async
traverse_async(
items: Iterable[T],
func: Callable[[T], Awaitable[Result[U, E]]],
*,
limit: int | None = None,
cancel_on_err: bool = True,
panic: bool = True,
) -> Any
Concurrently map a fallible async function over an iterable.
Short-circuits on the first error. Includes an optional semaphore limit to prevent resource flooding.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T]
|
An iterable of inputs. |
required |
func
|
Callable[[T], Awaitable[Result[U, E]]]
|
An async function returning a Result. |
required |
limit
|
int | None
|
Optional concurrency limit (semaphore). |
None
|
cancel_on_err
|
bool
|
If True, cancels pending tasks on first error. |
True
|
panic
|
bool
|
If True (default), raises TypeError on contract violations. |
True
|
Returns:
| Type | Description |
|---|---|
Any
|
Ok(list) of transformed values, or the first Err found. |
Source code in src/result/combinators.py
traverse_async_outcome
async
traverse_async_outcome(
items: Iterable[T],
func: Callable[[T], Awaitable[Outcome[U, Any]]],
*,
panic: bool = True,
) -> Any
Concurrent map-reduce for fault-tolerant workloads.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T]
|
An iterable of inputs. |
required |
func
|
Callable[[T], Awaitable[Outcome[U, Any]]]
|
An async function returning an Outcome. |
required |
panic
|
bool
|
If True (default), raises TypeError on contract violations. |
True
|
Returns:
| Type | Description |
|---|---|
Any
|
A master Outcome of the batch. |
Source code in src/result/combinators.py
try_fold
Fallible reduction (fold). Short-circuits on the first error.
Useful for building state (like a symbol table) from a list of operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterable[T]
|
An iterable of items to reduce. |
required |
initial
|
U
|
The starting accumulator value. |
required |
func
|
Callable[[U, T], Result[U, E]]
|
A function taking (accumulator, item) and returning a Result. |
required |
Returns:
| Type | Description |
|---|---|
Result[U, E]
|
The final accumulator value wrapped in Ok, or the first Err encountered. |
Examples:
>>> def build_path(acc, part):
... return Ok(f"{acc}/{part}") if part else Err("empty part")
>>> try_fold(["usr", "", "bin"], "", build_path)
Err('empty part')
Source code in src/result/combinators.py
validate
Accumulate all errors from multiple results, or return a tuple of all values.
Unlike standard monadic chaining (.and_then), this does not short-circuit. It is the 'Applicative' alternative to fail-fast logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*results
|
Result[Any, Any]
|
Multiple Result instances to validate. |
()
|
Returns:
| Type | Description |
|---|---|
Result[tuple[Any, ...], list[Any]]
|
Ok(tuple) if all are Ok, otherwise Err(list) containing all failures. |
Examples:
>>> # Accumulates multiple errors
>>> validate(Ok(1), Err("fail1"), Err("fail2"))
Err(['fail1', 'fail2'])
Source code in src/result/combinators.py
validate_async
async
validate_async(
*coroutines: Awaitable[Result[T, E]],
panic: bool = True,
) -> Any
Applicative async concurrency.
Waits for ALL tasks to finish. If any failed, returns Err containing a list of ALL accumulated errors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*coroutines
|
Awaitable[Result[T, E]]
|
Async tasks that return a Result. |
()
|
panic
|
bool
|
If True (default), raises TypeError on contract violations. If False, returns Err([TypeError]). |
True
|
Returns:
| Type | Description |
|---|---|
Any
|
Ok(list) if all succeed, or Err(list) containing all errors. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If a task returns a non-Result type and panic is True. |
Examples:
>>> async def fail(msg):
... return Err(msg)
>>> await validate_async(fail("e1"), fail("e2"))
Err(['e1', 'e2'])