Extending SimplePipelines
SimplePipelines is designed for easy extension via Julia's multiple dispatch.
Testing and coverage
Run the test suite:
julia --project=. -e 'using Pkg; Pkg.test()'Building the docs
From the repo root, install Documenter (if needed) and run the doc build. Note: Pkg.activate does not accept an extras argument; use Pkg.add("Documenter") to pull in the docs dependency.
julia --project=. -e 'using Pkg; Pkg.add("Documenter"); using Documenter; using SimplePipelines; cd("docs"); include("make.jl")'Or in the Julia REPL (start with julia --project=.): run using Pkg; Pkg.add("Documenter"); cd("docs"); include("make.jl").
Run tests with code coverage (for local profiling or CI):
julia --project=. -e 'using Pkg; Pkg.test(coverage=true)'Coverage is reported in lcov.info; CI uploads it to Codecov.
Architecture
All nodes inherit from AbstractNode. To add custom behavior, define a new subtype and implement dispatch methods:
AbstractNode
├── Step{F} # Leaf node
├── Sequence{T} # Sequential
├── Parallel{T} # Concurrent
├── Retry{N} # Retry on failure
├── Fallback{A,B} # Try A, else B
└── Branch{C,T,F} # ConditionalCustom Node in 4 Steps
using SimplePipelines
import SimplePipelines: AbstractNode, run_node, print_dag, count_steps, steps
# 1. Define type (parametric for type stability)
struct Timeout{N<:AbstractNode} <: AbstractNode
node::N
seconds::Float64
end
# 2. Implement execution (signature: v, forced::Bool, context_input=nothing)
function run_node(t::Timeout, v, forced::Bool, context_input=nothing)
# ... timeout logic ...
return run_node(t.node, v, forced, context_input)
end
# 3. Implement visualization (signature: io, node, pre, cont, color)
function print_dag(io::IO, t::Timeout, pre::String, cont::String, color::Bool)
print(io, pre, "⏱ Timeout ", t.seconds, "s\n")
print_dag(io, t.node, cont * " ", cont * " ", color)
end
# 4. Implement utilities
count_steps(t::Timeout) = count_steps(t.node)
steps(t::Timeout) = steps(t.node)Custom Operator
import Base: ^
# Node^3 means repeat 3 times
^(node::AbstractNode, n::Int) = Repeat(node, n)Custom Step Executor
Extend execute for new work types:
struct HTTPGet
url::String
end
function SimplePipelines.execute(step::Step{HTTPGet})
start = time()
resp = HTTP.get(step.work.url)
return StepResult(step, resp.status == 200, time() - start, step.inputs, step.outputs, String(resp.body))
end
# Usage: Step(:fetch, HTTPGet("https://api.example.com"))Type Stability Rules
- Use parametric types:
struct MyNode{T<:AbstractNode}notchildren::Vector{AbstractNode} - Use tuple recursion for heterogeneous collections
- Dispatch on types, don't check with
isa