Flow Logo

API Guides

Downloading Data

Flow provides multiple methods for downloading data, from individual files to bulk exports. This guide covers all download scenarios including direct downloads, bulk operations, and programmatic access.


Download Methods

1. Direct Download

Download individual files with authentication:

# Download a data file
curl -H "Authorization: Bearer <token>" \
  https://api.flow.bio/downloads/<data_id>/<filename> \
  -o output.fastq.gz

# Download execution results
curl -H "Authorization: Bearer <token>" \
  https://api.flow.bio/executions/<execution_id>/multiqc_report.html \
  -o report.html

2. Python Client

The flowbio client provides convenient download methods:

import flowbio

client = flowbio.Client()
client.login("username", "password")

# Download data file
data = client.get_data(789)
client.download_file(data.download_url, "local_file.fastq.gz")

# Download with progress bar
client.download_file(
    data.download_url, 
    "local_file.fastq.gz",
    progress=True
)

3. Web Interface

Use the Flow web interface to browse and download files interactively.


Individual File Downloads

Download Data Files

# Get data information
data = client.get_data(789)
print(f"File: {data.filename}")
print(f"Size: {data.size:,} bytes")

# Download to local file
client.download_file(
    f"/downloads/{data.id}/{data.filename}",
    f"downloads/{data.filename}"
)

Download Execution Results

# Get execution details
execution = client.get_execution(456)

# List available files
for output in execution.outputs:
    print(f"- {output.filename} ({output.size:,} bytes)")

# Download specific output
report_url = f"/executions/{execution.id}/multiqc_report.html"
client.download_file(report_url, "results/multiqc_report.html")

Streaming Downloads

For large files, use streaming to avoid memory issues:

import requests

def download_streaming(url, output_path, token, chunk_size=8192):
    """Stream download large files"""
    headers = {'Authorization': f'Bearer {token}'}
    
    with requests.get(url, headers=headers, stream=True) as r:
        r.raise_for_status()
        
        # Get total size from headers
        total_size = int(r.headers.get('content-length', 0))
        
        with open(output_path, 'wb') as f:
            downloaded = 0
            for chunk in r.iter_content(chunk_size=chunk_size):
                f.write(chunk)
                downloaded += len(chunk)
                
                # Progress callback
                if total_size > 0:
                    percent = (downloaded / total_size) * 100
                    print(f"\rProgress: {percent:.1f}%", end='')
        
        print("\nDownload complete!")

Bulk Downloads

For downloading multiple files, use Flow's bulk download system.

Create Bulk Download Job

# Request bulk download of multiple data files
job = client.create_bulk_download(
    data_ids=[789, 790, 791],
    name="my_dataset.zip"
)

print(f"Download job created: {job.id}")
print(f"Status: {job.status}")

Using REST API

# Create bulk download job
curl -X POST https://api.flow.bio/downloads \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "data_ids": [789, 790, 791],
    "name": "my_dataset.zip"
  }'

Monitor Download Status

# Check job status
status = client.get_download_status(job.id)

while status['status'] in ['pending', 'processing']:
    print(f"Status: {status['status']} ({status['progress']}%)")
    time.sleep(5)
    status = client.get_download_status(job.id)

if status['status'] == 'completed':
    print(f"Ready to download: {status['download_url']}")
    client.download_file(status['download_url'], 'dataset.zip')
else:
    print(f"Download failed: {status['error']}")

Download Job Lifecycle

┌─────────────┐
│   Request   │
│  Download   │
└──────┬──────┘


┌─────────────┐
│   Queued    │
│  (pending)  │
└──────┬──────┘


┌─────────────┐
│ Processing  │◄─── Creating ZIP archive
│             │
└──────┬──────┘


┌─────────────┐
│  Completed  │──── Ready for download
│             │
└─────────────┘


┌─────────────┐
│   Expired   │──── After 24 hours
└─────────────┘

Download Patterns

Download All Sample Data

def download_sample_data(client, sample_id, output_dir):
    """Download all data files for a sample"""
    
    # Get sample details
    sample = client.get_sample(sample_id)
    
    # Create output directory
    sample_dir = os.path.join(output_dir, sample.name)
    os.makedirs(sample_dir, exist_ok=True)
    
    # Download each data file
    for data in sample.data:
        output_path = os.path.join(sample_dir, data.filename)
        print(f"Downloading {data.filename}...")
        
        client.download_file(
            f"/downloads/{data.id}/{data.filename}",
            output_path,
            progress=True
        )
    
    print(f"✓ Downloaded {len(sample.data)} files for {sample.name}")

Download Execution Results

