Skip to content

swiss-ai-hub-pipeline

The data-ingestion SDK for Swiss AI Hub — turn documents into RAG-ready vectors with Dagster.

PyPIPythonLicense


What is Swiss AI Hub?

Swiss AI Hub is an open-source, self-hosted AI platform for enterprises. One docker compose up starts ~30 integrated containers — LLM gateway (LiteLLM), vector search (Milvus), document parsing (MinerU), S3 storage (SeaweedFS), SSO (Keycloak), observability (Langfuse), a chat UI (Open-WebUI), and more. Agents answer questions over your organization's knowledge; this package is how that knowledge gets in.

What is this package?

swiss-ai-hub-pipeline is a Dagster-based SDK that ingests documents and produces the vectors RAG agents search. It implements a two-stage, asset-based pipeline:

  1. Source → data lake — monitor a source (SharePoint, OneDrive, Google Drive, S3, local/network shares — anything rclone supports) and sync changed files into the platform's S3 (SeaweedFS).
  2. Data lake → vector store — parse each file (MinerU OCR + structure), chunk it, embed it via the LLM gateway, and upsert the vectors into Milvus, with full lineage from every embedding back to its source document.

You compose a pipeline from one function, default_definitions(), which wires together all the assets, resources, IO managers, sensors, jobs, and schedules. It builds on swiss-ai-hub-core (installed automatically); RAG agents from swiss-ai-hub-agent query its output.

Should you use this package?

Probably not directly — most deployments use the pre-built pipeline images (default_rag_pipeline, shared_rag_pipeline), which ingest the platform's default buckets out of the box.

Use this PyPI package when you want a custom pipeline — connect a new data source, ingest into a different bucket, or tune parsing/chunking/embedding for your documents. It's an SDK for building your own ingestion as a Dagster code location.

Installation

bash
pip install swiss-ai-hub-pipeline
# or
uv add swiss-ai-hub-pipeline

Requires Python 3.13.


Quick start

A pipeline is a Dagster code location — a module that exposes a Definitions object. default_definitions() builds a complete one:

python
# my_pipeline/__init__.py
from swiss_ai_hub.pipeline import default_definitions

defs = default_definitions(
    datalake_container_name="my_docs",                  # S3 bucket (Dagster name: letters, digits, underscores)
    embedding_model_name="embedding/bge-m3",            # any embedding model on the LiteLLM gateway
    llm_model_name="text-generation/gemma-4-31B-it",    # for summaries / table & figure refinement
    with_summary_nodes=True,                            # hierarchical RAG summaries
)

Run it with the Dagster UI and materialize the assets:

bash
dagster dev -m my_pipeline      # opens http://localhost:3000

Drop a document into the my_docs bucket, click Materialize on the asset graph, and watch it flow: observe → documents (parse) → nodes (chunk + embed) → Milvus. A RAG agent pointed at that bucket can now answer questions over it.

To also pull from an external source, combine default_definitions() with a Stage-1 builder — e.g. default_rclone_to_datalake_definitions(...) for OneDrive/Google Drive/Dropbox, or default_sharepoint_to_datalake_definitions(...). The source templates (SharePoint, OneDrive, S3, Azure Blob, Google Drive, SFTP, local FS) are copy-paste starting points.


How it works

default_definitions() assembles a graph of Dagster assets connected by IO managers to the platform's stores:

StageAssetsBacked by
Source → data lakeobservable_*, data_lake_file, removed_data_lake_filesSeaweedFS (S3)
Data lake → vector storedocuments (parse), nodes (chunk + embed), summary_nodes, removed_documentsMinerU, LiteLLM, MongoDB, Milvus

Materialization is driven by eager automation, daily schedules, and a NATS sensor that fires when documents are uploaded through the API — so ingestion keeps up with changes without manual runs. Key default_definitions() knobs: with_summary_nodes, with_table_refinement, with_figure_descriptions, document_parser_loader_type (MinerU or Document Intelligence), and max_partitions.


Development

The dev stack runs the infrastructure a pipeline needs — SeaweedFS (S3), MongoDB, Milvus, MinerU, and the LiteLLM gateway — and exposes it on localhost:

bash
# 1. Start the platform infrastructure (from a Swiss AI Hub checkout)
docker compose --env-file .env -f infra/docker-compose.dev.yml up -d

# 2. Load the dev connection settings into your shell
set -a && source .env && set +a

# 3. Run your pipeline's Dagster UI against the stack
dagster dev -m my_pipeline       # http://localhost:3000

Materialize assets from the UI to parse, embed, and store real documents. dagster definitions validate -m my_pipeline loads the whole code location (every asset, resource, and IO manager) without running it — handy as a fast sanity check and in CI.

Settings are not auto-loaded from the environment. The SDK reads connection settings only when constructed, so make sure the variables above are exported in the process that runs Dagster (set -a && source .env && set +a).

Production

In production a pipeline runs as a Dagster code location: a gRPC server in a container that the platform's Dagster webserver and daemon connect to.

1. Containerize it as a gRPC code-location server:

dockerfile
FROM python:3.13-slim
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /app
COPY pyproject.toml uv.lock ./        # your project, depending on swiss-ai-hub-pipeline
RUN uv sync --frozen --no-dev
COPY . .

ENV PATH="/app/.venv/bin:$PATH" PYTHONUNBUFFERED=1
EXPOSE 4000
ENTRYPOINT ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_pipeline"]

2. Run it alongside the platform on the right networks — a pipeline reaches MinerU + LiteLLM (backend), MongoDB + Milvus + NATS (data), and SeaweedFS/S3 (storage):

yaml
# docker-compose.my-pipeline.yml — deployed alongside the platform
services:
  my-pipeline:
    image: registry.example.com/my-pipeline:1.0.0
    restart: always
    environment:
      MONGO_CONNECTION_STRING: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@ferretdb:27017/
      MILVUS_URL: http://milvus-standalone:19530
      S3_STORAGE_ENDPOINT: http://seaweedfs-s3:9000
      S3_STORAGE_ACCESS_KEY: ${S3_STORAGE_ACCESS_KEY}
      S3_STORAGE_SECRET_KEY: ${S3_STORAGE_SECRET_KEY}
      LITE_LLM_PROXY_BASE_URL: http://litellm:4000
      LITE_LLM_PROXY_API_KEY: ${LITELLM_MASTER_KEY}
      MINERU_API_BASE_URL: http://mineru-api:8000
      NATS_ENDPOINT: nats://nats:4222
      NATS_TOKEN: ${NATS_TOKEN}
    networks: [backend, data, storage]

networks:
  backend: { external: true }
  data: { external: true }
  storage: { external: true }

3. Register it in the platform's Dagster workspace so the webserver/daemon load it:

yaml
# workspace.yaml
load_from:
  - grpc_server:
      host: my-pipeline      # the service name above
      port: 4000
      location_name: my-pipeline
bash
docker compose -f docker-compose.my-pipeline.yml up -d

Reuse the platform's secrets (from its .env) for the ${…} values, and match the actual network names of your deployment. Your pipeline then shows up as a code location in the platform's Dagster UI, with its schedules and sensors running under the shared daemon.

Network reference. backend = LiteLLM, MinerU, OTEL. data = NATS, FerretDB, Milvus. storage = SeaweedFS/S3.


License

Apache-2.0 — see packages/pipeline/LICENSE. For the full per-package license matrix, see LICENSES.md.


Part of Swiss AI Hub. Built in Switzerland by bbv Software Services.

Built with ❤️ in Switzerland 🇨🇭