Flow Logo

Resource APIs

Sample Management API

Flow's sample management system provides comprehensive tools for organizing, tracking, and analyzing biological samples. This guide covers all aspects of working with samples through the API.


Understanding Samples

Samples in Flow represent biological materials with associated metadata and data files.

Sample Structure

# A typical sample contains:
sample = {
    "id": 123,
    "name": "Patient_001_Tumor",
    "organism": "Homo sapiens",
    "sample_type": "RNA",
    "source": "Tissue",
    "purification_target": "Total RNA",
    "condition": "Tumor",
    "replicate_group": "Patient_001",
    "project": {"id": 10, "name": "Cancer Study"},
    "metadata": {
        "age": 45,
        "gender": "F",
        "tissue": "lung",
        "stage": "IIIA",
        "treatment": "pre-treatment"
    },
    "data": [
        {"id": 789, "filename": "reads_R1.fastq.gz"},
        {"id": 790, "filename": "reads_R2.fastq.gz"}
    ],
    "filesets": [
        {"id": 45, "name": "Paired-end reads"}
    ]
}

Creating Samples

Basic Sample Creation

# Python client
sample = client.create_sample(
    name="Sample_001",
    organism="human",  # or organism ID
    sample_type="RNA"  # or sample type ID
)

print(f"Created sample: {sample.id}")

REST API Sample Creation

curl -X POST https://api.flow.bio/samples/create \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Sample_001",
    "organism": "human",
    "sample_type": "RNA"
  }'

GraphQL Sample Creation

mutation CreateSample($input: CreateSampleInput!) {
  createSample(input: $input) {
    id
    name
    organism {
      id
      name
    }
    sampleType {
      id
      name
    }
    created
  }
}

# Variables
{
  "input": {
    "name": "Sample_001",
    "organism": "9606",
    "sampleType": "1",
    "project": "10",
    "metadata": {
      "tissue": "liver",
      "age": 45
    }
  }
}

Create with Full Metadata

# Comprehensive sample creation
sample = client.create_sample(
    name="Patient_001_Baseline",
    organism="human",
    sample_type="RNA",
    source="Blood",
    purification_target="Total RNA",
    condition="Baseline",
    replicate_group="Patient_001",
    project=project_id,
    metadata={
        # Clinical metadata
        "patient_id": "P001",
        "age": 52,
        "gender": "M",
        "diagnosis": "Type 2 Diabetes",
        "bmi": 28.5,
        
        # Collection metadata
        "collection_date": "2024-01-15",
        "collection_time": "08:00",
        "collection_site": "Clinic A",
        "technician": "J. Smith",
        
        # Processing metadata
        "extraction_method": "RNeasy Mini Kit",
        "extraction_date": "2024-01-15",
        "rna_concentration": "250 ng/µL",
        "rin_score": 8.5,
        "volume": "50 µL",
        
        # Experimental metadata
        "treatment_group": "control",
        "timepoint": "0h",
        "batch": "Batch_001"
    }
)

Sample Upload Workflows

Upload with Sample Creation

# Create sample and upload data in one step
sample = client.upload_sample(
    name="Sample_002",
    read1="/path/to/reads_R1.fastq.gz",
    read2="/path/to/reads_R2.fastq.gz",
    organism="mouse",
    sample_type="RNA",
    metadata={
        "strain": "C57BL/6",
        "age_weeks": 8,
        "tissue": "brain"
    },
    progress=True
)

print(f"Sample {sample.id} created with {len(sample.data)} files")

Batch Sample Upload

# Upload multiple samples efficiently
samples_to_upload = [
    {
        "name": "WT_Rep1",
        "files": {"R1": "wt1_R1.fq.gz", "R2": "wt1_R2.fq.gz"},
        "metadata": {"genotype": "WT", "replicate": 1}
    },
    {
        "name": "WT_Rep2", 
        "files": {"R1": "wt2_R1.fq.gz", "R2": "wt2_R2.fq.gz"},
        "metadata": {"genotype": "WT", "replicate": 2}
    },
    {
        "name": "KO_Rep1",
        "files": {"R1": "ko1_R1.fq.gz", "R2": "ko1_R2.fq.gz"},
        "metadata": {"genotype": "KO", "replicate": 1}
    }
]

created_samples = []
for sample_info in samples_to_upload:
    sample = client.upload_sample(
        name=sample_info["name"],
        read1=sample_info["files"]["R1"],
        read2=sample_info["files"]["R2"],
        organism="mouse",
        sample_type="RNA",
        metadata=sample_info["metadata"]
    )
    created_samples.append(sample)
    print(f"✓ Uploaded {sample.name}")