def download_execution_results(client, execution_id, output_dir):
    """Download all results from a pipeline execution"""
    
    execution = client.get_execution(execution_id)
    
    # Create directory structure
    exec_dir = os.path.join(output_dir, f"execution_{execution.id}")
    os.makedirs(exec_dir, exist_ok=True)
    
    # Common output files to download
    output_files = [
        "multiqc_report.html",
        "pipeline_report.html",
        "results/counts/all.gene_counts.tsv",
        "results/salmon/salmon.merged.gene_tpm.tsv"
    ]
    
    for filename in output_files:
        url = f"/executions/{execution.id}/{filename}"
        output_path = os.path.join(exec_dir, filename)
        
        # Create subdirectories if needed
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        try:
            client.download_file(url, output_path)
            print(f"✓ Downloaded {filename}")
        except Exception as e:
            print(f"✗ Failed to download {filename}: {e}")

Batch Download Projects

def download_project_data(client, project_id, output_dir):
    """Download all data from a project"""
    
    project = client.get_project(project_id)
    project_dir = os.path.join(output_dir, project.name)
    
    # Get all data IDs from project samples
    data_ids = []
    for sample in project.samples:
        data_ids.extend([d.id for d in sample.data])
    
    if len(data_ids) > 100:
        # Use bulk download for large projects
        print(f"Creating bulk download for {len(data_ids)} files...")
        
        job = client.create_bulk_download(
            data_ids=data_ids,
            name=f"{project.name}_data.zip"
        )
        
        # Wait for completion
        # ... (status monitoring code)
        
    else:
        # Download files individually
        for sample in project.samples:
            download_sample_data(client, sample.id, project_dir)

Performance Optimization

Parallel Downloads

import concurrent.futures
from functools import partial

def download_file_wrapper(client, file_info):
    """Wrapper for parallel download"""
    url, output_path = file_info
    client.download_file(url, output_path)
    return output_path

def parallel_download(client, file_list, max_workers=4):
    """Download multiple files in parallel"""
    
    download_func = partial(download_file_wrapper, client)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all downloads
        futures = [executor.submit(download_func, f) for f in file_list]
        
        # Process completed downloads
        for future in concurrent.futures.as_completed(futures):
            try:
                path = future.result()
                print(f"✓ Downloaded: {os.path.basename(path)}")
            except Exception as e:
                print(f"✗ Download failed: {e}")

Resume Interrupted Downloads

def download_with_resume(url, output_path, token, chunk_size=1024*1024):
    """Download with resume support"""
    
    headers = {'Authorization': f'Bearer {token}'}
    
    # Check if partial file exists
    if os.path.exists(output_path):
        resume_pos = os.path.getsize(output_path)
        headers['Range'] = f'bytes={resume_pos}-'
        mode = 'ab'
    else:
        resume_pos = 0
        mode = 'wb'
    
    response = requests.get(url, headers=headers, stream=True)
    
    # Check if server supports range requests
    if response.status_code == 206:  # Partial content
        print(f"Resuming download from byte {resume_pos}")
    elif response.status_code == 200:
        if resume_pos > 0:
            print("Server doesn't support resume, starting over")
            mode = 'wb'
    else:
        response.raise_for_status()
    
    # Download with progress
    total_size = int(response.headers.get('content-length', 0)) + resume_pos
    
    with open(output_path, mode) as f:
        downloaded = resume_pos
        
        for chunk in response.iter_content(chunk_size=chunk_size):
            if chunk:
                f.write(chunk)
                downloaded += len(chunk)
                
                # Show progress
                percent = (downloaded / total_size) * 100
                print(f"\rProgress: {percent:.1f}%", end='')
    
    print("\nDownload complete!")

Download Security

Secure Token Handling

import os
import keyring

class SecureDownloader:
    """Downloader with secure token storage"""
    
    def __init__(self, username):
        self.username = username
        self.token = None
    
    def get_token(self):
        """Get token from secure storage"""
        if not self.token:
            # Try keyring first
            self.token = keyring.get_password("flow-api", self.username)
            
            if not self.token:
                # Fall back to environment variable
                self.token = os.environ.get('FLOW_ACCESS_TOKEN')
        
        return self.token
    
    def download(self, url, output_path):
        """Download with secure authentication"""
        token = self.get_token()
        if not token:
            raise ValueError("No access token available")
        
        headers = {'Authorization': f'Bearer {token}'}
        
        response = requests.get(url, headers=headers, stream=True)
        response.raise_for_status()
        
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

Verify Downloads

import hashlib

