Flow Logo

Advanced Topics

Writing pipelines

Flow uses Nextflow as its pipeline orchestration engine, enabling scalable and reproducible bioinformatics workflows. This guide explains how to write pipelines that integrate seamlessly with Flow's platform.


Overview

Flow pipelines are standard Nextflow workflows with additional conventions and metadata that enable:

  • Automatic UI generation for pipeline parameters
  • Sample and data management integration
  • Progress tracking and real-time monitoring
  • Result organization and visualization
  • Version control and reproducibility

Pipeline Types

Flow supports several types of pipelines:

  1. Primary Pipelines - Full bioinformatics workflows (RNA-seq, ChIP-seq, etc.)
  2. Utility Pipelines - Data processing tools (demultiplexing, format conversion)
  3. Module Wrappers - Single-tool pipelines for specific analyses
  4. Subworkflow Wrappers - Reusable workflow components

Getting Started

Basic Pipeline Structure

A minimal Flow pipeline consists of:

// main.nf
nextflow.enable.dsl = 2

// Parameters with defaults
params.input = null
params.output = "results"

// Main workflow
workflow {
    // Input validation
    if (!params.input) {
        error "Please provide input data with --input"
    }
    
    // Process definition
    PROCESS_DATA(params.input)
    
    // Emit results
    emit:
    results = PROCESS_DATA.out
}

// Process definition
process PROCESS_DATA {
    publishDir params.output, mode: 'copy'
    
    input:
    path input_file
    
    output:
    path "*.txt", emit: results
    
    script:
    """
    # Your analysis commands here
    process_data.py ${input_file} > output.txt
    """
}

Flow Schema File

Every Flow pipeline requires a JSON schema file that describes inputs and parameters:

// schema.json
{
  "$schema": "http://json-schema.org/draft-07/schema",
  "title": "My Pipeline",
  "description": "A pipeline that processes biological data",
  "type": "object",
  "definitions": {
    "input_output_options": {
      "title": "Input/output options",
      "type": "object",
      "properties": {
        "input": {
          "type": "string",
          "format": "file-path",
          "description": "Path to input data file",
          "help_text": "Accepts FASTQ, FASTA, or BAM formats"
        },
        "output": {
          "type": "string",
          "description": "Output directory for results",
          "default": "results"
        }
      },
      "required": ["input"]
    }
  },
  "allOf": [
    {"$ref": "#/definitions/input_output_options"}
  ]
}

Flow Integration Features

1. Sample Input Handling

Flow automatically provides sample data to pipelines. Access sample files using:

// Automatic sample detection
Channel
    .fromPath(params.input)
    .splitCsv(header: true)
    .map { row -> 
        tuple(row.sample_id, file(row.read1), file(row.read2))
    }
    .set { samples_ch }

2. Progress Reporting

Enable real-time progress tracking:

process ALIGN_READS {
    tag "$sample_id"
    label 'process_high'
    
    input:
    tuple val(sample_id), path(reads)
    
    output:
    tuple val(sample_id), path("*.bam"), emit: alignments
    
    script:
    """
    # Report progress to Flow
    echo "STATUS:Aligning reads for sample ${sample_id}"
    
    # Run alignment
    bwa mem reference.fa ${reads} > ${sample_id}.bam
    
    echo "STATUS:Alignment complete for ${sample_id}"
    """
}

3. Result Organization

Structure outputs for Flow's result viewer:

// Organize outputs by type
params.output_structure = [
    alignments: "${params.output}/alignments",
    qc: "${params.output}/qc",
    counts: "${params.output}/counts",
    reports: "${params.output}/reports"
]

// Publish to structured directories
process GENERATE_REPORT {
    publishDir params.output_structure.reports, 
               mode: 'copy',
               pattern: "*.html"
    
    // Process definition...
}

4. Metadata Integration

Access Flow metadata within pipelines:

// Flow provides metadata via params
params.flow_metadata = [
    project_id: null,
    sample_metadata: null,
    execution_id: null
]

// Use metadata in processes
process ANNOTATE_RESULTS {
    input:
    path results
    
    script:
    def metadata = params.flow_metadata
    """
    annotate.py \
        --results ${results} \
        --project ${metadata.project_id} \
        --metadata ${metadata.sample_metadata} \
        > annotated_results.txt
    """
}

Best Practices

1. Use Containers

Always specify containers for reproducibility:

process ANALYZE_DATA {
    container 'biocontainers/fastqc:v0.11.9_cv8'
    
    // For Singularity on HPC
    containerOptions params.singularity_bind_paths
    
    script:
    """
    fastqc ${reads}
    """
}

2. Resource Management

Define resource requirements using labels:

