Skip to content

Processing Engine Standards

Overview

The processing engine is a Python FastAPI service responsible for scientific computations, MAST Portal integration, and FITS file handling.

Architecture

processing-engine/
├── main.py                      # FastAPI application entry point
├── requirements.txt             # Python dependencies
└── app/
    ├── mast/                    # MAST Portal integration
    │   ├── mast_service.py      # astroquery.mast wrapper
    │   ├── routes.py            # FastAPI endpoints
    │   ├── models.py            # Pydantic models
    │   ├── chunked_downloader.py    # HTTP Range download support
    │   ├── s3_client.py             # Anonymous S3 client for STScI bucket
    │   ├── s3_resolver.py           # MAST product → S3 key resolution
    │   ├── s3_downloader.py         # S3 multipart download engine
    │   ├── download_state_manager.py # Resume state persistence
    │   └── download_tracker.py      # Progress tracking
    ├── composite/               # RGB & N-channel composites
    │   ├── routes.py            # Composite FastAPI endpoints
    │   ├── models.py            # Pydantic models
    │   └── color_mapping.py     # N-channel hue/RGB mapping engine
    ├── mosaic/                  # WCS mosaic generation
    │   ├── routes.py            # Mosaic FastAPI endpoints
    │   ├── models.py            # Pydantic models
    │   └── mosaic_engine.py     # Reproject-based WCS reprojection
    ├── analysis/                # Region statistics
    │   ├── routes.py            # Analysis FastAPI endpoints
    │   └── models.py            # Pydantic models
    ├── discovery/               # Featured targets & recipe engine
    │   ├── routes.py            # Discovery FastAPI endpoints
    │   ├── models.py            # Pydantic models
    │   └── recipe_engine.py     # Composite recipe suggestion engine
    ├── semantic/                # Semantic search & embeddings
    │   ├── routes.py            # Semantic search FastAPI endpoints
    │   ├── models.py            # Pydantic models
    │   ├── embedding_service.py # FITS metadata embedding generation
    │   └── text_builder.py      # Metadata-to-text conversion for embeddings
    ├── storage/                 # Storage abstraction layer
    │   ├── provider.py          # StorageProvider ABC
    │   ├── local_storage.py     # Local filesystem implementation
    │   ├── s3_storage.py        # S3-compatible implementation (boto3)
    │   ├── temp_cache.py        # LRU temp file cache for S3
    │   ├── factory.py           # Provider factory (singleton)
    │   └── helpers.py           # resolve_fits_path, validate_fits_file_size
    └── processing/              # Scientific algorithms
        ├── analysis.py          # Analysis algorithms
        └── utils.py             # FITS utilities
```text

## Key Files

| File                                 | Purpose                                               |
| ------------------------------------ | ----------------------------------------------------- |
| `main.py`                            | FastAPI application entry, algorithm registration     |
| `app/mast/mast_service.py`           | MastService class wrapping astroquery.mast            |
| `app/mast/routes.py`                 | MAST search, download, and import endpoints           |
| `app/mast/chunked_downloader.py`     | Async HTTP downloads with Range headers               |
| `app/mast/s3_downloader.py`          | S3 multipart download engine with progress            |
| `app/mast/s3_resolver.py`            | MAST `get_cloud_uris()` → S3 key resolution          |
| `app/mast/download_state_manager.py` | JSON state files for resume capability                |
| `app/mast/download_tracker.py`       | Byte-level progress tracking                          |
| `app/composite/routes.py`            | N-channel composite generation endpoints              |
| `app/composite/color_mapping.py`     | Hue/RGB color mapping engine                          |
| `app/mosaic/routes.py`               | WCS mosaic generation endpoints (incl. observation mosaic) |
| `app/mosaic/mosaic_engine.py`        | Reproject-based WCS reprojection + batched mosaicking |
| `app/analysis/routes.py`             | Region statistics computation                         |
| `app/discovery/routes.py`            | Featured targets and recipe suggestion endpoints      |
| `app/discovery/recipe_engine.py`     | Composite recipe ranking and suggestion engine        |
| `app/semantic/routes.py`             | Semantic search and re-index endpoints                |
| `app/semantic/embedding_service.py`  | FITS metadata embedding generation                    |
| `app/storage/provider.py`            | StorageProvider ABC (local or S3)                     |

## MAST Module

### Search Endpoints

- `POST /mast/search/target` - Search by astronomical target name
- `POST /mast/search/coordinates` - Search by RA/Dec coordinates
- `POST /mast/search/observation` - Search by observation ID
- `POST /mast/search/program` - Search by program ID

### Download Endpoints

- `POST /mast/download/start-chunked` - Start download with progress (S3 preferred, HTTP fallback)
- `GET /mast/download/progress/{job_id}` - Get download progress
- `POST /mast/download/resume/{job_id}` - Resume interrupted download
- `POST /mast/download/pause/{job_id}` - Pause active download
- `GET /mast/download/resumable` - List resumable jobs

### S3 Download Path

When `downloadSource` is "auto" (default) or "s3", the engine resolves S3 keys via MAST's `get_cloud_uris()` API and downloads from `s3://stpubdata/jwst/public/` using anonymous access. Falls back to HTTP chunked download if S3 resolution fails.

