> ## Documentation Index
> Fetch the complete documentation index at: https://documentation.datalab.to/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipelines

> Create, version, and run document processing pipelines using the Datalab SDK.

## Overview

Pipelines chain processors (convert, extract, segment, custom) into reusable, versioned configurations. See [Pipeline Overview](/docs/recipes/pipelines/pipeline-overview) for concepts.

## Basic Usage

```python theme={null}
from datalab_sdk import DatalabClient, PipelineProcessor

client = DatalabClient()

# Create a pipeline
pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={
        "page_schema": {
            "type": "object",
            "properties": {
                "title": {"type": "string"},
                "date": {"type": "string"}
            }
        }
    })
])

# Save and publish
pipeline = client.save_pipeline(pipeline.pipeline_id, name="My Pipeline")
version = client.create_pipeline_version(pipeline.pipeline_id)

# Run
execution = client.run_pipeline(pipeline.pipeline_id, file_path="doc.pdf")
execution = client.get_pipeline_execution(execution.execution_id, max_polls=300)

# Get results
result = client.get_step_result(execution.execution_id, step_index=1)
```

## Models

### PipelineProcessor

Defines a single processor in a pipeline.

```python theme={null}
from datalab_sdk import PipelineProcessor

step = PipelineProcessor(
    type="extract",                      # Step type
    settings={"page_schema": {...}},     # Step-specific config
    custom_processor_id="cp_abc123",     # For custom steps
    eval_rubric_id=42,                   # Optional eval rubric
)
```

| Field                 | Type | Required | Description                                          |
| --------------------- | ---- | -------- | ---------------------------------------------------- |
| `type`                | str  | Yes      | `"convert"`, `"extract"`, `"segment"`, or `"custom"` |
| `settings`            | dict | Yes      | Step-specific configuration                          |
| `custom_processor_id` | str  | No       | Custom processor ID for `"custom"` steps             |
| `eval_rubric_id`      | int  | No       | Evaluation rubric to apply                           |

### PipelineConfig

Returned by pipeline CRUD methods.

| Field            | Type     | Description                                            |
| ---------------- | -------- | ------------------------------------------------------ |
| `pipeline_id`    | str      | Unique ID (`pl_XXXXX`)                                 |
| `steps`          | list     | Ordered list of step definitions                       |
| `name`           | str      | Pipeline name (set via `save_pipeline`)                |
| `is_saved`       | bool     | Whether pipeline has been saved                        |
| `archived`       | bool     | Whether pipeline is archived                           |
| `active_version` | int      | Current published version (`0` = no published version) |
| `created`        | datetime | Creation timestamp                                     |
| `updated`        | datetime | Last update timestamp                                  |

### PipelineVersion

Immutable snapshot of pipeline steps at a point in time.

| Field         | Type     | Description                |
| ------------- | -------- | -------------------------- |
| `version`     | int      | Version number             |
| `steps`       | list     | Steps at this version      |
| `description` | str      | Version description        |
| `created`     | datetime | When version was published |

### PipelineExecution

Result from running a pipeline.

| Field              | Type     | Description                                                          |
| ------------------ | -------- | -------------------------------------------------------------------- |
| `execution_id`     | str      | Unique ID (`pex_XXXXX`)                                              |
| `pipeline_id`      | str      | Pipeline that was executed                                           |
| `pipeline_version` | int      | Version used (`0` = draft)                                           |
| `status`           | str      | `pending`, `running`, `completed`, `completed_with_errors`, `failed` |
| `steps`            | list     | List of `PipelineExecutionStepResult`                                |
| `started_at`       | datetime | Execution start time                                                 |
| `completed_at`     | datetime | Execution end time                                                   |
| `created`          | datetime | When execution was created                                           |
| `config_snapshot`  | dict     | Frozen step configuration used                                       |
| `input_config`     | dict     | Input file details                                                   |
| `rate_breakdown`   | dict     | Billing breakdown                                                    |

### PipelineExecutionStepResult

Status of a single step within an execution.

| Field           | Type     | Description                                                          |
| --------------- | -------- | -------------------------------------------------------------------- |
| `step_index`    | int      | Position in pipeline                                                 |
| `step_type`     | str      | Step type                                                            |
| `status`        | str      | `pending`, `dispatched`, `running`, `completed`, `failed`, `skipped` |
| `result_url`    | str      | URL to fetch step result                                             |
| `checkpoint_id` | str      | Checkpoint passed to downstream steps                                |
| `started_at`    | datetime | Step start time                                                      |
| `finished_at`   | datetime | Step end time                                                        |
| `error_message` | str      | Error details if failed                                              |

## Pipeline Management

### Create

```python theme={null}
pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={"page_schema": {...}})
])
```

### Save

```python theme={null}
pipeline = client.save_pipeline(pipeline.pipeline_id, name="Invoice Parser")
```

### Update

Creates a draft if a published version exists:

