Flow Logo

API Reference

API Workflows

This guide demonstrates common integration patterns and workflows using Flow's APIs. These examples show how to combine multiple API operations to accomplish real-world tasks.


Automated Sample Processing

Workflow: Process Samples from Sequencing Facility

This workflow automatically processes new samples as they arrive from a sequencing facility.

import flowbio
import os
import time
from pathlib import Path

class SequencingProcessor:
    def __init__(self, watch_dir, project_id):
        self.client = flowbio.Client()
        self.client.login(os.environ['FLOW_USERNAME'], os.environ['FLOW_PASSWORD'])
        self.watch_dir = Path(watch_dir)
        self.project_id = project_id
        self.processed_files = set()
    
    def find_sample_pairs(self):
        """Find R1/R2 pairs in watch directory."""
        r1_files = list(self.watch_dir.glob("*_R1_*.fastq.gz"))
        pairs = []
        
        for r1 in r1_files:
            r2 = Path(str(r1).replace("_R1_", "_R2_"))
            if r2.exists() and r1 not in self.processed_files:
                sample_name = r1.stem.split("_R1_")[0]
                pairs.append((sample_name, r1, r2))
        
        return pairs
    
    def process_new_samples(self):
        """Upload new samples and start analysis."""
        pairs = self.find_sample_pairs()
        
        if not pairs:
            return
        
        uploaded_samples = []
        for sample_name, r1, r2 in pairs:
            print(f"Uploading {sample_name}...")
            
            # Extract metadata from filename or sidecar file
            metadata = self.parse_metadata(sample_name)
            
            # Upload sample
            sample = self.client.upload_sample(
                name=sample_name,
                read1=str(r1),
                read2=str(r2),
                project_id=self.project_id,
                metadata=metadata,
                progress=True
            )
            
            uploaded_samples.append(sample)
            self.processed_files.add(r1)
            
            # Archive processed files
            self.archive_files(r1, r2)
        
        if uploaded_samples:
            # Run pipeline on new samples
            self.run_analysis(uploaded_samples)
    
    def parse_metadata(self, sample_name):
        """Parse metadata from filename or metadata file."""
        metadata = {}
        
        # Look for metadata sidecar file
        metadata_file = self.watch_dir / f"{sample_name}_metadata.json"
        if metadata_file.exists():
            import json
            with open(metadata_file) as f:
                metadata = json.load(f)
        else:
            # Parse from filename convention
            # Example: PatientID_Tissue_TimePoint_Rep1_R1_001.fastq.gz
            parts = sample_name.split("_")
            if len(parts) >= 4:
                metadata = {
                    "patient_id": parts[0],
                    "tissue": parts[1],
                    "time_point": parts[2],
                    "replicate": parts[3]
                }
        
        return metadata
    
    def archive_files(self, r1, r2):
        """Move processed files to archive directory."""
        archive_dir = self.watch_dir / "archived"
        archive_dir.mkdir(exist_ok=True)
        
        r1.rename(archive_dir / r1.name)
        r2.rename(archive_dir / r2.name)
    
    def run_analysis(self, samples):
        """Start RNA-seq pipeline on uploaded samples."""
        print(f"Starting analysis for {len(samples)} samples...")
        
        execution = self.client.run_pipeline(
            pipeline="RNA-seq",
            samples=[s.id for s in samples],
            params={
                "aligner": "star_salmon",
                "skip_qc": False
            }
        )
        
        print(f"Pipeline started: {execution.id}")
        self.monitor_execution(execution)
    
    def monitor_execution(self, execution):
        """Monitor pipeline execution and notify on completion."""
        while execution.status in ["pending", "running"]:
            time.sleep(60)
            execution.refresh()
        
        if execution.status == "completed":
            self.notify_completion(execution)
        else:
            self.notify_failure(execution)
    
    def notify_completion(self, execution):
        """Send notification when pipeline completes."""
        # Send email, Slack message, etc.
        print(f"Pipeline {execution.id} completed successfully!")
    
    def notify_failure(self, execution):
        """Send notification when pipeline fails."""
        print(f"Pipeline {execution.id} failed: {execution.error}")
    
    def run(self, check_interval=300):
        """Run the processor continuously."""
        print(f"Watching {self.watch_dir} for new samples...")
        
        while True:
            try:
                self.process_new_samples()
            except Exception as e:
                print(f"Error processing samples: {e}")
            
            time.sleep(check_interval)

