Benchmarks: eager vs lazy and backends¶
This notebook benchmarks functional_list across:
- Eager mode:
ListMapper(...).map(...).filter(...) - Lazy mode:
ListMapper(...).lazy().map(...).filter(...).collect(...)
Backends:
SerialBackend(default)LocalBackend(threads/processes)AsyncBackend(for async functions; mostly I/O-bound workloads)RayBackend(optional dependency)DaskBackend(optional dependency)
Notes¶
- For CPU-bound work, threads may not help due to the GIL; prefer
processesor distributed backends. - Ray/Dask are optional; cells will auto-skip if they aren't installed.
AsyncBackendcannot run inside an active Jupyter event loop viaasyncio.run. This notebook benchmarks theAsyncBackendby spawning a fresh Python subprocess.
In [1]:
Copied!
import os
import statistics
import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Callable, List
from functional_list import ListMapper
from functional_list.backend import LocalBackend, SerialBackend
try:
from functional_list.backend import RayBackend
except Exception:
RayBackend = None
try:
from functional_list.backend import DaskBackend
except Exception:
DaskBackend = None
DATA_SIZE = int(os.environ.get('BENCH_DATA_SIZE', '200_000'))
N_RUNS = int(os.environ.get('BENCH_RUNS', '5'))
WARMUP = int(os.environ.get('BENCH_WARMUP', '1'))
print('Python:', sys.version)
print('DATA_SIZE:', DATA_SIZE, 'N_RUNS:', N_RUNS)
import os
import statistics
import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Callable, List
from functional_list import ListMapper
from functional_list.backend import LocalBackend, SerialBackend
try:
from functional_list.backend import RayBackend
except Exception:
RayBackend = None
try:
from functional_list.backend import DaskBackend
except Exception:
DaskBackend = None
DATA_SIZE = int(os.environ.get('BENCH_DATA_SIZE', '200_000'))
N_RUNS = int(os.environ.get('BENCH_RUNS', '5'))
WARMUP = int(os.environ.get('BENCH_WARMUP', '1'))
print('Python:', sys.version)
print('DATA_SIZE:', DATA_SIZE, 'N_RUNS:', N_RUNS)
2026-01-09 21:06:04,742 INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
Python: 3.12.3 (main, Nov 6 2025, 13:44:16) [GCC 13.3.0] DATA_SIZE: 200000 N_RUNS: 5
Benchmark helpers¶
In [2]:
Copied!
@dataclass
class BenchResult:
name: str
seconds: List[float]
@property
def mean(self) -> float:
return statistics.mean(self.seconds)
@property
def stdev(self) -> float:
return statistics.pstdev(self.seconds)
def timeit(fn: Callable[[], object], *, runs: int, warmup: int = 1) -> List[float]:
for _ in range(warmup):
_ = fn()
out: List[float] = []
for _ in range(runs):
t0 = time.perf_counter()
_ = fn()
out.append(time.perf_counter() - t0)
return out
def summarize(results: List[BenchResult]) -> None:
width = max(len(r.name) for r in results)
print(f"{'name':<{width}} mean(s) stdev(s) runs")
for r in sorted(results, key=lambda x: x.mean):
print(f"{r.name:<{width}} {r.mean:8.4f} {r.stdev:8.4f} {len(r.seconds)}")
@dataclass
class BenchResult:
name: str
seconds: List[float]
@property
def mean(self) -> float:
return statistics.mean(self.seconds)
@property
def stdev(self) -> float:
return statistics.pstdev(self.seconds)
def timeit(fn: Callable[[], object], *, runs: int, warmup: int = 1) -> List[float]:
for _ in range(warmup):
_ = fn()
out: List[float] = []
for _ in range(runs):
t0 = time.perf_counter()
_ = fn()
out.append(time.perf_counter() - t0)
return out
def summarize(results: List[BenchResult]) -> None:
width = max(len(r.name) for r in results)
print(f"{'name':<{width}} mean(s) stdev(s) runs")
for r in sorted(results, key=lambda x: x.mean):
print(f"{r.name:<{width}} {r.mean:8.4f} {r.stdev:8.4f} {len(r.seconds)}")
Workload definition¶
We benchmark a simple map+filter pipeline that keeps the output size stable but not too small.
- map:
x * 3 + 1 - filter: keep only values divisible by 3
In [3]:
Copied!
def map_fn(x: int) -> int:
return x * 3 + 1
def pred_fn(x: int) -> bool:
return (x % 3) == 0
data = list(range(DATA_SIZE))
print('data[0:5]=', data[:5])
def map_fn(x: int) -> int:
return x * 3 + 1
def pred_fn(x: int) -> bool:
return (x % 3) == 0
data = list(range(DATA_SIZE))
print('data[0:5]=', data[:5])
data[0:5]= [0, 1, 2, 3, 4]
Eager mode (ListMapper)¶
In eager mode, each step materializes an intermediate list.
In [4]:
Copied!
def eager_pipeline(backend=None) -> int:
lm = ListMapper(*data)
out = lm.map(map_fn, backend=backend).filter(pred_fn, backend=backend)
return len(out.to_list())
def eager_pipeline(backend=None) -> int:
lm = ListMapper(*data)
out = lm.map(map_fn, backend=backend).filter(pred_fn, backend=backend)
return len(out.to_list())
In [5]:
Copied!
results: List[BenchResult] = [
BenchResult('eager/serial', timeit(lambda: eager_pipeline(SerialBackend()), runs=N_RUNS, warmup=WARMUP)),
BenchResult('eager/local[threads]',
timeit(lambda: eager_pipeline(LocalBackend(mode='threads')), runs=N_RUNS, warmup=WARMUP)),
BenchResult('eager/local[processes]',
timeit(lambda: eager_pipeline(LocalBackend(mode='processes')), runs=N_RUNS, warmup=0))]
summarize(results)
results: List[BenchResult] = [
BenchResult('eager/serial', timeit(lambda: eager_pipeline(SerialBackend()), runs=N_RUNS, warmup=WARMUP)),
BenchResult('eager/local[threads]',
timeit(lambda: eager_pipeline(LocalBackend(mode='threads')), runs=N_RUNS, warmup=WARMUP)),
BenchResult('eager/local[processes]',
timeit(lambda: eager_pipeline(LocalBackend(mode='processes')), runs=N_RUNS, warmup=0))]
summarize(results)
name mean(s) stdev(s) runs eager/serial 0.0466 0.0118 5 eager/local[threads] 8.2720 0.3542 5 eager/local[processes] 52.1400 0.4650 5
Lazy mode (LazyListMapper)¶
Lazy mode records the plan and only materializes at collect() / to_list().
When you pass a backend to collect(), the plan is executed via that backend during materialization.
In [5]:
Copied!
def lazy_pipeline(backend=None) -> int:
lazy = ListMapper(*data).lazy().map(map_fn).filter(pred_fn)
out = lazy.collect(backend=backend)
return len(out.to_list())
def lazy_pipeline(backend=None) -> int:
lazy = ListMapper(*data).lazy().map(map_fn).filter(pred_fn)
out = lazy.collect(backend=backend)
return len(out.to_list())
In [6]:
Copied!
lazy_results: List[BenchResult] = [
BenchResult('lazy/serial', timeit(lambda: lazy_pipeline(SerialBackend()), runs=N_RUNS, warmup=WARMUP)),
BenchResult('lazy/local[threads]',
timeit(lambda: lazy_pipeline(LocalBackend(mode='threads')), runs=N_RUNS,
warmup=WARMUP)), BenchResult('lazy/local[processes]',
timeit(lambda: lazy_pipeline(LocalBackend(mode='processes')),
runs=N_RUNS, warmup=0))]
summarize(lazy_results)
lazy_results: List[BenchResult] = [
BenchResult('lazy/serial', timeit(lambda: lazy_pipeline(SerialBackend()), runs=N_RUNS, warmup=WARMUP)),
BenchResult('lazy/local[threads]',
timeit(lambda: lazy_pipeline(LocalBackend(mode='threads')), runs=N_RUNS,
warmup=WARMUP)), BenchResult('lazy/local[processes]',
timeit(lambda: lazy_pipeline(LocalBackend(mode='processes')),
runs=N_RUNS, warmup=0))]
summarize(lazy_results)
name mean(s) stdev(s) runs lazy/serial 0.0297 0.0032 5 lazy/local[threads] 8.6404 0.4340 5 lazy/local[processes] 53.0344 0.3797 5
In [8]:
Copied!
if RayBackend is None:
print('RayBackend not available (ray not installed). Skipping.')
else:
ray_backend = RayBackend(address=None)
ray_results: List[BenchResult] = [
BenchResult('eager/ray', timeit(lambda: eager_pipeline(ray_backend), runs=max(1, N_RUNS // 2), warmup=0)),
BenchResult('lazy/ray', timeit(lambda: lazy_pipeline(ray_backend), runs=max(1, N_RUNS // 2), warmup=0))]
summarize(ray_results)
if RayBackend is None:
print('RayBackend not available (ray not installed). Skipping.')
else:
ray_backend = RayBackend(address=None)
ray_results: List[BenchResult] = [
BenchResult('eager/ray', timeit(lambda: eager_pipeline(ray_backend), runs=max(1, N_RUNS // 2), warmup=0)),
BenchResult('lazy/ray', timeit(lambda: lazy_pipeline(ray_backend), runs=max(1, N_RUNS // 2), warmup=0))]
summarize(ray_results)
2026-01-09 15:28:11,590 INFO worker.py:2007 -- Started a local Ray instance. /home/andri/OtherProject/list-function-python-project/.venv/lib/python3.12/site-packages/ray/_private/worker.py:2046: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0 warnings.warn( (pid=gcs_server) [2026-01-09 15:28:40,106 E 39536 39536] (gcs_server) gcs_server.cc:303: Failed to establish connection to the event+metrics exporter agent. Events and metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 (raylet) [2026-01-09 15:28:41,527 E 39700 39700] (raylet) main.cc:1032: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 (_apply pid=39771) [2026-01-09 15:28:42,138 E 39771 39880] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 [2026-01-09 15:28:42,921 E 38971 39770] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 (_a pid=39782) pply (_apply pid=39773) [2026-01-09 15:28:42,629 E 39773 40928] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14 [repeated 21x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
name mean(s) stdev(s) runs lazy/ray 86.9034 0.5679 2 eager/ray 92.9674 2.3085 2
In [ ]:
Copied!
if DaskBackend is None:
print('DaskBackend not available (dask not installed). Skipping.')
else:
dask_backend = DaskBackend(scheduler=None)
dask_results: List[BenchResult] = [
BenchResult('eager/dask', timeit(lambda: eager_pipeline(dask_backend), runs=max(1, N_RUNS // 2), warmup=0)),
BenchResult('lazy/dask', timeit(lambda: lazy_pipeline(dask_backend), runs=max(1, N_RUNS // 2), warmup=0))]
summarize(dask_results)
if DaskBackend is None:
print('DaskBackend not available (dask not installed). Skipping.')
else:
dask_backend = DaskBackend(scheduler=None)
dask_results: List[BenchResult] = [
BenchResult('eager/dask', timeit(lambda: eager_pipeline(dask_backend), runs=max(1, N_RUNS // 2), warmup=0)),
BenchResult('lazy/dask', timeit(lambda: lazy_pipeline(dask_backend), runs=max(1, N_RUNS // 2), warmup=0))]
summarize(dask_results)
Async backend benchmark (subprocess)¶
AsyncBackend uses asyncio.run(...) internally. In Jupyter, an event loop is already running, so calling it in-process can fail.
This cell benchmarks AsyncBackend by running a small script in a fresh Python process.
In [ ]:
Copied!
async_script = r'''
import os, time, statistics
from functional_list import ListMapper
from functional_list.backend import AsyncBackend
DATA_SIZE = int(os.environ.get('BENCH_DATA_SIZE', '20_000'))
RUNS = int(os.environ.get('BENCH_RUNS', '5'))
data = list(range(DATA_SIZE))
async def map_fn(x: int) -> int:
# simulate tiny IO
import asyncio
await asyncio.sleep(0)
return x + 1
async def pred_fn(x: int) -> bool:
import asyncio
await asyncio.sleep(0)
return (x % 2) == 0
backend = AsyncBackend(concurrency=500)
def eager_async():
lm = ListMapper(*data)
out = lm.map(map_fn, backend=backend).filter(pred_fn, backend=backend)
return len(out.to_list())
def bench(fn):
times = []
for _ in range(RUNS):
t0 = time.perf_counter()
fn()
times.append(time.perf_counter() - t0)
return times
times = bench(eager_async)
print('async/eager mean(s)=', statistics.mean(times), 'stdev=', statistics.pstdev(times), 'runs=', len(times))
'''
env = dict(os.environ)
env.setdefault('BENCH_DATA_SIZE', str(min(DATA_SIZE, 20_000)))
env.setdefault('BENCH_RUNS', str(max(3, N_RUNS)))
proc = subprocess.run([sys.executable, '-c', async_script], env=env, capture_output=True, text=True)
print(proc.stdout)
if proc.returncode != 0:
print(proc.stderr)
raise RuntimeError('Async subprocess failed')
async_script = r'''
import os, time, statistics
from functional_list import ListMapper
from functional_list.backend import AsyncBackend
DATA_SIZE = int(os.environ.get('BENCH_DATA_SIZE', '20_000'))
RUNS = int(os.environ.get('BENCH_RUNS', '5'))
data = list(range(DATA_SIZE))
async def map_fn(x: int) -> int:
# simulate tiny IO
import asyncio
await asyncio.sleep(0)
return x + 1
async def pred_fn(x: int) -> bool:
import asyncio
await asyncio.sleep(0)
return (x % 2) == 0
backend = AsyncBackend(concurrency=500)
def eager_async():
lm = ListMapper(*data)
out = lm.map(map_fn, backend=backend).filter(pred_fn, backend=backend)
return len(out.to_list())
def bench(fn):
times = []
for _ in range(RUNS):
t0 = time.perf_counter()
fn()
times.append(time.perf_counter() - t0)
return times
times = bench(eager_async)
print('async/eager mean(s)=', statistics.mean(times), 'stdev=', statistics.pstdev(times), 'runs=', len(times))
'''
env = dict(os.environ)
env.setdefault('BENCH_DATA_SIZE', str(min(DATA_SIZE, 20_000)))
env.setdefault('BENCH_RUNS', str(max(3, N_RUNS)))
proc = subprocess.run([sys.executable, '-c', async_script], env=env, capture_output=True, text=True)
print(proc.stdout)
if proc.returncode != 0:
print(proc.stderr)
raise RuntimeError('Async subprocess failed')