# Add to project
project.add_samples([s.id for s in created_samples])

Upload from Annotation File

import pandas as pd

# Read sample annotation
annotations = pd.read_excel("sample_annotations.xlsx")

# Create samples from annotation
for _, row in annotations.iterrows():
    sample = client.create_sample(
        name=row['Sample_Name'],
        organism=row['Organism'],
        sample_type=row['Sample_Type'],
        condition=row['Condition'],
        replicate_group=row['Replicate_Group'],
        metadata={
            col: row[col] 
            for col in annotations.columns 
            if col not in ['Sample_Name', 'Organism', 'Sample_Type']
        }
    )
    
    # Upload associated files
    if pd.notna(row['Read1_Path']):
        client.upload_data(
            row['Read1_Path'],
            sample_id=sample.id,
            read_number=1
        )
    
    if pd.notna(row['Read2_Path']):
        client.upload_data(
            row['Read2_Path'],
            sample_id=sample.id,
            read_number=2
        )

Retrieving Samples

Get Single Sample

# Python client
sample = client.get_sample(123)

# Access sample properties
print(f"Name: {sample.name}")
print(f"Organism: {sample.organism.name}")
print(f"Type: {sample.sample_type.name}")
print(f"Data files: {len(sample.data)}")
print(f"Metadata: {sample.metadata}")

REST API Sample Retrieval

curl -H "Authorization: Bearer <token>" \
  https://api.flow.bio/samples/123

GraphQL Sample Query

query GetSampleDetails($id: ID!) {
  sample(id: $id) {
    id
    name
    organism {
      id
      name
      scientificName
    }
    sampleType {
      id
      name
    }
    source {
      name
    }
    purificationTarget {
      name
    }
    condition
    replicateGroup
    project {
      id
      name
    }
    metadata
    created
    modified
    owner {
      username
      email
    }
    data {
      edges {
        node {
          id
          filename
          size
          dataType {
            name
          }
          downloadUrl
        }
      }
    }
    filesets {
      id
      name
      files {
        id
        filename
        readNumber
      }
    }
    permissions {
      canEdit
      canShare
      canDelete
    }
  }
}

List Samples

# Get your samples
my_samples = client.get_samples(limit=50)

# Get shared samples
shared_samples = client.get_shared_samples()

# Get project samples
project_samples = project.get_samples()

# Filter samples
filtered_samples = client.get_samples(
    organism="human",
    sample_type="RNA",
    created_after="2024-01-01"
)

Updating Samples

Update Sample Metadata

# Update basic properties
sample.update(
    name="Updated_Sample_Name",
    condition="Treatment",
    replicate_group="Group_A"
)

# Update metadata
sample.update_metadata({
    "treatment_duration": "24h",
    "drug_concentration": "10µM",
    "notes": "Sample showed good RNA quality"
})

# Merge metadata (preserves existing fields)
sample.merge_metadata({
    "qc_status": "passed",
    "qc_date": "2024-01-20"
})

REST API Sample Update

curl -X POST https://api.flow.bio/samples/123/update \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "condition": "Treatment",
    "metadata": {
      "treatment_duration": "24h",
      "drug_concentration": "10µM"
    }
  }'

GraphQL Sample Update

mutation UpdateSample($id: ID!, $input: UpdateSampleInput!) {
  updateSample(id: $id, input: $input) {
    id
    name
    condition
    metadata
    modified
  }
}

Batch Update Samples

# Update multiple samples
samples_to_update = [
    {"id": 123, "metadata": {"batch": "B001", "qc": "passed"}},
    {"id": 124, "metadata": {"batch": "B001", "qc": "passed"}},
    {"id": 125, "metadata": {"batch": "B002", "qc": "failed"}}
]

for update in samples_to_update:
    sample = client.get_sample(update["id"])
    sample.update_metadata(update["metadata"])
    print(f"Updated sample {sample.id}")

Sample Data Management

Add Data to Sample

# Add single file
data = client.upload_data(
    "/path/to/analysis_results.bam",
    sample_id=sample.id,
    data_type="BAM"
)

# Add multiple files
for file_path in ["file1.vcf", "file2.bed", "file3.bigwig"]:
    client.upload_data(file_path, sample_id=sample.id)

# Add with metadata
data = client.upload_data(
    "/path/to/counts.tsv",
    sample_id=sample.id,
    metadata={
        "pipeline": "RNA-seq",
        "version": "3.14",
        "normalization": "TPM"
    }
)