# Usage
processor = SequencingProcessor(
    watch_dir="/data/sequencing/incoming",
    project_id="project-123"
)
processor.run()

Batch Quality Control

Workflow: QC Analysis Across Multiple Projects

This workflow runs quality control analysis across multiple projects and generates a summary report.

import flowbio
import pandas as pd
from datetime import datetime, timedelta

class BatchQCAnalyzer:
    def __init__(self):
        self.client = flowbio.Client()
        self.client.login()
    
    def analyze_recent_samples(self, days=7):
        """Run QC on samples created in the last N days."""
        # Get recent samples
        cutoff_date = datetime.now() - timedelta(days=days)
        
        samples = self.client.search_samples(
            created_after=cutoff_date,
            sample_type="RNA-seq",
            is_owned=True
        )
        
        # Group by project
        projects = {}
        for sample in samples:
            project_id = sample.project.id if sample.project else "no_project"
            if project_id not in projects:
                projects[project_id] = []
            projects[project_id].append(sample)
        
        # Run QC for each project
        results = []
        for project_id, project_samples in projects.items():
            if len(project_samples) >= 3:  # Minimum samples for QC
                result = self.run_project_qc(project_id, project_samples)
                results.append(result)
        
        # Generate summary report
        self.generate_qc_report(results)
        
        return results
    
    def run_project_qc(self, project_id, samples):
        """Run QC pipeline on project samples."""
        print(f"Running QC for project {project_id} ({len(samples)} samples)")
        
        # Run FastQC + MultiQC pipeline
        execution = self.client.run_pipeline(
            pipeline="QC-Analysis",
            samples=[s.id for s in samples],
            params={
                "skip_fastqc": False,
                "skip_multiqc": False,
                "extra_multiqc_config": "--cl_config 'max_table_rows: 500'"
            }
        )
        
        # Wait for completion
        while execution.status in ["pending", "running"]:
            time.sleep(30)
            execution.refresh()
        
        if execution.status == "completed":
            return self.extract_qc_metrics(execution, project_id)
        else:
            return {"project_id": project_id, "status": "failed", "error": execution.error}
    
    def extract_qc_metrics(self, execution, project_id):
        """Extract QC metrics from execution results."""
        metrics = {
            "project_id": project_id,
            "execution_id": execution.id,
            "status": "completed",
            "samples_analyzed": len(execution.samples),
            "metrics": {}
        }
        
        # Download MultiQC data
        try:
            multiqc_data = execution.get_output("multiqc_data.json")
            import json
            data = json.loads(multiqc_data.read())
            
            # Extract key metrics
            metrics["metrics"] = {
                "avg_read_count": self.calculate_avg_metric(data, "total_reads"),
                "avg_gc_content": self.calculate_avg_metric(data, "gc_percent"),
                "avg_duplication": self.calculate_avg_metric(data, "duplication_rate"),
                "samples_passed_qc": self.count_passing_samples(data)
            }
        except Exception as e:
            metrics["error"] = str(e)
        
        return metrics
    
    def calculate_avg_metric(self, data, metric_name):
        """Calculate average for a metric across samples."""
        values = []
        for sample_data in data.get("report_general_stats_data", []):
            if metric_name in sample_data:
                values.append(sample_data[metric_name])
        
        return sum(values) / len(values) if values else 0
    
    def count_passing_samples(self, data):
        """Count samples passing QC thresholds."""
        passing = 0
        for sample_data in data.get("report_general_stats_data", []):
            if (sample_data.get("total_reads", 0) > 1000000 and
                sample_data.get("gc_percent", 0) > 30 and
                sample_data.get("gc_percent", 0) < 70):
                passing += 1
        return passing
    
    def generate_qc_report(self, results):
        """Generate summary QC report."""
        # Create summary DataFrame
        summary_data = []
        for result in results:
            if result["status"] == "completed":
                summary_data.append({
                    "Project": result["project_id"],
                    "Samples": result["samples_analyzed"],
                    "Avg Reads (M)": result["metrics"]["avg_read_count"] / 1e6,
                    "Avg GC%": result["metrics"]["avg_gc_content"],
                    "Avg Dup%": result["metrics"]["avg_duplication"],
                    "Passed QC": result["metrics"]["samples_passed_qc"]
                })
        
        df = pd.DataFrame(summary_data)
        
        # Generate report
        report = f"""
# Batch QC Report
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}

## Summary Statistics
- Projects analyzed: {len(results)}
- Total samples: {df['Samples'].sum()}
- Average read count: {df['Avg Reads (M)'].mean():.1f}M
- Samples passing QC: {df['Passed QC'].sum()} / {df['Samples'].sum()}

## Project Details
{df.to_markdown(index=False)}

## Recommendations
"""
        
        # Add recommendations based on metrics
        for _, row in df.iterrows():
            if row['Avg Reads (M)'] < 20:
                report += f"\n- Project {row['Project']}: Low sequencing depth, consider resequencing"
            if row['Passed QC'] / row['Samples'] < 0.8:
                report += f"\n- Project {row['Project']}: High QC failure rate, investigate sample quality"
        
        # Save report
        with open(f"qc_report_{datetime.now().strftime('%Y%m%d')}.md", "w") as f:
            f.write(report)
        
        print("QC report generated successfully!")

