Flow Logo

API Reference

Python Client Guide

The flowbio Python library provides a high-level, Pythonic interface for interacting with the Flow platform. It handles authentication, file transfers, and API calls while abstracting away the complexity of direct API interaction.


Installation

PyPI Installation

pip install flowbio

Conda Installation

conda install -c bioconda flowbio

Development Installation

# Install from GitHub
pip install git+https://github.com/goodwright/flowbio.git

# Or clone and install locally
git clone https://github.com/goodwright/flowbio.git
cd flowbio
pip install -e .

Requirements

  • Python 3.7 or higher
  • requests >= 2.25.0
  • tqdm >= 4.60.0 (for progress bars)
  • python-dateutil >= 2.8.0

Quick Start

import flowbio

# Initialize and authenticate
client = flowbio.Client()
client.login("username", "password")

# Upload a sample
sample = client.upload_sample(
    name="Patient_001_RNA",
    read1="/path/to/reads_R1.fastq.gz",
    read2="/path/to/reads_R2.fastq.gz",
    metadata={
        "organism": "Homo sapiens",
        "tissue": "liver",
        "condition": "control"
    }
)

# Run a pipeline
execution = client.run_pipeline(
    pipeline="RNA-seq",
    version="3.19.0",
    samples=[sample.id],
    name="My RNA-seq Analysis"
)

# Monitor execution
execution.wait_until_complete(check_interval=30)
print(f"Pipeline completed with status: {execution.status}")

Client Configuration

Basic Configuration

# Default configuration
client = flowbio.Client()

# Custom API endpoint
client = flowbio.Client(
    api_url="https://flow.myinstitute.edu",
    timeout=120,  # Request timeout in seconds
    max_retries=5  # Number of retries for failed requests
)

# Configure for debugging
client = flowbio.Client(
    debug=True,  # Enable debug logging
    verify_ssl=False  # Disable SSL verification (dev only!)
)

Environment Variables

Configure the client using environment variables:

# API Configuration
export FLOW_API_URL="https://flow.myinstitute.edu"
export FLOW_API_TIMEOUT="120"

# Authentication
export FLOW_USERNAME="myusername"
export FLOW_PASSWORD="mypassword"

# Advanced Settings
export FLOW_CHUNK_SIZE="10485760"  # 10MB chunks
export FLOW_MAX_RETRIES="5"
export FLOW_DEBUG="true"
# Client automatically uses environment variables
client = flowbio.Client()
client.login()  # Uses FLOW_USERNAME and FLOW_PASSWORD

Configuration File

Create a configuration file at ~/.flowbio/config.json:

{
  "api_url": "https://api.flow.bio",
  "timeout": 120,
  "max_retries": 5,
  "chunk_size": 10485760,
  "progress_bars": true,
  "auto_refresh_token": true
}

Configuration Precedence

  1. Explicit parameters to Client()
  2. Environment variables
  3. Configuration file
  4. Default values

Authentication

Standard Login

client = flowbio.Client()
client.login("username", "password")

# Access user information
user = client.get_current_user()
print(f"Logged in as: {user.username}")
print(f"Email: {user.email}")
print(f"Groups: {', '.join(user.groups)}")

OIDC/SSO Login

# Login with OIDC token
client.oidc_login(id_token="<oidc_token>")

# Or configure for automatic OIDC
client = flowbio.Client(
    oidc_provider="https://sso.company.com",
    oidc_client_id="flow-client"
)

Token Persistence

# Save tokens securely
import keyring

# After login
keyring.set_password("flowbio", "access_token", client.access_token)
keyring.set_password("flowbio", "refresh_token", client.refresh_token)

# In a new session
client = flowbio.Client()
client.access_token = keyring.get_password("flowbio", "access_token")
client.refresh_token = keyring.get_password("flowbio", "refresh_token")

Session Management

# Create a context manager for automatic cleanup
from contextlib import contextmanager

@contextmanager
def flow_session(username, password):
    client = flowbio.Client()
    client.login(username, password)
    try:
        yield client
    finally:
        client.logout()

# Use the session
with flow_session("username", "password") as client:
    samples = client.get_samples()
    # Session automatically closes

Sample Management

Upload Samples

# Single-end sequencing
sample = client.upload_sample(
    name="Control_Rep1",
    read1="/data/control_rep1.fastq.gz",
    metadata={
        "organism": "Mus musculus",
        "strain": "C57BL/6",
        "age": "8 weeks",
        "tissue": "brain"
    }
)