Manage Filesets

# Create fileset for paired-end reads
fileset = sample.create_fileset(
    name="Paired-end reads",
    files=[
        {"data_id": 789, "read_number": 1},
        {"data_id": 790, "read_number": 2}
    ]
)

# Get sample filesets
for fileset in sample.filesets:
    print(f"Fileset: {fileset.name}")
    for file in fileset.files:
        print(f"  - {file.filename} (R{file.read_number})")

Remove Data from Sample

# Remove specific data file
sample.remove_data(data_id=789)

# Remove multiple files
sample.remove_data(data_ids=[789, 790, 791])

# Remove all data (careful!)
for data in sample.data:
    sample.remove_data(data_id=data.id)

Sample Organization

Sample Groups and Replicates

# Organize samples by replicate groups
def organize_by_replicates(samples):
    """Group samples by replicate group"""
    replicate_groups = {}
    
    for sample in samples:
        group = sample.replicate_group or "ungrouped"
        if group not in replicate_groups:
            replicate_groups[group] = []
        replicate_groups[group].append(sample)
    
    return replicate_groups

# Example usage
samples = client.get_samples(project=project_id)
groups = organize_by_replicates(samples)

for group_name, group_samples in groups.items():
    print(f"\nReplicate Group: {group_name}")
    for sample in group_samples:
        print(f"  - {sample.name} ({sample.condition})")

Sample Comparison

def compare_samples(sample1_id, sample2_id):
    """Compare two samples"""
    s1 = client.get_sample(sample1_id)
    s2 = client.get_sample(sample2_id)
    
    comparison = {
        "names": [s1.name, s2.name],
        "same_organism": s1.organism.id == s2.organism.id,
        "same_type": s1.sample_type.id == s2.sample_type.id,
        "same_project": s1.project.id == s2.project.id if s1.project and s2.project else False,
        "metadata_diff": {}
    }
    
    # Compare metadata
    all_keys = set(s1.metadata.keys()) | set(s2.metadata.keys())
    for key in all_keys:
        v1 = s1.metadata.get(key)
        v2 = s2.metadata.get(key)
        if v1 != v2:
            comparison["metadata_diff"][key] = {"sample1": v1, "sample2": v2}
    
    return comparison

Sample QC and Validation

Add QC Metrics

# Add QC results to sample
sample.update_metadata({
    "qc_metrics": {
        "total_reads": 25_000_000,
        "q30_percent": 92.5,
        "gc_content": 48.3,
        "duplication_rate": 0.12,
        "adapter_content": 0.02,
        "fastqc_status": "PASS"
    }
})

# Create QC report
def generate_qc_report(sample):
    """Generate QC report for sample"""
    qc = sample.metadata.get("qc_metrics", {})
    
    report = f"""
    QC Report for {sample.name}
    ========================
    Total Reads: {qc.get('total_reads', 'N/A'):,}
    Q30 %: {qc.get('q30_percent', 'N/A')}%
    GC Content: {qc.get('gc_content', 'N/A')}%
    Duplication Rate: {qc.get('duplication_rate', 'N/A')}
    Overall Status: {qc.get('fastqc_status', 'N/A')}
    """
    
    return report

Validate Sample Data

def validate_sample(sample):
    """Validate sample has required data"""
    issues = []
    
    # Check required fields
    if not sample.name:
        issues.append("Missing sample name")
    
    if not sample.organism:
        issues.append("Missing organism")
    
    # Check data files
    if len(sample.data) == 0:
        issues.append("No data files attached")
    
    # Check paired-end consistency
    read1_files = [d for d in sample.data if d.metadata.get("read_number") == 1]
    read2_files = [d for d in sample.data if d.metadata.get("read_number") == 2]
    
    if len(read1_files) != len(read2_files):
        issues.append("Mismatched paired-end files")
    
    # Check metadata
    required_metadata = ["collection_date", "tissue", "treatment"]
    for field in required_metadata:
        if field not in sample.metadata:
            issues.append(f"Missing required metadata: {field}")
    
    return {
        "valid": len(issues) == 0,
        "issues": issues
    }

# Validate all samples in project
for sample in project.get_samples():
    validation = validate_sample(sample)
    if not validation["valid"]:
        print(f"Sample {sample.name} has issues:")
        for issue in validation["issues"]:
            print(f"  - {issue}")

Sample Import/Export

Export Sample Metadata

