Flow Logo

Resource APIs

Pipeline Execution Guide

Flow provides access to a comprehensive suite of bioinformatics pipelines for analyzing genomic data. This guide covers discovering available pipelines, executing analyses, monitoring progress, and retrieving results.


Understanding Pipelines

Pipelines in Flow are versioned, reproducible workflows that process biological data.

Pipeline Structure

# A typical pipeline contains:
pipeline = {
    "id": 1,
    "name": "RNA-seq",
    "description": "Quantify gene expression from RNA sequencing data",
    "category": "RNA Analysis",
    "versions": [
        {
            "id": 10,
            "version": "3.19.0",
            "is_latest": True,
            "parameters": [...],
            "requirements": {
                "min_samples": 1,
                "required_data_types": ["FASTQ"],
                "supported_organisms": ["human", "mouse", "rat"]
            }
        }
    ],
    "upstream_pipelines": [],  # Can chain pipelines
    "estimated_runtime": "4-8 hours per sample"
}

Discovering Pipelines

List All Pipelines

# Python client
pipelines = client.get_pipelines()

for pipeline in pipelines:
    print(f"{pipeline.name}: {pipeline.description}")
    print(f"  Latest version: {pipeline.latest_version.version}")
    print(f"  Category: {pipeline.category.name}")

REST API Pipeline Discovery

curl -H "Authorization: Bearer <token>" \
  https://api.flow.bio/pipelines

GraphQL Pipeline Query

query GetPipelines {
  pipelines {
    id
    name
    description
    category {
      id
      name
    }
    versions {
      id
      version
      isLatest
      canRun
    }
    requiredDataTypes {
      name
    }
    supportedOrganisms {
      name
    }
  }
}

Find Compatible Pipelines

# Find pipelines for your samples
samples = [sample1, sample2, sample3]
compatible = client.get_compatible_pipelines(sample_ids=[s.id for s in samples])

for pipeline in compatible:
    print(f"{pipeline.name} can process your samples")

Pipeline Categories

RNA Analysis

# RNA-seq pipeline
rnaseq = client.get_pipeline("RNA-seq")
print(f"Description: {rnaseq.description}")
print(f"Latest version: {rnaseq.latest_version.version}")

# Parameters for RNA-seq
params = rnaseq.latest_version.parameters
for param in params:
    print(f"- {param.name}: {param.description}")
    print(f"  Type: {param.type}, Default: {param.default}")

DNA Analysis

# Available DNA pipelines
dna_pipelines = client.get_pipelines(category="DNA Analysis")
# Includes: Variant Calling, CNV Analysis, SV Detection

Epigenomics

# ChIP-seq, ATAC-seq, Bisulfite-seq pipelines
epigenome_pipelines = client.get_pipelines(category="Epigenomics")

Single Cell

# Single-cell RNA-seq pipelines
sc_pipelines = client.get_pipelines(category="Single Cell")

Running Pipelines

Basic Pipeline Execution

# Run pipeline with default parameters
execution = client.run_pipeline(
    pipeline="RNA-seq",
    samples=[sample1.id, sample2.id, sample3.id],
    name="My RNA-seq Analysis"
)

print(f"Execution started: {execution.id}")
print(f"Status: {execution.status}")

REST API Pipeline Execution

curl -X POST https://api.flow.bio/pipelines/versions/10/run \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "My RNA-seq Analysis",
    "samples": [123, 124, 125],
    "parameters": {
      "aligner": "star",
      "strandedness": "reverse"
    }
  }'

GraphQL Pipeline Execution

mutation RunPipeline($input: RunPipelineInput!) {
  runPipeline(input: $input) {
    id
    name
    status
    created
    pipeline {
      name
    }
    pipelineVersion {
      version
    }
  }
}

# Variables
{
  "input": {
    "pipelineVersionId": "10",
    "name": "RNA-seq Analysis",
    "samples": ["123", "124", "125"],
    "parameters": {
      "aligner": "star_salmon",
      "min_trimmed_reads": 10000
    }
  }
}

Advanced Execution Options

# Run with custom parameters
execution = client.run_pipeline(
    pipeline="RNA-seq",
    version="3.19.0",  # Specific version
    samples=sample_ids,
    name="Custom RNA-seq Run",
    parameters={
        # Alignment
        "aligner": "star_salmon",
        "star_index": "GRCh38",
        
        # Trimming
        "clip_r1": 5,
        "clip_r2": 5,
        "three_prime_clip_r1": 0,
        "three_prime_clip_r2": 0,
        "trim_nextseq": 20,
        
        # Filtering
        "min_trimmed_reads": 10000,
        "skip_trimming": False,
        "skip_markduplicates": True,
        
        # Quantification
        "pseudo_aligner": "salmon",
        "salmon_quant_libtype": "ISR",
        
        # QC
        "skip_qc": False,
        "skip_fastqc": False,
        "skip_multiqc": False,
        
        # Differential Expression
        "deseq2_vst": True,
        "stringtie_ignore_gtf": False
    },
    metadata={
        "experiment": "drug_treatment",
        "analysis_type": "differential_expression"
    }
)

