API Guides
Uploading Data
Flow provides multiple methods for uploading data, from simple single-file uploads to complex multiplexed sequencing data. This guide covers all upload scenarios with examples and best practices.
Upload Methods
1. Python Client (Recommended)
The flowbio Python client provides the easiest way to upload data with automatic chunking, retry logic, and progress tracking.
import flowbio
client = flowbio.Client()
client.login("username", "password")
# Upload any data file
data = client.upload_data(
"/path/to/file.fastq.gz",
progress=True,
retries=5
)
print(f"Uploaded: {data.id} - {data.filename}")
2. REST API
Direct REST API uploads for custom integrations:
curl -X POST https://api.flow.bio/upload \
-H "Authorization: Bearer <token>" \
-F "file=@data.fastq.gz" \
-F "filename=data.fastq.gz" \
-F "chunk=0" \
-F "total_chunks=1" \
-F "file_id=unique-id-123"
3. Web Interface
For interactive uploads, use the Flow web interface with drag-and-drop support.
Chunked Uploads
Large files should be uploaded in chunks for reliability and resumability.
Chunk Size Guidelines
- Default: 1MB chunks (good for most cases)
- Fast networks: 5-10MB chunks
- Slow/unstable networks: 500KB chunks
- Maximum chunk size: 50MB
Python Client Chunking
# Custom chunk size (5MB)
data = client.upload_data(
"/path/to/large_file.bam",
chunk_size=5_000_000,
progress=True
)
# Monitor progress
def progress_callback(bytes_uploaded, total_bytes):
percent = (bytes_uploaded / total_bytes) * 100
print(f"Progress: {percent:.1f}%")
data = client.upload_data(
"/path/to/file.fastq.gz",
progress_callback=progress_callback
)
REST API Chunking
import os
import requests
import hashlib
def upload_file_chunked(filepath, token, chunk_size=1_000_000):
file_size = os.path.getsize(filepath)
total_chunks = (file_size + chunk_size - 1) // chunk_size
file_id = hashlib.md5(filepath.encode()).hexdigest()
with open(filepath, 'rb') as f:
for chunk_num in range(total_chunks):
chunk_data = f.read(chunk_size)
response = requests.post(
'https://api.flow.bio/upload',
headers={'Authorization': f'Bearer {token}'},
files={'file': chunk_data},
data={
'filename': os.path.basename(filepath),
'chunk': chunk_num,
'total_chunks': total_chunks,
'file_id': file_id
}
)
if response.status_code != 200:
raise Exception(f"Upload failed: {response.text}")
print(f"Uploaded chunk {chunk_num + 1}/{total_chunks}")
return response.json()
Sample Uploads
Upload sequencing data with sample metadata.
Single Sample
# Upload paired-end reads
sample = client.upload_sample(
name="Patient_001_RNA",
read1="/path/to/reads_R1.fastq.gz",
read2="/path/to/reads_R2.fastq.gz", # Optional for single-end
metadata={
"organism": "Homo sapiens",
"tissue": "liver",
"condition": "tumor",
"treatment": "untreated",
"sequencer": "Illumina NovaSeq",
"read_length": 150,
"strandedness": "reverse"
},
progress=True
)
print(f"Sample created: {sample.id}")
print(f"Files: {len(sample.data)} uploaded")
Batch Sample Upload
# Upload multiple samples efficiently
samples_to_upload = [
{
"name": "Sample_001",
"r1": "/data/Sample_001_R1.fastq.gz",
"r2": "/data/Sample_001_R2.fastq.gz",
"metadata": {"condition": "control"}
},
{
"name": "Sample_002",
"r1": "/data/Sample_002_R1.fastq.gz",
"r2": "/data/Sample_002_R2.fastq.gz",
"metadata": {"condition": "treatment"}
}
]
uploaded_samples = []
for sample_info in samples_to_upload:
sample = client.upload_sample(
name=sample_info["name"],
read1=sample_info["r1"],
read2=sample_info["r2"],
metadata=sample_info["metadata"],
progress=True
)
uploaded_samples.append(sample)
print(f"Uploaded {sample.name}")
# Create project with samples
project = client.create_project(
name="RNA-seq Experiment",
sample_ids=[s.id for s in uploaded_samples]
)
Multiplexed Data
Upload multiplexed sequencing runs with demultiplexing information.
Excel Annotation Format
Create an Excel file with sample information:
Sample Name | Barcode | Organism | Category | Other Metadata |
---|---|---|---|---|
Control_1 | ATCGAT | Human | RNA-seq | ... |
Control_2 | GCATGC | Human | RNA-seq | ... |
Treatment_1 | TGACTA | Human | RNA-seq | ... |
Upload Multiplexed Run
# Method 1: Using Python client
lane = client.upload_lane(
name="NovaSeq_Run_001",
annotation="/path/to/sample_sheet.xlsx",
multiplexed_file="/path/to/multiplexed.fastq.gz",
ignore_warnings=True, # Skip barcode validation warnings
progress=True
)
print(f"Lane created: {lane.id}")
print(f"Samples to be created: {len(lane.samples)}")
# Method 2: Using REST API
curl -X POST https://api.flow.bio/upload/multiplexed \
-H "Authorization: Bearer <token>" \
-F "name=NovaSeq_Run_001" \
-F "annotation=@sample_sheet.xlsx" \
-F "file=@multiplexed.fastq.gz"
Annotation Templates
Download templates for bulk sample annotation.
Get Template
# Download annotation template
template = client.get_annotation_template("sample")
with open("sample_template.xlsx", "wb") as f:
f.write(template)
# Or via REST API
curl -H "Authorization: Bearer <token>" \
https://api.flow.bio/annotation/sample \
-o sample_template.xlsx
Template Structure
The template includes:
- Required fields (Sample Name, Organism, Category)
- Optional metadata fields
- Validation rules
- Example data
- Field descriptions
Validation and Errors
Pre-upload Validation
# Validate before uploading
def validate_fastq(filepath):
"""Basic FASTQ validation"""
with gzip.open(filepath, 'rt') as f:
for i, line in enumerate(f):
if i >= 4: # Check first record
break
if i == 0 and not line.startswith('@'):
raise ValueError("Invalid FASTQ: missing @ header")
return True
# Validate file exists and is readable
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
if not os.access(filepath, os.R_OK):
raise PermissionError(f"Cannot read file: {filepath}")
# Check file size
file_size = os.path.getsize(filepath)
if file_size > 50_000_000_000: # 50GB
raise ValueError(f"File too large: {file_size} bytes")
Common Upload Errors
Authentication Errors
{
"error": "Authentication required",
"code": "UNAUTHENTICATED",
"status": 401
}
Solution: Refresh your access token or re-login.
File Too Large
{
"error": "File exceeds maximum size of 50GB",
"code": "PAYLOAD_TOO_LARGE",
"status": 413
}
Solution: Contact support for large file uploads or split the file.
Invalid File Format
{
"error": "Invalid file format. Expected: fastq, fastq.gz",
"code": "INVALID_FORMAT",
"status": 400
}
Solution: Ensure file extension and content match expected format.
Duplicate File
{
"error": "File already exists with same checksum",
"code": "DUPLICATE_FILE",
"data_id": 12345,
"status": 409
}
Solution: Use the existing file ID or force re-upload with different metadata.
Resume Failed Uploads
The Python client automatically resumes failed uploads:
# Automatic retry with exponential backoff
data = client.upload_data(
"/path/to/file.fastq.gz",
retries=5, # Retry up to 5 times
retry_delay=1.0, # Initial delay in seconds
progress=True
)
# Manual resume from specific chunk
def resume_upload(filepath, file_id, start_chunk=0):
# Implementation continues from start_chunk
pass
Performance Optimization
Parallel Uploads
import concurrent.futures
from pathlib import Path
def upload_file(client, filepath):
return client.upload_data(filepath, progress=False)
# Upload multiple files in parallel
files = list(Path("/data").glob("*.fastq.gz"))
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(upload_file, client, f) for f in files]
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Uploaded: {result.filename}")
Network Optimization
# Use compression for text files
import gzip
def compress_and_upload(filepath):
if not filepath.endswith('.gz'):
compressed = filepath + '.gz'
with open(filepath, 'rb') as f_in:
with gzip.open(compressed, 'wb') as f_out:
f_out.writelines(f_in)
filepath = compressed
return client.upload_data(filepath)
Upload Lifecycle
┌─────────────┐
│ Initiate │
│ Upload │
└──────┬──────┘
│
▼
┌─────────────┐
│ Validate │──── Invalid ───► Error Response
│ File Type │
└──────┬──────┘
│ Valid
▼
┌─────────────┐
│ Upload │
│ Chunks │◄─── Retry on Failure
└──────┬──────┘
│
▼
┌─────────────┐
│ Verify │
│ Checksum │
└──────┬──────┘
│
▼
┌─────────────┐
│ Create │
│ Data Record │
└──────┬──────┘
│
▼
┌─────────────┐
│ Success │
│ Response │
└─────────────┘
Best Practices
1. Use Appropriate Chunk Sizes
def get_optimal_chunk_size(file_size, network_speed_mbps=100):
"""Calculate optimal chunk size based on file size and network speed"""
if file_size < 100_000_000: # < 100MB
return 1_000_000 # 1MB chunks
elif file_size < 1_000_000_000: # < 1GB
return 5_000_000 # 5MB chunks
else:
# Larger chunks for big files on fast networks
if network_speed_mbps > 1000:
return 50_000_000 # 50MB chunks
else:
return 10_000_000 # 10MB chunks
2. Add Metadata During Upload
# Include all relevant metadata during upload
sample = client.upload_sample(
name="Sample_001",
read1=r1_path,
read2=r2_path,
metadata={
# Biological metadata
"organism": "Homo sapiens",
"tissue": "liver",
"cell_type": "hepatocyte",
"disease_state": "healthy",
# Technical metadata
"sequencing_platform": "Illumina NovaSeq 6000",
"library_prep": "TruSeq Stranded mRNA",
"read_length": 150,
"sequencing_depth": "30M reads",
# Experimental metadata
"experiment_date": "2024-01-15",
"technician": "J. Smith",
"batch": "Batch_2024_001"
}
)
3. Validate Before Upload
def validate_upload(filepath, expected_format="fastq.gz"):
"""Comprehensive upload validation"""
# Check file exists
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
# Check file extension
if not filepath.endswith(expected_format):
raise ValueError(f"Expected {expected_format} file")
# Check file not empty
if os.path.getsize(filepath) == 0:
raise ValueError("File is empty")
# Check file readable
if not os.access(filepath, os.R_OK):
raise PermissionError("Cannot read file")
# Validate content (first few lines)
if expected_format == "fastq.gz":
import gzip
with gzip.open(filepath, 'rt') as f:
first_line = f.readline()
if not first_line.startswith('@'):
raise ValueError("Invalid FASTQ format")
return True
4. Handle Errors Gracefully
def upload_with_retry(client, filepath, max_retries=3):
"""Upload with comprehensive error handling"""
for attempt in range(max_retries):
try:
# Validate before upload
validate_upload(filepath)
# Attempt upload
result = client.upload_data(
filepath,
progress=True,
retries=2 # Client-level retries
)
print(f"✓ Successfully uploaded: {result.filename}")
return result
except FileNotFoundError as e:
print(f"✗ File not found: {e}")
raise # Don't retry
except PermissionError as e:
print(f"✗ Permission denied: {e}")
raise # Don't retry
except Exception as e:
print(f"✗ Upload failed (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f" Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
Next Steps
- Downloading Data - Retrieve uploaded files
- Python Client Guide - Advanced client usage
- API Reference - Complete endpoint documentation
- Sample Management - Working with samples