Flow Logo

API Reference

Error Handling Guide

Flow's APIs provide consistent, informative error responses to help you handle issues gracefully. This guide covers error formats, common error codes, and best practices for error handling.


Error Response Format

REST API Errors

REST API errors follow a consistent JSON structure:

{
  "error": {
    "code": "VALIDATION_ERROR",
    "message": "Invalid file format. Expected FASTQ or FASTQ.gz",
    "details": {
      "field": "file",
      "expected": ["fastq", "fastq.gz"],
      "received": "txt"
    },
    "request_id": "req_1234567890",
    "timestamp": "2024-01-15T10:30:00Z"
  }
}

GraphQL Errors

GraphQL errors appear in the errors array:

{
  "data": null,
  "errors": [
    {
      "message": "Sample not found",
      "extensions": {
        "code": "NOT_FOUND",
        "id": "12345",
        "timestamp": "2024-01-15T10:30:00Z"
      },
      "path": ["sample"],
      "locations": [{"line": 2, "column": 3}]
    }
  ]
}

HTTP Status Codes

Flow uses standard HTTP status codes to indicate the success or failure of requests:

Status CodeMeaningCommon Causes
200 OKSuccessRequest completed successfully
201 CreatedCreatedResource created successfully
204 No ContentSuccessRequest succeeded with no response body
400 Bad RequestClient ErrorInvalid request data or parameters
401 UnauthorizedAuthentication ErrorMissing or invalid authentication token
403 ForbiddenPermission ErrorInsufficient permissions for resource
404 Not FoundNot FoundResource doesn't exist
409 ConflictConflictResource already exists or state conflict
413 Payload Too LargeToo LargeUpload exceeds size limits
422 Unprocessable EntityValidation ErrorRequest understood but invalid
429 Too Many RequestsRate LimitedToo many requests in time window
500 Internal Server ErrorServer ErrorUnexpected server error
503 Service UnavailableUnavailableService temporarily down

Error Codes

Authentication Errors

# UNAUTHENTICATED - No valid authentication
{
    "code": "UNAUTHENTICATED",
    "message": "Authentication required",
    "status": 401
}

# INVALID_CREDENTIALS - Wrong username/password
{
    "code": "INVALID_CREDENTIALS", 
    "message": "Invalid username or password",
    "status": 401
}

# TOKEN_EXPIRED - Access token expired
{
    "code": "TOKEN_EXPIRED",
    "message": "Access token has expired",
    "status": 401
}

# TOKEN_INVALID - Malformed or invalid token
{
    "code": "TOKEN_INVALID",
    "message": "Invalid authentication token",
    "status": 401
}

Permission Errors

# FORBIDDEN - Insufficient permissions
{
    "code": "FORBIDDEN",
    "message": "You do not have permission to access this resource",
    "details": {
        "resource_type": "sample",
        "resource_id": 123,
        "required_permission": "edit",
        "user_permission": "read"
    },
    "status": 403
}

# NO_PIPELINE_PERMISSION - Cannot run pipelines
{
    "code": "NO_PIPELINE_PERMISSION",
    "message": "You do not have permission to run pipelines",
    "status": 403
}

# CANNOT_SHARE - Insufficient permission to share
{
    "code": "CANNOT_SHARE",
    "message": "You need share permission to share this resource",
    "status": 403
}

Validation Errors

# VALIDATION_ERROR - Generic validation failure
{
    "code": "VALIDATION_ERROR",
    "message": "Validation failed",
    "details": {
        "errors": [
            {"field": "name", "message": "Name is required"},
            {"field": "organism", "message": "Invalid organism ID"}
        ]
    },
    "status": 400
}

# INVALID_FORMAT - Wrong file/data format
{
    "code": "INVALID_FORMAT",
    "message": "Invalid file format",
    "details": {
        "expected": ["fastq", "fastq.gz"],
        "received": "pdf"
    },
    "status": 400
}

