Skip to content

File I/O Operations Guide

This guide covers all file I/O operations in functional_list, including reading from and writing to various file formats.

📋 Supported Formats Overview

Format Read Method Write Method Requires Best For
CSV from_csv() to_csv() Built-in Tabular data
JSON from_json() to_json() Built-in Structured data
JSONL from_jsonl() to_jsonl() Built-in Streaming logs, large JSON
Parquet from_parquet() to_parquet() pyarrow Big data, columnar storage
Text from_text() to_text() Built-in Plain text, logs

📦 Installation

Basic I/O (CSV, JSON, JSONL, Text)

These formats work out of the box - no extra dependencies needed:

pip install functional-list

Parquet Support

For Parquet files, install PyArrow:

# Install with Parquet support
pip install functional-list[io]

# Or install PyArrow separately
pip install pyarrow

📖 Reading Files

CSV Files

Read CSV files with flexible options:

Basic CSV Reading

from functional_list import ListMapper

# Simple read (each row is a list)
rows = ListMapper.from_csv("data.csv")
print(rows[0])  # ['column1_value', 'column2_value', ...]

CSV with Options

from functional_list import ListMapper
from functional_list.io import CSVReadOptions

# Skip header row
rows = ListMapper.from_csv(
    "users.csv",
    options=CSVReadOptions(skip_header=True)
)

# Custom delimiter
rows = ListMapper.from_csv(
    "data.tsv",
    options=CSVReadOptions(delimiter="\t")
)

# Custom encoding
rows = ListMapper.from_csv(
    "international.csv",
    options=CSVReadOptions(encoding="utf-8")
)

Transforming CSV Rows

Transform rows to dictionaries for easier access:

from functional_list import ListMapper
from functional_list.io import CSVReadOptions

# Transform to dictionaries
users = ListMapper.from_csv(
    "users.csv",
    options=CSVReadOptions(skip_header=True),
    transform=lambda row: {
        "name": row[0],
        "age": int(row[1]),
        "email": row[2],
        "city": row[3]
    }
)

# Now you can easily filter and map
adults = users.filter(lambda u: u["age"] >= 18)
names = adults.map(lambda u: u["name"])

CSVReadOptions Reference

from functional_list.io import CSVReadOptions

options = CSVReadOptions(
    skip_header=True,      # Skip first row (default: False)
    delimiter=",",         # Column delimiter (default: ",")
    encoding="utf-8"       # File encoding (default: "utf-8")
)

JSON Files

Read JSON files containing arrays or single objects:

Basic JSON Reading

from functional_list import ListMapper

# Read JSON array
# File: [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
users = ListMapper.from_json("users.json")
print(users[0])  # {"name": "Alice", "age": 30}

# Read single JSON object (becomes 1-element list)
# File: {"status": "ok", "data": [...]}
response = ListMapper.from_json("response.json")
print(len(response))  # 1

Processing JSON Data

from functional_list import ListMapper

# Load and process
users = ListMapper.from_json("users.json")

# Extract fields
names = users.map(lambda u: u["name"])

# Filter and transform
adults = (
    users
    .filter(lambda u: u.get("age", 0) >= 18)
    .map(lambda u: {
        "name": u["name"],
        "email": u.get("email", "N/A")
    })
)

JSONL (JSON Lines) Files

JSONL format has one JSON object per line - perfect for logs and streaming data:

Basic JSONL Reading

from functional_list import ListMapper

# Each line is a separate JSON object
# File:
# {"timestamp": "2024-01-01", "level": "INFO", "message": "Started"}
# {"timestamp": "2024-01-01", "level": "ERROR", "message": "Failed"}
# {"timestamp": "2024-01-01", "level": "INFO", "message": "Completed"}

events = ListMapper.from_jsonl("events.jsonl")
print(events[0])  # {"timestamp": "2024-01-01", "level": "INFO", ...}

Processing Log Files

from functional_list import ListMapper

# Load JSONL log file
logs = ListMapper.from_jsonl("application.log")

# Find all errors
errors = logs.filter(lambda e: e.get("level") == "ERROR")

# Group by error type
by_type = errors.group_by(lambda e: e.get("error_type", "unknown"))

