API Reference
Module
Types
SimplePipelines.AbstractNode — Type
AbstractNodeAbstract supertype of all pipeline nodes (Step, Sequence, Parallel, Retry, Fallback, Branch, Timeout, Force, Reduce, ForEach, Pipe, SameInputPipe, BroadcastPipe). Constructors only build the struct; execution is via the functor: call (node)(v, forced) which dispatches to run_node(node, v, forced).
SimplePipelines.AbstractStepResult — Type
Supertype of all step results; use for Vector{AbstractStepResult} (e.g. return of run).
SimplePipelines.StepResult — Type
StepResult(step, success, duration, inputs, outputs, result)Result of running one step. Type is StepResult{S, I, O, V}. Type-stable: no Any.
Fields (all real)
inputs— Input file paths the step declared. Empty for steps that take no input files (e.g. download / start nodes).outputs— Output file paths the step declared (files the step produces).result— Execution result: stdout for shell steps, return value for function steps, ornothingwhenkeep_outputs != :all.
SimplePipelines.Step — Type
Step{F} <: AbstractNodeA single unit of work in a pipeline. F is the work type: Cmd (backtick or sh"..."), Function, or ShRun (from sh(f)).
Fields
name::Symbol— Step identifier (auto-generated if not provided)work::F— The command or function to executeinputs::Vector{String}— Input file dependenciesoutputs::Vector{String}— Output file paths
Constructors
Step(work) # Auto-generated name
Step(name::Symbol, work) # Named step
Step(name, work, inputs, outputs) # Full specification
@step name = work # Macro form
@step name(inputs => outputs) = work # Macro with dependenciesSimplePipelines.ShRun — Type
ShRun{F}Internal: runs a shell command string at execution time. Use sh(cmd_func) in steps; the pipeline prints the command when verbose=true.
SimplePipelines.Sequence — Type
Sequence{T} <: AbstractNode
a >> bExecutes nodes sequentially, stopping on first failure. Data passing: the next node receives one value: the previous step's output (declared output paths when applicable, else the step's result). When the previous node produced multiple results (e.g. Parallel/ForEach), the next node receives only the last branch's output. Distinct from |> (which passes a vector of all) and .>> (which runs the next step once per branch).
SimplePipelines.Parallel — Type
Parallel{T} <: AbstractNodeExecutes nodes concurrently using threads. Created automatically by the & operator.
SimplePipelines.Retry — Type
Retry{N} <: AbstractNodeRetries a node up to max_attempts times on failure, with optional delay. Created by the ^ operator or Retry() constructor.
SimplePipelines.Fallback — Type
Fallback{A,B} <: AbstractNodeExecutes fallback node if primary fails. Created by the | operator.
SimplePipelines.Branch — Type
Branch{C,T,F} <: AbstractNodeConditional execution based on a predicate function.
SimplePipelines.Timeout — Type
Timeout{N} <: AbstractNodeWraps a node with a time limit. Returns failure if time exceeded.
SimplePipelines.Reduce — Type
Reduce{F,N} <: AbstractNodeRuns a parallel node and combines successful step outputs with a reducer function.
SimplePipelines.Force — Type
Force{N} <: AbstractNodeForces execution of a node, bypassing freshness checks. See also is_fresh, clear_state!.
SimplePipelines.Pipeline — Type
Pipeline{N<:AbstractNode}A named pipeline wrapping a root node for execution.
SimplePipelines.Pipe — Type
Pipe{A, B} <: AbstractNode
a |> bRun a, then run b with a's output(s) as b's input. RHS must be a function step. Same notion of output as >> (declared output paths when applicable, else result). Single result → one value; multiple results (ForEach/Parallel) → vector of all branch outputs. Distinct from >> (which passes only the last) and .>> (which runs the next step once per branch).
SimplePipelines.SameInputPipe — Type
SameInputPipe{A, B} <: AbstractNode
a >>> bRun a then b; both receive the same context input. b does not receive a's output. Distinct from >> and |> (where b receives a's output). Use when the next step should run on the same input (e.g. branch id) as the previous.
SimplePipelines.BroadcastPipe — Type
BroadcastPipe{A, B} <: AbstractNode
a .>> b (broadcast of >>)Run the next step once per branch of the left node, each with that branch's output. For ForEach or Parallel, branches run in parallel. For a single-output node, equivalent to a >> b. Distinct from >> (one run, last branch only) and |> (one run with vector of all outputs).
Macros
SimplePipelines.@step — Macro
@step name = work
@step name(inputs => outputs) = work
@step workCreate a named step with optional file dependencies. Steps are lazy: if the right-hand side is a function call (other than sh(...)), it is wrapped in a thunk and runs only when the pipeline is run via run(pipeline).
Use sh"..." for literal commands; use sh("... " * var * " ...") when you need interpolation at construction time. For commands built at run time, use sh(cmd_func) where cmd_func returns the command string; with verbose=true, the command is printed before execution. For shell scripts that use shell variables, use shell_raw"..." or shell_raw"""...""" (multiline) so Julia does not interpret the dollar sign.
Examples
@step download = sh"curl -o data.csv http://example.com"
@step download([] => ["data.csv"]) = sh("curl -L -o " * repr(path) * " " * url)
@step process(["input.csv"] => ["output.csv"]) = sh"sort input.csv > output.csv"
@step call_tool([ref, bams] => [out]) = sh(() -> "bcftools mpileup -f " * repr(ref) * " " * join(repr.(bams), " "))
@step process("path") = process_file # function by name, receives path at run time
@step sh"echo hello"SimplePipelines.@sh_str — Macro
sh"command"
sh(command::String)Shell commands: use sh"..." for literals, sh("...") when you need interpolation.
Examples
sh"sort data.txt > sorted.txt"
sh("process \$(sample)_R1.fq")SimplePipelines.@shell_raw_str — Macro
shell_raw"command"
shell_raw"""..."""String literal for shell commands where the dollar sign is not interpreted by Julia. Use for scripts that use shell variables. Triple-quoted form keeps multiline scripts readable; shell variables (e.g. $VAR, $`${var}`$) stay intact.
Example (multiline with loop and Julia interpolation)
donors = ["A", "B"]
cmd = shell_raw"""
for d in """ * join(donors, " ") * shell_raw"""
do
echo "Processing $d"
done
"""
# Use in a step: sh(() -> cmd)Operators
The package extends these operators for pipeline composition. Cmd and Function arguments are auto-wrapped in Step.
| Operator | Name | Description |
|---|---|---|
>> | Sequence | Run in order; pass previous output to next (function step) |
& | Parallel | Run nodes concurrently |
| | Fallback | Run fallback if primary fails |
^ | Retry | Wrap with retries, e.g. node^3 |
|> | Pipe | Run right with left's output(s) (single or vector) |
>>> | SameInputPipe | Run both with the same input (e.g. branch id) |
.>> | BroadcastPipe | Attach right to each branch of left (per-branch pipeline) |
When the left has one output, >>, |>, and .>> all pass that value to the next step. When the left has multiple outputs (ForEach, Parallel):
| Left side | $>>$ | Pipe | $.>>$ |
|---|---|---|---|
| Single output | step(one value) | step(one value) | step(one value) |
| Multi output | step(last only) | step(vector of all) | step per branch (one call each) |
Functions
SimplePipelines.ForEach — Type
Lazy node: run block over file matches (pattern string) or over a collection (vector). Dispatches on second argument.
SimplePipelines.fe — Type
Short alias for ForEach. Use fe("pattern") do x ... end.
Shell
SimplePipelines.sh — Function
Shell command with string (build-time). For run-time command use sh(cmd_func). See also @sh_str.
The string macro shell_raw"..." (and triple-quoted shell_raw\"\"\"...\"\"\") is documented in @shell_raw_str; use it for scripts where the dollar sign must not be interpreted by Julia.
Execution
Execution is recursive: run(pipeline) calls run_node(root, ...) which dispatches on node type and recurses (Sequence in order, Parallel/ForEach with optional @spawn).
Base.run — Function
run(p::Pipeline; verbose=true, dry_run=false, force=false, jobs=8, keep_outputs=:last) -> Vector{AbstractStepResult}
run(node::AbstractNode; kwargs...) -> Vector{AbstractStepResult}Execute a pipeline or node, returning results for each step.
Keywords
verbose=true: Show colored progress output; when true, prints each shell command (for steps that run external tools) before running itdry_run=false: If true, show DAG structure without executingforce=false: If true, run all steps regardless of freshnessjobs=8: Max concurrent branches for Parallel/ForEach. All branches run; whenjobs > 0, they run in rounds ofjobs(each round waits for the previous).jobs=0= unbounded (all at once).keep_outputs=:last: What to retain in each result's.result.:last(default) keeps only the last step's result (others getnothing);:allkeeps every step's result;:nonedrops all.
When result is kept vs dropped
keep_outputs only affects the returned results vector: after the run we replace .result with nothing for non-final steps. During execution all step outputs stay in memory until the run finishes.
Memory: path-based I/O and streaming reducers
To avoid holding large data in memory:
- Steps: Write large results to files and return only the path (or a small summary). Then the runner only holds path strings, not file contents.
- Reduce: The reducer receives a vector of all branch outputs. If those are paths, implement a streaming reducer: open one path at a time, read/aggregate, then close. Do not load all files into memory inside the reducer. Example:
reducer(paths) = (acc = init; for p in paths; acc = merge(acc, read_stats(p)); end; acc). - Use
keep_outputs=:lastso the returned result vector does not retain every step's result.
If you follow (1) and (2), you only hold paths and small aggregates; full file contents are never all in memory. If a step returns a large object (e.g. a DataFrame of the whole file), that object is held until the run ends.
Output
With verbose=true, shows tree-structured output: ▶ running, ✓ success, ✗ failure, ⊳ up to date (not re-run). Shell commands are printed so you see exactly what is run.
Examples
run(pipeline)
run(pipeline, jobs=0)
run(pipeline, keep_outputs=:all)
run(pipeline, verbose=false)
run(pipeline, dry_run=true)
run(pipeline, force=true)Freshness and state
State is stored in .pipeline_state as a fixed-layout, memory-mapped file. Completions are batched and written when run() finishes.
SimplePipelines.is_fresh — Function
is_fresh(step::Step) -> BoolCheck if a step can be skipped based on Make-like freshness rules.
Freshness Rules
- Has inputs and outputs: Fresh if all outputs exist and are newer than all inputs
- Has only outputs: Fresh if outputs exist and step was previously completed
- No file dependencies: Fresh if step was previously completed (state-based tracking)
State is persisted in .pipeline_state. Completions during a run are batched and written once when run() finishes.
See also: Force, clear_state!
SimplePipelines.clear_state! — Function
clear_state!()Remove the pipeline state file (.pipeline_state), forcing all steps to run on the next execution regardless of freshness. The file uses the fixed binary layout defined in the StateFormat module.
State file format
The state file uses a fixed binary layout (see src/StateFormat.jl) for random access and mmap.
SimplePipelines.StateFormat — Module
StateFormatFixed binary layout for the pipeline state file. Single source of truth for offsets, sizes, and all I/O: memory-mapped file with random access to the hash array.
Layout (on disk): [0..8) magic (8 bytes) [8..16) count (UInt64, number of valid hashes) [16..) hashes (max_hashes × sizeof(UInt64), mmap-able for random access)
All read/write/mmap of the state file goes through this module (stateinit, stateread, statewrite, stateappend).
SimplePipelines.StateFormat.StateFileLayout — Type
StateFileLayoutDescriptor for the fixed binary layout of the state file. Holds magic bytes, header length, offsets for count and hashes, and max capacity.
SimplePipelines.StateFormat.state_init — Function
Ensure the state file exists and has the correct size; create or truncate if needed.
SimplePipelines.StateFormat.state_read — Function
Read completed step hashes from the state file (memory-mapped, random access). Returns Set{UInt64}.
SimplePipelines.StateFormat.state_write — Function
Write all completed hashes to the state file (capped at layout.max_hashes).
SimplePipelines.StateFormat.state_append — Function
Append one hash; returns false if at capacity (caller should statewrite(union(stateread(), [h]))).
Utilities
SimplePipelines.count_steps — Function
count_steps(node) -> IntReturn the number of steps in the DAG (leaf count for execution).
SimplePipelines.steps — Function
steps(node) -> Vector{Step}Return all leaf steps in the DAG (flattened).
SimplePipelines.print_dag — Function
print_dag(node [; color=true])
print_dag(io, node [, indent])Print a tree visualization of the pipeline DAG. With color=true (default when writing to a terminal), uses colors for node types and status. Steps with no inputs (start nodes) are shown with ◆ in light cyan; steps with inputs use ○ in cyan. See also run and display(pipeline).
Index
SimplePipelines.StateFormatSimplePipelines.AbstractNodeSimplePipelines.AbstractStepResultSimplePipelines.BranchSimplePipelines.BroadcastPipeSimplePipelines.FallbackSimplePipelines.ForEachSimplePipelines.ForceSimplePipelines.ParallelSimplePipelines.PipeSimplePipelines.PipelineSimplePipelines.ReduceSimplePipelines.RetrySimplePipelines.SameInputPipeSimplePipelines.SequenceSimplePipelines.ShRunSimplePipelines.StateFormat.StateFileLayoutSimplePipelines.StepSimplePipelines.StepResultSimplePipelines.TimeoutSimplePipelines.feBase.runSimplePipelines.StateFormat.state_appendSimplePipelines.StateFormat.state_initSimplePipelines.StateFormat.state_readSimplePipelines.StateFormat.state_writeSimplePipelines.clear_state!SimplePipelines.count_stepsSimplePipelines.is_freshSimplePipelines.print_dagSimplePipelines.shSimplePipelines.stepsSimplePipelines.@sh_strSimplePipelines.@shell_raw_strSimplePipelines.@step