Advanced Topics
Writing pipelines
Flow uses Nextflow as its pipeline orchestration engine, enabling scalable and reproducible bioinformatics workflows. This guide explains how to write pipelines that integrate seamlessly with Flow's platform.
Overview
Flow pipelines are standard Nextflow workflows with additional conventions and metadata that enable:
- Automatic UI generation for pipeline parameters
- Sample and data management integration
- Progress tracking and real-time monitoring
- Result organization and visualization
- Version control and reproducibility
Pipeline Types
Flow supports several types of pipelines:
- Primary Pipelines - Full bioinformatics workflows (RNA-seq, ChIP-seq, etc.)
- Utility Pipelines - Data processing tools (demultiplexing, format conversion)
- Module Wrappers - Single-tool pipelines for specific analyses
- Subworkflow Wrappers - Reusable workflow components
Getting Started
Basic Pipeline Structure
A minimal Flow pipeline consists of:
// main.nf
nextflow.enable.dsl = 2
// Parameters with defaults
params.input = null
params.output = "results"
// Main workflow
workflow {
// Input validation
if (!params.input) {
error "Please provide input data with --input"
}
// Process definition
PROCESS_DATA(params.input)
// Emit results
emit:
results = PROCESS_DATA.out
}
// Process definition
process PROCESS_DATA {
publishDir params.output, mode: 'copy'
input:
path input_file
output:
path "*.txt", emit: results
script:
"""
# Your analysis commands here
process_data.py ${input_file} > output.txt
"""
}
Flow Schema File
Every Flow pipeline requires a JSON schema file that describes inputs and parameters:
// schema.json
{
"$schema": "http://json-schema.org/draft-07/schema",
"title": "My Pipeline",
"description": "A pipeline that processes biological data",
"type": "object",
"definitions": {
"input_output_options": {
"title": "Input/output options",
"type": "object",
"properties": {
"input": {
"type": "string",
"format": "file-path",
"description": "Path to input data file",
"help_text": "Accepts FASTQ, FASTA, or BAM formats"
},
"output": {
"type": "string",
"description": "Output directory for results",
"default": "results"
}
},
"required": ["input"]
}
},
"allOf": [
{"$ref": "#/definitions/input_output_options"}
]
}
Flow Integration Features
1. Sample Input Handling
Flow automatically provides sample data to pipelines. Access sample files using:
// Automatic sample detection
Channel
.fromPath(params.input)
.splitCsv(header: true)
.map { row ->
tuple(row.sample_id, file(row.read1), file(row.read2))
}
.set { samples_ch }
2. Progress Reporting
Enable real-time progress tracking:
process ALIGN_READS {
tag "$sample_id"
label 'process_high'
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*.bam"), emit: alignments
script:
"""
# Report progress to Flow
echo "STATUS:Aligning reads for sample ${sample_id}"
# Run alignment
bwa mem reference.fa ${reads} > ${sample_id}.bam
echo "STATUS:Alignment complete for ${sample_id}"
"""
}
3. Result Organization
Structure outputs for Flow's result viewer:
// Organize outputs by type
params.output_structure = [
alignments: "${params.output}/alignments",
qc: "${params.output}/qc",
counts: "${params.output}/counts",
reports: "${params.output}/reports"
]
// Publish to structured directories
process GENERATE_REPORT {
publishDir params.output_structure.reports,
mode: 'copy',
pattern: "*.html"
// Process definition...
}
4. Metadata Integration
Access Flow metadata within pipelines:
// Flow provides metadata via params
params.flow_metadata = [
project_id: null,
sample_metadata: null,
execution_id: null
]
// Use metadata in processes
process ANNOTATE_RESULTS {
input:
path results
script:
def metadata = params.flow_metadata
"""
annotate.py \
--results ${results} \
--project ${metadata.project_id} \
--metadata ${metadata.sample_metadata} \
> annotated_results.txt
"""
}
Best Practices
1. Use Containers
Always specify containers for reproducibility:
process ANALYZE_DATA {
container 'biocontainers/fastqc:v0.11.9_cv8'
// For Singularity on HPC
containerOptions params.singularity_bind_paths
script:
"""
fastqc ${reads}
"""
}
2. Resource Management
Define resource requirements using labels:
// In nextflow.config
process {
withLabel: process_low {
cpus = 2
memory = 4.GB
time = 2.h
}
withLabel: process_medium {
cpus = 6
memory = 16.GB
time = 8.h
}
withLabel: process_high {
cpus = 12
memory = 32.GB
time = 24.h
}
}
3. Error Handling
Implement robust error handling:
process VALIDATE_INPUT {
errorStrategy 'terminate'
input:
path input_file
output:
path input_file, emit: validated
script:
"""
# Validate file format
if ! validate_format.py ${input_file}; then
echo "ERROR: Invalid input format" >&2
exit 1
fi
# Check file integrity
if ! check_integrity.py ${input_file}; then
echo "ERROR: Corrupted input file" >&2
exit 1
fi
# Pass through if valid
ln -s ${input_file} .
"""
}
4. Documentation
Include comprehensive documentation:
// main.nf header
/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
FLOW PIPELINE: MY ANALYSIS
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Github : https://github.com/myorg/my-pipeline
Website: https://flow.bio/pipelines/my-analysis
Docs : https://flow.bio/pipelines/my-analysis/docs
----------------------------------------------------------------------------------------
*/
// Process documentation
process COMPLEX_ANALYSIS {
"""
Performs complex bioinformatics analysis
This process implements the algorithm described in:
Smith et al. (2023) Nature Biotechnology
Key parameters:
- min_quality: Minimum quality score (default: 20)
- max_errors: Maximum allowed errors (default: 3)
"""
// Process definition...
}
Testing Pipelines
Local Testing
Test your pipeline locally before deploying:
# Test with sample data
nextflow run main.nf \
-profile test,docker \
--input test_data/samples.csv \
--output test_results
# Validate schema
nextflow run main.nf \
--help \
--schema_validate
Continuous Integration
Use GitHub Actions for automated testing:
# .github/workflows/test.yml
name: Pipeline Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: nf-core/setup-nextflow@v1
- name: Run pipeline tests
run: |
nextflow run main.nf -profile test,docker
- name: Run nf-test
run: |
nf-test test tests/*.nf.test
Deploying to Flow
1. Repository Setup
Your pipeline repository should include:
my-pipeline/
├── main.nf # Main pipeline script
├── nextflow.config # Configuration file
├── schema.json # Flow schema definition
├── modules/ # Process modules
├── subworkflows/ # Reusable subworkflows
├── bin/ # Helper scripts
├── docs/ # Documentation
│ └── output.md # Output description
├── test_data/ # Test datasets
└── README.md # Pipeline documentation
2. Version Tags
Tag releases following semantic versioning:
git tag -a v1.0.0 -m "First stable release"
git push origin v1.0.0
3. Flow Registration
Pipelines are automatically discovered when:
- Repository is public or Flow has access
- Contains valid
main.nf
andschema.json
- Has at least one version tag
- Passes validation checks
Advanced Topics
Dynamic Parameter Generation
Generate parameters based on input data:
// Detect genome automatically
process DETECT_GENOME {
input:
path sample_sheet
output:
env GENOME, emit: genome
script:
"""
GENOME=\$(detect_species.py ${sample_sheet})
"""
}
// Use detected genome
workflow {
DETECT_GENOME(params.input)
ALIGN_READS(
samples_ch,
DETECT_GENOME.out.genome
)
}
Custom UI Components
Define custom UI elements in schema:
{
"properties": {
"aligner": {
"type": "string",
"description": "Alignment algorithm",
"enum": ["bwa", "bowtie2", "star"],
"enumNames": ["BWA-MEM", "Bowtie 2", "STAR"],
"default": "star",
"flow_ui": {
"widget": "radio",
"help_link": "https://docs.flow.bio/aligners"
}
}
}
}
Multi-Sample Processing
Handle complex sample relationships:
// Group samples by condition
Channel
.fromPath(params.sample_sheet)
.splitCsv(header: true)
.map { row ->
tuple(
row.condition,
row.sample_id,
file(row.reads)
)
}
.groupTuple(by: 0)
.set { grouped_samples }
// Process groups
process DIFFERENTIAL_ANALYSIS {
input:
tuple val(condition), val(sample_ids), path(reads)
script:
"""
run_deseq2.R \
--condition ${condition} \
--samples ${sample_ids.join(',')} \
--reads ${reads.join(' ')}
"""
}
Resources
- Pipeline Template: github.com/goodwright/pipeline-template
- Flow Modules: github.com/goodwright/flow-nf
- Example Pipelines: github.com/goodwright/flow-pipelines
- Nextflow Documentation: nextflow.io/docs
- Community Forum: community.flow.bio/pipelines