A Python SDK for building serverless AI inference workers on the Cozy Creator platform.
uv add gen-workerWith PyTorch support:
uv add gen-worker[torch]import msgspec
from gen_worker import ActionContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
text: str
@worker_function()
def generate(ctx: ActionContext, payload: Input) -> Output:
return Output(text=f"Hello, {payload.prompt}!")- Function discovery - Automatic detection of
@worker_functiondecorated functions - Schema generation - Input/output schemas extracted from msgspec types
- Model injection - Dependency injection for ML models with caching
- Streaming output - Support for incremental/streaming responses
- Progress reporting - Built-in progress events via
ActionContext - File handling - Upload/download assets via Cozy hub file API
- Model caching - LRU cache with VRAM/disk management and cache-aware routing
import msgspec
from gen_worker import ActionContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
result: str
@worker_function()
def my_function(ctx: ActionContext, payload: Input) -> Output:
return Output(result=f"Processed: {payload.prompt}")from typing import Iterator
class Delta(msgspec.Struct):
chunk: str
@worker_function()
def stream(ctx: ActionContext, payload: Input) -> Iterator[Delta]:
for word in payload.prompt.split():
if ctx.is_canceled():
raise InterruptedError("canceled")
yield Delta(chunk=word)from typing import Annotated
from gen_worker.injection import ModelArtifacts, ModelRef, ModelRefSource as Src
@worker_function()
def generate(
ctx: ActionContext,
artifacts: Annotated[ModelArtifacts, ModelRef(Src.DEPLOYMENT, "my-model")],
payload: Input,
) -> Output:
model_path = artifacts.root_dir
# Load and use model...
return Output(result="done")@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
# Save bytes and get asset reference
asset = ctx.save_bytes("output.png", image_bytes)
return Output(result=asset.ref)[tool.cozy]
deployment = "my-worker"
[tool.cozy.models]
# Model refs (phase 1):
# - Cozy Hub snapshot (default): org/repo[:tag] or org/repo@sha256:<digest>
# - Hugging Face repo: hf:org/repo[@revision] (requires gen-worker)
sdxl = "cozy:stabilityai/sdxl:latest"
qwen_image = "hf:Qwen/Qwen2.5-VL-7B-Instruct@main"
[tool.cozy.build]
gpu = true| Variable | Default | Description |
|---|---|---|
SCHEDULER_ADDR |
- | Primary scheduler address |
SCHEDULER_ADDRS |
- | Comma-separated seed addresses for leader discovery |
WORKER_JWT |
- | Auth token (fallback if AUTH_TOKEN not set) |
SCHEDULER_JWKS_URL |
- | JWKS URL for JWT verification |
WORKER_MAX_CONCURRENCY |
- | Max concurrent task executions |
WORKER_MAX_INPUT_BYTES |
- | Max input payload size |
WORKER_MAX_OUTPUT_BYTES |
- | Max output payload size |
WORKER_MAX_UPLOAD_BYTES |
- | Max file upload size |
WORKER_MAX_VRAM_GB |
Auto | Maximum VRAM for models |
WORKER_VRAM_SAFETY_MARGIN_GB |
3.5 | Reserved VRAM for working memory |
WORKER_MODEL_CACHE_DIR |
/tmp/model_cache |
Disk cache directory |
WORKER_MAX_CONCURRENT_DOWNLOADS |
2 | Max parallel model downloads |
COZY_HUB_URL |
- | Cozy hub base URL |
COZY_HUB_TOKEN |
- | Cozy hub bearer token |
HF_TOKEN |
- | Hugging Face token (for private hf: refs) |
By default, hf: model refs do not download the full repo. The worker uses huggingface_hub.snapshot_download(allow_patterns=...) to avoid pulling huge legacy weights.
Defaults:
- Download only what a diffusers pipeline needs (derived from
model_index.json). - Skip
safety_checkerandfeature_extractorby default. - Download only reduced-precision safetensors weights (
fp16/bf16); never download.ckptor.binby default. - For sharded safetensors, also download the
*.safetensors.index.jsonand the referenced shard files.
Overrides:
COZY_HF_COMPONENTS="unet,vae,text_encoder,tokenizer,scheduler": hard override component list.COZY_HF_INCLUDE_OPTIONAL_COMPONENTS=1: include components likesafety_checker/feature_extractorif present.COZY_HF_WEIGHT_PRECISIONS="fp16,bf16": change which weight suffixes are accepted (addfp32only if you really need it).COZY_HF_ALLOW_ROOT_JSON=1: allow additional small root*.jsonfiles (some repos need extra root config).COZY_HF_FULL_REPO_DOWNLOAD=1: disable filtering and download the entire repo (not recommended; can be 10s of GB).
Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.
my-worker/
├── pyproject.toml
├── uv.lock
└── src/
└── my_module/
└── __init__.py
ARG BASE_IMAGE=cozycreator/gen-runtime:cuda12.8-torch2.9
FROM ${BASE_IMAGE}
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
WORKDIR /app
COPY . /app
RUN if [ -f uv.lock ]; then uv sync --frozen --no-dev; else uv sync --no-dev; fi
RUN mkdir -p .cozy && python -m gen_worker.discover > .cozy/manifest.json
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]# Build
docker build -t my-worker .
# Run
docker run -e SCHEDULER_ADDR=orchestrator:8080 my-worker| Image | GPU | CUDA | PyTorch |
|---|---|---|---|
cozycreator/gen-runtime:cpu-torch2.9 |
No | - | 2.9 |
cozycreator/gen-runtime:cuda12.6-torch2.9 |
Yes | 12.6 | 2.9 |
cozycreator/gen-runtime:cuda12.8-torch2.9 |
Yes | 12.8 | 2.9 |
cozycreator/gen-runtime:cuda13-torch2.9 |
Yes | 13.0 | 2.9 |
Workers report model availability for intelligent job routing:
| State | Location | Latency |
|---|---|---|
| Hot | VRAM | Instant |
| Warm | Disk | Seconds |
| Cold | None | Minutes (download required) |
## Dev Testing (Mock Orchestrator)
For local end-to-end tests without standing up `gen-orchestrator`, you can run a mock orchestrator gRPC server and point a worker at it. This exercises the real worker gRPC protocol (ConnectWorker stream + TaskExecutionRequest/Result).
Start mock orchestrator (listens on port 8080 and runs a single function call):
```bash
python -m gen_worker.testing.mock_orchestrator --listen 0.0.0.0:8080 --run hello --payload-json '{"name":"world"}'Then start your worker container pointing SCHEDULER_ADDR to the host:
docker run --rm -e SCHEDULER_ADDR=host.docker.internal:8080 <your-worker-image>from gen_worker.model_cache import ModelCache
cache = ModelCache(max_vram_gb=20.0) cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0) cache.is_in_vram("model-a") # True cache.get_vram_models() # ["model-a"]
## Error Handling
```python
from gen_worker.errors import RetryableError, ValidationError, FatalError
@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
if not payload.prompt:
raise ValidationError("prompt is required") # 400, no retry
try:
result = call_external_api()
except TimeoutError:
raise RetryableError("API timeout") # Will be retried
return Output(result=result)
# Install dev dependencies
uv sync --extra dev
# Run tests
uv run pytest
# Type checking
uv run mypy src/gen_worker
# Build
uv buildRequires gen-orchestrator as a sibling repo:
uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.protoMIT