Resource APIs
Sample Management API
Flow's sample management system provides comprehensive tools for organizing, tracking, and analyzing biological samples. This guide covers all aspects of working with samples through the API.
Understanding Samples
Samples in Flow represent biological materials with associated metadata and data files.
Sample Structure
# A typical sample contains:
sample = {
"id": 123,
"name": "Patient_001_Tumor",
"organism": "Homo sapiens",
"sample_type": "RNA",
"source": "Tissue",
"purification_target": "Total RNA",
"condition": "Tumor",
"replicate_group": "Patient_001",
"project": {"id": 10, "name": "Cancer Study"},
"metadata": {
"age": 45,
"gender": "F",
"tissue": "lung",
"stage": "IIIA",
"treatment": "pre-treatment"
},
"data": [
{"id": 789, "filename": "reads_R1.fastq.gz"},
{"id": 790, "filename": "reads_R2.fastq.gz"}
],
"filesets": [
{"id": 45, "name": "Paired-end reads"}
]
}
Creating Samples
Basic Sample Creation
# Python client
sample = client.create_sample(
name="Sample_001",
organism="human", # or organism ID
sample_type="RNA" # or sample type ID
)
print(f"Created sample: {sample.id}")
REST API Sample Creation
curl -X POST https://api.flow.bio/samples/create \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "Sample_001",
"organism": "human",
"sample_type": "RNA"
}'
GraphQL Sample Creation
mutation CreateSample($input: CreateSampleInput!) {
createSample(input: $input) {
id
name
organism {
id
name
}
sampleType {
id
name
}
created
}
}
# Variables
{
"input": {
"name": "Sample_001",
"organism": "9606",
"sampleType": "1",
"project": "10",
"metadata": {
"tissue": "liver",
"age": 45
}
}
}
Create with Full Metadata
# Comprehensive sample creation
sample = client.create_sample(
name="Patient_001_Baseline",
organism="human",
sample_type="RNA",
source="Blood",
purification_target="Total RNA",
condition="Baseline",
replicate_group="Patient_001",
project=project_id,
metadata={
# Clinical metadata
"patient_id": "P001",
"age": 52,
"gender": "M",
"diagnosis": "Type 2 Diabetes",
"bmi": 28.5,
# Collection metadata
"collection_date": "2024-01-15",
"collection_time": "08:00",
"collection_site": "Clinic A",
"technician": "J. Smith",
# Processing metadata
"extraction_method": "RNeasy Mini Kit",
"extraction_date": "2024-01-15",
"rna_concentration": "250 ng/µL",
"rin_score": 8.5,
"volume": "50 µL",
# Experimental metadata
"treatment_group": "control",
"timepoint": "0h",
"batch": "Batch_001"
}
)
Sample Upload Workflows
Upload with Sample Creation
# Create sample and upload data in one step
sample = client.upload_sample(
name="Sample_002",
read1="/path/to/reads_R1.fastq.gz",
read2="/path/to/reads_R2.fastq.gz",
organism="mouse",
sample_type="RNA",
metadata={
"strain": "C57BL/6",
"age_weeks": 8,
"tissue": "brain"
},
progress=True
)
print(f"Sample {sample.id} created with {len(sample.data)} files")
Batch Sample Upload
# Upload multiple samples efficiently
samples_to_upload = [
{
"name": "WT_Rep1",
"files": {"R1": "wt1_R1.fq.gz", "R2": "wt1_R2.fq.gz"},
"metadata": {"genotype": "WT", "replicate": 1}
},
{
"name": "WT_Rep2",
"files": {"R1": "wt2_R1.fq.gz", "R2": "wt2_R2.fq.gz"},
"metadata": {"genotype": "WT", "replicate": 2}
},
{
"name": "KO_Rep1",
"files": {"R1": "ko1_R1.fq.gz", "R2": "ko1_R2.fq.gz"},
"metadata": {"genotype": "KO", "replicate": 1}
}
]
created_samples = []
for sample_info in samples_to_upload:
sample = client.upload_sample(
name=sample_info["name"],
read1=sample_info["files"]["R1"],
read2=sample_info["files"]["R2"],
organism="mouse",
sample_type="RNA",
metadata=sample_info["metadata"]
)
created_samples.append(sample)
print(f"✓ Uploaded {sample.name}")
# Add to project
project.add_samples([s.id for s in created_samples])
Upload from Annotation File
import pandas as pd
# Read sample annotation
annotations = pd.read_excel("sample_annotations.xlsx")
# Create samples from annotation
for _, row in annotations.iterrows():
sample = client.create_sample(
name=row['Sample_Name'],
organism=row['Organism'],
sample_type=row['Sample_Type'],
condition=row['Condition'],
replicate_group=row['Replicate_Group'],
metadata={
col: row[col]
for col in annotations.columns
if col not in ['Sample_Name', 'Organism', 'Sample_Type']
}
)
# Upload associated files
if pd.notna(row['Read1_Path']):
client.upload_data(
row['Read1_Path'],
sample_id=sample.id,
read_number=1
)
if pd.notna(row['Read2_Path']):
client.upload_data(
row['Read2_Path'],
sample_id=sample.id,
read_number=2
)
Retrieving Samples
Get Single Sample
# Python client
sample = client.get_sample(123)
# Access sample properties
print(f"Name: {sample.name}")
print(f"Organism: {sample.organism.name}")
print(f"Type: {sample.sample_type.name}")
print(f"Data files: {len(sample.data)}")
print(f"Metadata: {sample.metadata}")
REST API Sample Retrieval
curl -H "Authorization: Bearer <token>" \
https://api.flow.bio/samples/123
GraphQL Sample Query
query GetSampleDetails($id: ID!) {
sample(id: $id) {
id
name
organism {
id
name
scientificName
}
sampleType {
id
name
}
source {
name
}
purificationTarget {
name
}
condition
replicateGroup
project {
id
name
}
metadata
created
modified
owner {
username
email
}
data {
edges {
node {
id
filename
size
dataType {
name
}
downloadUrl
}
}
}
filesets {
id
name
files {
id
filename
readNumber
}
}
permissions {
canEdit
canShare
canDelete
}
}
}
List Samples
# Get your samples
my_samples = client.get_samples(limit=50)
# Get shared samples
shared_samples = client.get_shared_samples()
# Get project samples
project_samples = project.get_samples()
# Filter samples
filtered_samples = client.get_samples(
organism="human",
sample_type="RNA",
created_after="2024-01-01"
)
Updating Samples
Update Sample Metadata
# Update basic properties
sample.update(
name="Updated_Sample_Name",
condition="Treatment",
replicate_group="Group_A"
)
# Update metadata
sample.update_metadata({
"treatment_duration": "24h",
"drug_concentration": "10µM",
"notes": "Sample showed good RNA quality"
})
# Merge metadata (preserves existing fields)
sample.merge_metadata({
"qc_status": "passed",
"qc_date": "2024-01-20"
})
REST API Sample Update
curl -X POST https://api.flow.bio/samples/123/update \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"condition": "Treatment",
"metadata": {
"treatment_duration": "24h",
"drug_concentration": "10µM"
}
}'
GraphQL Sample Update
mutation UpdateSample($id: ID!, $input: UpdateSampleInput!) {
updateSample(id: $id, input: $input) {
id
name
condition
metadata
modified
}
}
Batch Update Samples
# Update multiple samples
samples_to_update = [
{"id": 123, "metadata": {"batch": "B001", "qc": "passed"}},
{"id": 124, "metadata": {"batch": "B001", "qc": "passed"}},
{"id": 125, "metadata": {"batch": "B002", "qc": "failed"}}
]
for update in samples_to_update:
sample = client.get_sample(update["id"])
sample.update_metadata(update["metadata"])
print(f"Updated sample {sample.id}")
Sample Data Management
Add Data to Sample
# Add single file
data = client.upload_data(
"/path/to/analysis_results.bam",
sample_id=sample.id,
data_type="BAM"
)
# Add multiple files
for file_path in ["file1.vcf", "file2.bed", "file3.bigwig"]:
client.upload_data(file_path, sample_id=sample.id)
# Add with metadata
data = client.upload_data(
"/path/to/counts.tsv",
sample_id=sample.id,
metadata={
"pipeline": "RNA-seq",
"version": "3.14",
"normalization": "TPM"
}
)
Manage Filesets
# Create fileset for paired-end reads
fileset = sample.create_fileset(
name="Paired-end reads",
files=[
{"data_id": 789, "read_number": 1},
{"data_id": 790, "read_number": 2}
]
)
# Get sample filesets
for fileset in sample.filesets:
print(f"Fileset: {fileset.name}")
for file in fileset.files:
print(f" - {file.filename} (R{file.read_number})")
Remove Data from Sample
# Remove specific data file
sample.remove_data(data_id=789)
# Remove multiple files
sample.remove_data(data_ids=[789, 790, 791])
# Remove all data (careful!)
for data in sample.data:
sample.remove_data(data_id=data.id)
Sample Organization
Sample Groups and Replicates
# Organize samples by replicate groups
def organize_by_replicates(samples):
"""Group samples by replicate group"""
replicate_groups = {}
for sample in samples:
group = sample.replicate_group or "ungrouped"
if group not in replicate_groups:
replicate_groups[group] = []
replicate_groups[group].append(sample)
return replicate_groups
# Example usage
samples = client.get_samples(project=project_id)
groups = organize_by_replicates(samples)
for group_name, group_samples in groups.items():
print(f"\nReplicate Group: {group_name}")
for sample in group_samples:
print(f" - {sample.name} ({sample.condition})")
Sample Comparison
def compare_samples(sample1_id, sample2_id):
"""Compare two samples"""
s1 = client.get_sample(sample1_id)
s2 = client.get_sample(sample2_id)
comparison = {
"names": [s1.name, s2.name],
"same_organism": s1.organism.id == s2.organism.id,
"same_type": s1.sample_type.id == s2.sample_type.id,
"same_project": s1.project.id == s2.project.id if s1.project and s2.project else False,
"metadata_diff": {}
}
# Compare metadata
all_keys = set(s1.metadata.keys()) | set(s2.metadata.keys())
for key in all_keys:
v1 = s1.metadata.get(key)
v2 = s2.metadata.get(key)
if v1 != v2:
comparison["metadata_diff"][key] = {"sample1": v1, "sample2": v2}
return comparison
Sample QC and Validation
Add QC Metrics
# Add QC results to sample
sample.update_metadata({
"qc_metrics": {
"total_reads": 25_000_000,
"q30_percent": 92.5,
"gc_content": 48.3,
"duplication_rate": 0.12,
"adapter_content": 0.02,
"fastqc_status": "PASS"
}
})
# Create QC report
def generate_qc_report(sample):
"""Generate QC report for sample"""
qc = sample.metadata.get("qc_metrics", {})
report = f"""
QC Report for {sample.name}
========================
Total Reads: {qc.get('total_reads', 'N/A'):,}
Q30 %: {qc.get('q30_percent', 'N/A')}%
GC Content: {qc.get('gc_content', 'N/A')}%
Duplication Rate: {qc.get('duplication_rate', 'N/A')}
Overall Status: {qc.get('fastqc_status', 'N/A')}
"""
return report
Validate Sample Data
def validate_sample(sample):
"""Validate sample has required data"""
issues = []
# Check required fields
if not sample.name:
issues.append("Missing sample name")
if not sample.organism:
issues.append("Missing organism")
# Check data files
if len(sample.data) == 0:
issues.append("No data files attached")
# Check paired-end consistency
read1_files = [d for d in sample.data if d.metadata.get("read_number") == 1]
read2_files = [d for d in sample.data if d.metadata.get("read_number") == 2]
if len(read1_files) != len(read2_files):
issues.append("Mismatched paired-end files")
# Check metadata
required_metadata = ["collection_date", "tissue", "treatment"]
for field in required_metadata:
if field not in sample.metadata:
issues.append(f"Missing required metadata: {field}")
return {
"valid": len(issues) == 0,
"issues": issues
}
# Validate all samples in project
for sample in project.get_samples():
validation = validate_sample(sample)
if not validation["valid"]:
print(f"Sample {sample.name} has issues:")
for issue in validation["issues"]:
print(f" - {issue}")
Sample Import/Export
Export Sample Metadata
# Export to CSV
def export_samples_to_csv(samples, filename="samples.csv"):
import csv
with open(filename, 'w', newline='') as f:
# Collect all metadata keys
all_keys = set()
for sample in samples:
all_keys.update(sample.metadata.keys())
# Define columns
fieldnames = [
'id', 'name', 'organism', 'sample_type',
'condition', 'replicate_group', 'project'
] + sorted(all_keys)
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for sample in samples:
row = {
'id': sample.id,
'name': sample.name,
'organism': sample.organism.name,
'sample_type': sample.sample_type.name,
'condition': sample.condition,
'replicate_group': sample.replicate_group,
'project': sample.project.name if sample.project else ''
}
row.update(sample.metadata)
writer.writerow(row)
print(f"Exported {len(samples)} samples to {filename}")
# Export to Excel with formatting
def export_samples_to_excel(samples, filename="samples.xlsx"):
import pandas as pd
data = []
for sample in samples:
row = {
'ID': sample.id,
'Name': sample.name,
'Organism': sample.organism.name,
'Type': sample.sample_type.name,
'Condition': sample.condition,
'Replicate Group': sample.replicate_group,
'Project': sample.project.name if sample.project else '',
'Created': sample.created,
'Data Files': len(sample.data)
}
# Add metadata fields
row.update({f"metadata.{k}": v for k, v in sample.metadata.items()})
data.append(row)
df = pd.DataFrame(data)
# Write with formatting
with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='Samples', index=False)
# Format the worksheet
workbook = writer.book
worksheet = writer.sheets['Samples']
# Add filters
worksheet.autofilter(0, 0, len(df), len(df.columns) - 1)
# Adjust column widths
for i, col in enumerate(df.columns):
max_len = max(df[col].astype(str).str.len().max(), len(col)) + 2
worksheet.set_column(i, i, max_len)
Import Samples from File
def import_samples_from_excel(filename, project_id=None):
"""Import samples from Excel file"""
import pandas as pd
df = pd.read_excel(filename)
created_samples = []
for _, row in df.iterrows():
# Extract metadata columns
metadata = {}
for col in df.columns:
if col.startswith('metadata.'):
metadata[col.replace('metadata.', '')] = row[col]
elif col not in ['Name', 'Organism', 'Type', 'Condition', 'Replicate Group']:
metadata[col] = row[col]
# Create sample
sample = client.create_sample(
name=row['Name'],
organism=row['Organism'],
sample_type=row['Type'],
condition=row.get('Condition'),
replicate_group=row.get('Replicate Group'),
project=project_id,
metadata=metadata
)
created_samples.append(sample)
print(f"Created sample: {sample.name}")
return created_samples
Sample Sharing and Permissions
Share Samples
# Share individual sample
sample.share(
users=["collaborator@example.com"],
groups=["research-team"],
permission="read"
)
# Share multiple samples
sample_ids = [123, 124, 125]
client.bulk_share(
resource_type="sample",
resource_ids=sample_ids,
users=["colleague@example.com"],
permission="edit"
)
# Share all samples in project
project = client.get_project(10)
for sample in project.get_samples():
sample.share(groups=["lab-members"], permission="read")
Transfer Ownership
# Transfer sample ownership
sample.transfer_ownership("new_owner@example.com")
# Batch transfer
samples_to_transfer = [123, 124, 125]
for sample_id in samples_to_transfer:
sample = client.get_sample(sample_id)
sample.transfer_ownership("new_owner@example.com")
Sample Deletion
Delete Samples
# Delete single sample
sample.delete()
# or
client.delete_sample(123)
# Batch delete
client.bulk_delete_samples([123, 124, 125])
# Delete with confirmation
def delete_sample_with_confirmation(sample_id):
sample = client.get_sample(sample_id)
print(f"Sample: {sample.name}")
print(f"Data files: {len(sample.data)}")
print(f"Used in executions: {len(sample.executions)}")
confirm = input("Are you sure you want to delete? (yes/no): ")
if confirm.lower() == "yes":
sample.delete()
print("Sample deleted")
else:
print("Deletion cancelled")
Sample Templates
Create Sample Templates
# Define sample templates for common experiments
SAMPLE_TEMPLATES = {
"rna_seq_human": {
"organism": "human",
"sample_type": "RNA",
"purification_target": "Total RNA",
"metadata": {
"library_prep": "TruSeq Stranded mRNA",
"sequencer": "Illumina NovaSeq",
"read_length": 150,
"read_type": "paired-end"
}
},
"chip_seq_mouse": {
"organism": "mouse",
"sample_type": "DNA",
"purification_target": "Chromatin",
"metadata": {
"antibody": "", # To be filled
"cell_type": "", # To be filled
"crosslink_method": "1% formaldehyde",
"fragmentation": "sonication"
}
}
}
# Use template to create sample
def create_sample_from_template(name, template_name, **overrides):
template = SAMPLE_TEMPLATES[template_name].copy()
# Apply overrides
for key, value in overrides.items():
if key == "metadata":
template["metadata"].update(value)
else:
template[key] = value
return client.create_sample(name=name, **template)
# Example usage
sample = create_sample_from_template(
name="Patient_001_Tumor",
template_name="rna_seq_human",
condition="Tumor",
metadata={
"tissue": "lung",
"tumor_stage": "IIIA"
}
)
Advanced Sample Queries
Complex Sample Filtering
# Find samples matching complex criteria
def find_samples_for_analysis(
organism="human",
sample_type="RNA",
min_reads=10_000_000,
required_metadata=None
):
"""Find samples suitable for analysis"""
# Get all samples matching basic criteria
samples = client.search_samples(
organism=organism,
sample_type=sample_type,
has_data=True
)
suitable_samples = []
for sample in samples:
# Check read count
total_reads = sample.metadata.get("qc_metrics", {}).get("total_reads", 0)
if total_reads < min_reads:
continue
# Check required metadata
if required_metadata:
missing = False
for key, value in required_metadata.items():
if sample.metadata.get(key) != value:
missing = True
break
if missing:
continue
suitable_samples.append(sample)
return suitable_samples
# Find samples for differential expression
treatment_samples = find_samples_for_analysis(
organism="human",
sample_type="RNA",
min_reads=20_000_000,
required_metadata={"treatment": "drug_A"}
)
control_samples = find_samples_for_analysis(
organism="human",
sample_type="RNA",
min_reads=20_000_000,
required_metadata={"treatment": "control"}
)
Sample Lineage Tracking
def track_sample_lineage(sample_id):
"""Track sample through analysis pipeline"""
sample = client.get_sample(sample_id)
lineage = {
"sample": sample,
"executions": [],
"derived_data": []
}
# Find all executions using this sample
for execution in sample.executions:
exec_info = {
"id": execution.id,
"pipeline": execution.pipeline.name,
"status": execution.status,
"outputs": []
}
# Track outputs
for output in execution.outputs:
exec_info["outputs"].append({
"filename": output.filename,
"size": output.size,
"type": output.data_type
})
lineage["executions"].append(exec_info)
return lineage
Best Practices
1. Consistent Naming
# Use structured naming convention
def generate_sample_name(
project_code,
patient_id,
tissue,
timepoint,
replicate
):
"""Generate consistent sample names"""
return f"{project_code}_{patient_id}_{tissue}_{timepoint}_Rep{replicate}"
# Example: "PROJ001_P001_liver_0h_Rep1"
2. Comprehensive Metadata
# Define required metadata schema
REQUIRED_METADATA = {
"clinical": ["age", "sex", "diagnosis"],
"collection": ["date", "time", "site"],
"processing": ["method", "date", "technician"],
"qc": ["concentration", "quality_score"]
}
def validate_metadata(sample, study_type="clinical"):
"""Validate sample has required metadata"""
required = REQUIRED_METADATA.get(study_type, [])
missing = [field for field in required if field not in sample.metadata]
return len(missing) == 0, missing
3. Batch Operations
# Process samples in batches
def process_samples_batch(sample_ids, operation, batch_size=50):
"""Process samples in batches to avoid timeouts"""
for i in range(0, len(sample_ids), batch_size):
batch = sample_ids[i:i + batch_size]
for sample_id in batch:
try:
sample = client.get_sample(sample_id)
operation(sample)
except Exception as e:
print(f"Error processing sample {sample_id}: {e}")
Next Steps
- Projects API - Organizing samples into projects
- Upload Guide - Uploading sample data
- Pipelines API - Running analyses on samples
- Search API - Finding samples