# MISSING_REQUIRED_FIELD - Required field not provided
{
    "code": "MISSING_REQUIRED_FIELD",
    "message": "Required field missing",
    "details": {
        "field": "sample_type",
        "message": "Sample type is required"
    },
    "status": 400
}

Resource Errors

# NOT_FOUND - Resource doesn't exist
{
    "code": "NOT_FOUND",
    "message": "Sample not found",
    "details": {
        "resource_type": "sample",
        "resource_id": 12345
    },
    "status": 404
}

# ALREADY_EXISTS - Duplicate resource
{
    "code": "ALREADY_EXISTS",
    "message": "Resource already exists",
    "details": {
        "resource_type": "project",
        "field": "name",
        "value": "My Project"
    },
    "status": 409
}

# RESOURCE_LOCKED - Resource is locked
{
    "code": "RESOURCE_LOCKED",
    "message": "Resource is locked and cannot be modified",
    "details": {
        "locked_by": "system",
        "reason": "Pipeline execution in progress"
    },
    "status": 409
}

File Operation Errors

# FILE_TOO_LARGE - Upload exceeds limits
{
    "code": "FILE_TOO_LARGE",
    "message": "File exceeds maximum size of 50GB",
    "details": {
        "max_size": 53687091200,
        "file_size": 64424509440
    },
    "status": 413
}

# UPLOAD_FAILED - Upload operation failed
{
    "code": "UPLOAD_FAILED",
    "message": "Failed to upload file",
    "details": {
        "reason": "Checksum mismatch",
        "expected_md5": "abc123",
        "received_md5": "def456"
    },
    "status": 400
}

# DOWNLOAD_EXPIRED - Download link expired
{
    "code": "DOWNLOAD_EXPIRED",
    "message": "Download link has expired",
    "details": {
        "expired_at": "2024-01-15T10:00:00Z"
    },
    "status": 410
}

Pipeline Errors

# PIPELINE_NOT_FOUND - Pipeline doesn't exist
{
    "code": "PIPELINE_NOT_FOUND",
    "message": "Pipeline 'Custom-Analysis' not found",
    "status": 404
}

# INCOMPATIBLE_SAMPLES - Samples not compatible
{
    "code": "INCOMPATIBLE_SAMPLES",
    "message": "Some samples are not compatible with this pipeline",
    "details": {
        "incompatible_samples": [123, 124],
        "reason": "Wrong organism"
    },
    "status": 400
}

# EXECUTION_FAILED - Pipeline execution failed
{
    "code": "EXECUTION_FAILED",
    "message": "Pipeline execution failed",
    "details": {
        "execution_id": 456,
        "failed_process": "FASTQC",
        "error": "Out of memory"
    },
    "status": 500
}

Error Handling Patterns

Python Client Error Handling

from flowbio.exceptions import (
    AuthenticationError,
    PermissionError,
    ValidationError,
    NotFoundError,
    RateLimitError,
    ServerError
)

def safe_api_call():
    """Example of comprehensive error handling"""
    
    try:
        sample = client.get_sample(12345)
        sample.share(users=["colleague"], permission="read")
        
    except AuthenticationError as e:
        # Handle authentication issues
        if e.code == "TOKEN_EXPIRED":
            client.refresh_token()
            # Retry the operation
        else:
            print(f"Authentication failed: {e.message}")
            
    except PermissionError as e:
        # Handle permission issues
        print(f"Permission denied: {e.message}")
        print(f"Required permission: {e.required_permission}")
        
    except ValidationError as e:
        # Handle validation errors
        print(f"Validation failed: {e.message}")
        for error in e.errors:
            print(f"  {error['field']}: {error['message']}")
            
    except NotFoundError as e:
        # Handle missing resources
        print(f"Resource not found: {e.resource_type} {e.resource_id}")
        
    except RateLimitError as e:
        # Handle rate limiting
        print(f"Rate limited. Retry after {e.retry_after} seconds")
        time.sleep(e.retry_after)
        
    except ServerError as e:
        # Handle server errors
        print(f"Server error: {e.message}")
        print(f"Request ID: {e.request_id}")
        
    except Exception as e:
        # Catch-all for unexpected errors
        print(f"Unexpected error: {e}")