# Usage
analyzer = BatchQCAnalyzer()
analyzer.analyze_recent_samples(days=7)

Data Integration

Workflow: Integrate Flow with LIMS

This workflow shows how to integrate Flow with a Laboratory Information Management System (LIMS).

import flowbio
from sqlalchemy import create_engine
import pandas as pd

class LIMSIntegration:
    def __init__(self, lims_db_url):
        self.flow_client = flowbio.Client()
        self.flow_client.login()
        self.lims_engine = create_engine(lims_db_url)
    
    def sync_samples_from_lims(self):
        """Sync new samples from LIMS to Flow."""
        # Query LIMS for unsynced samples
        query = """
        SELECT 
            sample_id,
            sample_name,
            project_code,
            organism,
            sample_type,
            metadata
        FROM samples
        WHERE flow_id IS NULL
        AND status = 'sequenced'
        """
        
        lims_samples = pd.read_sql(query, self.lims_engine)
        
        for _, sample in lims_samples.iterrows():
            try:
                # Create or find project in Flow
                flow_project = self.get_or_create_project(sample['project_code'])
                
                # Upload sample to Flow
                flow_sample = self.create_flow_sample(sample, flow_project.id)
                
                # Update LIMS with Flow ID
                self.update_lims_sample(sample['sample_id'], flow_sample.id)
                
            except Exception as e:
                print(f"Error syncing sample {sample['sample_id']}: {e}")
    
    def get_or_create_project(self, project_code):
        """Get existing project or create new one."""
        # Search for existing project
        projects = self.flow_client.search_projects(name=project_code)
        
        if projects:
            return projects[0]
        else:
            # Get project details from LIMS
            project_info = pd.read_sql(
                f"SELECT * FROM projects WHERE code = '{project_code}'",
                self.lims_engine
            ).iloc[0]
            
            # Create project in Flow
            return self.flow_client.create_project(
                name=project_code,
                description=project_info['description']
            )
    
    def create_flow_sample(self, lims_sample, project_id):
        """Create sample in Flow from LIMS data."""
        # Parse metadata
        import json
        metadata = json.loads(lims_sample['metadata']) if lims_sample['metadata'] else {}
        
        # Find data files in storage
        r1_path = f"/data/sequencing/{lims_sample['sample_id']}_R1.fastq.gz"
        r2_path = f"/data/sequencing/{lims_sample['sample_id']}_R2.fastq.gz"
        
        # Upload to Flow
        flow_sample = self.flow_client.upload_sample(
            name=lims_sample['sample_name'],
            read1=r1_path,
            read2=r2_path,
            project_id=project_id,
            metadata={
                **metadata,
                "lims_id": lims_sample['sample_id'],
                "organism": lims_sample['organism'],
                "sample_type": lims_sample['sample_type']
            }
        )
        
        return flow_sample
    
    def update_lims_sample(self, lims_id, flow_id):
        """Update LIMS with Flow sample ID."""
        update_query = f"""
        UPDATE samples 
        SET flow_id = '{flow_id}', 
            flow_sync_date = NOW()
        WHERE sample_id = '{lims_id}'
        """
        
        with self.lims_engine.connect() as conn:
            conn.execute(update_query)
    
    def sync_results_to_lims(self):
        """Sync analysis results from Flow back to LIMS."""
        # Get completed executions
        executions = self.flow_client.get_executions(
            status="completed",
            created_after=datetime.now() - timedelta(days=1)
        )
        
        for execution in executions:
            # Get sample LIMS IDs from metadata
            for sample in execution.samples:
                if 'lims_id' in sample.metadata:
                    self.update_lims_with_results(
                        sample.metadata['lims_id'],
                        execution
                    )
    
    def update_lims_with_results(self, lims_id, execution):
        """Update LIMS with analysis results."""
        # Extract key metrics from execution
        metrics = self.extract_execution_metrics(execution)
        
        # Update LIMS
        update_query = f"""
        UPDATE samples
        SET 
            analysis_status = 'completed',
            analysis_date = NOW(),
            qc_status = '{metrics['qc_status']}',
            total_reads = {metrics['total_reads']},
            mapped_reads = {metrics['mapped_reads']},
            analysis_url = 'https://flow.bio/executions/{execution.id}'
        WHERE sample_id = '{lims_id}'
        """
        
        with self.lims_engine.connect() as conn:
            conn.execute(update_query)
    
    def extract_execution_metrics(self, execution):
        """Extract key metrics from execution results."""
        metrics = {
            'qc_status': 'passed',
            'total_reads': 0,
            'mapped_reads': 0
        }
        
        try:
            # Get MultiQC summary
            multiqc_data = execution.get_output("multiqc_data.json")
            data = json.loads(multiqc_data.read())
            
            # Extract metrics
            general_stats = data.get('report_general_stats_data', [{}])[0]
            metrics['total_reads'] = general_stats.get('total_sequences', 0)
            metrics['mapped_reads'] = general_stats.get('mapped_reads', 0)
            
            # Determine QC status
            if metrics['total_reads'] < 1000000:
                metrics['qc_status'] = 'failed'
            elif metrics['mapped_reads'] / metrics['total_reads'] < 0.7:
                metrics['qc_status'] = 'warning'
                
        except Exception as e:
            print(f"Error extracting metrics: {e}")
        
        return metrics
    
    def run_sync(self):
        """Run full synchronization."""
        print("Syncing samples from LIMS to Flow...")
        self.sync_samples_from_lims()
        
        print("Syncing results from Flow to LIMS...")
        self.sync_results_to_lims()
        
        print("Synchronization complete!")

