API Reference
API Workflows
This guide demonstrates common integration patterns and workflows using Flow's APIs. These examples show how to combine multiple API operations to accomplish real-world tasks.
Automated Sample Processing
Workflow: Process Samples from Sequencing Facility
This workflow automatically processes new samples as they arrive from a sequencing facility.
import flowbio
import os
import time
from pathlib import Path
class SequencingProcessor:
def __init__(self, watch_dir, project_id):
self.client = flowbio.Client()
self.client.login(os.environ['FLOW_USERNAME'], os.environ['FLOW_PASSWORD'])
self.watch_dir = Path(watch_dir)
self.project_id = project_id
self.processed_files = set()
def find_sample_pairs(self):
"""Find R1/R2 pairs in watch directory."""
r1_files = list(self.watch_dir.glob("*_R1_*.fastq.gz"))
pairs = []
for r1 in r1_files:
r2 = Path(str(r1).replace("_R1_", "_R2_"))
if r2.exists() and r1 not in self.processed_files:
sample_name = r1.stem.split("_R1_")[0]
pairs.append((sample_name, r1, r2))
return pairs
def process_new_samples(self):
"""Upload new samples and start analysis."""
pairs = self.find_sample_pairs()
if not pairs:
return
uploaded_samples = []
for sample_name, r1, r2 in pairs:
print(f"Uploading {sample_name}...")
# Extract metadata from filename or sidecar file
metadata = self.parse_metadata(sample_name)
# Upload sample
sample = self.client.upload_sample(
name=sample_name,
read1=str(r1),
read2=str(r2),
project_id=self.project_id,
metadata=metadata,
progress=True
)
uploaded_samples.append(sample)
self.processed_files.add(r1)
# Archive processed files
self.archive_files(r1, r2)
if uploaded_samples:
# Run pipeline on new samples
self.run_analysis(uploaded_samples)
def parse_metadata(self, sample_name):
"""Parse metadata from filename or metadata file."""
metadata = {}
# Look for metadata sidecar file
metadata_file = self.watch_dir / f"{sample_name}_metadata.json"
if metadata_file.exists():
import json
with open(metadata_file) as f:
metadata = json.load(f)
else:
# Parse from filename convention
# Example: PatientID_Tissue_TimePoint_Rep1_R1_001.fastq.gz
parts = sample_name.split("_")
if len(parts) >= 4:
metadata = {
"patient_id": parts[0],
"tissue": parts[1],
"time_point": parts[2],
"replicate": parts[3]
}
return metadata
def archive_files(self, r1, r2):
"""Move processed files to archive directory."""
archive_dir = self.watch_dir / "archived"
archive_dir.mkdir(exist_ok=True)
r1.rename(archive_dir / r1.name)
r2.rename(archive_dir / r2.name)
def run_analysis(self, samples):
"""Start RNA-seq pipeline on uploaded samples."""
print(f"Starting analysis for {len(samples)} samples...")
execution = self.client.run_pipeline(
pipeline="RNA-seq",
samples=[s.id for s in samples],
params={
"aligner": "star_salmon",
"skip_qc": False
}
)
print(f"Pipeline started: {execution.id}")
self.monitor_execution(execution)
def monitor_execution(self, execution):
"""Monitor pipeline execution and notify on completion."""
while execution.status in ["pending", "running"]:
time.sleep(60)
execution.refresh()
if execution.status == "completed":
self.notify_completion(execution)
else:
self.notify_failure(execution)
def notify_completion(self, execution):
"""Send notification when pipeline completes."""
# Send email, Slack message, etc.
print(f"Pipeline {execution.id} completed successfully!")
def notify_failure(self, execution):
"""Send notification when pipeline fails."""
print(f"Pipeline {execution.id} failed: {execution.error}")
def run(self, check_interval=300):
"""Run the processor continuously."""
print(f"Watching {self.watch_dir} for new samples...")
while True:
try:
self.process_new_samples()
except Exception as e:
print(f"Error processing samples: {e}")
time.sleep(check_interval)
# Usage
processor = SequencingProcessor(
watch_dir="/data/sequencing/incoming",
project_id="project-123"
)
processor.run()
Batch Quality Control
Workflow: QC Analysis Across Multiple Projects
This workflow runs quality control analysis across multiple projects and generates a summary report.
import flowbio
import pandas as pd
from datetime import datetime, timedelta
class BatchQCAnalyzer:
def __init__(self):
self.client = flowbio.Client()
self.client.login()
def analyze_recent_samples(self, days=7):
"""Run QC on samples created in the last N days."""
# Get recent samples
cutoff_date = datetime.now() - timedelta(days=days)
samples = self.client.search_samples(
created_after=cutoff_date,
sample_type="RNA-seq",
is_owned=True
)
# Group by project
projects = {}
for sample in samples:
project_id = sample.project.id if sample.project else "no_project"
if project_id not in projects:
projects[project_id] = []
projects[project_id].append(sample)
# Run QC for each project
results = []
for project_id, project_samples in projects.items():
if len(project_samples) >= 3: # Minimum samples for QC
result = self.run_project_qc(project_id, project_samples)
results.append(result)
# Generate summary report
self.generate_qc_report(results)
return results
def run_project_qc(self, project_id, samples):
"""Run QC pipeline on project samples."""
print(f"Running QC for project {project_id} ({len(samples)} samples)")
# Run FastQC + MultiQC pipeline
execution = self.client.run_pipeline(
pipeline="QC-Analysis",
samples=[s.id for s in samples],
params={
"skip_fastqc": False,
"skip_multiqc": False,
"extra_multiqc_config": "--cl_config 'max_table_rows: 500'"
}
)
# Wait for completion
while execution.status in ["pending", "running"]:
time.sleep(30)
execution.refresh()
if execution.status == "completed":
return self.extract_qc_metrics(execution, project_id)
else:
return {"project_id": project_id, "status": "failed", "error": execution.error}
def extract_qc_metrics(self, execution, project_id):
"""Extract QC metrics from execution results."""
metrics = {
"project_id": project_id,
"execution_id": execution.id,
"status": "completed",
"samples_analyzed": len(execution.samples),
"metrics": {}
}
# Download MultiQC data
try:
multiqc_data = execution.get_output("multiqc_data.json")
import json
data = json.loads(multiqc_data.read())
# Extract key metrics
metrics["metrics"] = {
"avg_read_count": self.calculate_avg_metric(data, "total_reads"),
"avg_gc_content": self.calculate_avg_metric(data, "gc_percent"),
"avg_duplication": self.calculate_avg_metric(data, "duplication_rate"),
"samples_passed_qc": self.count_passing_samples(data)
}
except Exception as e:
metrics["error"] = str(e)
return metrics
def calculate_avg_metric(self, data, metric_name):
"""Calculate average for a metric across samples."""
values = []
for sample_data in data.get("report_general_stats_data", []):
if metric_name in sample_data:
values.append(sample_data[metric_name])
return sum(values) / len(values) if values else 0
def count_passing_samples(self, data):
"""Count samples passing QC thresholds."""
passing = 0
for sample_data in data.get("report_general_stats_data", []):
if (sample_data.get("total_reads", 0) > 1000000 and
sample_data.get("gc_percent", 0) > 30 and
sample_data.get("gc_percent", 0) < 70):
passing += 1
return passing
def generate_qc_report(self, results):
"""Generate summary QC report."""
# Create summary DataFrame
summary_data = []
for result in results:
if result["status"] == "completed":
summary_data.append({
"Project": result["project_id"],
"Samples": result["samples_analyzed"],
"Avg Reads (M)": result["metrics"]["avg_read_count"] / 1e6,
"Avg GC%": result["metrics"]["avg_gc_content"],
"Avg Dup%": result["metrics"]["avg_duplication"],
"Passed QC": result["metrics"]["samples_passed_qc"]
})
df = pd.DataFrame(summary_data)
# Generate report
report = f"""
# Batch QC Report
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}
## Summary Statistics
- Projects analyzed: {len(results)}
- Total samples: {df['Samples'].sum()}
- Average read count: {df['Avg Reads (M)'].mean():.1f}M
- Samples passing QC: {df['Passed QC'].sum()} / {df['Samples'].sum()}
## Project Details
{df.to_markdown(index=False)}
## Recommendations
"""
# Add recommendations based on metrics
for _, row in df.iterrows():
if row['Avg Reads (M)'] < 20:
report += f"\n- Project {row['Project']}: Low sequencing depth, consider resequencing"
if row['Passed QC'] / row['Samples'] < 0.8:
report += f"\n- Project {row['Project']}: High QC failure rate, investigate sample quality"
# Save report
with open(f"qc_report_{datetime.now().strftime('%Y%m%d')}.md", "w") as f:
f.write(report)
print("QC report generated successfully!")
# Usage
analyzer = BatchQCAnalyzer()
analyzer.analyze_recent_samples(days=7)
Data Integration
Workflow: Integrate Flow with LIMS
This workflow shows how to integrate Flow with a Laboratory Information Management System (LIMS).
import flowbio
from sqlalchemy import create_engine
import pandas as pd
class LIMSIntegration:
def __init__(self, lims_db_url):
self.flow_client = flowbio.Client()
self.flow_client.login()
self.lims_engine = create_engine(lims_db_url)
def sync_samples_from_lims(self):
"""Sync new samples from LIMS to Flow."""
# Query LIMS for unsynced samples
query = """
SELECT
sample_id,
sample_name,
project_code,
organism,
sample_type,
metadata
FROM samples
WHERE flow_id IS NULL
AND status = 'sequenced'
"""
lims_samples = pd.read_sql(query, self.lims_engine)
for _, sample in lims_samples.iterrows():
try:
# Create or find project in Flow
flow_project = self.get_or_create_project(sample['project_code'])
# Upload sample to Flow
flow_sample = self.create_flow_sample(sample, flow_project.id)
# Update LIMS with Flow ID
self.update_lims_sample(sample['sample_id'], flow_sample.id)
except Exception as e:
print(f"Error syncing sample {sample['sample_id']}: {e}")
def get_or_create_project(self, project_code):
"""Get existing project or create new one."""
# Search for existing project
projects = self.flow_client.search_projects(name=project_code)
if projects:
return projects[0]
else:
# Get project details from LIMS
project_info = pd.read_sql(
f"SELECT * FROM projects WHERE code = '{project_code}'",
self.lims_engine
).iloc[0]
# Create project in Flow
return self.flow_client.create_project(
name=project_code,
description=project_info['description']
)
def create_flow_sample(self, lims_sample, project_id):
"""Create sample in Flow from LIMS data."""
# Parse metadata
import json
metadata = json.loads(lims_sample['metadata']) if lims_sample['metadata'] else {}
# Find data files in storage
r1_path = f"/data/sequencing/{lims_sample['sample_id']}_R1.fastq.gz"
r2_path = f"/data/sequencing/{lims_sample['sample_id']}_R2.fastq.gz"
# Upload to Flow
flow_sample = self.flow_client.upload_sample(
name=lims_sample['sample_name'],
read1=r1_path,
read2=r2_path,
project_id=project_id,
metadata={
**metadata,
"lims_id": lims_sample['sample_id'],
"organism": lims_sample['organism'],
"sample_type": lims_sample['sample_type']
}
)
return flow_sample
def update_lims_sample(self, lims_id, flow_id):
"""Update LIMS with Flow sample ID."""
update_query = f"""
UPDATE samples
SET flow_id = '{flow_id}',
flow_sync_date = NOW()
WHERE sample_id = '{lims_id}'
"""
with self.lims_engine.connect() as conn:
conn.execute(update_query)
def sync_results_to_lims(self):
"""Sync analysis results from Flow back to LIMS."""
# Get completed executions
executions = self.flow_client.get_executions(
status="completed",
created_after=datetime.now() - timedelta(days=1)
)
for execution in executions:
# Get sample LIMS IDs from metadata
for sample in execution.samples:
if 'lims_id' in sample.metadata:
self.update_lims_with_results(
sample.metadata['lims_id'],
execution
)
def update_lims_with_results(self, lims_id, execution):
"""Update LIMS with analysis results."""
# Extract key metrics from execution
metrics = self.extract_execution_metrics(execution)
# Update LIMS
update_query = f"""
UPDATE samples
SET
analysis_status = 'completed',
analysis_date = NOW(),
qc_status = '{metrics['qc_status']}',
total_reads = {metrics['total_reads']},
mapped_reads = {metrics['mapped_reads']},
analysis_url = 'https://flow.bio/executions/{execution.id}'
WHERE sample_id = '{lims_id}'
"""
with self.lims_engine.connect() as conn:
conn.execute(update_query)
def extract_execution_metrics(self, execution):
"""Extract key metrics from execution results."""
metrics = {
'qc_status': 'passed',
'total_reads': 0,
'mapped_reads': 0
}
try:
# Get MultiQC summary
multiqc_data = execution.get_output("multiqc_data.json")
data = json.loads(multiqc_data.read())
# Extract metrics
general_stats = data.get('report_general_stats_data', [{}])[0]
metrics['total_reads'] = general_stats.get('total_sequences', 0)
metrics['mapped_reads'] = general_stats.get('mapped_reads', 0)
# Determine QC status
if metrics['total_reads'] < 1000000:
metrics['qc_status'] = 'failed'
elif metrics['mapped_reads'] / metrics['total_reads'] < 0.7:
metrics['qc_status'] = 'warning'
except Exception as e:
print(f"Error extracting metrics: {e}")
return metrics
def run_sync(self):
"""Run full synchronization."""
print("Syncing samples from LIMS to Flow...")
self.sync_samples_from_lims()
print("Syncing results from Flow to LIMS...")
self.sync_results_to_lims()
print("Synchronization complete!")
# Usage
integration = LIMSIntegration("postgresql://user:pass@lims-db/lims")
integration.run_sync()
Comparative Analysis
Workflow: Compare Multiple Experiments
This workflow compares results across multiple experiments to identify consistent patterns.
import flowbio
import pandas as pd
import numpy as np
from scipy import stats
class ComparativeAnalyzer:
def __init__(self):
self.client = flowbio.Client()
self.client.login()
def compare_experiments(self, experiment_ids):
"""Compare gene expression across multiple experiments."""
results = {}
# Get executions for each experiment
for exp_id in experiment_ids:
execution = self.client.get_execution(exp_id)
if execution.status == "completed":
# Download gene expression data
counts = self.get_expression_data(execution)
results[exp_id] = counts
# Perform comparative analysis
comparison = self.analyze_experiments(results)
# Generate report
self.generate_comparison_report(comparison)
return comparison
def get_expression_data(self, execution):
"""Extract gene expression data from execution."""
# Download DESeq2 results
deseq_results = execution.get_output("deseq2_results.csv")
df = pd.read_csv(deseq_results.get_stream())
# Extract relevant columns
return df[['gene_id', 'gene_name', 'log2FoldChange', 'padj']]
def analyze_experiments(self, results):
"""Perform comparative analysis across experiments."""
# Combine all results
all_genes = set()
for exp_data in results.values():
all_genes.update(exp_data['gene_id'])
# Create comparison matrix
comparison_data = []
for gene in all_genes:
gene_data = {'gene_id': gene}
# Get fold changes across experiments
fold_changes = []
for exp_id, exp_data in results.items():
gene_row = exp_data[exp_data['gene_id'] == gene]
if not gene_row.empty:
fc = gene_row.iloc[0]['log2FoldChange']
padj = gene_row.iloc[0]['padj']
gene_data[f'{exp_id}_log2FC'] = fc
gene_data[f'{exp_id}_padj'] = padj
if padj < 0.05: # Significant
fold_changes.append(fc)
# Calculate consistency metrics
if len(fold_changes) >= 2:
gene_data['consistency_score'] = 1 - stats.variation(fold_changes)
gene_data['mean_log2FC'] = np.mean(fold_changes)
gene_data['experiments_significant'] = len(fold_changes)
comparison_data.append(gene_data)
comparison_df = pd.DataFrame(comparison_data)
# Identify consistently changed genes
consistent_genes = comparison_df[
(comparison_df['experiments_significant'] >= len(results) * 0.7) &
(comparison_df['consistency_score'] > 0.8)
].sort_values('mean_log2FC', key=abs, ascending=False)
return {
'full_comparison': comparison_df,
'consistent_genes': consistent_genes,
'experiment_count': len(results)
}
def generate_comparison_report(self, comparison):
"""Generate comparative analysis report."""
consistent = comparison['consistent_genes']
report = f"""
# Comparative Analysis Report
## Summary
- Experiments compared: {comparison['experiment_count']}
- Total genes analyzed: {len(comparison['full_comparison'])}
- Consistently changed genes: {len(consistent)}
## Top Consistently Upregulated Genes
{consistent[consistent['mean_log2FC'] > 0].head(20)[['gene_id', 'mean_log2FC', 'consistency_score']].to_markdown()}
## Top Consistently Downregulated Genes
{consistent[consistent['mean_log2FC'] < 0].head(20)[['gene_id', 'mean_log2FC', 'consistency_score']].to_markdown()}
## Pathway Analysis
"""
# Perform enrichment analysis on consistent genes
enrichment = self.perform_enrichment(consistent['gene_id'].tolist())
report += enrichment
# Save results
with open("comparative_analysis_report.md", "w") as f:
f.write(report)
# Save detailed results
comparison['full_comparison'].to_csv("full_comparison_results.csv", index=False)
consistent.to_csv("consistent_genes.csv", index=False)
print("Comparative analysis complete!")
def perform_enrichment(self, gene_list):
"""Perform pathway enrichment analysis."""
# This would typically use an enrichment API or library
# Placeholder for demonstration
return """
### Enriched Pathways
1. Cell cycle regulation (p < 0.001)
2. DNA repair (p < 0.01)
3. Apoptosis (p < 0.01)
"""
# Usage
analyzer = ComparativeAnalyzer()
analyzer.compare_experiments([
"execution-123",
"execution-456",
"execution-789"
])
Automation with CI/CD
Workflow: GitHub Actions Integration
This example shows how to integrate Flow with GitHub Actions for automated testing of bioinformatics pipelines.
# .github/workflows/pipeline-test.yml
name: Test Pipeline on Flow
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test-pipeline:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install flowbio pytest pandas
- name: Run pipeline tests
env:
FLOW_USERNAME: ${{ secrets.FLOW_USERNAME }}
FLOW_PASSWORD: ${{ secrets.FLOW_PASSWORD }}
run: |
python test_pipeline.py
# test_pipeline.py
import flowbio
import os
import sys
import time
def test_pipeline():
"""Test pipeline with test data."""
# Initialize client
client = flowbio.Client()
client.login(
os.environ['FLOW_USERNAME'],
os.environ['FLOW_PASSWORD']
)
# Create test project
project = client.create_project(
name=f"CI Test - {os.environ.get('GITHUB_SHA', 'local')[:8]}",
description="Automated pipeline test"
)
try:
# Upload test data
test_samples = []
test_data_dir = "test/data"
for sample_file in os.listdir(test_data_dir):
if sample_file.endswith("_R1.fastq.gz"):
sample_name = sample_file.replace("_R1.fastq.gz", "")
r1 = os.path.join(test_data_dir, sample_file)
r2 = r1.replace("_R1", "_R2")
sample = client.upload_sample(
name=sample_name,
read1=r1,
read2=r2,
project_id=project.id
)
test_samples.append(sample)
# Run pipeline
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=[s.id for s in test_samples],
params={
"skip_bigwig": True, # Speed up test
"skip_stringtie": True
}
)
# Wait for completion (with timeout)
start_time = time.time()
timeout = 3600 # 1 hour
while execution.status in ["pending", "running"]:
if time.time() - start_time > timeout:
raise TimeoutError("Pipeline execution timed out")
time.sleep(30)
execution.refresh()
# Check results
assert execution.status == "completed", f"Pipeline failed: {execution.error}"
# Validate outputs
expected_outputs = [
"multiqc_report.html",
"salmon_merged_gene_counts.csv",
"deseq2_results.csv"
]
actual_outputs = [o.filename for o in execution.outputs]
for expected in expected_outputs:
assert any(expected in output for output in actual_outputs), \
f"Missing expected output: {expected}"
print("✓ Pipeline test passed!")
return True
except Exception as e:
print(f"✗ Pipeline test failed: {e}")
return False
finally:
# Cleanup
if 'project' in locals():
project.delete()
if __name__ == "__main__":
success = test_pipeline()
sys.exit(0 if success else 1)
Real-time Monitoring
Workflow: Pipeline Monitoring Dashboard
This workflow creates a real-time monitoring dashboard for running pipelines.
import flowbio
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import threading
import time
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
class PipelineMonitor:
def __init__(self):
self.client = flowbio.Client()
self.client.login()
self.active_executions = {}
self.monitoring = False
def start_monitoring(self):
"""Start monitoring executions."""
self.monitoring = True
thread = threading.Thread(target=self._monitor_loop)
thread.daemon = True
thread.start()
def _monitor_loop(self):
"""Main monitoring loop."""
while self.monitoring:
try:
# Get active executions
executions = self.client.get_executions(
status="running",
is_owned=True
)
# Update execution status
for execution in executions:
self._update_execution(execution)
# Check for completed executions
self._check_completed()
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(10) # Update every 10 seconds
def _update_execution(self, execution):
"""Update execution status and emit to clients."""
execution_data = {
'id': execution.id,
'name': execution.name,
'pipeline': execution.pipeline.name,
'status': execution.status,
'progress': execution.progress,
'started': execution.started.isoformat() if execution.started else None,
'duration': self._calculate_duration(execution),
'processes': self._get_process_status(execution)
}
self.active_executions[execution.id] = execution_data
# Emit update to connected clients
socketio.emit('execution_update', execution_data)
def _calculate_duration(self, execution):
"""Calculate execution duration."""
if execution.started:
end_time = execution.finished or datetime.now()
duration = end_time - execution.started
return str(duration).split('.')[0] # Remove microseconds
return "0:00:00"
def _get_process_status(self, execution):
"""Get status of individual processes."""
processes = []
for process in execution.process_executions:
processes.append({
'name': process.name,
'status': process.status,
'duration': process.duration if hasattr(process, 'duration') else None
})
return processes
def _check_completed(self):
"""Check for newly completed executions."""
completed_ids = []
for exec_id, exec_data in self.active_executions.items():
# Re-fetch to check current status
execution = self.client.get_execution(exec_id)
if execution.status in ["completed", "failed", "cancelled"]:
completed_ids.append(exec_id)
# Send completion notification
socketio.emit('execution_completed', {
'id': exec_id,
'status': execution.status,
'duration': self._calculate_duration(execution)
})
# Remove completed from active
for exec_id in completed_ids:
del self.active_executions[exec_id]
# Initialize monitor
monitor = PipelineMonitor()
@app.route('/')
def index():
"""Dashboard page."""
return render_template('dashboard.html')
@app.route('/api/executions')
def get_executions():
"""Get current executions."""
return jsonify(list(monitor.active_executions.values()))
@socketio.on('connect')
def handle_connect():
"""Handle client connection."""
emit('connected', {'data': 'Connected to pipeline monitor'})
# Send current state
for execution in monitor.active_executions.values():
emit('execution_update', execution)
@socketio.on('start_execution')
def handle_start_execution(data):
"""Start a new pipeline execution."""
try:
execution = monitor.client.run_pipeline(
pipeline=data['pipeline'],
samples=data['samples'],
params=data.get('params', {})
)
emit('execution_started', {
'id': execution.id,
'status': 'started'
})
except Exception as e:
emit('error', {'message': str(e)})
if __name__ == '__main__':
monitor.start_monitoring()
socketio.run(app, debug=True, port=5000)
<!-- templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
<title>Pipeline Monitor</title>
<script src="https://cdn.socket.io/4.5.0/socket.io.min.js"></script>
<style>
.execution {
border: 1px solid #ddd;
padding: 10px;
margin: 10px;
border-radius: 5px;
}
.running { background-color: #e3f2fd; }
.completed { background-color: #e8f5e9; }
.failed { background-color: #ffebee; }
.progress-bar {
width: 100%;
height: 20px;
background-color: #f0f0f0;
border-radius: 10px;
overflow: hidden;
}
.progress-fill {
height: 100%;
background-color: #4caf50;
transition: width 0.3s ease;
}
</style>
</head>
<body>
<h1>Pipeline Execution Monitor</h1>
<div id="executions"></div>
<script>
const socket = io();
const executions = {};
socket.on('connect', function() {
console.log('Connected to monitor');
});
socket.on('execution_update', function(data) {
executions[data.id] = data;
updateDisplay();
});
socket.on('execution_completed', function(data) {
if (executions[data.id]) {
executions[data.id].status = data.status;
updateDisplay();
// Remove after 5 seconds
setTimeout(() => {
delete executions[data.id];
updateDisplay();
}, 5000);
}
});
function updateDisplay() {
const container = document.getElementById('executions');
container.innerHTML = '';
Object.values(executions).forEach(exec => {
const div = document.createElement('div');
div.className = `execution ${exec.status}`;
div.innerHTML = `
`;
container.appendChild(div);
});
}
</script>
</body>
</html>
Best Practices
Error Handling and Retries
import flowbio
from functools import wraps
import time
import logging
def with_retry(max_attempts=3, delay=60):
"""Decorator for retrying failed operations."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except (flowbio.RateLimitError, flowbio.ServerError) as e:
last_exception = e
if attempt < max_attempts - 1:
wait_time = delay * (2 ** attempt) # Exponential backoff
logging.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
except flowbio.AuthenticationError:
# Re-authenticate and retry
if hasattr(args[0], 'client'):
args[0].client.login()
return func(*args, **kwargs)
raise
raise last_exception
return wrapper
return decorator
class RobustFlowClient:
def __init__(self):
self.client = flowbio.Client()
self.setup_logging()
def setup_logging(self):
"""Configure logging."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('flow_integration.log'),
logging.StreamHandler()
]
)
@with_retry(max_attempts=3)
def upload_sample_with_retry(self, **kwargs):
"""Upload sample with automatic retry."""
return self.client.upload_sample(**kwargs)
@with_retry(max_attempts=5, delay=120)
def run_pipeline_with_retry(self, **kwargs):
"""Run pipeline with extended retry."""
return self.client.run_pipeline(**kwargs)
Performance Optimization
import concurrent.futures
import flowbio
class ParallelProcessor:
def __init__(self, max_workers=4):
self.client = flowbio.Client()
self.client.login()
self.max_workers = max_workers
def parallel_upload(self, sample_list):
"""Upload multiple samples in parallel."""
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for sample_info in sample_list:
future = executor.submit(
self.client.upload_sample,
**sample_info
)
futures.append((sample_info['name'], future))
results = []
for name, future in futures:
try:
sample = future.result()
results.append(sample)
print(f"✓ Uploaded {name}")
except Exception as e:
print(f"✗ Failed to upload {name}: {e}")
return results
def batch_process(self, operations, batch_size=10):
"""Process operations in batches."""
results = []
for i in range(0, len(operations), batch_size):
batch = operations[i:i + batch_size]
batch_results = self._process_batch(batch)
results.extend(batch_results)
# Brief pause between batches
time.sleep(1)
return results
Next Steps
- API Overview - Understanding Flow's API architecture
- REST API Reference - Complete API documentation
- Python Client Guide - Detailed client library usage
- Example Scripts - More integration examples