Pipeline Parameters

Common Parameters

# Get available parameters for a pipeline
pipeline_version = client.get_pipeline_version(10)

for param in pipeline_version.parameters:
    print(f"\nParameter: {param.name}")
    print(f"  Description: {param.description}")
    print(f"  Type: {param.type}")
    print(f"  Default: {param.default}")
    print(f"  Required: {param.required}")
    
    if param.choices:
        print(f"  Choices: {param.choices}")
    
    if param.validation:
        print(f"  Validation: {param.validation}")

Parameter Types

TypeDescriptionExample
stringText value"star"
integerWhole number10000
floatDecimal number0.05
booleanTrue/Falsetrue
choicePredefined options"auto"
fileFile reference"s3://bucket/file"
arrayList of values["gene1", "gene2"]

Parameter Validation

def validate_pipeline_parameters(pipeline_name, parameters):
    """Validate parameters before execution"""
    
    pipeline = client.get_pipeline(pipeline_name)
    version = pipeline.latest_version
    
    errors = []
    
    for param in version.parameters:
        if param.required and param.name not in parameters:
            errors.append(f"Missing required parameter: {param.name}")
        
        if param.name in parameters:
            value = parameters[param.name]
            
            # Type validation
            if param.type == "integer" and not isinstance(value, int):
                errors.append(f"{param.name} must be an integer")
            
            # Choice validation
            if param.choices and value not in param.choices:
                errors.append(f"{param.name} must be one of {param.choices}")
            
            # Range validation
            if param.min is not None and value < param.min:
                errors.append(f"{param.name} must be >= {param.min}")
            
            if param.max is not None and value > param.max:
                errors.append(f"{param.name} must be <= {param.max}")
    
    return len(errors) == 0, errors

Monitoring Executions

Check Execution Status

# Get execution details
execution = client.get_execution(456)

print(f"Pipeline: {execution.pipeline.name}")
print(f"Status: {execution.status}")
print(f"Progress: {execution.progress}%")
print(f"Started: {execution.started}")
print(f"Duration: {execution.duration} seconds")

Monitor Progress

# Poll for updates
def monitor_execution(execution_id, interval=30):
    """Monitor execution until completion"""
    
    while True:
        execution = client.get_execution(execution_id)
        
        print(f"\r[{execution.progress}%] {execution.status}", end="")
        
        if execution.status in ["completed", "failed", "cancelled"]:
            print()  # New line
            break
        
        time.sleep(interval)
    
    return execution

# Wait for completion
final_status = monitor_execution(execution.id)
print(f"Final status: {final_status.status}")

Real-time Updates with GraphQL

subscription ExecutionUpdates($id: ID!) {
  executionUpdates(id: $id) {
    id
    status
    progress
    processExecutions {
      process
      status
      progress
      startTime
      endTime
    }
    logs {
      timestamp
      process
      message
      level
    }
  }
}

Get Execution Logs

# Get detailed logs
logs = execution.get_logs()

for log in logs:
    print(f"[{log.timestamp}] {log.process}: {log.message}")

# Filter logs by process
fastqc_logs = [log for log in logs if log.process == "FASTQC"]

# Get error logs only
errors = [log for log in logs if log.level == "ERROR"]

Process Tracking

Monitor Individual Processes

# Get process-level details
execution = client.get_execution(456)

for process in execution.process_executions:
    print(f"\nProcess: {process.name}")
    print(f"  Status: {process.status}")
    print(f"  Progress: {process.progress}%")
    print(f"  Start: {process.start_time}")
    print(f"  End: {process.end_time}")
    print(f"  Exit Code: {process.exit_code}")
    
    if process.resources:
        print(f"  CPU: {process.resources.cpu_percent}%")
        print(f"  Memory: {process.resources.memory_gb} GB")

Execution Timeline

def create_execution_timeline(execution_id):
    """Create timeline visualization of execution"""
    
    execution = client.get_execution(execution_id)
    
    timeline = []
    
    for process in execution.process_executions:
        if process.start_time and process.end_time:
            duration = (process.end_time - process.start_time).seconds
            
            timeline.append({
                "process": process.name,
                "start": process.start_time,
                "end": process.end_time,
                "duration": duration,
                "status": process.status
            })
    
    # Sort by start time
    timeline.sort(key=lambda x: x["start"])
    
    # Create Gantt chart data
    for i, item in enumerate(timeline):
        start_offset = (item["start"] - timeline[0]["start"]).seconds
        print(f"{item['process']:20} |{'=' * (item['duration'] // 60)}| {item['duration']}s")
    
    return timeline