# Get error messages
messages = errors.map(lambda e: e["message"])

Why Use JSONL?

  • Streaming-friendly: Process line by line
  • Append-safe: Add new entries without parsing entire file
  • Fault-tolerant: One bad line doesn't corrupt the whole file
  • Perfect for logs: Each log entry is independent

Text Files

Read plain text files line by line:

Basic Text Reading

from functional_list import ListMapper

# Read all lines
lines = ListMapper.from_text("document.txt")
print(lines[0])  # First line as string

Text with Options

from functional_list import ListMapper
from functional_list.io import TextReadOptions

# Strip whitespace and skip empty lines
lines = ListMapper.from_text(
    "log.txt",
    options=TextReadOptions(
        strip_lines=True,   # Remove leading/trailing whitespace
        skip_empty=True     # Skip blank lines
    )
)

# Custom encoding
lines = ListMapper.from_text(
    "unicode.txt",
    options=TextReadOptions(encoding="utf-8")
)

Processing Text Files

from functional_list import ListMapper
from functional_list.io import TextReadOptions

# Read and process log file
log_lines = ListMapper.from_text(
    "app.log",
    options=TextReadOptions(strip_lines=True, skip_empty=True)
)

# Extract error lines
errors = log_lines.filter(lambda line: "ERROR" in line)

# Parse log format: "2024-01-01 12:00:00 ERROR message"
parsed = errors.map(lambda line: {
    "timestamp": line.split()[0] + " " + line.split()[1],
    "level": line.split()[2],
    "message": " ".join(line.split()[3:])
})

# Count errors by date
by_date = (
    parsed
    .map(lambda e: (e["timestamp"].split()[0], 1))
    .reduce_by_key(lambda x, y: x + y)
)

TextReadOptions Reference

from functional_list.io import TextReadOptions

options = TextReadOptions(
    strip_lines=True,      # Strip whitespace (default: False)
    skip_empty=True,       # Skip empty lines (default: False)
    encoding="utf-8"       # File encoding (default: "utf-8")
)

Parquet Files

Parquet is a columnar storage format, great for analytics and big data:

Basic Parquet Reading

from functional_list import ListMapper

# Read entire Parquet file
data = ListMapper.from_parquet("data.parquet")
print(data[0])  # Dictionary with all columns

Reading Specific Columns

from functional_list import ListMapper

# Read only specific columns (more efficient!)
users = ListMapper.from_parquet(
    "users.parquet",
    columns=["user_id", "name", "signup_date"]
)

# Each row is a dict with only specified columns
print(users[0])  # {"user_id": 1, "name": "Alice", "signup_date": "2024-01-01"}

Working with Large Parquet Files

from functional_list import ListMapper

# Load large file
data = ListMapper.from_parquet("large_dataset.parquet", columns=["id", "value"])

# Process efficiently with lazy evaluation
result = (
    data
    .lazy()                                    # Switch to lazy for memory efficiency
    .filter(lambda row: row["value"] > 1000)  # Filter
    .map(lambda row: row["id"])               # Extract IDs
    .distinct()                                # Get unique
    .collect()                                 # Materialize when needed
)

📝 Writing Files

Writing JSON

from functional_list import ListMapper

# Create data
users = ListMapper[dict](
    {"name": "Alice", "age": 30},
    {"name": "Bob", "age": 25},
    {"name": "Charlie", "age": 35}
)

# Write to JSON file
users.to_json("output.json")

# Result: [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}, ...]

Writing JSONL

from functional_list import ListMapper

# Create log entries
logs = ListMapper[dict](
    {"timestamp": "2024-01-01 10:00", "level": "INFO", "msg": "Started"},
    {"timestamp": "2024-01-01 10:01", "level": "ERROR", "msg": "Failed"},
    {"timestamp": "2024-01-01 10:02", "level": "INFO", "msg": "Retry"}
)

# Write to JSONL (one object per line)
logs.to_jsonl("events.log")

# Result (file content):
# {"timestamp": "2024-01-01 10:00", "level": "INFO", "msg": "Started"}
# {"timestamp": "2024-01-01 10:01", "level": "ERROR", "msg": "Failed"}
# {"timestamp": "2024-01-01 10:02", "level": "INFO", "msg": "Retry"}