# Paired-end sequencing
sample = client.upload_sample(
    name="Treatment_Rep1",
    read1="/data/treatment_rep1_R1.fastq.gz",
    read2="/data/treatment_rep1_R2.fastq.gz",
    metadata={
        "organism": "Mus musculus",
        "treatment": "drug_A",
        "concentration": "10uM",
        "timepoint": "24h"
    },
    progress=True  # Show upload progress
)

Retrieve Samples

# Get all your samples
my_samples = client.get_samples()

# Get a specific sample
sample = client.get_sample(123)
print(f"Sample: {sample.name}")
print(f"Data files: {len(sample.data)}")
print(f"Metadata: {sample.metadata}")

# Filter samples
rna_samples = client.get_samples(
    organism="Homo sapiens",
    category="RNA-seq"
)

# Search samples
results = client.search_samples(
    query="cancer",
    organism="human",
    created_after="2024-01-01"
)

Update Samples

# Update sample metadata
sample.update(
    metadata={
        "treatment": "control",
        "batch": "batch_001",
        "notes": "High quality sample"
    }
)

# Add data to existing sample
sample.add_data("/path/to/additional_file.bam")

Share Samples

# Share with specific users
sample.share(
    users=["collaborator1", "collaborator2"],
    permission="read"  # read, write, or admin
)

# Share with groups
sample.share(
    groups=["research-team"],
    permission="write"
)

# Make public
sample.make_public()

Project Management

Create and Manage Projects

# Create a new project
project = client.create_project(
    name="Mouse Liver RNA-seq",
    description="Investigating metabolic changes in mouse liver",
    organism="Mus musculus",
    private=True
)

# Add samples to project
project.add_samples([sample1.id, sample2.id, sample3.id])

# Update project
project.update(
    description="Updated description",
    metadata={
        "funding": "NIH R01-12345",
        "pi": "Dr. Smith"
    }
)

Retrieve Projects

# Get your projects
my_projects = client.get_projects()

# Get public projects
public_projects = client.get_public_projects(
    organism="Homo sapiens",
    limit=50
)

# Search projects
cancer_projects = client.search_projects(
    "breast cancer",
    has_samples=True
)

Project Operations

# Get project details
project = client.get_project(10)
print(f"Project: {project.name}")
print(f"Samples: {project.sample_count}")
print(f"Created: {project.created}")

# List project samples
for sample in project.get_samples():
    print(f"- {sample.name}: {sample.organism}")

# Export project metadata
metadata_df = project.export_metadata()
metadata_df.to_csv("project_metadata.csv", index=False)

# Share project
project.share(groups=["lab-members"], permission="read")

Data Management

Upload Data Files

# Upload any data file
data = client.upload_data(
    "/path/to/results.vcf.gz",
    metadata={
        "file_type": "VCF",
        "variant_caller": "GATK",
        "reference": "GRCh38"
    },
    progress=True
)

# Upload with custom chunk size
data = client.upload_data(
    "/path/to/large_file.bam",
    chunk_size=50_000_000,  # 50MB chunks
    retries=5
)

Download Data

# Download a single file
data = client.get_data(789)
client.download_file(
    data.download_url,
    "/local/path/output.fastq.gz",
    progress=True
)

# Download all sample data
sample = client.get_sample(123)
for data_file in sample.data:
    client.download_file(
        data_file.download_url,
        f"/downloads/{data_file.filename}"
    )

Bulk Operations

# Bulk download
job = client.create_bulk_download(
    data_ids=[101, 102, 103, 104, 105],
    name="my_dataset.zip"
)

# Monitor download job
while job.status in ["pending", "processing"]:
    job.refresh()
    print(f"Status: {job.status} ({job.progress}%)")
    time.sleep(5)

# Download when ready
if job.status == "completed":
    client.download_file(job.download_url, "dataset.zip")

Pipeline Execution

List Available Pipelines

# Get all pipelines
pipelines = client.get_pipelines()
for pipeline in pipelines:
    print(f"{pipeline.name}: {pipeline.description}")
    for version in pipeline.versions:
        print(f"  - v{version.version}")

# Get specific pipeline
rnaseq = client.get_pipeline("RNA-seq")
print(f"Latest version: {rnaseq.latest_version}")

Run Pipelines

# Run with default parameters
execution = client.run_pipeline(
    pipeline="RNA-seq",
    samples=[sample1.id, sample2.id],
    name="My Analysis"
)

