API Reference

Module

Types

SimplePipelines.AbstractNodeType
AbstractNode

Abstract supertype of all pipeline nodes (Step, Sequence, Parallel, Retry, Fallback, Branch, Timeout, Force, Reduce, ForEach, Pipe, SameInputPipe, BroadcastPipe). Constructors only build the struct; execution is via the functor: call (node)(v, forced) which dispatches to run_node(node, v, forced).

source
SimplePipelines.StepResultType
StepResult(step, success, duration, inputs, outputs, result)

Result of running one step. Type is StepResult{S, I, O, V}. Type-stable: no Any.

Fields (all real)

  • inputs — Input file paths the step declared. Empty for steps that take no input files (e.g. download / start nodes).
  • outputs — Output file paths the step declared (files the step produces).
  • result — Execution result: stdout for shell steps, return value for function steps, or nothing when keep_outputs != :all.
source
SimplePipelines.StepType
Step{F} <: AbstractNode

A single unit of work in a pipeline. F is the work type: Cmd (backtick or sh"..."), Function, or ShRun (from sh(f)).

Fields

  • name::Symbol — Step identifier (auto-generated if not provided)
  • work::F — The command or function to execute
  • inputs::Vector{String} — Input file dependencies
  • outputs::Vector{String} — Output file paths

Constructors

Step(work)                           # Auto-generated name
Step(name::Symbol, work)             # Named step
Step(name, work, inputs, outputs)    # Full specification
@step name = work                    # Macro form
@step name(inputs => outputs) = work # Macro with dependencies

See also: @step, is_fresh, Force

source
SimplePipelines.ShRunType
ShRun{F}

Internal: runs a shell command string at execution time. Use sh(cmd_func) in steps; the pipeline prints the command when verbose=true.

source
SimplePipelines.SequenceType
Sequence{T} <: AbstractNode
a >> b

Executes nodes sequentially, stopping on first failure. Data passing: the next node receives one value: the previous step's output (declared output paths when applicable, else the step's result). When the previous node produced multiple results (e.g. Parallel/ForEach), the next node receives only the last branch's output. Distinct from |> (which passes a vector of all) and .>> (which runs the next step once per branch).

source
SimplePipelines.RetryType
Retry{N} <: AbstractNode

Retries a node up to max_attempts times on failure, with optional delay. Created by the ^ operator or Retry() constructor.

source
SimplePipelines.ReduceType
Reduce{F,N} <: AbstractNode

Runs a parallel node and combines successful step outputs with a reducer function.

source
SimplePipelines.PipeType
Pipe{A, B} <: AbstractNode
a |> b

Run a, then run b with a's output(s) as b's input. RHS must be a function step. Same notion of output as >> (declared output paths when applicable, else result). Single result → one value; multiple results (ForEach/Parallel) → vector of all branch outputs. Distinct from >> (which passes only the last) and .>> (which runs the next step once per branch).

source
SimplePipelines.SameInputPipeType
SameInputPipe{A, B} <: AbstractNode
a >>> b

Run a then b; both receive the same context input. b does not receive a's output. Distinct from >> and |> (where b receives a's output). Use when the next step should run on the same input (e.g. branch id) as the previous.

source
SimplePipelines.BroadcastPipeType
BroadcastPipe{A, B} <: AbstractNode
a .>> b   (broadcast of >>)

Run the next step once per branch of the left node, each with that branch's output. For ForEach or Parallel, branches run in parallel. For a single-output node, equivalent to a >> b. Distinct from >> (one run, last branch only) and |> (one run with vector of all outputs).

source

Macros

SimplePipelines.@stepMacro
@step name = work
@step name(inputs => outputs) = work
@step work

Create a named step with optional file dependencies. Steps are lazy: if the right-hand side is a function call (other than sh(...)), it is wrapped in a thunk and runs only when the pipeline is run via run(pipeline).

Use sh"..." for literal commands; use sh("... " * var * " ...") when you need interpolation at construction time. For commands built at run time, use sh(cmd_func) where cmd_func returns the command string; with verbose=true, the command is printed before execution. For shell scripts that use shell variables, use shell_raw"..." or shell_raw"""...""" (multiline) so Julia does not interpret the dollar sign.

Examples

@step download = sh"curl -o data.csv http://example.com"
@step download([] => ["data.csv"]) = sh("curl -L -o " * repr(path) * " " * url)
@step process(["input.csv"] => ["output.csv"]) = sh"sort input.csv > output.csv"
@step call_tool([ref, bams] => [out]) = sh(() -> "bcftools mpileup -f " * repr(ref) * " " * join(repr.(bams), " "))
@step process("path") = process_file   # function by name, receives path at run time
@step sh"echo hello"
source
SimplePipelines.@sh_strMacro
sh"command"
sh(command::String)

Shell commands: use sh"..." for literals, sh("...") when you need interpolation.

Examples

sh"sort data.txt > sorted.txt"
sh("process \$(sample)_R1.fq")
source
SimplePipelines.@shell_raw_strMacro
shell_raw"command"
shell_raw"""..."""

String literal for shell commands where the dollar sign is not interpreted by Julia. Use for scripts that use shell variables. Triple-quoted form keeps multiline scripts readable; shell variables (e.g. $VAR, $`${var}`$) stay intact.

Example (multiline with loop and Julia interpolation)

donors = ["A", "B"]
cmd = shell_raw"""
for d in """ * join(donors, " ") * shell_raw"""
  do
    echo "Processing $d"
  done
"""
# Use in a step: sh(() -> cmd)
source