REST API Error Handling

import requests

def make_api_request(endpoint, method="GET", data=None):
    """Make API request with error handling"""
    
    url = f"https://api.flow.bio{endpoint}"
    headers = {"Authorization": f"Bearer {token}"}
    
    try:
        response = requests.request(
            method=method,
            url=url,
            headers=headers,
            json=data
        )
        
        # Check for HTTP errors
        response.raise_for_status()
        
        return response.json()
        
    except requests.exceptions.HTTPError as e:
        # Parse error response
        error_data = e.response.json().get("error", {})
        
        # Handle specific error codes
        if e.response.status_code == 401:
            handle_auth_error(error_data)
        elif e.response.status_code == 403:
            handle_permission_error(error_data)
        elif e.response.status_code == 404:
            handle_not_found_error(error_data)
        elif e.response.status_code == 429:
            handle_rate_limit(e.response.headers)
        else:
            print(f"API Error: {error_data.get('message', 'Unknown error')}")
            
    except requests.exceptions.ConnectionError:
        print("Connection error. Check your network.")
    except requests.exceptions.Timeout:
        print("Request timed out. Try again.")

GraphQL Error Handling

async function graphqlRequest(query, variables) {
  try {
    const response = await fetch('https://api.flow.bio/graphql', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${token}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ query, variables })
    });

    const result = await response.json();

    // Check for GraphQL errors
    if (result.errors) {
      handleGraphQLErrors(result.errors);
      return null;
    }

    return result.data;

  } catch (error) {
    console.error('Network error:', error);
    throw error;
  }
}

function handleGraphQLErrors(errors) {
  errors.forEach(error => {
    const code = error.extensions?.code;
    
    switch (code) {
      case 'UNAUTHENTICATED':
        console.error('Authentication required');
        // Redirect to login
        break;
        
      case 'NOT_FOUND':
        console.error(`Resource not found: ${error.message}`);
        break;
        
      case 'VALIDATION_ERROR':
        console.error('Validation errors:', error.extensions.details);
        break;
        
      default:
        console.error(`GraphQL error: ${error.message}`);
    }
  });
}

Retry Strategies

Exponential Backoff

import time
import random

def exponential_backoff_retry(func, max_retries=5, initial_delay=1):
    """Retry with exponential backoff"""
    
    for attempt in range(max_retries):
        try:
            return func()
            
        except RateLimitError as e:
            # Use server-provided retry delay if available
            delay = e.retry_after or (initial_delay * (2 ** attempt))
            
        except (ServerError, ConnectionError) as e:
            # Exponential backoff with jitter
            delay = initial_delay * (2 ** attempt)
            delay += random.uniform(0, delay * 0.1)  # Add 10% jitter
            
            if attempt == max_retries - 1:
                raise  # Re-raise on final attempt
                
        print(f"Attempt {attempt + 1} failed. Retrying in {delay:.1f}s...")
        time.sleep(delay)
    
    raise Exception(f"Failed after {max_retries} attempts")

Idempotent Requests

def make_idempotent_request(operation, idempotency_key=None):
    """Make request with idempotency key"""
    
    if not idempotency_key:
        idempotency_key = str(uuid.uuid4())
    
    headers = {
        "Authorization": f"Bearer {token}",
        "Idempotency-Key": idempotency_key
    }
    
    try:
        response = requests.post(
            "https://api.flow.bio/samples/create",
            headers=headers,
            json=operation
        )
        
        if response.status_code == 409:
            # Request already processed
            print(f"Request {idempotency_key} already processed")
            return response.json()["result"]
            
        response.raise_for_status()
        return response.json()
        
    except Exception as e:
        # Can safely retry with same idempotency key
        raise