# Export to CSV
def export_samples_to_csv(samples, filename="samples.csv"):
    import csv
    
    with open(filename, 'w', newline='') as f:
        # Collect all metadata keys
        all_keys = set()
        for sample in samples:
            all_keys.update(sample.metadata.keys())
        
        # Define columns
        fieldnames = [
            'id', 'name', 'organism', 'sample_type', 
            'condition', 'replicate_group', 'project'
        ] + sorted(all_keys)
        
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        
        for sample in samples:
            row = {
                'id': sample.id,
                'name': sample.name,
                'organism': sample.organism.name,
                'sample_type': sample.sample_type.name,
                'condition': sample.condition,
                'replicate_group': sample.replicate_group,
                'project': sample.project.name if sample.project else ''
            }
            row.update(sample.metadata)
            writer.writerow(row)
    
    print(f"Exported {len(samples)} samples to {filename}")

# Export to Excel with formatting
def export_samples_to_excel(samples, filename="samples.xlsx"):
    import pandas as pd
    
    data = []
    for sample in samples:
        row = {
            'ID': sample.id,
            'Name': sample.name,
            'Organism': sample.organism.name,
            'Type': sample.sample_type.name,
            'Condition': sample.condition,
            'Replicate Group': sample.replicate_group,
            'Project': sample.project.name if sample.project else '',
            'Created': sample.created,
            'Data Files': len(sample.data)
        }
        # Add metadata fields
        row.update({f"metadata.{k}": v for k, v in sample.metadata.items()})
        data.append(row)
    
    df = pd.DataFrame(data)
    
    # Write with formatting
    with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
        df.to_excel(writer, sheet_name='Samples', index=False)
        
        # Format the worksheet
        workbook = writer.book
        worksheet = writer.sheets['Samples']
        
        # Add filters
        worksheet.autofilter(0, 0, len(df), len(df.columns) - 1)
        
        # Adjust column widths
        for i, col in enumerate(df.columns):
            max_len = max(df[col].astype(str).str.len().max(), len(col)) + 2
            worksheet.set_column(i, i, max_len)

Import Samples from File

def import_samples_from_excel(filename, project_id=None):
    """Import samples from Excel file"""
    import pandas as pd
    
    df = pd.read_excel(filename)
    created_samples = []
    
    for _, row in df.iterrows():
        # Extract metadata columns
        metadata = {}
        for col in df.columns:
            if col.startswith('metadata.'):
                metadata[col.replace('metadata.', '')] = row[col]
            elif col not in ['Name', 'Organism', 'Type', 'Condition', 'Replicate Group']:
                metadata[col] = row[col]
        
        # Create sample
        sample = client.create_sample(
            name=row['Name'],
            organism=row['Organism'],
            sample_type=row['Type'],
            condition=row.get('Condition'),
            replicate_group=row.get('Replicate Group'),
            project=project_id,
            metadata=metadata
        )
        
        created_samples.append(sample)
        print(f"Created sample: {sample.name}")
    
    return created_samples

Sample Sharing and Permissions

Share Samples

# Share individual sample
sample.share(
    users=["collaborator@example.com"],
    groups=["research-team"],
    permission="read"
)

# Share multiple samples
sample_ids = [123, 124, 125]
client.bulk_share(
    resource_type="sample",
    resource_ids=sample_ids,
    users=["colleague@example.com"],
    permission="edit"
)

# Share all samples in project
project = client.get_project(10)
for sample in project.get_samples():
    sample.share(groups=["lab-members"], permission="read")

Transfer Ownership

# Transfer sample ownership
sample.transfer_ownership("new_owner@example.com")

# Batch transfer
samples_to_transfer = [123, 124, 125]
for sample_id in samples_to_transfer:
    sample = client.get_sample(sample_id)
    sample.transfer_ownership("new_owner@example.com")

Sample Deletion

Delete Samples

# Delete single sample
sample.delete()
# or
client.delete_sample(123)

# Batch delete
client.bulk_delete_samples([123, 124, 125])

# Delete with confirmation
def delete_sample_with_confirmation(sample_id):
    sample = client.get_sample(sample_id)
    
    print(f"Sample: {sample.name}")
    print(f"Data files: {len(sample.data)}")
    print(f"Used in executions: {len(sample.executions)}")
    
    confirm = input("Are you sure you want to delete? (yes/no): ")
    if confirm.lower() == "yes":
        sample.delete()
        print("Sample deleted")
    else:
        print("Deletion cancelled")

Sample Templates

Create Sample Templates