Retrieving Results

List Execution Outputs

# Get all outputs
execution = client.get_execution(456)

for output in execution.outputs:
    print(f"File: {output.filename}")
    print(f"  Size: {output.size / 1e6:.1f} MB")
    print(f"  Type: {output.data_type}")
    print(f"  Path: {output.path}")

Download Results

# Download specific file
execution.download_output(
    "multiqc_report.html",
    "/local/path/multiqc_report.html"
)

# Download all results
execution.download_all_outputs("/results/execution_456/")

# Download specific output types
def download_output_type(execution, output_type, dest_dir):
    """Download all outputs of specific type"""
    
    for output in execution.outputs:
        if output.data_type == output_type:
            dest_path = os.path.join(dest_dir, output.filename)
            execution.download_output(output.filename, dest_path)
            print(f"Downloaded {output.filename}")

# Download all BAM files
download_output_type(execution, "BAM", "/results/alignments/")

Access Results Programmatically

# Get count matrices directly
counts_url = execution.get_output_url("salmon.merged.gene_counts.tsv")
counts_df = pd.read_csv(counts_url, sep='\t', index_col=0)

print(f"Expression matrix: {counts_df.shape}")
print(counts_df.head())

# Get MultiQC data
multiqc_data = execution.get_output_json("multiqc_data.json")

Pipeline Chaining

Sequential Pipelines

# Run RNA-seq followed by pathway analysis
# First: RNA-seq
rnaseq_exec = client.run_pipeline(
    pipeline="RNA-seq",
    samples=sample_ids,
    name="RNA-seq for pathway analysis"
)

# Wait for completion
rnaseq_exec.wait_until_complete()

# Second: Use RNA-seq outputs for pathway analysis
pathway_exec = client.run_pipeline(
    pipeline="Pathway-Analysis",
    upstream_execution=rnaseq_exec.id,
    name="Pathway analysis of DE genes",
    parameters={
        "fdr_threshold": 0.05,
        "log2fc_threshold": 1.0
    }
)

Parallel Pipeline Execution

# Run multiple analyses in parallel
def run_parallel_analyses(samples, pipelines):
    """Run multiple pipelines on same samples"""
    
    executions = []
    
    for pipeline_name in pipelines:
        exec = client.run_pipeline(
            pipeline=pipeline_name,
            samples=samples,
            name=f"{pipeline_name} analysis"
        )
        executions.append(exec)
        print(f"Started {pipeline_name}: {exec.id}")
    
    # Monitor all executions
    while any(e.status == "running" for e in executions):
        for exec in executions:
            exec.refresh()
        
        time.sleep(60)
    
    return executions

# Run RNA-seq and ChIP-seq in parallel
results = run_parallel_analyses(
    samples=[s.id for s in samples],
    pipelines=["RNA-seq", "ChIP-seq"]
)

Error Handling

Common Pipeline Errors

try:
    execution = client.run_pipeline(
        pipeline="RNA-seq",
        samples=sample_ids,
        name="Test run"
    )
except PipelineError as e:
    if e.code == "INSUFFICIENT_PERMISSIONS":
        print("You don't have permission to run pipelines")
    elif e.code == "INVALID_SAMPLES":
        print("Some samples are not compatible with this pipeline")
    elif e.code == "MISSING_PARAMETERS":
        print(f"Missing required parameters: {e.missing_params}")
    else:
        print(f"Pipeline error: {e}")

Handle Failed Executions

def handle_failed_execution(execution_id):
    """Diagnose and handle failed execution"""
    
    execution = client.get_execution(execution_id)
    
    if execution.status != "failed":
        return
    
    # Find failed processes
    failed_processes = [
        p for p in execution.process_executions 
        if p.status == "failed"
    ]
    
    for process in failed_processes:
        print(f"\nFailed process: {process.name}")
        print(f"Exit code: {process.exit_code}")
        
        # Get error logs
        error_logs = [
            log for log in execution.logs 
            if log.process == process.name and log.level == "ERROR"
        ]
        
        for log in error_logs[-10:]:  # Last 10 error messages
            print(f"  {log.message}")
    
    # Common fixes
    if any("memory" in log.message.lower() for log in error_logs):
        print("\nSuggestion: Try running with increased memory allocation")
    
    if any("disk space" in log.message.lower() for log in error_logs):
        print("\nSuggestion: Check available disk space")
    
    return failed_processes

Retry Failed Executions

