JAF (Just Another Flow) is a powerful streaming data processing system for JSON/JSONL data with a focus on lazy evaluation, composability, and a fluent API.
- 🚀 Streaming Architecture - Process large datasets without loading everything into memory
- đź”— Lazy Evaluation - Build complex pipelines that only execute when needed
- 🎯 Fluent API - Intuitive method chaining for readable code
- đź§© Composable - Combine operations freely, integrate with other tools
- 📦 Multiple Sources - Files, directories, stdin, memory, compressed files, infinite streams
- 🛠️ Unix Philosophy - Works great with pipes and other command-line tools
pip install jaf# Filter JSON data using S-expressions (lazy by default)
jaf filter users.jsonl '(gt? @age 25)'
# Or use JSON array syntax
jaf filter users.jsonl '["gt?", "@age", 25]'
# Or use infix DSL (note: paths need @ prefix)
jaf filter users.jsonl '@age > 25'
# Evaluate immediately with --eval
jaf filter users.jsonl '(gt? @age 25)' --eval
# Chain operations
jaf filter users.jsonl '(eq? @status "active")' | \
jaf map - "@email" | \
jaf eval -
# Complex queries with nested logic
jaf filter logs.jsonl '(and (eq? @level "ERROR") (gt? @timestamp "2024-01-01"))' --eval
# Combine with other tools
jaf filter logs.jsonl '(eq? @level "ERROR")' --eval | \
ja groupby servicefrom jaf import stream
# Build a pipeline
pipeline = stream("users.jsonl") \
.filter(["gt?", "@age", 25]) \
.map(["dict", "name", "@name", "email", "@email"]) \
.take(10)
# Execute when ready
for user in pipeline.evaluate():
print(user)Operations don't execute until you call .evaluate() or use --eval:
# This doesn't read any data yet
pipeline = stream("huge_file.jsonl") \
.filter(["contains?", "@tags", "important"]) \
.map("@message")
# Now it processes data
for message in pipeline.evaluate():
process(message)JAF supports multiple query syntaxes for flexibility:
# Simple comparisons
(eq? @status "active") # status == "active"
(gt? @age 25) # age > 25
(contains? @tags "python") # "python" in tags
# Boolean logic
(and
(gte? @age 18)
(eq? @verified true))
# Nested expressions
(or (eq? @role "admin")
(and (eq? @role "user")
(gt? @score 100)))# Same queries in JSON array format
["eq?", "@status", "active"]
["gt?", "@age", 25]
["contains?", "@tags", "python"]
["and",
["gte?", "@age", 18],
["eq?", "@verified", true]
]# Natural infix notation (paths need @ prefix)
@status == "active"
@age > 25 and @verified == true
@role == "admin" or (@role == "user" and @score > 100)All three syntaxes compile to the same internal representation. Use whichever feels most natural for your use case!
- filter - Keep items matching a predicate
- map - Transform each item
- take/skip - Limit or paginate results
- batch - Group items into chunks
- Boolean ops - AND, OR, NOT on filtered streams
- Getting Started - Installation and first steps
- API Guide - Complete Python API reference
- Query Language - Query syntax and operators
- CLI Reference - Command-line usage
- Cookbook - Practical examples
# Find errors in specific services
errors = stream("app.log.jsonl") \
.filter(["and",
["eq?", "@level", "ERROR"],
["in?", "@service", ["api", "auth"]]
]) \
.map(["dict",
"time", "@timestamp",
"service", "@service",
"message", "@message"
]) \
.evaluate()# Find invalid records
invalid = stream("users.jsonl") \
.filter(["or",
["not", ["exists?", "@email"]],
["not", ["regex-match?", "@email", "^[^@]+@[^@]+\\.[^@]+$"]]
]) \
.evaluate()# Transform and filter data
pipeline = stream("raw_sales.jsonl") \
.filter(["eq?", "@status", "completed"]) \
.map(["dict",
"date", ["date", "@timestamp"],
"amount", "@amount",
"category", ["if", ["gt?", "@amount", 1000], "high", "low"]
]) \
.batch(1000)
# Process in chunks
for batch in pipeline.evaluate():
bulk_insert(batch)JAF works seamlessly with other tools:
# With jsonl-algebra
jaf filter orders.jsonl '["gt?", "@amount", 100]' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
# With jq
jaf filter data.jsonl '["exists?", "@metadata"]' --eval | \
jq '.metadata'
# With standard Unix tools
jaf map users.jsonl "@email" --eval | sort | uniq -cJAF is designed for streaming large datasets:
- Processes one item at a time
- Minimal memory footprint
- Early termination (e.g., with
take) - Efficient pipeline composition
JAF supports windowed operations for memory-efficient processing of large datasets:
- distinct, groupby, join, intersect, except all support
window_sizeparameter - Use
window_size=float('inf')for exact results (default) - Finite windows trade accuracy for memory efficiency
- Warning: For intersect/except, window size must be large enough to capture overlapping items
# Exact distinct (uses more memory)
stream("data.jsonl").distinct(window_size=float('inf'))
# Windowed distinct (bounded memory)
stream("data.jsonl").distinct(window_size=1000)
# Tumbling window groupby
stream("logs.jsonl").groupby(key="@level", window_size=100)- Bloom Filters for memory-efficient approximate set operations (intersect, except, distinct)
- Count-Min Sketch for frequency estimation and heavy hitters detection
- HyperLogLog for cardinality estimation
- These would provide controllable accuracy/memory tradeoffs with theoretical guarantees
- Top-K operations - Find most frequent items in streams
- Sampling strategies - Reservoir sampling, stratified sampling
- Time-based windows - Process data in time intervals
- Exactly-once semantics - Checkpointing and recovery
- Parallel processing - Multi-threaded stream processing
- FastAPI - REST API for stream processing
- Model Context Protocol (MCP) - LLM integration
- Apache Kafka - Stream from/to Kafka topics
- Cloud Storage - S3, GCS, Azure Blob support
Contributions are welcome! Please read our Contributing Guide for details.
JAF is licensed under the MIT License. See LICENSE for details.
- jsonl-algebra - Relational operations on JSONL
- jq - Command-line JSON processor
- dotsuite - Pedagogical ecosystem demonstrating the concepts behind JAF through simple, composable tools. Great for understanding the theory and building blocks that JAF productionizes.