API Guides
Downloading Data
Flow provides multiple methods for downloading data, from individual files to bulk exports. This guide covers all download scenarios including direct downloads, bulk operations, and programmatic access.
Download Methods
1. Direct Download
Download individual files with authentication:
# Download a data file
curl -H "Authorization: Bearer <token>" \
https://api.flow.bio/downloads/<data_id>/<filename> \
-o output.fastq.gz
# Download execution results
curl -H "Authorization: Bearer <token>" \
https://api.flow.bio/executions/<execution_id>/multiqc_report.html \
-o report.html
2. Python Client
The flowbio client provides convenient download methods:
import flowbio
client = flowbio.Client()
client.login("username", "password")
# Download data file
data = client.get_data(789)
client.download_file(data.download_url, "local_file.fastq.gz")
# Download with progress bar
client.download_file(
data.download_url,
"local_file.fastq.gz",
progress=True
)
3. Web Interface
Use the Flow web interface to browse and download files interactively.
Individual File Downloads
Download Data Files
# Get data information
data = client.get_data(789)
print(f"File: {data.filename}")
print(f"Size: {data.size:,} bytes")
# Download to local file
client.download_file(
f"/downloads/{data.id}/{data.filename}",
f"downloads/{data.filename}"
)
Download Execution Results
# Get execution details
execution = client.get_execution(456)
# List available files
for output in execution.outputs:
print(f"- {output.filename} ({output.size:,} bytes)")
# Download specific output
report_url = f"/executions/{execution.id}/multiqc_report.html"
client.download_file(report_url, "results/multiqc_report.html")
Streaming Downloads
For large files, use streaming to avoid memory issues:
import requests
def download_streaming(url, output_path, token, chunk_size=8192):
"""Stream download large files"""
headers = {'Authorization': f'Bearer {token}'}
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
# Get total size from headers
total_size = int(r.headers.get('content-length', 0))
with open(output_path, 'wb') as f:
downloaded = 0
for chunk in r.iter_content(chunk_size=chunk_size):
f.write(chunk)
downloaded += len(chunk)
# Progress callback
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"\rProgress: {percent:.1f}%", end='')
print("\nDownload complete!")
Bulk Downloads
For downloading multiple files, use Flow's bulk download system.
Create Bulk Download Job
# Request bulk download of multiple data files
job = client.create_bulk_download(
data_ids=[789, 790, 791],
name="my_dataset.zip"
)
print(f"Download job created: {job.id}")
print(f"Status: {job.status}")
Using REST API
# Create bulk download job
curl -X POST https://api.flow.bio/downloads \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"data_ids": [789, 790, 791],
"name": "my_dataset.zip"
}'
Monitor Download Status
# Check job status
status = client.get_download_status(job.id)
while status['status'] in ['pending', 'processing']:
print(f"Status: {status['status']} ({status['progress']}%)")
time.sleep(5)
status = client.get_download_status(job.id)
if status['status'] == 'completed':
print(f"Ready to download: {status['download_url']}")
client.download_file(status['download_url'], 'dataset.zip')
else:
print(f"Download failed: {status['error']}")
Download Job Lifecycle
┌─────────────┐
│ Request │
│ Download │
└──────┬──────┘
│
▼
┌─────────────┐
│ Queued │
│ (pending) │
└──────┬──────┘
│
▼
┌─────────────┐
│ Processing │◄─── Creating ZIP archive
│ │
└──────┬──────┘
│
▼
┌─────────────┐
│ Completed │──── Ready for download
│ │
└─────────────┘
│
▼
┌─────────────┐
│ Expired │──── After 24 hours
└─────────────┘
Download Patterns
Download All Sample Data
def download_sample_data(client, sample_id, output_dir):
"""Download all data files for a sample"""
# Get sample details
sample = client.get_sample(sample_id)
# Create output directory
sample_dir = os.path.join(output_dir, sample.name)
os.makedirs(sample_dir, exist_ok=True)
# Download each data file
for data in sample.data:
output_path = os.path.join(sample_dir, data.filename)
print(f"Downloading {data.filename}...")
client.download_file(
f"/downloads/{data.id}/{data.filename}",
output_path,
progress=True
)
print(f"✓ Downloaded {len(sample.data)} files for {sample.name}")
Download Execution Results
def download_execution_results(client, execution_id, output_dir):
"""Download all results from a pipeline execution"""
execution = client.get_execution(execution_id)
# Create directory structure
exec_dir = os.path.join(output_dir, f"execution_{execution.id}")
os.makedirs(exec_dir, exist_ok=True)
# Common output files to download
output_files = [
"multiqc_report.html",
"pipeline_report.html",
"results/counts/all.gene_counts.tsv",
"results/salmon/salmon.merged.gene_tpm.tsv"
]
for filename in output_files:
url = f"/executions/{execution.id}/{filename}"
output_path = os.path.join(exec_dir, filename)
# Create subdirectories if needed
os.makedirs(os.path.dirname(output_path), exist_ok=True)
try:
client.download_file(url, output_path)
print(f"✓ Downloaded {filename}")
except Exception as e:
print(f"✗ Failed to download {filename}: {e}")
Batch Download Projects
def download_project_data(client, project_id, output_dir):
"""Download all data from a project"""
project = client.get_project(project_id)
project_dir = os.path.join(output_dir, project.name)
# Get all data IDs from project samples
data_ids = []
for sample in project.samples:
data_ids.extend([d.id for d in sample.data])
if len(data_ids) > 100:
# Use bulk download for large projects
print(f"Creating bulk download for {len(data_ids)} files...")
job = client.create_bulk_download(
data_ids=data_ids,
name=f"{project.name}_data.zip"
)
# Wait for completion
# ... (status monitoring code)
else:
# Download files individually
for sample in project.samples:
download_sample_data(client, sample.id, project_dir)
Performance Optimization
Parallel Downloads
import concurrent.futures
from functools import partial
def download_file_wrapper(client, file_info):
"""Wrapper for parallel download"""
url, output_path = file_info
client.download_file(url, output_path)
return output_path
def parallel_download(client, file_list, max_workers=4):
"""Download multiple files in parallel"""
download_func = partial(download_file_wrapper, client)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all downloads
futures = [executor.submit(download_func, f) for f in file_list]
# Process completed downloads
for future in concurrent.futures.as_completed(futures):
try:
path = future.result()
print(f"✓ Downloaded: {os.path.basename(path)}")
except Exception as e:
print(f"✗ Download failed: {e}")
Resume Interrupted Downloads
def download_with_resume(url, output_path, token, chunk_size=1024*1024):
"""Download with resume support"""
headers = {'Authorization': f'Bearer {token}'}
# Check if partial file exists
if os.path.exists(output_path):
resume_pos = os.path.getsize(output_path)
headers['Range'] = f'bytes={resume_pos}-'
mode = 'ab'
else:
resume_pos = 0
mode = 'wb'
response = requests.get(url, headers=headers, stream=True)
# Check if server supports range requests
if response.status_code == 206: # Partial content
print(f"Resuming download from byte {resume_pos}")
elif response.status_code == 200:
if resume_pos > 0:
print("Server doesn't support resume, starting over")
mode = 'wb'
else:
response.raise_for_status()
# Download with progress
total_size = int(response.headers.get('content-length', 0)) + resume_pos
with open(output_path, mode) as f:
downloaded = resume_pos
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Show progress
percent = (downloaded / total_size) * 100
print(f"\rProgress: {percent:.1f}%", end='')
print("\nDownload complete!")
Download Security
Secure Token Handling
import os
import keyring
class SecureDownloader:
"""Downloader with secure token storage"""
def __init__(self, username):
self.username = username
self.token = None
def get_token(self):
"""Get token from secure storage"""
if not self.token:
# Try keyring first
self.token = keyring.get_password("flow-api", self.username)
if not self.token:
# Fall back to environment variable
self.token = os.environ.get('FLOW_ACCESS_TOKEN')
return self.token
def download(self, url, output_path):
"""Download with secure authentication"""
token = self.get_token()
if not token:
raise ValueError("No access token available")
headers = {'Authorization': f'Bearer {token}'}
response = requests.get(url, headers=headers, stream=True)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
Verify Downloads
import hashlib
def verify_download(filepath, expected_md5):
"""Verify downloaded file integrity"""
md5_hash = hashlib.md5()
with open(filepath, 'rb') as f:
# Read in chunks to handle large files
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
calculated_md5 = md5_hash.hexdigest()
if calculated_md5 == expected_md5:
print(f"✓ Verification passed: {filepath}")
return True
else:
print(f"✗ Verification failed: {filepath}")
print(f" Expected: {expected_md5}")
print(f" Got: {calculated_md5}")
return False
# Download and verify
data = client.get_data(789)
client.download_file(data.download_url, "file.fastq.gz")
if verify_download("file.fastq.gz", data.md5):
print("File is valid")
else:
print("File is corrupted, re-downloading...")
Error Handling
Common Download Errors
Authentication Failed
{
"error": "Invalid or expired token",
"code": "UNAUTHENTICATED",
"status": 401
}
Solution: Refresh your access token or re-login.
File Not Found
{
"error": "Data file not found",
"code": "NOT_FOUND",
"status": 404
}
Solution: Verify the file ID and your access permissions.
Permission Denied
{
"error": "You do not have permission to download this file",
"code": "FORBIDDEN",
"status": 403
}
Solution: Request access from the file owner or your group admin.
Download Expired
{
"error": "Download link has expired",
"code": "EXPIRED",
"status": 410
}
Solution: Request a new download link or create a new bulk download job.
Robust Error Handling
def safe_download(client, url, output_path, max_retries=3):
"""Download with comprehensive error handling"""
for attempt in range(max_retries):
try:
# Attempt download
client.download_file(url, output_path, progress=True)
# Verify file was created and has content
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
print(f"✓ Successfully downloaded: {output_path}")
return True
else:
raise ValueError("Downloaded file is empty")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print("Authentication failed, refreshing token...")
client.refresh_token()
elif e.response.status_code == 404:
print(f"File not found: {url}")
return False
else:
print(f"HTTP error {e.response.status_code}: {e}")
except requests.exceptions.ConnectionError:
print(f"Connection error (attempt {attempt + 1}/{max_retries})")
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
print(f"Download error: {e}")
# Clean up partial file
if os.path.exists(output_path) and os.path.getsize(output_path) == 0:
os.remove(output_path)
print(f"✗ Failed to download after {max_retries} attempts")
return False
Best Practices
1. Check Available Space
import shutil
def check_disk_space(path, required_bytes):
"""Ensure sufficient disk space before download"""
stat = shutil.disk_usage(path)
available = stat.free
if available < required_bytes * 1.1: # 10% buffer
raise IOError(
f"Insufficient disk space. "
f"Required: {required_bytes:,} bytes, "
f"Available: {available:,} bytes"
)
2. Use Progress Indicators
from tqdm import tqdm
def download_with_progress(url, output_path, token):
"""Download with progress bar"""
response = requests.get(
url,
headers={'Authorization': f'Bearer {token}'},
stream=True
)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
with open(output_path, 'wb') as f:
with tqdm(total=total_size, unit='B', unit_scale=True) as pbar:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
3. Organize Downloads
def organize_downloads(client, download_list, base_dir="downloads"):
"""Organize downloads into logical directory structure"""
for item in download_list:
# Create directory structure
if item['type'] == 'sample':
dir_path = os.path.join(base_dir, 'samples', item['name'])
elif item['type'] == 'execution':
dir_path = os.path.join(base_dir, 'executions', str(item['id']))
else:
dir_path = os.path.join(base_dir, 'data')
os.makedirs(dir_path, exist_ok=True)
# Download to organized location
output_path = os.path.join(dir_path, item['filename'])
client.download_file(item['url'], output_path)
4. Log Downloads
import logging
import json
from datetime import datetime
def setup_download_logging():
"""Setup logging for download tracking"""
logging.basicConfig(
filename='downloads.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
return logging.getLogger('downloader')
def log_download(logger, file_info):
"""Log download details"""
logger.info(json.dumps({
'action': 'download',
'file_id': file_info['id'],
'filename': file_info['filename'],
'size': file_info['size'],
'timestamp': datetime.utcnow().isoformat()
}))
Next Steps
- Uploading Data - Upload files to Flow
- Python Client Guide - Advanced client features
- Search API - Find data to download
- Permissions Guide - Understanding access control