Resource APIs
Pipeline Execution Guide
Flow provides access to a comprehensive suite of bioinformatics pipelines for analyzing genomic data. This guide covers discovering available pipelines, executing analyses, monitoring progress, and retrieving results.
Understanding Pipelines
Pipelines in Flow are versioned, reproducible workflows that process biological data.
Pipeline Structure
# A typical pipeline contains:
pipeline = {
"id": 1,
"name": "RNA-seq",
"description": "Quantify gene expression from RNA sequencing data",
"category": "RNA Analysis",
"versions": [
{
"id": 10,
"version": "3.19.0",
"is_latest": True,
"parameters": [...],
"requirements": {
"min_samples": 1,
"required_data_types": ["FASTQ"],
"supported_organisms": ["human", "mouse", "rat"]
}
}
],
"upstream_pipelines": [], # Can chain pipelines
"estimated_runtime": "4-8 hours per sample"
}
Discovering Pipelines
List All Pipelines
# Python client
pipelines = client.get_pipelines()
for pipeline in pipelines:
print(f"{pipeline.name}: {pipeline.description}")
print(f" Latest version: {pipeline.latest_version.version}")
print(f" Category: {pipeline.category.name}")
REST API Pipeline Discovery
curl -H "Authorization: Bearer <token>" \
https://api.flow.bio/pipelines
GraphQL Pipeline Query
query GetPipelines {
pipelines {
id
name
description
category {
id
name
}
versions {
id
version
isLatest
canRun
}
requiredDataTypes {
name
}
supportedOrganisms {
name
}
}
}
Find Compatible Pipelines
# Find pipelines for your samples
samples = [sample1, sample2, sample3]
compatible = client.get_compatible_pipelines(sample_ids=[s.id for s in samples])
for pipeline in compatible:
print(f"{pipeline.name} can process your samples")
Pipeline Categories
RNA Analysis
# RNA-seq pipeline
rnaseq = client.get_pipeline("RNA-seq")
print(f"Description: {rnaseq.description}")
print(f"Latest version: {rnaseq.latest_version.version}")
# Parameters for RNA-seq
params = rnaseq.latest_version.parameters
for param in params:
print(f"- {param.name}: {param.description}")
print(f" Type: {param.type}, Default: {param.default}")
DNA Analysis
# Available DNA pipelines
dna_pipelines = client.get_pipelines(category="DNA Analysis")
# Includes: Variant Calling, CNV Analysis, SV Detection
Epigenomics
# ChIP-seq, ATAC-seq, Bisulfite-seq pipelines
epigenome_pipelines = client.get_pipelines(category="Epigenomics")
Single Cell
# Single-cell RNA-seq pipelines
sc_pipelines = client.get_pipelines(category="Single Cell")
Running Pipelines
Basic Pipeline Execution
# Run pipeline with default parameters
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=[sample1.id, sample2.id, sample3.id],
name="My RNA-seq Analysis"
)
print(f"Execution started: {execution.id}")
print(f"Status: {execution.status}")
REST API Pipeline Execution
curl -X POST https://api.flow.bio/pipelines/versions/10/run \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "My RNA-seq Analysis",
"samples": [123, 124, 125],
"parameters": {
"aligner": "star",
"strandedness": "reverse"
}
}'
GraphQL Pipeline Execution
mutation RunPipeline($input: RunPipelineInput!) {
runPipeline(input: $input) {
id
name
status
created
pipeline {
name
}
pipelineVersion {
version
}
}
}
# Variables
{
"input": {
"pipelineVersionId": "10",
"name": "RNA-seq Analysis",
"samples": ["123", "124", "125"],
"parameters": {
"aligner": "star_salmon",
"min_trimmed_reads": 10000
}
}
}
Advanced Execution Options
# Run with custom parameters
execution = client.run_pipeline(
pipeline="RNA-seq",
version="3.19.0", # Specific version
samples=sample_ids,
name="Custom RNA-seq Run",
parameters={
# Alignment
"aligner": "star_salmon",
"star_index": "GRCh38",
# Trimming
"clip_r1": 5,
"clip_r2": 5,
"three_prime_clip_r1": 0,
"three_prime_clip_r2": 0,
"trim_nextseq": 20,
# Filtering
"min_trimmed_reads": 10000,
"skip_trimming": False,
"skip_markduplicates": True,
# Quantification
"pseudo_aligner": "salmon",
"salmon_quant_libtype": "ISR",
# QC
"skip_qc": False,
"skip_fastqc": False,
"skip_multiqc": False,
# Differential Expression
"deseq2_vst": True,
"stringtie_ignore_gtf": False
},
metadata={
"experiment": "drug_treatment",
"analysis_type": "differential_expression"
}
)
Pipeline Parameters
Common Parameters
# Get available parameters for a pipeline
pipeline_version = client.get_pipeline_version(10)
for param in pipeline_version.parameters:
print(f"\nParameter: {param.name}")
print(f" Description: {param.description}")
print(f" Type: {param.type}")
print(f" Default: {param.default}")
print(f" Required: {param.required}")
if param.choices:
print(f" Choices: {param.choices}")
if param.validation:
print(f" Validation: {param.validation}")
Parameter Types
Type | Description | Example |
---|---|---|
string | Text value | "star" |
integer | Whole number | 10000 |
float | Decimal number | 0.05 |
boolean | True/False | true |
choice | Predefined options | "auto" |
file | File reference | "s3://bucket/file" |
array | List of values | ["gene1", "gene2"] |
Parameter Validation
def validate_pipeline_parameters(pipeline_name, parameters):
"""Validate parameters before execution"""
pipeline = client.get_pipeline(pipeline_name)
version = pipeline.latest_version
errors = []
for param in version.parameters:
if param.required and param.name not in parameters:
errors.append(f"Missing required parameter: {param.name}")
if param.name in parameters:
value = parameters[param.name]
# Type validation
if param.type == "integer" and not isinstance(value, int):
errors.append(f"{param.name} must be an integer")
# Choice validation
if param.choices and value not in param.choices:
errors.append(f"{param.name} must be one of {param.choices}")
# Range validation
if param.min is not None and value < param.min:
errors.append(f"{param.name} must be >= {param.min}")
if param.max is not None and value > param.max:
errors.append(f"{param.name} must be <= {param.max}")
return len(errors) == 0, errors
Monitoring Executions
Check Execution Status
# Get execution details
execution = client.get_execution(456)
print(f"Pipeline: {execution.pipeline.name}")
print(f"Status: {execution.status}")
print(f"Progress: {execution.progress}%")
print(f"Started: {execution.started}")
print(f"Duration: {execution.duration} seconds")
Monitor Progress
# Poll for updates
def monitor_execution(execution_id, interval=30):
"""Monitor execution until completion"""
while True:
execution = client.get_execution(execution_id)
print(f"\r[{execution.progress}%] {execution.status}", end="")
if execution.status in ["completed", "failed", "cancelled"]:
print() # New line
break
time.sleep(interval)
return execution
# Wait for completion
final_status = monitor_execution(execution.id)
print(f"Final status: {final_status.status}")
Real-time Updates with GraphQL
subscription ExecutionUpdates($id: ID!) {
executionUpdates(id: $id) {
id
status
progress
processExecutions {
process
status
progress
startTime
endTime
}
logs {
timestamp
process
message
level
}
}
}
Get Execution Logs
# Get detailed logs
logs = execution.get_logs()
for log in logs:
print(f"[{log.timestamp}] {log.process}: {log.message}")
# Filter logs by process
fastqc_logs = [log for log in logs if log.process == "FASTQC"]
# Get error logs only
errors = [log for log in logs if log.level == "ERROR"]
Process Tracking
Monitor Individual Processes
# Get process-level details
execution = client.get_execution(456)
for process in execution.process_executions:
print(f"\nProcess: {process.name}")
print(f" Status: {process.status}")
print(f" Progress: {process.progress}%")
print(f" Start: {process.start_time}")
print(f" End: {process.end_time}")
print(f" Exit Code: {process.exit_code}")
if process.resources:
print(f" CPU: {process.resources.cpu_percent}%")
print(f" Memory: {process.resources.memory_gb} GB")
Execution Timeline
def create_execution_timeline(execution_id):
"""Create timeline visualization of execution"""
execution = client.get_execution(execution_id)
timeline = []
for process in execution.process_executions:
if process.start_time and process.end_time:
duration = (process.end_time - process.start_time).seconds
timeline.append({
"process": process.name,
"start": process.start_time,
"end": process.end_time,
"duration": duration,
"status": process.status
})
# Sort by start time
timeline.sort(key=lambda x: x["start"])
# Create Gantt chart data
for i, item in enumerate(timeline):
start_offset = (item["start"] - timeline[0]["start"]).seconds
print(f"{item['process']:20} |{'=' * (item['duration'] // 60)}| {item['duration']}s")
return timeline
Retrieving Results
List Execution Outputs
# Get all outputs
execution = client.get_execution(456)
for output in execution.outputs:
print(f"File: {output.filename}")
print(f" Size: {output.size / 1e6:.1f} MB")
print(f" Type: {output.data_type}")
print(f" Path: {output.path}")
Download Results
# Download specific file
execution.download_output(
"multiqc_report.html",
"/local/path/multiqc_report.html"
)
# Download all results
execution.download_all_outputs("/results/execution_456/")
# Download specific output types
def download_output_type(execution, output_type, dest_dir):
"""Download all outputs of specific type"""
for output in execution.outputs:
if output.data_type == output_type:
dest_path = os.path.join(dest_dir, output.filename)
execution.download_output(output.filename, dest_path)
print(f"Downloaded {output.filename}")
# Download all BAM files
download_output_type(execution, "BAM", "/results/alignments/")
Access Results Programmatically
# Get count matrices directly
counts_url = execution.get_output_url("salmon.merged.gene_counts.tsv")
counts_df = pd.read_csv(counts_url, sep='\t', index_col=0)
print(f"Expression matrix: {counts_df.shape}")
print(counts_df.head())
# Get MultiQC data
multiqc_data = execution.get_output_json("multiqc_data.json")
Pipeline Chaining
Sequential Pipelines
# Run RNA-seq followed by pathway analysis
# First: RNA-seq
rnaseq_exec = client.run_pipeline(
pipeline="RNA-seq",
samples=sample_ids,
name="RNA-seq for pathway analysis"
)
# Wait for completion
rnaseq_exec.wait_until_complete()
# Second: Use RNA-seq outputs for pathway analysis
pathway_exec = client.run_pipeline(
pipeline="Pathway-Analysis",
upstream_execution=rnaseq_exec.id,
name="Pathway analysis of DE genes",
parameters={
"fdr_threshold": 0.05,
"log2fc_threshold": 1.0
}
)
Parallel Pipeline Execution
# Run multiple analyses in parallel
def run_parallel_analyses(samples, pipelines):
"""Run multiple pipelines on same samples"""
executions = []
for pipeline_name in pipelines:
exec = client.run_pipeline(
pipeline=pipeline_name,
samples=samples,
name=f"{pipeline_name} analysis"
)
executions.append(exec)
print(f"Started {pipeline_name}: {exec.id}")
# Monitor all executions
while any(e.status == "running" for e in executions):
for exec in executions:
exec.refresh()
time.sleep(60)
return executions
# Run RNA-seq and ChIP-seq in parallel
results = run_parallel_analyses(
samples=[s.id for s in samples],
pipelines=["RNA-seq", "ChIP-seq"]
)
Error Handling
Common Pipeline Errors
try:
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=sample_ids,
name="Test run"
)
except PipelineError as e:
if e.code == "INSUFFICIENT_PERMISSIONS":
print("You don't have permission to run pipelines")
elif e.code == "INVALID_SAMPLES":
print("Some samples are not compatible with this pipeline")
elif e.code == "MISSING_PARAMETERS":
print(f"Missing required parameters: {e.missing_params}")
else:
print(f"Pipeline error: {e}")
Handle Failed Executions
def handle_failed_execution(execution_id):
"""Diagnose and handle failed execution"""
execution = client.get_execution(execution_id)
if execution.status != "failed":
return
# Find failed processes
failed_processes = [
p for p in execution.process_executions
if p.status == "failed"
]
for process in failed_processes:
print(f"\nFailed process: {process.name}")
print(f"Exit code: {process.exit_code}")
# Get error logs
error_logs = [
log for log in execution.logs
if log.process == process.name and log.level == "ERROR"
]
for log in error_logs[-10:]: # Last 10 error messages
print(f" {log.message}")
# Common fixes
if any("memory" in log.message.lower() for log in error_logs):
print("\nSuggestion: Try running with increased memory allocation")
if any("disk space" in log.message.lower() for log in error_logs):
print("\nSuggestion: Check available disk space")
return failed_processes
Retry Failed Executions
def retry_execution_with_fixes(failed_execution_id):
"""Retry execution with parameter adjustments"""
failed = client.get_execution(failed_execution_id)
# Copy original parameters
new_params = failed.parameters.copy()
# Apply fixes based on failure
if "memory" in str(failed.error):
new_params["max_memory"] = "32.GB"
if "time limit" in str(failed.error):
new_params["max_time"] = "48.h"
# Create new execution
retry = client.run_pipeline(
pipeline=failed.pipeline.name,
version=failed.pipeline_version.version,
samples=[s.id for s in failed.samples],
name=f"{failed.name} (Retry)",
parameters=new_params,
metadata={
"retry_of": failed_execution_id,
"retry_reason": failed.error
}
)
return retry
Pipeline Management
Cancel Running Executions
# Cancel single execution
execution.cancel()
# Batch cancel
running_execs = client.get_executions(status="running")
for exec in running_execs:
if exec.name.startswith("test_"):
exec.cancel()
print(f"Cancelled {exec.name}")
Execution Priorities
# Run high-priority analysis
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=urgent_samples,
name="Urgent Analysis",
priority="high", # high, normal, low
metadata={
"reason": "Clinical deadline",
"deadline": "2024-01-20"
}
)
Resource Allocation
# Specify resource requirements
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=sample_ids,
name="Large Dataset Analysis",
resources={
"cpus": 16,
"memory": "64GB",
"queue": "highmem",
"time_limit": "48h"
}
)
Best Practices
1. Parameter Selection
def get_optimal_parameters(pipeline_name, sample_characteristics):
"""Get recommended parameters based on data"""
params = {}
if pipeline_name == "RNA-seq":
# Strandedness
if sample_characteristics.get("library_prep") == "TruSeq Stranded":
params["strandedness"] = "reverse"
else:
params["strandedness"] = "auto"
# Aligner
if sample_characteristics.get("read_length", 0) >= 150:
params["aligner"] = "star_salmon"
else:
params["aligner"] = "star_rsem"
# Trimming
if sample_characteristics.get("adapter_content", 0) > 0.1:
params["skip_trimming"] = False
params["adapter"] = "auto"
return params
2. Batch Processing
def batch_process_by_group(samples, pipeline_name, group_by="condition"):
"""Process samples in batches by grouping"""
# Group samples
groups = {}
for sample in samples:
group_key = sample.metadata.get(group_by, "default")
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(sample)
# Run pipeline for each group
executions = {}
for group_name, group_samples in groups.items():
exec = client.run_pipeline(
pipeline=pipeline_name,
samples=[s.id for s in group_samples],
name=f"{pipeline_name} - {group_name}",
metadata={"group": group_name}
)
executions[group_name] = exec
return executions
3. Results Organization
def organize_pipeline_results(execution_id, output_dir):
"""Organize pipeline outputs by type"""
execution = client.get_execution(execution_id)
# Create directory structure
dirs = {
"qc": os.path.join(output_dir, "01_QC"),
"alignments": os.path.join(output_dir, "02_Alignments"),
"counts": os.path.join(output_dir, "03_Counts"),
"de": os.path.join(output_dir, "04_DifferentialExpression"),
"reports": os.path.join(output_dir, "05_Reports")
}
for dir_path in dirs.values():
os.makedirs(dir_path, exist_ok=True)
# Organize outputs
for output in execution.outputs:
if "multiqc" in output.filename or "fastqc" in output.filename:
dest = dirs["qc"]
elif output.filename.endswith(".bam") or output.filename.endswith(".bai"):
dest = dirs["alignments"]
elif "counts" in output.filename or "tpm" in output.filename:
dest = dirs["counts"]
elif "deseq2" in output.filename or "differential" in output.filename:
dest = dirs["de"]
else:
dest = dirs["reports"]
output_path = os.path.join(dest, output.filename)
execution.download_output(output.filename, output_path)
print(f"Results organized in {output_dir}")
Pipeline Development
Custom Pipeline Integration
# Register custom pipeline
custom_pipeline = client.register_pipeline(
name="Custom-Analysis",
repository="https://github.com/myorg/custom-pipeline",
version="1.0.0",
config={
"executor": "slurm",
"container": "docker://myorg/custom:latest",
"schema": "assets/schema.json"
}
)
# Test custom pipeline
test_execution = client.run_pipeline(
pipeline="Custom-Analysis",
samples=[test_sample.id],
name="Test Custom Pipeline",
test_mode=True # Runs with minimal resources
)
Next Steps
- Sample Management - Preparing samples for analysis
- Search API - Finding executions and results
- Download Guide - Retrieving pipeline outputs
- Error Handling - Handling pipeline errors