```python theme={null}
pipeline = client.update_pipeline(pipeline.pipeline_id, steps=[
    PipelineProcessor(type="convert", settings={"mode": "accurate"}),
    PipelineProcessor(type="extract", settings={"page_schema": {...}})
])
```

### List

```python theme={null}
result = client.list_pipelines(
    saved_only=True,           # Only saved pipelines (default)
    include_archived=False,    # Include archived (default: False)
    limit=50,
    offset=0
)

for p in result["pipelines"]:
    print(f"{p.pipeline_id}: {p.name}")
```

### Get

```python theme={null}
pipeline = client.get_pipeline("pl_abc123")
```

### Archive / Unarchive

```python theme={null}
client.archive_pipeline("pl_abc123")
client.unarchive_pipeline("pl_abc123")
```

## Versioning

### Publish a Version

```python theme={null}
version = client.create_pipeline_version(
    "pl_abc123",
    description="Added line items extraction"
)
print(f"Published v{version.version}")
```

### List Versions

```python theme={null}
result = client.list_pipeline_versions("pl_abc123")
for v in result["versions"]:
    print(f"v{v.version}: {v.description}")
```

### Discard Draft

```python theme={null}
# Revert to active published version
pipeline = client.discard_pipeline_draft("pl_abc123")

# Revert to a specific version
pipeline = client.discard_pipeline_draft("pl_abc123", version=1)
```

### Get Rate

```python theme={null}
rate = client.get_pipeline_rate("pl_abc123")
print(f"{rate['rate_per_1000_pages_cents']} cents per 1000 pages")
```

## Execution

### Run

```python theme={null}
execution = client.run_pipeline(
    "pl_abc123",
    file_path="document.pdf",     # or file_url="https://..."
    page_range="0-10",
    output_format="json",
    skip_cache=False,
    run_evals=False,
    webhook_url="https://example.com/hook",
    version=2,                    # omit for active version
    max_polls=1,                  # polls after submission
    poll_interval=1,
)
```

| Parameter       | Type | Default  | Description                                       |
| --------------- | ---- | -------- | ------------------------------------------------- |
| `pipeline_id`   | str  | Required | Pipeline to run                                   |
| `file_path`     | str  | -        | Local file path                                   |
| `file_url`      | str  | -        | URL to document                                   |
| `page_range`    | str  | -        | Pages to process (`"0-5,10"`)                     |
| `output_format` | str  | -        | Override output format                            |
| `skip_cache`    | bool | `False`  | Skip cached results                               |
| `run_evals`     | bool | `False`  | Run eval rubrics on steps                         |
| `webhook_url`   | str  | -        | Webhook URL for completion                        |
| `version`       | int  | -        | Version to run (omit=active, 0=draft, N=specific) |
| `max_polls`     | int  | `1`      | Polling attempts                                  |
| `poll_interval` | int  | `1`      | Seconds between polls                             |

### Poll Execution

```python theme={null}
execution = client.get_pipeline_execution(
    "pex_abc123",
    max_polls=300,
    poll_interval=2
)
```

### List Executions

```python theme={null}
result = client.list_pipeline_executions("pl_abc123", limit=20)
for ex in result["executions"]:
    print(f"{ex.execution_id}: {ex.status}")
```

### Get Step Result

```python theme={null}
result = client.get_step_result("pex_abc123", step_index=1)
```

## Async Usage

All pipeline methods are available on `AsyncDatalabClient`:

```python theme={null}
import asyncio
from datalab_sdk import AsyncDatalabClient, PipelineProcessor

async def run():
    async with AsyncDatalabClient() as client:
        pipeline = await client.create_pipeline(steps=[
            PipelineProcessor(type="convert", settings={"mode": "balanced"}),
            PipelineProcessor(type="extract", settings={"page_schema": {
                "type": "object",
                "properties": {"title": {"type": "string"}}
            }})
        ])

        pipeline = await client.save_pipeline(
            pipeline.pipeline_id, name="Async Pipeline"
        )

        execution = await client.run_pipeline(
            pipeline.pipeline_id, file_path="doc.pdf"
        )

        execution = await client.get_pipeline_execution(
            execution.execution_id, max_polls=300
        )

        result = await client.get_step_result(
            execution.execution_id, step_index=1
        )
        return result

result = asyncio.run(run())
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Pipeline Overview" icon="sitemap" href="/docs/recipes/pipelines/pipeline-overview">
    Concepts, processor types, and when to use pipelines.
  </Card>

  <Card title="Create a Pipeline" icon="hammer" href="/docs/recipes/pipelines/create-pipeline">
    Step-by-step guide to building pipelines.
  </Card>

  <Card title="Pipeline Versioning" icon="code-branch" href="/docs/recipes/pipelines/pipeline-versioning">
    Manage drafts and publish versions.
  </Card>

  <Card title="Run a Pipeline" icon="play" href="/docs/recipes/pipelines/run-pipeline">
    Execution, overrides, and result retrieval.
  </Card>
</CardGroup>