Writing CSV

from functional_list import ListMapper

# If supported, write to CSV
data = ListMapper[list](
    ["Alice", "30", "alice@example.com"],
    ["Bob", "25", "bob@example.com"]
)

# Note: Check the API if to_csv() is available
# data.to_csv("output.csv")

🔄 Complete ETL Example

Here's a complete Extract-Transform-Load example:

from functional_list import ListMapper
from functional_list.io import CSVReadOptions, TextReadOptions

# Extract: Load from multiple sources
csv_users = ListMapper.from_csv(
    "users.csv",
    options=CSVReadOptions(skip_header=True),
    transform=lambda row: {
        "id": int(row[0]),
        "name": row[1],
        "email": row[2],
        "source": "csv"
    }
)

json_users = ListMapper.from_json("new_users.json").map(
    lambda u: {**u, "source": "json"}
)

# Transform: Combine and process
all_users = (
    csv_users
    .union(json_users)                              # Combine sources
    .distinct()                                     # Remove duplicates
    .filter(lambda u: u.get("email"))              # Only valid emails
    .map(lambda u: {
        **u,
        "email": u["email"].lower(),               # Normalize email
        "name": u["name"].strip().title()          # Normalize name
    })
    .sort(key=lambda u: u["id"])                   # Sort by ID
)

# Load: Save results
all_users.to_json("processed_users.json")
all_users.to_jsonl("processed_users.jsonl")

# Also create summary
summary = {
    "total": len(all_users),
    "sources": all_users.group_by(lambda u: u["source"])
}
ListMapper[dict](summary).to_json("summary.json")

🛠️ Advanced Patterns

Reading Large Files Lazily

For very large files, use lazy evaluation to avoid loading everything into memory:

from functional_list import ListMapper

# Read large file lazily
lazy_data = (
    ListMapper.from_parquet("huge_file.parquet")
    .lazy()                                    # Don't load all into memory
    .filter(lambda row: row["active"])         # Filter as we stream
    .map(lambda row: transform(row))           # Transform as we stream
)

# Only materialize top 1000 results
top_1000 = lazy_data.take(1000)

Batch Processing Multiple Files

from functional_list import ListMapper
import glob

# Find all CSV files
csv_files = glob.glob("data/*.csv")

# Process all files
all_data = ListMapper[str](*csv_files).flat_map(
    lambda filename: ListMapper.from_csv(filename)
)

# Now process combined data
result = all_data.filter(...).map(...)

Error Handling

from functional_list import ListMapper

def safe_parse(row):
    """Safely parse row, return None on error"""
    try:
        return {
            "id": int(row[0]),
            "value": float(row[1])
        }
    except (ValueError, IndexError):
        return None

# Read and filter out parse errors
data = (
    ListMapper.from_csv("data.csv")
    .map(safe_parse)
    .filter(lambda x: x is not None)  # Remove failed parses
)

💡 Best Practices

Choose the Right Format

  • CSV: Simple tabular data, human-readable
  • JSON: Structured data with nesting
  • JSONL: Logs, streaming data, append-only files
  • Parquet: Large datasets, analytics, columnar queries
  • Text: Logs, simple line-based data

Use Column Selection with Parquet

Only read columns you need to save memory and I/O:

# Good: Only load needed columns
data = ListMapper.from_parquet("big.parquet", columns=["id", "name"])

# Avoid: Loading all columns when you only need few
data = ListMapper.from_parquet("big.parquet")  # Loads everything

Process Large Files Lazily

Use lazy mode for large files:

big_data = ListMapper.from_parquet("huge.parquet").lazy().filter(...)

Watch File Encodings

Always specify encoding for non-ASCII files:

data = ListMapper.from_text("file.txt", options=TextReadOptions(encoding="utf-8"))

Alternative: ListMapperIO

You can also use the ListMapperIO utility class:

from functional_list import ListMapperIO
rows = ListMapperIO.read_csv("data.csv", options=CSVReadOptions(...))

🎓 Next Steps