Rate Limiting

Understanding Rate Limits

# Rate limit headers
{
    "X-RateLimit-Limit": "1000",      # Requests per hour
    "X-RateLimit-Remaining": "750",    # Remaining requests
    "X-RateLimit-Reset": "1642255200", # Unix timestamp when limit resets
    "Retry-After": "3600"              # Seconds until retry (when limited)
}

Handle Rate Limits

class RateLimitHandler:
    """Handle API rate limits gracefully"""
    
    def __init__(self):
        self.limit = None
        self.remaining = None
        self.reset_time = None
        
    def update_from_headers(self, headers):
        """Update rate limit info from response headers"""
        
        self.limit = int(headers.get("X-RateLimit-Limit", 0))
        self.remaining = int(headers.get("X-RateLimit-Remaining", 0))
        
        reset_timestamp = headers.get("X-RateLimit-Reset")
        if reset_timestamp:
            self.reset_time = datetime.fromtimestamp(int(reset_timestamp))
    
    def should_throttle(self, threshold=0.1):
        """Check if we should slow down requests"""
        
        if not self.limit or not self.remaining:
            return False
            
        # Throttle when below 10% of limit
        return self.remaining < (self.limit * threshold)
    
    def get_delay(self):
        """Calculate delay to avoid hitting limits"""
        
        if not self.reset_time:
            return 0
            
        time_until_reset = (self.reset_time - datetime.now()).seconds
        
        if self.remaining == 0:
            return time_until_reset
        
        # Spread remaining requests over time
        return time_until_reset / self.remaining

# Usage
rate_limiter = RateLimitHandler()

response = make_api_request("/samples")
rate_limiter.update_from_headers(response.headers)

if rate_limiter.should_throttle():
    delay = rate_limiter.get_delay()
    print(f"Throttling requests. Waiting {delay}s")
    time.sleep(delay)

Error Recovery

Automatic Recovery

class ResilientClient:
    """Client with automatic error recovery"""
    
    def __init__(self, base_client):
        self.client = base_client
        self.error_counts = {}
        
    def get_sample_with_recovery(self, sample_id):
        """Get sample with automatic recovery"""
        
        try:
            return self.client.get_sample(sample_id)
            
        except NotFoundError:
            # Try searching for sample
            results = self.client.search_samples(
                filter=f"id:{sample_id}"
            )
            if results:
                return results[0]
            raise
            
        except PermissionError:
            # Try getting through project
            projects = self.client.get_projects()
            for project in projects:
                samples = project.get_samples()
                for sample in samples:
                    if sample.id == sample_id:
                        return sample
            raise
            
        except ServerError as e:
            # Track server errors
            self.error_counts["server"] = self.error_counts.get("server", 0) + 1
            
            # Circuit breaker pattern
            if self.error_counts["server"] > 5:
                raise Exception("Too many server errors. Service may be down.")
            
            # Retry with backoff
            time.sleep(2 ** self.error_counts["server"])
            return self.get_sample_with_recovery(sample_id)

Transaction Rollback

class TransactionalOperation:
    """Execute operations with rollback on failure"""
    
    def __init__(self, client):
        self.client = client
        self.operations = []
        self.rollback_actions = []
        
    def add_operation(self, operation, rollback):
        """Add operation with rollback action"""
        self.operations.append(operation)
        self.rollback_actions.append(rollback)
        
    def execute(self):
        """Execute all operations with rollback on failure"""
        
        completed = []
        
        try:
            for i, operation in enumerate(self.operations):
                result = operation()
                completed.append((i, result))
                
        except Exception as e:
            print(f"Operation failed: {e}")
            print("Rolling back...")
            
            # Rollback in reverse order
            for i, _ in reversed(completed):
                try:
                    self.rollback_actions[i]()
                except Exception as rollback_error:
                    print(f"Rollback failed: {rollback_error}")
                    
            raise
            
        return completed

