Processing Engine Standards
Overview
The processing engine is a Python FastAPI service responsible for scientific computations, MAST Portal integration, and FITS file handling.
Architecture
processing-engine/
├── main.py # FastAPI application entry point
├── requirements.txt # Python dependencies
└── app/
├── mast/ # MAST Portal integration
│ ├── mast_service.py # astroquery.mast wrapper
│ ├── routes.py # FastAPI endpoints
│ ├── models.py # Pydantic models
│ ├── chunked_downloader.py # HTTP Range download support
│ ├── s3_client.py # Anonymous S3 client for STScI bucket
│ ├── s3_resolver.py # MAST product → S3 key resolution
│ ├── s3_downloader.py # S3 multipart download engine
│ ├── download_state_manager.py # Resume state persistence
│ └── download_tracker.py # Progress tracking
├── composite/ # RGB & N-channel composites
│ ├── routes.py # Composite FastAPI endpoints
│ ├── models.py # Pydantic models
│ └── color_mapping.py # N-channel hue/RGB mapping engine
├── mosaic/ # WCS mosaic generation
│ ├── routes.py # Mosaic FastAPI endpoints
│ ├── models.py # Pydantic models
│ └── mosaic_engine.py # Reproject-based WCS reprojection
├── analysis/ # Region statistics
│ ├── routes.py # Analysis FastAPI endpoints
│ └── models.py # Pydantic models
├── discovery/ # Featured targets & recipe engine
│ ├── routes.py # Discovery FastAPI endpoints
│ ├── models.py # Pydantic models
│ └── recipe_engine.py # Composite recipe suggestion engine
├── semantic/ # Semantic search & embeddings
│ ├── routes.py # Semantic search FastAPI endpoints
│ ├── models.py # Pydantic models
│ ├── embedding_service.py # FITS metadata embedding generation
│ └── text_builder.py # Metadata-to-text conversion for embeddings
├── storage/ # Storage abstraction layer
│ ├── provider.py # StorageProvider ABC
│ ├── local_storage.py # Local filesystem implementation
│ ├── s3_storage.py # S3-compatible implementation (boto3)
│ ├── temp_cache.py # LRU temp file cache for S3
│ ├── factory.py # Provider factory (singleton)
│ └── helpers.py # resolve_fits_path, validate_fits_file_size
└── processing/ # Scientific algorithms
├── analysis.py # Analysis algorithms
└── utils.py # FITS utilities
```text
## Key Files
| File | Purpose |
| ------------------------------------ | ----------------------------------------------------- |
| `main.py` | FastAPI application entry, algorithm registration |
| `app/mast/mast_service.py` | MastService class wrapping astroquery.mast |
| `app/mast/routes.py` | MAST search, download, and import endpoints |
| `app/mast/chunked_downloader.py` | Async HTTP downloads with Range headers |
| `app/mast/s3_downloader.py` | S3 multipart download engine with progress |
| `app/mast/s3_resolver.py` | MAST `get_cloud_uris()` → S3 key resolution |
| `app/mast/download_state_manager.py` | JSON state files for resume capability |
| `app/mast/download_tracker.py` | Byte-level progress tracking |
| `app/composite/routes.py` | N-channel composite generation endpoints |
| `app/composite/color_mapping.py` | Hue/RGB color mapping engine |
| `app/mosaic/routes.py` | WCS mosaic generation endpoints (incl. observation mosaic) |
| `app/mosaic/mosaic_engine.py` | Reproject-based WCS reprojection + batched mosaicking |
| `app/analysis/routes.py` | Region statistics computation |
| `app/discovery/routes.py` | Featured targets and recipe suggestion endpoints |
| `app/discovery/recipe_engine.py` | Composite recipe ranking and suggestion engine |
| `app/semantic/routes.py` | Semantic search and re-index endpoints |
| `app/semantic/embedding_service.py` | FITS metadata embedding generation |
| `app/storage/provider.py` | StorageProvider ABC (local or S3) |
## MAST Module
### Search Endpoints
- `POST /mast/search/target` - Search by astronomical target name
- `POST /mast/search/coordinates` - Search by RA/Dec coordinates
- `POST /mast/search/observation` - Search by observation ID
- `POST /mast/search/program` - Search by program ID
### Download Endpoints
- `POST /mast/download/start-chunked` - Start download with progress (S3 preferred, HTTP fallback)
- `GET /mast/download/progress/{job_id}` - Get download progress
- `POST /mast/download/resume/{job_id}` - Resume interrupted download
- `POST /mast/download/pause/{job_id}` - Pause active download
- `GET /mast/download/resumable` - List resumable jobs
### S3 Download Path
When `downloadSource` is "auto" (default) or "s3", the engine resolves S3 keys via MAST's `get_cloud_uris()` API and downloads from `s3://stpubdata/jwst/public/` using anonymous access. Falls back to HTTP chunked download if S3 resolution fails.
### Chunked Download Features
- **HTTP Range Headers**: Downloads in 5MB chunks for reliability
- **Parallel Downloads**: 3 concurrent file downloads using asyncio
- **Progress Tracking**: Byte-level progress with speed (B/s) and ETA
- **State Persistence**: JSON files in `/app/data/mast/.download_state/`
- **Resume Capability**: Continue from last successful byte position
- **Retry Logic**: Exponential backoff (3 retries, 1s base delay)
### Configuration
```python
CHUNK_SIZE = 5 * 1024 * 1024 # 5MB chunks
MAX_CONCURRENT_FILES = 3 # Parallel file downloads
MAX_RETRIES = 3 # Retry failed chunks
RETRY_BASE_DELAY = 1.0 # Exponential backoff base
STATE_RETENTION_DAYS = 7 # Auto-cleanup old state
```text
## Coding Standards
### Style Guide
- Follow PEP 8 style guide
- Use type hints for all functions
- Document complex algorithms with docstrings
- Use Pydantic models for request/response validation
### Async Patterns
```python
# Use async for I/O operations
async def download_file(url: str, path: str) -> None:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# Process response...
# Use asyncio.gather for parallel operations
results = await asyncio.gather(
download_file(url1, path1),
download_file(url2, path2),
download_file(url3, path3),
)
```text
### Error Handling
```python
# Use specific exception types
class DownloadError(Exception):
pass
class ResumeError(Exception):
pass
# Handle errors gracefully
try:
await download_with_retry(url, path)
except aiohttp.ClientError as e:
logger.error(f"Download failed: {e}")
raise DownloadError(f"Failed to download {url}")
```text
### Progress Reporting
```python
# Report progress via callback
def progress_callback(
downloaded_bytes: int,
total_bytes: int,
speed_bytes_per_sec: float,
eta_seconds: Optional[float]
) -> None:
percent = (downloaded_bytes / total_bytes) * 100
logger.info(f"Progress: {percent:.1f}% at {speed_bytes_per_sec/1e6:.2f} MB/s")
```text
## Dependencies
Key packages in `requirements.txt`:
```text
fastapi>=0.100.0 # Web framework
uvicorn>=0.24.0 # ASGI server
astroquery>=0.4.7 # MAST Portal queries
astropy>=5.3 # FITS file handling
numpy>=1.24.0 # Numerical operations
scipy>=1.11.0 # Scientific computing
aiohttp>=3.9.0 # Async HTTP client
aiofiles>=23.2.0 # Async file operations
pydantic>=2.0.0 # Data validation
```text
## Testing
```bash
# Run tests
pytest
# Run with coverage
pytest --cov=app
# Run specific test file
pytest tests/test_mast.py
```text
## Docker
The processing engine runs in Docker with storage configured via environment variables:
```yaml
processing-engine:
build: ../processing-engine
environment:
- PYTHONUNBUFFERED=1
- STORAGE_PROVIDER=local # or "s3" for S3-compatible storage
ports:
- "8000:8000"
Files are accessed through the StorageProvider abstraction (app/storage/). With STORAGE_PROVIDER=s3, files are read/written via S3 (SeaweedFS for local dev, AWS S3 for production). With local, files use the shared Docker volume at /app/data/.