Backends Guide¶
Backends control where and how ListMapper runs map, filter, foreach, and reduce.
This project provides:
- SerialBackend: pure Python, single-threaded, in-process
- LocalBackend: local execution (serial / threads / processes)
- in serial mode, it can optionally use Cython-compiled C extensions (if built)
- AsyncBackend: asyncio concurrency for async def functions (I/O bound)
- RayBackend: distributed execution with Ray (optional dependency)
- DaskBackend: execution with Dask (optional dependency)
1) SerialBackend (default)¶
What it is¶
- Pure Python
- Runs in the current process
- Best for small workloads, debugging, deterministic behavior
How to use¶
from functional_list import ListMapper
lm = ListMapper[int](1, 2, 3, 4)
print(lm.map(lambda x: x * x).to_list())
2) LocalBackend¶
What it is¶
Runs on your machine using one of three modes:
- serial: in-process execution
- threads: ThreadPoolExecutor (good for I/O bound work)
- processes: ProcessPoolExecutor (good for CPU bound work)
Important: LocalBackend serial mode can use Cython (C code)¶
In LocalBackend(mode="serial") we call helper functions from functional_list.accelerators.
- If the wheel includes the optional Cython extensions, those helpers run as compiled C code.
- If not, the package automatically falls back to pure Python.
So: - SerialBackend = pure Python list comprehensions only - LocalBackend(serial) = pure Python by default, optionally accelerated by Cython/C
Note: Cython helps mostly with internal looping/overhead and some numeric reductions. Arbitrary user callables (e.g.
lambda x: ...) are still Python-level.
How to use: serial¶
from functional_list import ListMapper
from functional_list.backend import LocalBackend
backend = LocalBackend(mode="serial")
out = ListMapper[int](1, 2, 3).map(lambda x: x + 1, backend=backend)
print(out.to_list())
How to use: threads¶
from functional_list import ListMapper
from functional_list.backend import LocalBackend
backend = LocalBackend(mode="threads", workers=8)
out = ListMapper[int](1, 2, 3).map(lambda x: x * x, backend=backend)
print(out.to_list())
How to use: processes¶
Use this for CPU-bound work. The function must be pickleable (avoid lambdas).
from functional_list import ListMapper
from functional_list.backend import LocalBackend
def square(x: int) -> int:
return x * x
backend = LocalBackend(mode="processes", workers=4)
out = ListMapper[int](1, 2, 3, 4).map(square, backend=backend)
print(out.to_list())
3) AsyncBackend¶
What it is¶
- Uses
asyncioto run many coroutine calls concurrently - Great for I/O bound work (HTTP, database, filesystem)
- Not multi-process
How to use¶
Only use with async def functions:
import asyncio
from functional_list import ListMapper
from functional_list.backend import AsyncBackend
async def fetch(x: int) -> int:
await asyncio.sleep(0.01)
return x * 2
out = ListMapper[int](1, 2, 3).map(fetch, backend=AsyncBackend(concurrency=50))
print(out.to_list())
Jupyter note¶
AsyncBackend calls asyncio.run() internally. If you are already inside a running
event loop (common in notebooks), this backend can raise a BackendError.
4) RayBackend¶
What it is¶
Distributed execution using Ray.
Install:
Features¶
- Distributed computing across multiple machines
- Automatic fault tolerance
- Optimized operations for large datasets
Optimized Operations¶
RayBackend includes optimized implementations of reduce, distinct, and sort that leverage Ray's distributed computing capabilities:
Tree Reduction (reduce)¶
For datasets ≥ 100 elements, uses distributed tree reduction: - Parallel chunk reduction using Ray tasks - O(log n) depth vs O(n) sequential - Minimizes data transfer between workers
from functional_list import ListMapper
from functional_list.backend import RayBackend
backend = RayBackend(chunk_size=10000)
data = ListMapper(*range(1_000_000))
result = data.reduce(lambda x, y: x + y, backend=backend)
# Uses distributed tree reduction - very fast!
Parallel Deduplication (distinct)¶
For datasets ≥ 1,000 elements, uses parallel chunk deduplication: - Deduplicate chunks in parallel - Combine and deduplicate globally - Preserves first occurrence order
backend = RayBackend(chunk_size=5000)
data = ListMapper(*([i % 10000 for i in range(1_000_000)]))
unique = data.distinct(backend=backend)
# Processes chunks in parallel
Distributed Sort-Merge (sort)¶
For datasets ≥ 1,000 elements, uses parallel sort-merge: - Sort chunks in parallel - Merge using efficient k-way merge (heapq) - Supports key functions and reverse sorting
backend = RayBackend(chunk_size=10000)
import random
data = ListMapper(*random.sample(range(1_000_000), 1_000_000))
sorted_data = data.sort(backend=backend)
# With key function
users = ListMapper(*user_list)
sorted_users = users.sort(
key=lambda u: u["score"],
reverse=True,
backend=RayBackend(chunk_size=5000)
)
Configuration¶
backend = RayBackend(
address=None, # Start local cluster (or "auto" to connect)
chunk_size=10000, # Elements per Ray task
init_kwargs={"num_cpus": 8} # Extra Ray init args
)
Performance Tips¶
Chunk Size Guidelines:
- Small datasets (< 10,000): chunk_size=100
- Medium datasets (10,000 - 1M): chunk_size=1000
- Large datasets (> 1M): chunk_size=10000
When to Use: - Very large datasets (> 1M elements) - Distributed computing across machines - CPU-intensive transformations - Already using Ray in your stack
5) DaskBackend¶
What it is¶
Execution using Dask with delayed task graph optimization.
Install:
Features¶
- Lazy evaluation with task graph optimization
- Multiple scheduler options (threads/processes/distributed)
- Integration with Dask ecosystem
- Optimized operations for large datasets
Optimized Operations¶
DaskBackend includes optimized implementations leveraging Dask's delayed execution:
Delayed Tree Reduction (reduce)¶
For datasets ≥ 100 elements: - Dask delayed task graph with tree reduction - Optimal task scheduling - Logarithmic reduction depth
from functional_list import ListMapper
from functional_list.backend import DaskBackend
backend = DaskBackend(scheduler="threads", chunk_size=1000)
data = ListMapper(*range(1_000_000))
result = data.reduce(lambda x, y: x + y, backend=backend)
# Uses Dask delayed tree reduction
Delayed Deduplication (distinct)¶
For datasets ≥ 1,000 elements: - Parallel chunk deduplication - Task graph optimization - Order preservation
backend = DaskBackend(scheduler="processes", chunk_size=5000)
data = ListMapper(*([i % 10000 for i in range(1_000_000)]))
unique = data.distinct(backend=backend)
Delayed Sort-Merge (sort)¶
For datasets ≥ 1,000 elements: - Parallel chunk sorting with Dask delayed - Efficient merge operation - Full key function support
backend = DaskBackend(scheduler="threads", chunk_size=10000)
data = ListMapper(*shuffled_data)
sorted_data = data.sort(
key=lambda x: x.lower(),
backend=backend
)
Scheduler Options¶
# Threads - good for I/O bound, GIL-friendly operations
backend = DaskBackend(scheduler="threads", chunk_size=1000)
# Processes - good for CPU-bound operations
backend = DaskBackend(scheduler="processes", chunk_size=5000)
# Distributed - for multi-machine clusters
backend = DaskBackend(scheduler="distributed", chunk_size=10000)
Performance Tips¶
When to Use Threads: - I/O-bound operations (file reading, network calls) - Operations that release the GIL - Need shared memory
When to Use Processes: - CPU-bound operations - Need true parallelism - Each task is substantial
Chunk Size Guidelines: - Threads: 1,000 - 5,000 - Processes: 5,000 - 10,000 - Distributed: 10,000+
Performance Comparison¶
Typical performance on 1M elements (8-core machine):
| Operation | Base | LocalBackend | RayBackend | DaskBackend |
|---|---|---|---|---|
reduce |
2.5s | 0.8s | 0.3s | 0.4s |
distinct |
3.1s | 1.2s | 0.5s | 0.6s |
sort |
8.2s | 3.5s | 1.2s | 1.4s |
Note: Ray and Dask backends are especially beneficial for: - Very large datasets (> 1M elements) - Distributed computing scenarios - When already using Ray/Dask in your stack
Choosing a Backend (optional)¶
What it is¶
- Distributed execution using Ray
- Useful when you want to scale beyond a single process/machine
Speed tip: use chunk_size¶
For benchmarks or lightweight per-item functions, set chunk_size so Ray runs
one task per chunk instead of one task per element.
Install¶
How to use¶
from functional_list import ListMapper
from functional_list.backend import RayBackend
backend = RayBackend(address=None, chunk_size=10_000) # one Ray task per 10k items
out = ListMapper[int](*range(100_000)).map(lambda x: x * x, backend=backend)
print(out.to_list()[:5])
5) DaskBackend (optional)¶
What it is¶
- Parallel/distributed execution with Dask
- Supports thread/process schedulers and cluster schedulers
Speed tip: use chunk_size¶
For small per-item work, Dask can be dominated by task graph overhead.
Setting chunk_size makes one task process a batch of items.
Install¶
How to use¶
from functional_list import ListMapper
from functional_list.backend import DaskBackend
backend = DaskBackend(scheduler="threads", chunk_size=10_000)
out = ListMapper[int](*range(100_000)).map(lambda x: x * x, backend=backend)
print(out.to_list()[:5])
Choosing a backend (quick rules)¶
- Small data / simplest:
SerialBackend - Local I/O-bound (many waits):
LocalBackend(mode="threads")orAsyncBackend - Local CPU-bound:
LocalBackend(mode="processes") - Distributed:
RayBackendorDaskBackend
If you benchmark and see threads/processes slower than serial, it usually means: - your per-item task is too small (overhead dominates) - you need larger chunks / fewer tasks - the workload is not CPU-bound (processes won’t help)