# Usage
integration = LIMSIntegration("postgresql://user:pass@lims-db/lims")
integration.run_sync()

Comparative Analysis

Workflow: Compare Multiple Experiments

This workflow compares results across multiple experiments to identify consistent patterns.

import flowbio
import pandas as pd
import numpy as np
from scipy import stats

class ComparativeAnalyzer:
    def __init__(self):
        self.client = flowbio.Client()
        self.client.login()
    
    def compare_experiments(self, experiment_ids):
        """Compare gene expression across multiple experiments."""
        results = {}
        
        # Get executions for each experiment
        for exp_id in experiment_ids:
            execution = self.client.get_execution(exp_id)
            
            if execution.status == "completed":
                # Download gene expression data
                counts = self.get_expression_data(execution)
                results[exp_id] = counts
        
        # Perform comparative analysis
        comparison = self.analyze_experiments(results)
        
        # Generate report
        self.generate_comparison_report(comparison)
        
        return comparison
    
    def get_expression_data(self, execution):
        """Extract gene expression data from execution."""
        # Download DESeq2 results
        deseq_results = execution.get_output("deseq2_results.csv")
        df = pd.read_csv(deseq_results.get_stream())
        
        # Extract relevant columns
        return df[['gene_id', 'gene_name', 'log2FoldChange', 'padj']]
    
    def analyze_experiments(self, results):
        """Perform comparative analysis across experiments."""
        # Combine all results
        all_genes = set()
        for exp_data in results.values():
            all_genes.update(exp_data['gene_id'])
        
        # Create comparison matrix
        comparison_data = []
        
        for gene in all_genes:
            gene_data = {'gene_id': gene}
            
            # Get fold changes across experiments
            fold_changes = []
            for exp_id, exp_data in results.items():
                gene_row = exp_data[exp_data['gene_id'] == gene]
                if not gene_row.empty:
                    fc = gene_row.iloc[0]['log2FoldChange']
                    padj = gene_row.iloc[0]['padj']
                    gene_data[f'{exp_id}_log2FC'] = fc
                    gene_data[f'{exp_id}_padj'] = padj
                    
                    if padj < 0.05:  # Significant
                        fold_changes.append(fc)
            
            # Calculate consistency metrics
            if len(fold_changes) >= 2:
                gene_data['consistency_score'] = 1 - stats.variation(fold_changes)
                gene_data['mean_log2FC'] = np.mean(fold_changes)
                gene_data['experiments_significant'] = len(fold_changes)
            
            comparison_data.append(gene_data)
        
        comparison_df = pd.DataFrame(comparison_data)
        
        # Identify consistently changed genes
        consistent_genes = comparison_df[
            (comparison_df['experiments_significant'] >= len(results) * 0.7) &
            (comparison_df['consistency_score'] > 0.8)
        ].sort_values('mean_log2FC', key=abs, ascending=False)
        
        return {
            'full_comparison': comparison_df,
            'consistent_genes': consistent_genes,
            'experiment_count': len(results)
        }
    
    def generate_comparison_report(self, comparison):
        """Generate comparative analysis report."""
        consistent = comparison['consistent_genes']
        
        report = f"""
# Comparative Analysis Report

## Summary
- Experiments compared: {comparison['experiment_count']}
- Total genes analyzed: {len(comparison['full_comparison'])}
- Consistently changed genes: {len(consistent)}

## Top Consistently Upregulated Genes
{consistent[consistent['mean_log2FC'] > 0].head(20)[['gene_id', 'mean_log2FC', 'consistency_score']].to_markdown()}

## Top Consistently Downregulated Genes
{consistent[consistent['mean_log2FC'] < 0].head(20)[['gene_id', 'mean_log2FC', 'consistency_score']].to_markdown()}

## Pathway Analysis
"""
        
        # Perform enrichment analysis on consistent genes
        enrichment = self.perform_enrichment(consistent['gene_id'].tolist())
        report += enrichment
        
        # Save results
        with open("comparative_analysis_report.md", "w") as f:
            f.write(report)
        
        # Save detailed results
        comparison['full_comparison'].to_csv("full_comparison_results.csv", index=False)
        consistent.to_csv("consistent_genes.csv", index=False)
        
        print("Comparative analysis complete!")
    
    def perform_enrichment(self, gene_list):
        """Perform pathway enrichment analysis."""
        # This would typically use an enrichment API or library
        # Placeholder for demonstration
        return """
### Enriched Pathways
1. Cell cycle regulation (p < 0.001)
2. DNA repair (p < 0.01)
3. Apoptosis (p < 0.01)
"""