def verify_download(filepath, expected_md5):
    """Verify downloaded file integrity"""
    
    md5_hash = hashlib.md5()
    
    with open(filepath, 'rb') as f:
        # Read in chunks to handle large files
        for chunk in iter(lambda: f.read(4096), b""):
            md5_hash.update(chunk)
    
    calculated_md5 = md5_hash.hexdigest()
    
    if calculated_md5 == expected_md5:
        print(f"✓ Verification passed: {filepath}")
        return True
    else:
        print(f"✗ Verification failed: {filepath}")
        print(f"  Expected: {expected_md5}")
        print(f"  Got:      {calculated_md5}")
        return False

# Download and verify
data = client.get_data(789)
client.download_file(data.download_url, "file.fastq.gz")

if verify_download("file.fastq.gz", data.md5):
    print("File is valid")
else:
    print("File is corrupted, re-downloading...")

Error Handling

Common Download Errors

Authentication Failed

{
  "error": "Invalid or expired token",
  "code": "UNAUTHENTICATED",
  "status": 401
}

Solution: Refresh your access token or re-login.

File Not Found

{
  "error": "Data file not found",
  "code": "NOT_FOUND",
  "status": 404
}

Solution: Verify the file ID and your access permissions.

Permission Denied

{
  "error": "You do not have permission to download this file",
  "code": "FORBIDDEN",
  "status": 403
}

Solution: Request access from the file owner or your group admin.

Download Expired

{
  "error": "Download link has expired",
  "code": "EXPIRED",
  "status": 410
}

Solution: Request a new download link or create a new bulk download job.

Robust Error Handling

def safe_download(client, url, output_path, max_retries=3):
    """Download with comprehensive error handling"""
    
    for attempt in range(max_retries):
        try:
            # Attempt download
            client.download_file(url, output_path, progress=True)
            
            # Verify file was created and has content
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"✓ Successfully downloaded: {output_path}")
                return True
            else:
                raise ValueError("Downloaded file is empty")
                
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                print("Authentication failed, refreshing token...")
                client.refresh_token()
            elif e.response.status_code == 404:
                print(f"File not found: {url}")
                return False
            else:
                print(f"HTTP error {e.response.status_code}: {e}")
                
        except requests.exceptions.ConnectionError:
            print(f"Connection error (attempt {attempt + 1}/{max_retries})")
            time.sleep(2 ** attempt)  # Exponential backoff
            
        except Exception as e:
            print(f"Download error: {e}")
            
        # Clean up partial file
        if os.path.exists(output_path) and os.path.getsize(output_path) == 0:
            os.remove(output_path)
    
    print(f"✗ Failed to download after {max_retries} attempts")
    return False

Best Practices

1. Check Available Space

import shutil

def check_disk_space(path, required_bytes):
    """Ensure sufficient disk space before download"""
    
    stat = shutil.disk_usage(path)
    available = stat.free
    
    if available < required_bytes * 1.1:  # 10% buffer
        raise IOError(
            f"Insufficient disk space. "
            f"Required: {required_bytes:,} bytes, "
            f"Available: {available:,} bytes"
        )

2. Use Progress Indicators

from tqdm import tqdm

def download_with_progress(url, output_path, token):
    """Download with progress bar"""
    
    response = requests.get(
        url,
        headers={'Authorization': f'Bearer {token}'},
        stream=True
    )
    response.raise_for_status()
    
    total_size = int(response.headers.get('content-length', 0))
    
    with open(output_path, 'wb') as f:
        with tqdm(total=total_size, unit='B', unit_scale=True) as pbar:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
                pbar.update(len(chunk))

3. Organize Downloads

def organize_downloads(client, download_list, base_dir="downloads"):
    """Organize downloads into logical directory structure"""
    
    for item in download_list:
        # Create directory structure
        if item['type'] == 'sample':
            dir_path = os.path.join(base_dir, 'samples', item['name'])
        elif item['type'] == 'execution':
            dir_path = os.path.join(base_dir, 'executions', str(item['id']))
        else:
            dir_path = os.path.join(base_dir, 'data')
        
        os.makedirs(dir_path, exist_ok=True)
        
        # Download to organized location
        output_path = os.path.join(dir_path, item['filename'])
        client.download_file(item['url'], output_path)

4. Log Downloads

import logging
import json
from datetime import datetime

def setup_download_logging():
    """Setup logging for download tracking"""
    
    logging.basicConfig(
        filename='downloads.log',
        level=logging.INFO,
        format='%(asctime)s - %(message)s'
    )
    
    return logging.getLogger('downloader')

def log_download(logger, file_info):
    """Log download details"""
    
    logger.info(json.dumps({
        'action': 'download',
        'file_id': file_info['id'],
        'filename': file_info['filename'],
        'size': file_info['size'],
        'timestamp': datetime.utcnow().isoformat()
    }))

Next Steps

Previous
Uploading Data