# Define sample templates for common experiments
SAMPLE_TEMPLATES = {
    "rna_seq_human": {
        "organism": "human",
        "sample_type": "RNA",
        "purification_target": "Total RNA",
        "metadata": {
            "library_prep": "TruSeq Stranded mRNA",
            "sequencer": "Illumina NovaSeq",
            "read_length": 150,
            "read_type": "paired-end"
        }
    },
    "chip_seq_mouse": {
        "organism": "mouse",
        "sample_type": "DNA",
        "purification_target": "Chromatin",
        "metadata": {
            "antibody": "",  # To be filled
            "cell_type": "",  # To be filled
            "crosslink_method": "1% formaldehyde",
            "fragmentation": "sonication"
        }
    }
}

# Use template to create sample
def create_sample_from_template(name, template_name, **overrides):
    template = SAMPLE_TEMPLATES[template_name].copy()
    
    # Apply overrides
    for key, value in overrides.items():
        if key == "metadata":
            template["metadata"].update(value)
        else:
            template[key] = value
    
    return client.create_sample(name=name, **template)

# Example usage
sample = create_sample_from_template(
    name="Patient_001_Tumor",
    template_name="rna_seq_human",
    condition="Tumor",
    metadata={
        "tissue": "lung",
        "tumor_stage": "IIIA"
    }
)

Advanced Sample Queries

Complex Sample Filtering

# Find samples matching complex criteria
def find_samples_for_analysis(
    organism="human",
    sample_type="RNA",
    min_reads=10_000_000,
    required_metadata=None
):
    """Find samples suitable for analysis"""
    
    # Get all samples matching basic criteria
    samples = client.search_samples(
        organism=organism,
        sample_type=sample_type,
        has_data=True
    )
    
    suitable_samples = []
    
    for sample in samples:
        # Check read count
        total_reads = sample.metadata.get("qc_metrics", {}).get("total_reads", 0)
        if total_reads < min_reads:
            continue
        
        # Check required metadata
        if required_metadata:
            missing = False
            for key, value in required_metadata.items():
                if sample.metadata.get(key) != value:
                    missing = True
                    break
            if missing:
                continue
        
        suitable_samples.append(sample)
    
    return suitable_samples

# Find samples for differential expression
treatment_samples = find_samples_for_analysis(
    organism="human",
    sample_type="RNA",
    min_reads=20_000_000,
    required_metadata={"treatment": "drug_A"}
)

control_samples = find_samples_for_analysis(
    organism="human",
    sample_type="RNA",
    min_reads=20_000_000,
    required_metadata={"treatment": "control"}
)

Sample Lineage Tracking

def track_sample_lineage(sample_id):
    """Track sample through analysis pipeline"""
    sample = client.get_sample(sample_id)
    
    lineage = {
        "sample": sample,
        "executions": [],
        "derived_data": []
    }
    
    # Find all executions using this sample
    for execution in sample.executions:
        exec_info = {
            "id": execution.id,
            "pipeline": execution.pipeline.name,
            "status": execution.status,
            "outputs": []
        }
        
        # Track outputs
        for output in execution.outputs:
            exec_info["outputs"].append({
                "filename": output.filename,
                "size": output.size,
                "type": output.data_type
            })
        
        lineage["executions"].append(exec_info)
    
    return lineage

Best Practices

1. Consistent Naming

# Use structured naming convention
def generate_sample_name(
    project_code,
    patient_id,
    tissue,
    timepoint,
    replicate
):
    """Generate consistent sample names"""
    return f"{project_code}_{patient_id}_{tissue}_{timepoint}_Rep{replicate}"

# Example: "PROJ001_P001_liver_0h_Rep1"

2. Comprehensive Metadata

# Define required metadata schema
REQUIRED_METADATA = {
    "clinical": ["age", "sex", "diagnosis"],
    "collection": ["date", "time", "site"],
    "processing": ["method", "date", "technician"],
    "qc": ["concentration", "quality_score"]
}

def validate_metadata(sample, study_type="clinical"):
    """Validate sample has required metadata"""
    required = REQUIRED_METADATA.get(study_type, [])
    missing = [field for field in required if field not in sample.metadata]
    return len(missing) == 0, missing

3. Batch Operations

# Process samples in batches
def process_samples_batch(sample_ids, operation, batch_size=50):
    """Process samples in batches to avoid timeouts"""
    
    for i in range(0, len(sample_ids), batch_size):
        batch = sample_ids[i:i + batch_size]
        
        for sample_id in batch:
            try:
                sample = client.get_sample(sample_id)
                operation(sample)
            except Exception as e:
                print(f"Error processing sample {sample_id}: {e}")

Next Steps

Previous
Permissions Guide