def retry_execution_with_fixes(failed_execution_id):
    """Retry execution with parameter adjustments"""
    
    failed = client.get_execution(failed_execution_id)
    
    # Copy original parameters
    new_params = failed.parameters.copy()
    
    # Apply fixes based on failure
    if "memory" in str(failed.error):
        new_params["max_memory"] = "32.GB"
    
    if "time limit" in str(failed.error):
        new_params["max_time"] = "48.h"
    
    # Create new execution
    retry = client.run_pipeline(
        pipeline=failed.pipeline.name,
        version=failed.pipeline_version.version,
        samples=[s.id for s in failed.samples],
        name=f"{failed.name} (Retry)",
        parameters=new_params,
        metadata={
            "retry_of": failed_execution_id,
            "retry_reason": failed.error
        }
    )
    
    return retry

Pipeline Management

Cancel Running Executions

# Cancel single execution
execution.cancel()

# Batch cancel
running_execs = client.get_executions(status="running")
for exec in running_execs:
    if exec.name.startswith("test_"):
        exec.cancel()
        print(f"Cancelled {exec.name}")

Execution Priorities

# Run high-priority analysis
execution = client.run_pipeline(
    pipeline="RNA-seq",
    samples=urgent_samples,
    name="Urgent Analysis",
    priority="high",  # high, normal, low
    metadata={
        "reason": "Clinical deadline",
        "deadline": "2024-01-20"
    }
)

Resource Allocation

# Specify resource requirements
execution = client.run_pipeline(
    pipeline="RNA-seq",
    samples=sample_ids,
    name="Large Dataset Analysis",
    resources={
        "cpus": 16,
        "memory": "64GB",
        "queue": "highmem",
        "time_limit": "48h"
    }
)

Best Practices

1. Parameter Selection

def get_optimal_parameters(pipeline_name, sample_characteristics):
    """Get recommended parameters based on data"""
    
    params = {}
    
    if pipeline_name == "RNA-seq":
        # Strandedness
        if sample_characteristics.get("library_prep") == "TruSeq Stranded":
            params["strandedness"] = "reverse"
        else:
            params["strandedness"] = "auto"
        
        # Aligner
        if sample_characteristics.get("read_length", 0) >= 150:
            params["aligner"] = "star_salmon"
        else:
            params["aligner"] = "star_rsem"
        
        # Trimming
        if sample_characteristics.get("adapter_content", 0) > 0.1:
            params["skip_trimming"] = False
            params["adapter"] = "auto"
    
    return params

2. Batch Processing

def batch_process_by_group(samples, pipeline_name, group_by="condition"):
    """Process samples in batches by grouping"""
    
    # Group samples
    groups = {}
    for sample in samples:
        group_key = sample.metadata.get(group_by, "default")
        if group_key not in groups:
            groups[group_key] = []
        groups[group_key].append(sample)
    
    # Run pipeline for each group
    executions = {}
    for group_name, group_samples in groups.items():
        exec = client.run_pipeline(
            pipeline=pipeline_name,
            samples=[s.id for s in group_samples],
            name=f"{pipeline_name} - {group_name}",
            metadata={"group": group_name}
        )
        executions[group_name] = exec
    
    return executions

3. Results Organization

def organize_pipeline_results(execution_id, output_dir):
    """Organize pipeline outputs by type"""
    
    execution = client.get_execution(execution_id)
    
    # Create directory structure
    dirs = {
        "qc": os.path.join(output_dir, "01_QC"),
        "alignments": os.path.join(output_dir, "02_Alignments"),
        "counts": os.path.join(output_dir, "03_Counts"),
        "de": os.path.join(output_dir, "04_DifferentialExpression"),
        "reports": os.path.join(output_dir, "05_Reports")
    }
    
    for dir_path in dirs.values():
        os.makedirs(dir_path, exist_ok=True)
    
    # Organize outputs
    for output in execution.outputs:
        if "multiqc" in output.filename or "fastqc" in output.filename:
            dest = dirs["qc"]
        elif output.filename.endswith(".bam") or output.filename.endswith(".bai"):
            dest = dirs["alignments"]
        elif "counts" in output.filename or "tpm" in output.filename:
            dest = dirs["counts"]
        elif "deseq2" in output.filename or "differential" in output.filename:
            dest = dirs["de"]
        else:
            dest = dirs["reports"]
        
        output_path = os.path.join(dest, output.filename)
        execution.download_output(output.filename, output_path)
    
    print(f"Results organized in {output_dir}")

Pipeline Development

Custom Pipeline Integration

# Register custom pipeline
custom_pipeline = client.register_pipeline(
    name="Custom-Analysis",
    repository="https://github.com/myorg/custom-pipeline",
    version="1.0.0",
    config={
        "executor": "slurm",
        "container": "docker://myorg/custom:latest",
        "schema": "assets/schema.json"
    }
)

# Test custom pipeline
test_execution = client.run_pipeline(
    pipeline="Custom-Analysis",
    samples=[test_sample.id],
    name="Test Custom Pipeline",
    test_mode=True  # Runs with minimal resources
)

Next Steps

Previous
Project Management