Examples
This page shows pipeline patterns in order of increasing complexity. Each example has a flow diagram, a short goal, and runnable code.
Contents
- Basics — sequential, parallel, Julia + shell, data passing and pipe
- Control flow — retry, fallback, branching
- Complex DAGs — multi-stage parallel, robust pipeline
- Bioinformatics — immune repertoire (single & multi-donor), variant calling
1. Basics
1.1 Basic Pipeline
Flow: Three steps in sequence.
download ──► process ──► uploadGoal: Create a file, process it, then copy the result (runnable without network).
using SimplePipelines
download = @step download = sh"(echo line3; echo line1; echo line2) > data.txt"
process = @step process = sh"sort data.txt > sorted.txt"
upload = @step upload = sh"cp sorted.txt uploaded.txt"
pipeline = download >> process >> upload
run(pipeline)1.2 Parallel Processing
Flow: Three steps run in parallel, then one step merges.
┌── file_a ──┐
├── file_b ──┼──► archive
└── file_c ──┘Goal: Create three files, compress them concurrently, then archive (runnable as-is).
using SimplePipelines
file_a = @step a = sh"echo content_a > file_a.txt && gzip -k file_a.txt"
file_b = @step b = sh"echo content_b > file_b.txt && gzip -k file_b.txt"
file_c = @step c = sh"echo content_c > file_c.txt && gzip -k file_c.txt"
archive = @step archive = sh"tar -cvf archive.tar file_a.txt.gz file_b.txt.gz file_c.txt.gz"
pipeline = (file_a & file_b & file_c) >> archive
run(pipeline)1.3 Julia + Shell
Flow: Julia step → shell step → Julia step.
generate ──► process ──► report
(Julia) (shell) (Julia)Goal: Generate data in Julia, run a shell tool, then summarize in Julia.
using SimplePipelines
using DelimitedFiles
generate = @step generate = () -> begin
data = rand(100, 10)
writedlm("matrix.csv", data, ',')
return "Generated $(size(data)) matrix"
end
process = @step process = sh"wc -l matrix.csv"
report = @step report = () -> begin
lines = read("matrix.csv", String)
nrows = count(==('\n'), lines)
return "Matrix has $nrows rows"
end
pipeline = generate >> process >> report
run(pipeline)1.4 Data passing and Pipe
Flow: Function steps receive the previous step's output. |> pipes output to the next step; .>> attaches the next step to each branch.
download(id) ──► process(path) # >> passes path
ForEach .>> step # each branch: branch >> stepGoal: Pass a downloaded path to a processor; use broadcast to process each branch output immediately.
using SimplePipelines
# Sequence: first step returns path, second receives it
download() = "data.csv"
process(path) = path * "_done"
pipeline = @step dl = download >> @step proc = process
run(pipeline)
# proc receives "data.csv"
# Pipe: left output becomes right input (single or vector for multi-branch)
fetch = @step fetch = `echo "hello"`
pipeline = fetch |> @step process = (s -> "got: " * strip(String(s)))
run(pipeline)
# Broadcast: attach process to each branch (no wait for all)
fe = ForEach([1, 2]) do x
Step(Symbol("gen_", x), `echo $x`)
end
pipeline = fe .>> @step process = (s -> "got_" * strip(String(s)))
run(pipeline, force=true, keep_outputs=:all)
# 4 results: gen_1, process("1"), gen_2, process("2")2. Control flow
2.1 Retry and Fallback
Flow (retry then continue): Run a step up to N times, then continue.
[fetch^3] ──► process
(retry)Flow (fallback): If primary fails, run fallback.
primary ──► (on success) result
│
└──► (on failure) fallback ──► resultGoal: Retry flaky fetch; use fallback when primary step fails; combine retry and fallback.
using SimplePipelines
# Retry then continue (fetch creates file; no network)
fetch = @step fetch = sh"echo '{\"x\":1}' > data.json"
process = @step process = sh"wc -c data.json"
pipeline = Retry(fetch, 3, delay=0.1) >> process
# Fallback (create data.csv first so both branches have input)
run(@step setup = sh"(echo 'a,b'; echo '1,2') > data.csv", verbose=false)
fast = @step fast = sh"sort data.csv > sorted.csv"
slow = @step slow = sh"cat data.csv > sorted.csv"
pipeline = fast | slow
# Retry then fallback
pipeline = Retry(fast, 3) | slow
run(pipeline)2.2 Conditional Branching
Flow: One of two branches runs based on a condition.
┌── if true ──► branch_a
condition ─┤
└── if false ──► branch_bGoal: Choose processing path by file size or environment (e.g. DEBUG).
using SimplePipelines
# Create data.csv so the branch condition has a file to check
run(@step setup = sh"(echo 'a,b'; echo '1,2'; echo '3,4') > data.csv", verbose=false)
# By file size
small_pipeline = @step small = sh"head -n 1000 data.csv > sample.csv"
large_pipeline = @step decompress = sh"gunzip -c data.csv.gz > data.csv" >> @step process = sh"split -l 10000 data.csv chunk_"
pipeline = Branch(
() -> filesize("data.csv") < 100_000_000,
small_pipeline,
large_pipeline
)
run(pipeline)
# By environment
debug_steps = @step debug = sh"echo 'debug mode'"
prod_steps = @step prod = sh"echo 'production'"
pipeline = Branch(() -> get(ENV, "DEBUG", "0") == "1", debug_steps, prod_steps)
run(pipeline)3. Complex DAGs
3.1 Multi-stage Parallel
Flow: Two parallel fetch+transform branches, then merge, analyze, then report and archive in parallel.
┌── fetch_db ──► transform_db ──┐
│ ├──► merge ──► analyze ──► ┌── report
└── fetch_files ──► transform_files ─┘ └── archiveGoal: Fetch from two sources in parallel (here: create two files), process each, merge, analyze, then report and archive (runnable without network).
using SimplePipelines
fetch_db = @step db = sh"echo '{\"db\":1}' > db_data.json"
fetch_files = @step files = sh"echo 'local_data' > local_data.txt"
transform_db = @step transform_db = sh"wc -c db_data.json > db_size.txt"
transform_files = @step transform_files = sh"wc -c local_data.txt > files_size.txt"
merge = @step merge = sh"cat db_size.txt files_size.txt > merged.txt"
analyze = @step analyze = sh"wc -l merged.txt > results.txt"
report = @step report = sh"cat results.txt"
archive = @step archive = sh"gzip -c merged.txt > results.tar.gz"
db_branch = fetch_db >> transform_db
files_branch = fetch_files >> transform_files
pipeline = (db_branch & files_branch) >> merge >> analyze >> (report & archive)
run(pipeline)3.2 Robust Pipeline (all features)
Flow: Retry+fallback fetch → conditional process (small vs full) → report and notify in parallel (notify with retry).
[primary^3 | backup] ──► [small? quick : full] ──► report
└── notify^2Goal: Fetch with retry and fallback; branch on file size; run report and notify in parallel (runnable without network).
using SimplePipelines
primary_source = @step primary = sh"echo '{\"status\":\"ok\"}' > data.json"
backup_source = @step backup = sh"echo '{\"status\":\"fallback\"}' > data.json"
fetch = Retry(primary_source, 3, delay=0.1) | backup_source
quick_process = @step quick = sh"wc -c data.json > output.txt"
full_process = @step parse = sh"wc -l data.json > output.txt" >> @step validate = sh"wc -c output.txt >> output.txt"
process = Branch(
() -> filesize("data.json") < 1_000_000,
quick_process,
full_process
)
report = @step report = sh"cat output.txt"
notify = @step notify = sh"echo 'Pipeline done'"
pipeline = fetch >> process >> (report & Retry(notify, 2))
run(Pipeline(pipeline, name="Robust ETL"))4. Bioinformatics
Multiple donors / samples: Use ForEach with a {wildcard} pattern to automatically discover files and create parallel branches. The wildcard values propagate via normal Julia $() interpolation.
Note: Examples below use real tool names (PEAR, IgBLAST, BWA, etc.). They are runnable only if those tools are installed and input files exist; otherwise use echo placeholders as in the basic examples.
4.1 Immune Repertoire (single donor)
Flow: Paired-end FASTQ → merge (PEAR) → FASTQ→FASTA → IgBLAST (V/D/J) → Julia filter by identity.
R1.fastq ──┐
├──► pear ──► to_fasta ──► igblast ──► filter_identity ──► igblast_filtered.tsv
R2.fastq ──┘Goal: Merge paired-end reads, run IgBLAST with V/D/J references, filter rows by v_identity and j_identity > 90% in Julia.
using SimplePipelines
using DelimitedFiles
pear = @step pear = sh"pear -f R1.fastq -r R2.fastq -o merged"
to_fasta = @step to_fasta = sh"seqtk seq -A merged.assembled.fastq > merged.assembled.fasta"
igblast = @step igblast = sh"igblastn -query merged.assembled.fasta -germline_db_V V.fasta -germline_db_D D.fasta -germline_db_J J.fasta -outfmt 7 -out igblast.tsv"
filter_identity = @step filter_identity = () -> begin
data, header_row = readdlm("igblast.tsv", '\t', header=true)
header = vec(header_row)
v_idx = findfirst(isequal("v_identity"), header)
j_idx = findfirst(isequal("j_identity"), header)
to_float(x) = something(tryparse(Float64, string(x)), 0.0)
filtered = [r for r in eachrow(data) if to_float(r[v_idx]) > 90.0 && to_float(r[j_idx]) > 90.0]
writedlm("igblast_filtered.tsv", vcat(header', filtered), '\t')
return "Filtered $(length(filtered)) sequences"
end
pipeline = pear >> to_fasta >> igblast >> filter_identity
run(Pipeline(pipeline, name="Immune Repertoire"))4.2 Immune Repertoire (multiple donors)
Flow: ForEach discovers files matching pattern → one parallel branch per donor.
ForEach("fastq/{donor}_R1.fq.gz")
│
├── donor1: pear → fasta → igblast ──┐
├── donor2: pear → fasta → igblast ──┼── (parallel)
└── donorN: pear → fasta → igblast ──┘Goal: Run the same pipeline for every donor; files discovered automatically.
using SimplePipelines
# ForEach: pattern discovery + parallel branches in one construct
pipeline = ForEach("fastq/{donor}_R1.fq.gz") do donor
sh("pear -f fastq/$(donor)_R1.fq.gz -r fastq/$(donor)_R2.fq.gz -o $(donor)_merged") >>
sh("seqtk seq -A $(donor)_merged.assembled.fastq > $(donor).fasta") >>
sh("igblastn -query $(donor).fasta -germline_db_V V.fasta -germline_db_D D.fasta -germline_db_J J.fasta -outfmt 7 -out $(donor)_igblast.tsv")
end
run(pipeline)4.3 Variant Calling
Flow: Paired-end reads → FastQC → trim → BWA (GRCh38) → sort/index → bcftools call → index → filter.
R1.fq.gz ──┐
├──► fastqc ──► trim ──► align ──► index ──► call ──► index_vcf ──► filter_vcf
R2.fq.gz ──┘Goal: QC, trim, align to GRCh38, call variants with bcftools, filter by quality.
using SimplePipelines
fastqc = @step fastqc = sh"fastqc -o qc/ R1.fq.gz R2.fq.gz"
trim = @step trim = sh"trimmomatic PE R1.fq.gz R2.fq.gz R1_trimmed.fq.gz R1_unpaired.fq.gz R2_trimmed.fq.gz R2_unpaired.fq.gz ILLUMINACLIP:adapters.fa:2:30:10 LEADING:3 TRAILING:3 SLIDINGWINDOW:4:15 MINLEN:36"
align = @step align = sh"bwa mem -t 8 GRCh38.fa R1_trimmed.fq.gz R2_trimmed.fq.gz | samtools sort -@ 4 -o aligned.bam -"
index = @step index = sh"samtools index aligned.bam"
call = @step call = sh"bcftools mpileup -f GRCh38.fa aligned.bam | bcftools call -mv -Oz -o variants.vcf.gz"
index_vcf = @step index_vcf = sh"bcftools index variants.vcf.gz"
filter_vcf = @step filter_vcf = sh"bcftools filter -i 'QUAL>=20' variants.vcf.gz -Oz -o filtered.vcf.gz"
pipeline = fastqc >> trim >> align >> index >> call >> index_vcf >> filter_vcf
run(Pipeline(pipeline, name="Variant Calling"))