# Run with custom parameters
execution = client.run_pipeline(
    pipeline="RNA-seq",
    version="3.19.0",
    samples=sample_ids,
    name="Custom RNA-seq Run",
    params={
        "aligner": "star_salmon",
        "min_trimmed_reads": 10000,
        "skip_markduplicates": True,
        "deseq2_vst": True
    }
)

# Run with genome override
execution = client.run_pipeline(
    pipeline="RNA-seq",
    samples=sample_ids,
    genome="GRCm39",  # Mouse genome
    name="Mouse RNA-seq"
)

Monitor Executions

# Get execution status
execution = client.get_execution(456)
print(f"Status: {execution.status}")
print(f"Progress: {execution.progress}%")

# Wait for completion
execution.wait_until_complete(
    check_interval=60,  # Check every minute
    timeout=3600  # Timeout after 1 hour
)

# Get execution logs
for log in execution.get_logs():
    print(f"[{log.timestamp}] {log.process}: {log.message}")

# Download results
if execution.status == "completed":
    # Download specific outputs
    execution.download_output(
        "multiqc_report.html",
        "/results/multiqc_report.html"
    )
    
    # Download all results
    execution.download_all_outputs("/results/execution_456/")

Cancel Executions

# Cancel a running execution
execution.cancel()

# Batch cancel
executions = client.get_executions(status="running")
for execution in executions:
    if execution.name.startswith("test_"):
        execution.cancel()

Search and Discovery

# Search across all entities
results = client.search("BRCA1")
print(f"Samples: {len(results['samples'])}")
print(f"Projects: {len(results['projects'])}")
print(f"Data: {len(results['data'])}")
# Search with filters
samples = client.search_samples(
    query="cancer",
    organism="Homo sapiens",
    sample_type="RNA-seq",
    created_after="2024-01-01",
    created_before="2024-12-31",
    has_data=True
)

# Search executions
executions = client.search_executions(
    pipeline="RNA-seq",
    status="completed",
    created_by="username"
)

# Search data files
data_files = client.search_data(
    pattern="*.bam",
    min_size=1_000_000_000,  # Files > 1GB
    data_type="BAM"
)

Group Management

Working with Groups

# Get user's groups
my_groups = client.get_my_groups()
for group in my_groups:
    print(f"{group.name}: {group.member_count} members")

# Get group details
group = client.get_group("research-lab")
print(f"Group: {group.name}")
print(f"Description: {group.description}")
print(f"Your role: {group.user_role}")

# List group members
for member in group.get_members():
    print(f"- {member.username} ({member.role})")

Share with Groups

# Share resources with groups
sample.share(groups=["lab-team"], permission="write")
project.share(groups=["collaborators"], permission="read")
execution.share(groups=["analysis-team"], permission="read")

Error Handling

Exception Types

from flowbio.exceptions import (
    AuthenticationError,
    PermissionError,
    NotFoundError,
    ValidationError,
    UploadError,
    DownloadError,
    PipelineError
)

try:
    sample = client.get_sample(999999)
except NotFoundError:
    print("Sample not found")
except PermissionError:
    print("No permission to access sample")
except AuthenticationError:
    print("Authentication failed - please login again")

Retry Logic

from flowbio.utils import retry_on_error

@retry_on_error(max_attempts=3, delay=1.0)
def upload_with_retry(client, filepath):
    return client.upload_data(filepath)

# Use the wrapped function
data = upload_with_retry(client, "/path/to/file.fastq.gz")

Custom Error Handling

class FlowBioWrapper:
    def __init__(self, client):
        self.client = client
    
    def safe_upload(self, filepath, **kwargs):
        """Upload with comprehensive error handling"""
        try:
            return self.client.upload_data(filepath, **kwargs)
        except UploadError as e:
            if "duplicate" in str(e).lower():
                print(f"File already exists: {filepath}")
                # Return existing file
                return self.client.search_data(
                    pattern=os.path.basename(filepath)
                )[0]
            elif "size" in str(e).lower():
                print(f"File too large: {filepath}")
                # Try with larger chunks
                kwargs['chunk_size'] = 50_000_000
                return self.client.upload_data(filepath, **kwargs)
            else:
                raise

Advanced Usage

Async Operations

import asyncio
from flowbio.async_client import AsyncClient

async def async_upload_samples(sample_list):
    async with AsyncClient() as client:
        await client.login("username", "password")
        
        tasks = []
        for sample_info in sample_list:
            task = client.upload_sample_async(**sample_info)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results

