API Reference
Python Client Guide
The flowbio
Python library provides a high-level, Pythonic interface for interacting with the Flow platform. It handles authentication, file transfers, and API calls while abstracting away the complexity of direct API interaction.
Installation
PyPI Installation
pip install flowbio
Conda Installation
conda install -c bioconda flowbio
Development Installation
# Install from GitHub
pip install git+https://github.com/goodwright/flowbio.git
# Or clone and install locally
git clone https://github.com/goodwright/flowbio.git
cd flowbio
pip install -e .
Requirements
- Python 3.7 or higher
- requests >= 2.25.0
- tqdm >= 4.60.0 (for progress bars)
- python-dateutil >= 2.8.0
Quick Start
import flowbio
# Initialize and authenticate
client = flowbio.Client()
client.login("username", "password")
# Upload a sample
sample = client.upload_sample(
name="Patient_001_RNA",
read1="/path/to/reads_R1.fastq.gz",
read2="/path/to/reads_R2.fastq.gz",
metadata={
"organism": "Homo sapiens",
"tissue": "liver",
"condition": "control"
}
)
# Run a pipeline
execution = client.run_pipeline(
pipeline="RNA-seq",
version="3.19.0",
samples=[sample.id],
name="My RNA-seq Analysis"
)
# Monitor execution
execution.wait_until_complete(check_interval=30)
print(f"Pipeline completed with status: {execution.status}")
Client Configuration
Basic Configuration
# Default configuration
client = flowbio.Client()
# Custom API endpoint
client = flowbio.Client(
api_url="https://flow.myinstitute.edu",
timeout=120, # Request timeout in seconds
max_retries=5 # Number of retries for failed requests
)
# Configure for debugging
client = flowbio.Client(
debug=True, # Enable debug logging
verify_ssl=False # Disable SSL verification (dev only!)
)
Environment Variables
Configure the client using environment variables:
# API Configuration
export FLOW_API_URL="https://flow.myinstitute.edu"
export FLOW_API_TIMEOUT="120"
# Authentication
export FLOW_USERNAME="myusername"
export FLOW_PASSWORD="mypassword"
# Advanced Settings
export FLOW_CHUNK_SIZE="10485760" # 10MB chunks
export FLOW_MAX_RETRIES="5"
export FLOW_DEBUG="true"
# Client automatically uses environment variables
client = flowbio.Client()
client.login() # Uses FLOW_USERNAME and FLOW_PASSWORD
Configuration File
Create a configuration file at ~/.flowbio/config.json
:
{
"api_url": "https://api.flow.bio",
"timeout": 120,
"max_retries": 5,
"chunk_size": 10485760,
"progress_bars": true,
"auto_refresh_token": true
}
Configuration Precedence
- Explicit parameters to
Client()
- Environment variables
- Configuration file
- Default values
Authentication
Standard Login
client = flowbio.Client()
client.login("username", "password")
# Access user information
user = client.get_current_user()
print(f"Logged in as: {user.username}")
print(f"Email: {user.email}")
print(f"Groups: {', '.join(user.groups)}")
OIDC/SSO Login
# Login with OIDC token
client.oidc_login(id_token="<oidc_token>")
# Or configure for automatic OIDC
client = flowbio.Client(
oidc_provider="https://sso.company.com",
oidc_client_id="flow-client"
)
Token Persistence
# Save tokens securely
import keyring
# After login
keyring.set_password("flowbio", "access_token", client.access_token)
keyring.set_password("flowbio", "refresh_token", client.refresh_token)
# In a new session
client = flowbio.Client()
client.access_token = keyring.get_password("flowbio", "access_token")
client.refresh_token = keyring.get_password("flowbio", "refresh_token")
Session Management
# Create a context manager for automatic cleanup
from contextlib import contextmanager
@contextmanager
def flow_session(username, password):
client = flowbio.Client()
client.login(username, password)
try:
yield client
finally:
client.logout()
# Use the session
with flow_session("username", "password") as client:
samples = client.get_samples()
# Session automatically closes
Sample Management
Upload Samples
# Single-end sequencing
sample = client.upload_sample(
name="Control_Rep1",
read1="/data/control_rep1.fastq.gz",
metadata={
"organism": "Mus musculus",
"strain": "C57BL/6",
"age": "8 weeks",
"tissue": "brain"
}
)
# Paired-end sequencing
sample = client.upload_sample(
name="Treatment_Rep1",
read1="/data/treatment_rep1_R1.fastq.gz",
read2="/data/treatment_rep1_R2.fastq.gz",
metadata={
"organism": "Mus musculus",
"treatment": "drug_A",
"concentration": "10uM",
"timepoint": "24h"
},
progress=True # Show upload progress
)
Retrieve Samples
# Get all your samples
my_samples = client.get_samples()
# Get a specific sample
sample = client.get_sample(123)
print(f"Sample: {sample.name}")
print(f"Data files: {len(sample.data)}")
print(f"Metadata: {sample.metadata}")
# Filter samples
rna_samples = client.get_samples(
organism="Homo sapiens",
category="RNA-seq"
)
# Search samples
results = client.search_samples(
query="cancer",
organism="human",
created_after="2024-01-01"
)
Update Samples
# Update sample metadata
sample.update(
metadata={
"treatment": "control",
"batch": "batch_001",
"notes": "High quality sample"
}
)
# Add data to existing sample
sample.add_data("/path/to/additional_file.bam")
Share Samples
# Share with specific users
sample.share(
users=["collaborator1", "collaborator2"],
permission="read" # read, write, or admin
)
# Share with groups
sample.share(
groups=["research-team"],
permission="write"
)
# Make public
sample.make_public()
Project Management
Create and Manage Projects
# Create a new project
project = client.create_project(
name="Mouse Liver RNA-seq",
description="Investigating metabolic changes in mouse liver",
organism="Mus musculus",
private=True
)
# Add samples to project
project.add_samples([sample1.id, sample2.id, sample3.id])
# Update project
project.update(
description="Updated description",
metadata={
"funding": "NIH R01-12345",
"pi": "Dr. Smith"
}
)
Retrieve Projects
# Get your projects
my_projects = client.get_projects()
# Get public projects
public_projects = client.get_public_projects(
organism="Homo sapiens",
limit=50
)
# Search projects
cancer_projects = client.search_projects(
"breast cancer",
has_samples=True
)
Project Operations
# Get project details
project = client.get_project(10)
print(f"Project: {project.name}")
print(f"Samples: {project.sample_count}")
print(f"Created: {project.created}")
# List project samples
for sample in project.get_samples():
print(f"- {sample.name}: {sample.organism}")
# Export project metadata
metadata_df = project.export_metadata()
metadata_df.to_csv("project_metadata.csv", index=False)
# Share project
project.share(groups=["lab-members"], permission="read")
Data Management
Upload Data Files
# Upload any data file
data = client.upload_data(
"/path/to/results.vcf.gz",
metadata={
"file_type": "VCF",
"variant_caller": "GATK",
"reference": "GRCh38"
},
progress=True
)
# Upload with custom chunk size
data = client.upload_data(
"/path/to/large_file.bam",
chunk_size=50_000_000, # 50MB chunks
retries=5
)
Download Data
# Download a single file
data = client.get_data(789)
client.download_file(
data.download_url,
"/local/path/output.fastq.gz",
progress=True
)
# Download all sample data
sample = client.get_sample(123)
for data_file in sample.data:
client.download_file(
data_file.download_url,
f"/downloads/{data_file.filename}"
)
Bulk Operations
# Bulk download
job = client.create_bulk_download(
data_ids=[101, 102, 103, 104, 105],
name="my_dataset.zip"
)
# Monitor download job
while job.status in ["pending", "processing"]:
job.refresh()
print(f"Status: {job.status} ({job.progress}%)")
time.sleep(5)
# Download when ready
if job.status == "completed":
client.download_file(job.download_url, "dataset.zip")
Pipeline Execution
List Available Pipelines
# Get all pipelines
pipelines = client.get_pipelines()
for pipeline in pipelines:
print(f"{pipeline.name}: {pipeline.description}")
for version in pipeline.versions:
print(f" - v{version.version}")
# Get specific pipeline
rnaseq = client.get_pipeline("RNA-seq")
print(f"Latest version: {rnaseq.latest_version}")
Run Pipelines
# Run with default parameters
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=[sample1.id, sample2.id],
name="My Analysis"
)
# Run with custom parameters
execution = client.run_pipeline(
pipeline="RNA-seq",
version="3.19.0",
samples=sample_ids,
name="Custom RNA-seq Run",
params={
"aligner": "star_salmon",
"min_trimmed_reads": 10000,
"skip_markduplicates": True,
"deseq2_vst": True
}
)
# Run with genome override
execution = client.run_pipeline(
pipeline="RNA-seq",
samples=sample_ids,
genome="GRCm39", # Mouse genome
name="Mouse RNA-seq"
)
Monitor Executions
# Get execution status
execution = client.get_execution(456)
print(f"Status: {execution.status}")
print(f"Progress: {execution.progress}%")
# Wait for completion
execution.wait_until_complete(
check_interval=60, # Check every minute
timeout=3600 # Timeout after 1 hour
)
# Get execution logs
for log in execution.get_logs():
print(f"[{log.timestamp}] {log.process}: {log.message}")
# Download results
if execution.status == "completed":
# Download specific outputs
execution.download_output(
"multiqc_report.html",
"/results/multiqc_report.html"
)
# Download all results
execution.download_all_outputs("/results/execution_456/")
Cancel Executions
# Cancel a running execution
execution.cancel()
# Batch cancel
executions = client.get_executions(status="running")
for execution in executions:
if execution.name.startswith("test_"):
execution.cancel()
Search and Discovery
Quick Search
# Search across all entities
results = client.search("BRCA1")
print(f"Samples: {len(results['samples'])}")
print(f"Projects: {len(results['projects'])}")
print(f"Data: {len(results['data'])}")
Advanced Search
# Search with filters
samples = client.search_samples(
query="cancer",
organism="Homo sapiens",
sample_type="RNA-seq",
created_after="2024-01-01",
created_before="2024-12-31",
has_data=True
)
# Search executions
executions = client.search_executions(
pipeline="RNA-seq",
status="completed",
created_by="username"
)
# Search data files
data_files = client.search_data(
pattern="*.bam",
min_size=1_000_000_000, # Files > 1GB
data_type="BAM"
)
Group Management
Working with Groups
# Get user's groups
my_groups = client.get_my_groups()
for group in my_groups:
print(f"{group.name}: {group.member_count} members")
# Get group details
group = client.get_group("research-lab")
print(f"Group: {group.name}")
print(f"Description: {group.description}")
print(f"Your role: {group.user_role}")
# List group members
for member in group.get_members():
print(f"- {member.username} ({member.role})")
Share with Groups
# Share resources with groups
sample.share(groups=["lab-team"], permission="write")
project.share(groups=["collaborators"], permission="read")
execution.share(groups=["analysis-team"], permission="read")
Error Handling
Exception Types
from flowbio.exceptions import (
AuthenticationError,
PermissionError,
NotFoundError,
ValidationError,
UploadError,
DownloadError,
PipelineError
)
try:
sample = client.get_sample(999999)
except NotFoundError:
print("Sample not found")
except PermissionError:
print("No permission to access sample")
except AuthenticationError:
print("Authentication failed - please login again")
Retry Logic
from flowbio.utils import retry_on_error
@retry_on_error(max_attempts=3, delay=1.0)
def upload_with_retry(client, filepath):
return client.upload_data(filepath)
# Use the wrapped function
data = upload_with_retry(client, "/path/to/file.fastq.gz")
Custom Error Handling
class FlowBioWrapper:
def __init__(self, client):
self.client = client
def safe_upload(self, filepath, **kwargs):
"""Upload with comprehensive error handling"""
try:
return self.client.upload_data(filepath, **kwargs)
except UploadError as e:
if "duplicate" in str(e).lower():
print(f"File already exists: {filepath}")
# Return existing file
return self.client.search_data(
pattern=os.path.basename(filepath)
)[0]
elif "size" in str(e).lower():
print(f"File too large: {filepath}")
# Try with larger chunks
kwargs['chunk_size'] = 50_000_000
return self.client.upload_data(filepath, **kwargs)
else:
raise
Advanced Usage
Async Operations
import asyncio
from flowbio.async_client import AsyncClient
async def async_upload_samples(sample_list):
async with AsyncClient() as client:
await client.login("username", "password")
tasks = []
for sample_info in sample_list:
task = client.upload_sample_async(**sample_info)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
# Run async uploads
samples = asyncio.run(async_upload_samples(sample_data))
Batch Processing
from flowbio.batch import BatchProcessor
# Create batch processor
batch = BatchProcessor(client)
# Queue operations
for file_path in file_list:
batch.add_upload(file_path, metadata={"batch": "batch_001"})
# Execute batch with progress
results = batch.execute(max_workers=4, progress=True)
# Check results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
Custom API Calls
# Make custom REST API calls
response = client.api_call(
method="GET",
endpoint="/custom/endpoint",
params={"filter": "value"}
)
# Custom GraphQL queries
query = """
query GetSampleDetails($id: ID!) {
sample(id: $id) {
id
name
metadata
data {
id
filename
size
}
}
}
"""
result = client.graphql_query(
query,
variables={"id": 123}
)
Plugin System
from flowbio.plugins import Plugin
class CustomPlugin(Plugin):
"""Custom plugin for specialized workflows"""
def on_upload_complete(self, data):
"""Called after successful upload"""
print(f"Upload complete: {data.filename}")
# Custom post-processing
def on_execution_complete(self, execution):
"""Called when pipeline finishes"""
if execution.status == "completed":
# Auto-download results
execution.download_output(
"results.zip",
f"/auto-downloads/{execution.id}.zip"
)
# Register plugin
client.register_plugin(CustomPlugin())
Performance Tips
1. Use Appropriate Chunk Sizes
# Optimize chunk size based on file size and network
def get_optimal_chunk_size(file_size, network_speed_mbps=100):
if file_size < 100_000_000: # < 100MB
return 1_000_000 # 1MB chunks
elif network_speed_mbps > 1000: # Gigabit
return 50_000_000 # 50MB chunks
else:
return 10_000_000 # 10MB chunks
chunk_size = get_optimal_chunk_size(os.path.getsize(filepath))
data = client.upload_data(filepath, chunk_size=chunk_size)
2. Parallel Operations
from concurrent.futures import ThreadPoolExecutor
def parallel_upload(client, file_paths, max_workers=4):
"""Upload multiple files in parallel"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(client.upload_data, fp): fp
for fp in file_paths
}
results = []
for future in concurrent.futures.as_completed(futures):
filepath = futures[future]
try:
data = future.result()
results.append((filepath, data))
print(f"✓ Uploaded: {os.path.basename(filepath)}")
except Exception as e:
print(f"✗ Failed: {filepath} - {e}")
return results
3. Connection Pooling
# Reuse connections for multiple requests
client = flowbio.Client(
connection_pool_size=10,
max_retries=3
)
# The client will maintain a pool of connections
# for better performance with multiple requests
4. Caching
from functools import lru_cache
class CachedFlowClient:
def __init__(self, client):
self.client = client
@lru_cache(maxsize=100)
def get_sample_cached(self, sample_id):
"""Cache sample lookups"""
return self.client.get_sample(sample_id)
@lru_cache(maxsize=50)
def get_pipeline_cached(self, pipeline_name):
"""Cache pipeline information"""
return self.client.get_pipeline(pipeline_name)
Troubleshooting
Debug Mode
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
client = flowbio.Client(debug=True)
# All API calls will be logged
client.get_samples()
Common Issues
Authentication Failures:
try:
client.login("username", "password")
except AuthenticationError as e:
if "rate limit" in str(e):
print("Too many login attempts. Wait 5 minutes.")
elif "invalid" in str(e):
print("Check username and password")
Upload Failures:
# Verify file before upload
import os
filepath = "/path/to/file.fastq.gz"
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
if os.path.getsize(filepath) > 50_000_000_000: # 50GB
print("Warning: Large file. Consider using larger chunks.")
# Upload with verification
data = client.upload_data(filepath, verify=True)
Network Issues:
# Configure for unreliable networks
client = flowbio.Client(
timeout=300, # 5 minute timeout
max_retries=10,
retry_delay=5.0,
retry_backoff=2.0 # Exponential backoff
)
Next Steps
- API Overview - Understanding Flow's API architecture
- Upload Guide - Detailed upload strategies
- Download Guide - Efficient data retrieval
- REST API Reference - Direct API access
- Sample Workflows - Common usage patterns