# Usage
analyzer = ComparativeAnalyzer()
analyzer.compare_experiments([
    "execution-123",
    "execution-456", 
    "execution-789"
])

Automation with CI/CD

Workflow: GitHub Actions Integration

This example shows how to integrate Flow with GitHub Actions for automated testing of bioinformatics pipelines.

# .github/workflows/pipeline-test.yml
name: Test Pipeline on Flow

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test-pipeline:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install flowbio pytest pandas
    
    - name: Run pipeline tests
      env:
        FLOW_USERNAME: ${{ secrets.FLOW_USERNAME }}
        FLOW_PASSWORD: ${{ secrets.FLOW_PASSWORD }}
      run: |
        python test_pipeline.py
# test_pipeline.py
import flowbio
import os
import sys
import time

def test_pipeline():
    """Test pipeline with test data."""
    # Initialize client
    client = flowbio.Client()
    client.login(
        os.environ['FLOW_USERNAME'],
        os.environ['FLOW_PASSWORD']
    )
    
    # Create test project
    project = client.create_project(
        name=f"CI Test - {os.environ.get('GITHUB_SHA', 'local')[:8]}",
        description="Automated pipeline test"
    )
    
    try:
        # Upload test data
        test_samples = []
        test_data_dir = "test/data"
        
        for sample_file in os.listdir(test_data_dir):
            if sample_file.endswith("_R1.fastq.gz"):
                sample_name = sample_file.replace("_R1.fastq.gz", "")
                r1 = os.path.join(test_data_dir, sample_file)
                r2 = r1.replace("_R1", "_R2")
                
                sample = client.upload_sample(
                    name=sample_name,
                    read1=r1,
                    read2=r2,
                    project_id=project.id
                )
                test_samples.append(sample)
        
        # Run pipeline
        execution = client.run_pipeline(
            pipeline="RNA-seq",
            samples=[s.id for s in test_samples],
            params={
                "skip_bigwig": True,  # Speed up test
                "skip_stringtie": True
            }
        )
        
        # Wait for completion (with timeout)
        start_time = time.time()
        timeout = 3600  # 1 hour
        
        while execution.status in ["pending", "running"]:
            if time.time() - start_time > timeout:
                raise TimeoutError("Pipeline execution timed out")
            
            time.sleep(30)
            execution.refresh()
        
        # Check results
        assert execution.status == "completed", f"Pipeline failed: {execution.error}"
        
        # Validate outputs
        expected_outputs = [
            "multiqc_report.html",
            "salmon_merged_gene_counts.csv",
            "deseq2_results.csv"
        ]
        
        actual_outputs = [o.filename for o in execution.outputs]
        
        for expected in expected_outputs:
            assert any(expected in output for output in actual_outputs), \
                f"Missing expected output: {expected}"
        
        print("✓ Pipeline test passed!")
        return True
        
    except Exception as e:
        print(f"✗ Pipeline test failed: {e}")
        return False
        
    finally:
        # Cleanup
        if 'project' in locals():
            project.delete()