### Chunked Download Features

- **HTTP Range Headers**: Downloads in 5MB chunks for reliability
- **Parallel Downloads**: 3 concurrent file downloads using asyncio
- **Progress Tracking**: Byte-level progress with speed (B/s) and ETA
- **State Persistence**: JSON files in `/app/data/mast/.download_state/`
- **Resume Capability**: Continue from last successful byte position
- **Retry Logic**: Exponential backoff (3 retries, 1s base delay)

### Configuration

```python
CHUNK_SIZE = 5 * 1024 * 1024      # 5MB chunks
MAX_CONCURRENT_FILES = 3          # Parallel file downloads
MAX_RETRIES = 3                   # Retry failed chunks
RETRY_BASE_DELAY = 1.0            # Exponential backoff base
STATE_RETENTION_DAYS = 7          # Auto-cleanup old state
```text

## Coding Standards

### Style Guide

- Follow PEP 8 style guide
- Use type hints for all functions
- Document complex algorithms with docstrings
- Use Pydantic models for request/response validation

### Async Patterns

```python
# Use async for I/O operations
async def download_file(url: str, path: str) -> None:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            # Process response...

# Use asyncio.gather for parallel operations
results = await asyncio.gather(
    download_file(url1, path1),
    download_file(url2, path2),
    download_file(url3, path3),
)
```text

### Error Handling

```python
# Use specific exception types
class DownloadError(Exception):
    pass

class ResumeError(Exception):
    pass

# Handle errors gracefully
try:
    await download_with_retry(url, path)
except aiohttp.ClientError as e:
    logger.error(f"Download failed: {e}")
    raise DownloadError(f"Failed to download {url}")
```text

### Progress Reporting

```python
# Report progress via callback
def progress_callback(
    downloaded_bytes: int,
    total_bytes: int,
    speed_bytes_per_sec: float,
    eta_seconds: Optional[float]
) -> None:
    percent = (downloaded_bytes / total_bytes) * 100
    logger.info(f"Progress: {percent:.1f}% at {speed_bytes_per_sec/1e6:.2f} MB/s")
```text

## Dependencies

Key packages in `requirements.txt`:

```text
fastapi>=0.100.0          # Web framework
uvicorn>=0.24.0           # ASGI server
astroquery>=0.4.7         # MAST Portal queries
astropy>=5.3              # FITS file handling
numpy>=1.24.0             # Numerical operations
scipy>=1.11.0             # Scientific computing
aiohttp>=3.9.0            # Async HTTP client
aiofiles>=23.2.0          # Async file operations
pydantic>=2.0.0           # Data validation
```text

## Testing

```bash
# Run tests
pytest

# Run with coverage
pytest --cov=app

# Run specific test file
pytest tests/test_mast.py
```text

## Docker

The processing engine runs in Docker with storage configured via environment variables:

```yaml
processing-engine:
  build: ../processing-engine
  environment:
    - PYTHONUNBUFFERED=1
    - STORAGE_PROVIDER=local    # or "s3" for S3-compatible storage
  ports:
    - "8000:8000"

Files are accessed through the StorageProvider abstraction (app/storage/). With STORAGE_PROVIDER=s3, files are read/written via S3 (SeaweedFS for local dev, AWS S3 for production). With local, files use the shared Docker volume at /app/data/.