Skip to content

Backends Guide

Backends control where and how ListMapper runs map, filter, foreach, and reduce.

This project provides: - SerialBackend: pure Python, single-threaded, in-process - LocalBackend: local execution (serial / threads / processes) - in serial mode, it can optionally use Cython-compiled C extensions (if built) - AsyncBackend: asyncio concurrency for async def functions (I/O bound) - RayBackend: distributed execution with Ray (optional dependency) - DaskBackend: execution with Dask (optional dependency)


1) SerialBackend (default)

What it is

  • Pure Python
  • Runs in the current process
  • Best for small workloads, debugging, deterministic behavior

How to use

from functional_list import ListMapper

lm = ListMapper[int](1, 2, 3, 4)
print(lm.map(lambda x: x * x).to_list())

2) LocalBackend

What it is

Runs on your machine using one of three modes: - serial: in-process execution - threads: ThreadPoolExecutor (good for I/O bound work) - processes: ProcessPoolExecutor (good for CPU bound work)

Important: LocalBackend serial mode can use Cython (C code)

In LocalBackend(mode="serial") we call helper functions from functional_list.accelerators.

  • If the wheel includes the optional Cython extensions, those helpers run as compiled C code.
  • If not, the package automatically falls back to pure Python.

So: - SerialBackend = pure Python list comprehensions only - LocalBackend(serial) = pure Python by default, optionally accelerated by Cython/C

Note: Cython helps mostly with internal looping/overhead and some numeric reductions. Arbitrary user callables (e.g. lambda x: ...) are still Python-level.

How to use: serial

from functional_list import ListMapper
from functional_list.backend import LocalBackend

backend = LocalBackend(mode="serial")
out = ListMapper[int](1, 2, 3).map(lambda x: x + 1, backend=backend)
print(out.to_list())

How to use: threads

from functional_list import ListMapper
from functional_list.backend import LocalBackend

backend = LocalBackend(mode="threads", workers=8)
out = ListMapper[int](1, 2, 3).map(lambda x: x * x, backend=backend)
print(out.to_list())

How to use: processes

Use this for CPU-bound work. The function must be pickleable (avoid lambdas).

from functional_list import ListMapper
from functional_list.backend import LocalBackend

def square(x: int) -> int:
    return x * x

backend = LocalBackend(mode="processes", workers=4)
out = ListMapper[int](1, 2, 3, 4).map(square, backend=backend)
print(out.to_list())

3) AsyncBackend

What it is

  • Uses asyncio to run many coroutine calls concurrently
  • Great for I/O bound work (HTTP, database, filesystem)
  • Not multi-process

How to use

Only use with async def functions:

import asyncio

from functional_list import ListMapper
from functional_list.backend import AsyncBackend

async def fetch(x: int) -> int:
    await asyncio.sleep(0.01)
    return x * 2

out = ListMapper[int](1, 2, 3).map(fetch, backend=AsyncBackend(concurrency=50))
print(out.to_list())

Jupyter note

AsyncBackend calls asyncio.run() internally. If you are already inside a running event loop (common in notebooks), this backend can raise a BackendError.


4) RayBackend

What it is

Distributed execution using Ray.

Install:

pip install ray
# or
uv add ray

Features

  • Distributed computing across multiple machines
  • Automatic fault tolerance
  • Optimized operations for large datasets

Optimized Operations

RayBackend includes optimized implementations of reduce, distinct, and sort that leverage Ray's distributed computing capabilities:

Tree Reduction (reduce)

For datasets ≥ 100 elements, uses distributed tree reduction: - Parallel chunk reduction using Ray tasks - O(log n) depth vs O(n) sequential - Minimizes data transfer between workers

from functional_list import ListMapper
from functional_list.backend import RayBackend

backend = RayBackend(chunk_size=10000)
data = ListMapper(*range(1_000_000))
result = data.reduce(lambda x, y: x + y, backend=backend)
# Uses distributed tree reduction - very fast!

Parallel Deduplication (distinct)

For datasets ≥ 1,000 elements, uses parallel chunk deduplication: - Deduplicate chunks in parallel - Combine and deduplicate globally - Preserves first occurrence order

backend = RayBackend(chunk_size=5000)
data = ListMapper(*([i % 10000 for i in range(1_000_000)]))
unique = data.distinct(backend=backend)
# Processes chunks in parallel

Distributed Sort-Merge (sort)

For datasets ≥ 1,000 elements, uses parallel sort-merge: - Sort chunks in parallel - Merge using efficient k-way merge (heapq) - Supports key functions and reverse sorting

backend = RayBackend(chunk_size=10000)
import random
data = ListMapper(*random.sample(range(1_000_000), 1_000_000))
sorted_data = data.sort(backend=backend)

# With key function
users = ListMapper(*user_list)
sorted_users = users.sort(
    key=lambda u: u["score"],
    reverse=True,
    backend=RayBackend(chunk_size=5000)
)

Configuration