if __name__ == "__main__":
    success = test_pipeline()
    sys.exit(0 if success else 1)

Real-time Monitoring

Workflow: Pipeline Monitoring Dashboard

This workflow creates a real-time monitoring dashboard for running pipelines.

import flowbio
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import threading
import time

app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")

class PipelineMonitor:
    def __init__(self):
        self.client = flowbio.Client()
        self.client.login()
        self.active_executions = {}
        self.monitoring = False
    
    def start_monitoring(self):
        """Start monitoring executions."""
        self.monitoring = True
        thread = threading.Thread(target=self._monitor_loop)
        thread.daemon = True
        thread.start()
    
    def _monitor_loop(self):
        """Main monitoring loop."""
        while self.monitoring:
            try:
                # Get active executions
                executions = self.client.get_executions(
                    status="running",
                    is_owned=True
                )
                
                # Update execution status
                for execution in executions:
                    self._update_execution(execution)
                
                # Check for completed executions
                self._check_completed()
                
            except Exception as e:
                print(f"Monitoring error: {e}")
            
            time.sleep(10)  # Update every 10 seconds
    
    def _update_execution(self, execution):
        """Update execution status and emit to clients."""
        execution_data = {
            'id': execution.id,
            'name': execution.name,
            'pipeline': execution.pipeline.name,
            'status': execution.status,
            'progress': execution.progress,
            'started': execution.started.isoformat() if execution.started else None,
            'duration': self._calculate_duration(execution),
            'processes': self._get_process_status(execution)
        }
        
        self.active_executions[execution.id] = execution_data
        
        # Emit update to connected clients
        socketio.emit('execution_update', execution_data)
    
    def _calculate_duration(self, execution):
        """Calculate execution duration."""
        if execution.started:
            end_time = execution.finished or datetime.now()
            duration = end_time - execution.started
            return str(duration).split('.')[0]  # Remove microseconds
        return "0:00:00"
    
    def _get_process_status(self, execution):
        """Get status of individual processes."""
        processes = []
        
        for process in execution.process_executions:
            processes.append({
                'name': process.name,
                'status': process.status,
                'duration': process.duration if hasattr(process, 'duration') else None
            })
        
        return processes
    
    def _check_completed(self):
        """Check for newly completed executions."""
        completed_ids = []
        
        for exec_id, exec_data in self.active_executions.items():
            # Re-fetch to check current status
            execution = self.client.get_execution(exec_id)
            
            if execution.status in ["completed", "failed", "cancelled"]:
                completed_ids.append(exec_id)
                
                # Send completion notification
                socketio.emit('execution_completed', {
                    'id': exec_id,
                    'status': execution.status,
                    'duration': self._calculate_duration(execution)
                })
        
        # Remove completed from active
        for exec_id in completed_ids:
            del self.active_executions[exec_id]