# Run async uploads
samples = asyncio.run(async_upload_samples(sample_data))

Batch Processing

from flowbio.batch import BatchProcessor

# Create batch processor
batch = BatchProcessor(client)

# Queue operations
for file_path in file_list:
    batch.add_upload(file_path, metadata={"batch": "batch_001"})

# Execute batch with progress
results = batch.execute(max_workers=4, progress=True)

# Check results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]

Custom API Calls

# Make custom REST API calls
response = client.api_call(
    method="GET",
    endpoint="/custom/endpoint",
    params={"filter": "value"}
)

# Custom GraphQL queries
query = """
    query GetSampleDetails($id: ID!) {
        sample(id: $id) {
            id
            name
            metadata
            data {
                id
                filename
                size
            }
        }
    }
"""

result = client.graphql_query(
    query,
    variables={"id": 123}
)

Plugin System

from flowbio.plugins import Plugin

class CustomPlugin(Plugin):
    """Custom plugin for specialized workflows"""
    
    def on_upload_complete(self, data):
        """Called after successful upload"""
        print(f"Upload complete: {data.filename}")
        # Custom post-processing
    
    def on_execution_complete(self, execution):
        """Called when pipeline finishes"""
        if execution.status == "completed":
            # Auto-download results
            execution.download_output(
                "results.zip",
                f"/auto-downloads/{execution.id}.zip"
            )

# Register plugin
client.register_plugin(CustomPlugin())

Performance Tips

1. Use Appropriate Chunk Sizes

# Optimize chunk size based on file size and network
def get_optimal_chunk_size(file_size, network_speed_mbps=100):
    if file_size < 100_000_000:  # < 100MB
        return 1_000_000  # 1MB chunks
    elif network_speed_mbps > 1000:  # Gigabit
        return 50_000_000  # 50MB chunks
    else:
        return 10_000_000  # 10MB chunks

chunk_size = get_optimal_chunk_size(os.path.getsize(filepath))
data = client.upload_data(filepath, chunk_size=chunk_size)

2. Parallel Operations

from concurrent.futures import ThreadPoolExecutor

def parallel_upload(client, file_paths, max_workers=4):
    """Upload multiple files in parallel"""
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(client.upload_data, fp): fp 
            for fp in file_paths
        }
        
        results = []
        for future in concurrent.futures.as_completed(futures):
            filepath = futures[future]
            try:
                data = future.result()
                results.append((filepath, data))
                print(f"✓ Uploaded: {os.path.basename(filepath)}")
            except Exception as e:
                print(f"✗ Failed: {filepath} - {e}")
                
    return results

3. Connection Pooling

# Reuse connections for multiple requests
client = flowbio.Client(
    connection_pool_size=10,
    max_retries=3
)

# The client will maintain a pool of connections
# for better performance with multiple requests

4. Caching

from functools import lru_cache

class CachedFlowClient:
    def __init__(self, client):
        self.client = client
    
    @lru_cache(maxsize=100)
    def get_sample_cached(self, sample_id):
        """Cache sample lookups"""
        return self.client.get_sample(sample_id)
    
    @lru_cache(maxsize=50)
    def get_pipeline_cached(self, pipeline_name):
        """Cache pipeline information"""
        return self.client.get_pipeline(pipeline_name)

Troubleshooting

Debug Mode

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

client = flowbio.Client(debug=True)

# All API calls will be logged
client.get_samples()

Common Issues

Authentication Failures:

try:
    client.login("username", "password")
except AuthenticationError as e:
    if "rate limit" in str(e):
        print("Too many login attempts. Wait 5 minutes.")
    elif "invalid" in str(e):
        print("Check username and password")

Upload Failures:

# Verify file before upload
import os

filepath = "/path/to/file.fastq.gz"
if not os.path.exists(filepath):
    raise FileNotFoundError(f"File not found: {filepath}")

if os.path.getsize(filepath) > 50_000_000_000:  # 50GB
    print("Warning: Large file. Consider using larger chunks.")
    
# Upload with verification
data = client.upload_data(filepath, verify=True)

Network Issues:

# Configure for unreliable networks
client = flowbio.Client(
    timeout=300,  # 5 minute timeout
    max_retries=10,
    retry_delay=5.0,
    retry_backoff=2.0  # Exponential backoff
)

Next Steps

Previous
GraphQL API