Operators

The package extends these operators for pipeline composition. Cmd and Function arguments are auto-wrapped in Step.

OperatorNameDescription
>>SequenceRun in order; pass previous output to next (function step)
&ParallelRun nodes concurrently
|FallbackRun fallback if primary fails
^RetryWrap with retries, e.g. node^3
|>PipeRun right with left's output(s) (single or vector)
>>>SameInputPipeRun both with the same input (e.g. branch id)
.>>BroadcastPipeAttach right to each branch of left (per-branch pipeline)

When the left has one output, >>, |>, and .>> all pass that value to the next step. When the left has multiple outputs (ForEach, Parallel):

Left side$>>$Pipe$.>>$
Single outputstep(one value)step(one value)step(one value)
Multi outputstep(last only)step(vector of all)step per branch (one call each)

Functions

SimplePipelines.ForEachType

Lazy node: run block over file matches (pattern string) or over a collection (vector). Dispatches on second argument.

source

Shell

The string macro shell_raw"..." (and triple-quoted shell_raw\"\"\"...\"\"\") is documented in @shell_raw_str; use it for scripts where the dollar sign must not be interpreted by Julia.

Execution

Execution is recursive: run(pipeline) calls run_node(root, ...) which dispatches on node type and recurses (Sequence in order, Parallel/ForEach with optional @spawn).

Base.runFunction
run(p::Pipeline; verbose=true, dry_run=false, force=false, jobs=8, keep_outputs=:last) -> Vector{AbstractStepResult}
run(node::AbstractNode; kwargs...) -> Vector{AbstractStepResult}

Execute a pipeline or node, returning results for each step.

Keywords

  • verbose=true: Show colored progress output; when true, prints each shell command (for steps that run external tools) before running it
  • dry_run=false: If true, show DAG structure without executing
  • force=false: If true, run all steps regardless of freshness
  • jobs=8: Max concurrent branches for Parallel/ForEach. All branches run; when jobs > 0, they run in rounds of jobs (each round waits for the previous). jobs=0 = unbounded (all at once).
  • keep_outputs=:last: What to retain in each result's .result. :last (default) keeps only the last step's result (others get nothing); :all keeps every step's result; :none drops all.

When result is kept vs dropped

keep_outputs only affects the returned results vector: after the run we replace .result with nothing for non-final steps. During execution all step outputs stay in memory until the run finishes.

Memory: path-based I/O and streaming reducers

To avoid holding large data in memory:

  1. Steps: Write large results to files and return only the path (or a small summary). Then the runner only holds path strings, not file contents.
  2. Reduce: The reducer receives a vector of all branch outputs. If those are paths, implement a streaming reducer: open one path at a time, read/aggregate, then close. Do not load all files into memory inside the reducer. Example: reducer(paths) = (acc = init; for p in paths; acc = merge(acc, read_stats(p)); end; acc).
  3. Use keep_outputs=:last so the returned result vector does not retain every step's result.

If you follow (1) and (2), you only hold paths and small aggregates; full file contents are never all in memory. If a step returns a large object (e.g. a DataFrame of the whole file), that object is held until the run ends.

Output

With verbose=true, shows tree-structured output: running, success, failure, up to date (not re-run). Shell commands are printed so you see exactly what is run.

Examples

run(pipeline)
run(pipeline, jobs=0)
run(pipeline, keep_outputs=:all)
run(pipeline, verbose=false)
run(pipeline, dry_run=true)
run(pipeline, force=true)

See also: is_fresh, Force, print_dag

source

Freshness and state

State is stored in .pipeline_state as a fixed-layout, memory-mapped file. Completions are batched and written when run() finishes.

SimplePipelines.is_freshFunction
is_fresh(step::Step) -> Bool

Check if a step can be skipped based on Make-like freshness rules.

Freshness Rules

  1. Has inputs and outputs: Fresh if all outputs exist and are newer than all inputs
  2. Has only outputs: Fresh if outputs exist and step was previously completed
  3. No file dependencies: Fresh if step was previously completed (state-based tracking)

State is persisted in .pipeline_state. Completions during a run are batched and written once when run() finishes.

See also: Force, clear_state!

source
SimplePipelines.clear_state!Function
clear_state!()

Remove the pipeline state file (.pipeline_state), forcing all steps to run on the next execution regardless of freshness. The file uses the fixed binary layout defined in the StateFormat module.

See also: is_fresh, Force

source

State file format

The state file uses a fixed binary layout (see src/StateFormat.jl) for random access and mmap.

SimplePipelines.StateFormatModule
StateFormat

Fixed binary layout for the pipeline state file. Single source of truth for offsets, sizes, and all I/O: memory-mapped file with random access to the hash array.

Layout (on disk): [0..8) magic (8 bytes) [8..16) count (UInt64, number of valid hashes) [16..) hashes (max_hashes × sizeof(UInt64), mmap-able for random access)

All read/write/mmap of the state file goes through this module (stateinit, stateread, statewrite, stateappend).

source

Utilities

SimplePipelines.print_dagFunction
print_dag(node [; color=true])
print_dag(io, node [, indent])

Print a tree visualization of the pipeline DAG. With color=true (default when writing to a terminal), uses colors for node types and status. Steps with no inputs (start nodes) are shown with ◆ in light cyan; steps with inputs use ○ in cyan. See also run and display(pipeline).

source

Index