# Initialize monitor
monitor = PipelineMonitor()

@app.route('/')
def index():
    """Dashboard page."""
    return render_template('dashboard.html')

@app.route('/api/executions')
def get_executions():
    """Get current executions."""
    return jsonify(list(monitor.active_executions.values()))

@socketio.on('connect')
def handle_connect():
    """Handle client connection."""
    emit('connected', {'data': 'Connected to pipeline monitor'})
    
    # Send current state
    for execution in monitor.active_executions.values():
        emit('execution_update', execution)

@socketio.on('start_execution')
def handle_start_execution(data):
    """Start a new pipeline execution."""
    try:
        execution = monitor.client.run_pipeline(
            pipeline=data['pipeline'],
            samples=data['samples'],
            params=data.get('params', {})
        )
        
        emit('execution_started', {
            'id': execution.id,
            'status': 'started'
        })
        
    except Exception as e:
        emit('error', {'message': str(e)})

if __name__ == '__main__':
    monitor.start_monitoring()
    socketio.run(app, debug=True, port=5000)
<!-- templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Pipeline Monitor</title>
    <script src="https://cdn.socket.io/4.5.0/socket.io.min.js"></script>
    <style>
        .execution {
            border: 1px solid #ddd;
            padding: 10px;
            margin: 10px;
            border-radius: 5px;
        }
        .running { background-color: #e3f2fd; }
        .completed { background-color: #e8f5e9; }
        .failed { background-color: #ffebee; }
        .progress-bar {
            width: 100%;
            height: 20px;
            background-color: #f0f0f0;
            border-radius: 10px;
            overflow: hidden;
        }
        .progress-fill {
            height: 100%;
            background-color: #4caf50;
            transition: width 0.3s ease;
        }
    </style>
</head>
<body>
    <h1>Pipeline Execution Monitor</h1>
    
    <div id="executions"></div>
    
    <script>
        const socket = io();
        const executions = {};
        
        socket.on('connect', function() {
            console.log('Connected to monitor');
        });
        
        socket.on('execution_update', function(data) {
            executions[data.id] = data;
            updateDisplay();
        });
        
        socket.on('execution_completed', function(data) {
            if (executions[data.id]) {
                executions[data.id].status = data.status;
                updateDisplay();
                
                // Remove after 5 seconds
                setTimeout(() => {
                    delete executions[data.id];
                    updateDisplay();
                }, 5000);
            }
        });
        
        function updateDisplay() {
            const container = document.getElementById('executions');
            container.innerHTML = '';
            
            Object.values(executions).forEach(exec => {
                const div = document.createElement('div');
                div.className = `execution ${exec.status}`;
                div.innerHTML = `
                    <h3>${exec.name}</h3>
                    <p>Pipeline: ${exec.pipeline}</p>
                    <p>Status: ${exec.status}</p>
                    <p>Duration: ${exec.duration}</p>
                    <div class="progress-bar">
                        <div class="progress-fill" style="width: ${exec.progress}%"></div>
                    </div>
                    <p>${exec.progress}% complete</p>
                `;
                container.appendChild(div);
            });
        }
    </script>
</body>
</html>

Best Practices

Error Handling and Retries

import flowbio
from functools import wraps
import time
import logging

def with_retry(max_attempts=3, delay=60):
    """Decorator for retrying failed operations."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except (flowbio.RateLimitError, flowbio.ServerError) as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        wait_time = delay * (2 ** attempt)  # Exponential backoff
                        logging.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
                        time.sleep(wait_time)
                except flowbio.AuthenticationError:
                    # Re-authenticate and retry
                    if hasattr(args[0], 'client'):
                        args[0].client.login()
                        return func(*args, **kwargs)
                    raise
            
            raise last_exception
        return wrapper
    return decorator

class RobustFlowClient:
    def __init__(self):
        self.client = flowbio.Client()
        self.setup_logging()
    
    def setup_logging(self):
        """Configure logging."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('flow_integration.log'),
                logging.StreamHandler()
            ]
        )
    
    @with_retry(max_attempts=3)
    def upload_sample_with_retry(self, **kwargs):
        """Upload sample with automatic retry."""
        return self.client.upload_sample(**kwargs)
    
    @with_retry(max_attempts=5, delay=120)
    def run_pipeline_with_retry(self, **kwargs):
        """Run pipeline with extended retry."""
        return self.client.run_pipeline(**kwargs)

Performance Optimization

import concurrent.futures
import flowbio

class ParallelProcessor:
    def __init__(self, max_workers=4):
        self.client = flowbio.Client()
        self.client.login()
        self.max_workers = max_workers
    
    def parallel_upload(self, sample_list):
        """Upload multiple samples in parallel."""
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            
            for sample_info in sample_list:
                future = executor.submit(
                    self.client.upload_sample,
                    **sample_info
                )
                futures.append((sample_info['name'], future))
            
            results = []
            for name, future in futures:
                try:
                    sample = future.result()
                    results.append(sample)
                    print(f"✓ Uploaded {name}")
                except Exception as e:
                    print(f"✗ Failed to upload {name}: {e}")
            
            return results
    
    def batch_process(self, operations, batch_size=10):
        """Process operations in batches."""
        results = []
        
        for i in range(0, len(operations), batch_size):
            batch = operations[i:i + batch_size]
            batch_results = self._process_batch(batch)
            results.extend(batch_results)
            
            # Brief pause between batches
            time.sleep(1)
        
        return results

Next Steps

Previous
Python Client