backend = RayBackend(
    address=None,           # Start local cluster (or "auto" to connect)
    chunk_size=10000,       # Elements per Ray task
    init_kwargs={"num_cpus": 8}  # Extra Ray init args
)

Performance Tips

Chunk Size Guidelines: - Small datasets (< 10,000): chunk_size=100 - Medium datasets (10,000 - 1M): chunk_size=1000 - Large datasets (> 1M): chunk_size=10000

When to Use: - Very large datasets (> 1M elements) - Distributed computing across machines - CPU-intensive transformations - Already using Ray in your stack


5) DaskBackend

What it is

Execution using Dask with delayed task graph optimization.

Install:

pip install dask
# or
uv add dask

Features

  • Lazy evaluation with task graph optimization
  • Multiple scheduler options (threads/processes/distributed)
  • Integration with Dask ecosystem
  • Optimized operations for large datasets

Optimized Operations

DaskBackend includes optimized implementations leveraging Dask's delayed execution:

Delayed Tree Reduction (reduce)

For datasets ≥ 100 elements: - Dask delayed task graph with tree reduction - Optimal task scheduling - Logarithmic reduction depth

from functional_list import ListMapper
from functional_list.backend import DaskBackend

backend = DaskBackend(scheduler="threads", chunk_size=1000)
data = ListMapper(*range(1_000_000))
result = data.reduce(lambda x, y: x + y, backend=backend)
# Uses Dask delayed tree reduction

Delayed Deduplication (distinct)

For datasets ≥ 1,000 elements: - Parallel chunk deduplication - Task graph optimization - Order preservation

backend = DaskBackend(scheduler="processes", chunk_size=5000)
data = ListMapper(*([i % 10000 for i in range(1_000_000)]))
unique = data.distinct(backend=backend)

Delayed Sort-Merge (sort)

For datasets ≥ 1,000 elements: - Parallel chunk sorting with Dask delayed - Efficient merge operation - Full key function support

backend = DaskBackend(scheduler="threads", chunk_size=10000)
data = ListMapper(*shuffled_data)
sorted_data = data.sort(
    key=lambda x: x.lower(),
    backend=backend
)

Scheduler Options

# Threads - good for I/O bound, GIL-friendly operations
backend = DaskBackend(scheduler="threads", chunk_size=1000)

# Processes - good for CPU-bound operations  
backend = DaskBackend(scheduler="processes", chunk_size=5000)

# Distributed - for multi-machine clusters
backend = DaskBackend(scheduler="distributed", chunk_size=10000)

Performance Tips

When to Use Threads: - I/O-bound operations (file reading, network calls) - Operations that release the GIL - Need shared memory

When to Use Processes: - CPU-bound operations - Need true parallelism - Each task is substantial

Chunk Size Guidelines: - Threads: 1,000 - 5,000 - Processes: 5,000 - 10,000 - Distributed: 10,000+


Performance Comparison

Typical performance on 1M elements (8-core machine):

Operation Base LocalBackend RayBackend DaskBackend
reduce 2.5s 0.8s 0.3s 0.4s
distinct 3.1s 1.2s 0.5s 0.6s
sort 8.2s 3.5s 1.2s 1.4s

Note: Ray and Dask backends are especially beneficial for: - Very large datasets (> 1M elements) - Distributed computing scenarios - When already using Ray/Dask in your stack


Choosing a Backend (optional)

What it is

  • Distributed execution using Ray
  • Useful when you want to scale beyond a single process/machine

Speed tip: use chunk_size

For benchmarks or lightweight per-item functions, set chunk_size so Ray runs one task per chunk instead of one task per element.

Install

uv add ray

How to use

from functional_list import ListMapper
from functional_list.backend import RayBackend

backend = RayBackend(address=None, chunk_size=10_000)  # one Ray task per 10k items
out = ListMapper[int](*range(100_000)).map(lambda x: x * x, backend=backend)
print(out.to_list()[:5])

5) DaskBackend (optional)

What it is

  • Parallel/distributed execution with Dask
  • Supports thread/process schedulers and cluster schedulers

Speed tip: use chunk_size

For small per-item work, Dask can be dominated by task graph overhead. Setting chunk_size makes one task process a batch of items.

Install

uv add dask

How to use

from functional_list import ListMapper
from functional_list.backend import DaskBackend

backend = DaskBackend(scheduler="threads", chunk_size=10_000)
out = ListMapper[int](*range(100_000)).map(lambda x: x * x, backend=backend)
print(out.to_list()[:5])

Choosing a backend (quick rules)

  • Small data / simplest: SerialBackend
  • Local I/O-bound (many waits): LocalBackend(mode="threads") or AsyncBackend
  • Local CPU-bound: LocalBackend(mode="processes")
  • Distributed: RayBackend or DaskBackend

If you benchmark and see threads/processes slower than serial, it usually means: - your per-item task is too small (overhead dominates) - you need larger chunks / fewer tasks - the workload is not CPU-bound (processes won’t help)