// In nextflow.config
process {
    withLabel: process_low {
        cpus = 2
        memory = 4.GB
        time = 2.h
    }
    withLabel: process_medium {
        cpus = 6
        memory = 16.GB
        time = 8.h
    }
    withLabel: process_high {
        cpus = 12
        memory = 32.GB
        time = 24.h
    }
}

3. Error Handling

Implement robust error handling:

process VALIDATE_INPUT {
    errorStrategy 'terminate'
    
    input:
    path input_file
    
    output:
    path input_file, emit: validated
    
    script:
    """
    # Validate file format
    if ! validate_format.py ${input_file}; then
        echo "ERROR: Invalid input format" >&2
        exit 1
    fi
    
    # Check file integrity
    if ! check_integrity.py ${input_file}; then
        echo "ERROR: Corrupted input file" >&2
        exit 1
    fi
    
    # Pass through if valid
    ln -s ${input_file} .
    """
}

4. Documentation

Include comprehensive documentation:

// main.nf header
/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    FLOW PIPELINE: MY ANALYSIS
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    Github : https://github.com/myorg/my-pipeline
    Website: https://flow.bio/pipelines/my-analysis
    Docs   : https://flow.bio/pipelines/my-analysis/docs
----------------------------------------------------------------------------------------
*/

// Process documentation
process COMPLEX_ANALYSIS {
    """
    Performs complex bioinformatics analysis
    
    This process implements the algorithm described in:
    Smith et al. (2023) Nature Biotechnology
    
    Key parameters:
    - min_quality: Minimum quality score (default: 20)
    - max_errors: Maximum allowed errors (default: 3)
    """
    
    // Process definition...
}

Testing Pipelines

Local Testing

Test your pipeline locally before deploying:

# Test with sample data
nextflow run main.nf \
    -profile test,docker \
    --input test_data/samples.csv \
    --output test_results

# Validate schema
nextflow run main.nf \
    --help \
    --schema_validate

Continuous Integration

Use GitHub Actions for automated testing:

# .github/workflows/test.yml
name: Pipeline Tests
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: nf-core/setup-nextflow@v1
      
      - name: Run pipeline tests
        run: |
          nextflow run main.nf -profile test,docker
      
      - name: Run nf-test
        run: |
          nf-test test tests/*.nf.test

Deploying to Flow

1. Repository Setup

Your pipeline repository should include:

my-pipeline/
├── main.nf              # Main pipeline script
├── nextflow.config      # Configuration file
├── schema.json          # Flow schema definition
├── modules/             # Process modules
├── subworkflows/        # Reusable subworkflows
├── bin/                 # Helper scripts
├── docs/                # Documentation
│   └── output.md        # Output description
├── test_data/           # Test datasets
└── README.md            # Pipeline documentation

2. Version Tags

Tag releases following semantic versioning:

git tag -a v1.0.0 -m "First stable release"
git push origin v1.0.0

3. Flow Registration

Pipelines are automatically discovered when:

  1. Repository is public or Flow has access
  2. Contains valid main.nf and schema.json
  3. Has at least one version tag
  4. Passes validation checks

Advanced Topics

Dynamic Parameter Generation

Generate parameters based on input data:

// Detect genome automatically
process DETECT_GENOME {
    input:
    path sample_sheet
    
    output:
    env GENOME, emit: genome
    
    script:
    """
    GENOME=\$(detect_species.py ${sample_sheet})
    """
}

// Use detected genome
workflow {
    DETECT_GENOME(params.input)
    
    ALIGN_READS(
        samples_ch,
        DETECT_GENOME.out.genome
    )
}

Custom UI Components

Define custom UI elements in schema:

{
  "properties": {
    "aligner": {
      "type": "string",
      "description": "Alignment algorithm",
      "enum": ["bwa", "bowtie2", "star"],
      "enumNames": ["BWA-MEM", "Bowtie 2", "STAR"],
      "default": "star",
      "flow_ui": {
        "widget": "radio",
        "help_link": "https://docs.flow.bio/aligners"
      }
    }
  }
}

Multi-Sample Processing

Handle complex sample relationships:

// Group samples by condition
Channel
    .fromPath(params.sample_sheet)
    .splitCsv(header: true)
    .map { row ->
        tuple(
            row.condition,
            row.sample_id,
            file(row.reads)
        )
    }
    .groupTuple(by: 0)
    .set { grouped_samples }

// Process groups
process DIFFERENTIAL_ANALYSIS {
    input:
    tuple val(condition), val(sample_ids), path(reads)
    
    script:
    """
    run_deseq2.R \
        --condition ${condition} \
        --samples ${sample_ids.join(',')} \
        --reads ${reads.join(' ')}
    """
}

Resources

Previous
Architecture Guide