# Usage
transaction = TransactionalOperation(client)

# Add sample creation
sample = None
transaction.add_operation(
    lambda: client.create_sample(name="Test Sample", organism="human"),
    lambda: sample.delete() if sample else None
)

# Add project creation
project = None
transaction.add_operation(
    lambda: client.create_project(name="Test Project", organism="human"),
    lambda: project.delete() if project else None
)

# Execute with automatic rollback on failure
results = transaction.execute()

Error Logging

Structured Error Logging

import logging
import json

class StructuredErrorLogger:
    """Log errors with structured data"""
    
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        )
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
        
    def log_api_error(self, error, context=None):
        """Log API error with context"""
        
        error_data = {
            "error_code": getattr(error, "code", "UNKNOWN"),
            "error_message": str(error),
            "error_type": type(error).__name__,
            "timestamp": datetime.now().isoformat()
        }
        
        if hasattr(error, "request_id"):
            error_data["request_id"] = error.request_id
            
        if hasattr(error, "details"):
            error_data["details"] = error.details
            
        if context:
            error_data["context"] = context
            
        self.logger.error(json.dumps(error_data))
        
        # Send to monitoring service
        self.send_to_monitoring(error_data)
        
    def send_to_monitoring(self, error_data):
        """Send error to monitoring service"""
        # Implement integration with monitoring service
        # e.g., Sentry, DataDog, CloudWatch
        pass

# Usage
logger = StructuredErrorLogger("flow-api")

try:
    sample = client.get_sample(12345)
except Exception as e:
    logger.log_api_error(e, context={
        "operation": "get_sample",
        "sample_id": 12345,
        "user": client.current_user
    })

Best Practices

1. Always Handle Specific Errors

# Bad - generic error handling
try:
    result = api_call()
except Exception:
    print("Something went wrong")

# Good - specific error handling
try:
    result = api_call()
except AuthenticationError:
    refresh_token_and_retry()
except PermissionError as e:
    request_access_from_owner(e.resource_id)
except ValidationError as e:
    fix_validation_issues(e.errors)
except Exception as e:
    log_unexpected_error(e)
    raise

2. Provide User-Friendly Messages

def user_friendly_error(error):
    """Convert API errors to user-friendly messages"""
    
    messages = {
        "UNAUTHENTICATED": "Please log in to continue",
        "TOKEN_EXPIRED": "Your session has expired. Please log in again",
        "FORBIDDEN": "You don't have permission to access this resource",
        "NOT_FOUND": "The requested item could not be found",
        "VALIDATION_ERROR": "Please check your input and try again",
        "RATE_LIMITED": "Too many requests. Please wait a moment",
        "SERVER_ERROR": "Something went wrong on our end. Please try again later"
    }
    
    return messages.get(error.code, "An unexpected error occurred")

3. Implement Circuit Breaker

class CircuitBreaker:
    """Prevent cascading failures"""
    
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker"""
        
        if self.state == "OPEN":
            if self._should_attempt_reset():
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN")
                
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
            
        except Exception as e:
            self._on_failure()
            raise
            
    def _should_attempt_reset(self):
        """Check if we should try to reset"""
        return (
            self.last_failure_time and
            time.time() - self.last_failure_time >= self.recovery_timeout
        )
        
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = "CLOSED"
        
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

4. Log Errors for Debugging

def log_error_context(error, operation, **context):
    """Log error with full context for debugging"""
    
    import traceback
    
    error_info = {
        "timestamp": datetime.now().isoformat(),
        "operation": operation,
        "error_type": type(error).__name__,
        "error_message": str(error),
        "traceback": traceback.format_exc(),
        "context": context
    }
    
    # Log to file
    with open("error_log.json", "a") as f:
        json.dump(error_info, f)
        f.write("\n")
    
    # Also log to console in development
    if DEBUG:
        print(json.dumps(error_info, indent=2))

Next Steps

Previous
Python Client Guide