Skip to main content

Documentation Index

Fetch the complete documentation index at: https://documentation.datalab.to/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Pipelines chain processors (convert, extract, segment, custom) into reusable, versioned configurations. See Pipeline Overview for concepts.

Basic Usage

from datalab_sdk import DatalabClient, PipelineProcessor

client = DatalabClient()

# Create a pipeline
pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={
        "page_schema": {
            "type": "object",
            "properties": {
                "title": {"type": "string"},
                "date": {"type": "string"}
            }
        }
    })
])

# Save and publish
pipeline = client.save_pipeline(pipeline.pipeline_id, name="My Pipeline")
version = client.create_pipeline_version(pipeline.pipeline_id)

# Run
execution = client.run_pipeline(pipeline.pipeline_id, file_path="doc.pdf")
execution = client.get_pipeline_execution(execution.execution_id, max_polls=300)

# Get results
result = client.get_step_result(execution.execution_id, step_index=1)

Models

PipelineProcessor

Defines a single processor in a pipeline.
from datalab_sdk import PipelineProcessor

step = PipelineProcessor(
    type="extract",                      # Step type
    settings={"page_schema": {...}},     # Step-specific config
    custom_processor_id="cp_abc123",     # For custom steps
    eval_rubric_id=42,                   # Optional eval rubric
)
FieldTypeRequiredDescription
typestrYes"convert", "extract", "segment", or "custom"
settingsdictYesStep-specific configuration
custom_processor_idstrNoCustom processor ID for "custom" steps
eval_rubric_idintNoEvaluation rubric to apply

PipelineConfig

Returned by pipeline CRUD methods.
FieldTypeDescription
pipeline_idstrUnique ID (pl_XXXXX)
stepslistOrdered list of step definitions
namestrPipeline name (set via save_pipeline)
is_savedboolWhether pipeline has been saved
archivedboolWhether pipeline is archived
active_versionintCurrent published version (0 = no published version)
createddatetimeCreation timestamp
updateddatetimeLast update timestamp

PipelineVersion

Immutable snapshot of pipeline steps at a point in time.
FieldTypeDescription
versionintVersion number
stepslistSteps at this version
descriptionstrVersion description
createddatetimeWhen version was published

PipelineExecution

Result from running a pipeline.
FieldTypeDescription
execution_idstrUnique ID (pex_XXXXX)
pipeline_idstrPipeline that was executed
pipeline_versionintVersion used (0 = draft)
statusstrpending, running, completed, completed_with_errors, failed
stepslistList of PipelineExecutionStepResult
started_atdatetimeExecution start time
completed_atdatetimeExecution end time
createddatetimeWhen execution was created
config_snapshotdictFrozen step configuration used
input_configdictInput file details
rate_breakdowndictBilling breakdown

PipelineExecutionStepResult

Status of a single step within an execution.
FieldTypeDescription
step_indexintPosition in pipeline
step_typestrStep type
statusstrpending, dispatched, running, completed, failed, skipped
result_urlstrURL to fetch step result
checkpoint_idstrCheckpoint passed to downstream steps
started_atdatetimeStep start time
finished_atdatetimeStep end time
error_messagestrError details if failed

Pipeline Management

Create

pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={"page_schema": {...}})
])

Save

pipeline = client.save_pipeline(pipeline.pipeline_id, name="Invoice Parser")

Update

Creates a draft if a published version exists:
pipeline = client.update_pipeline(pipeline.pipeline_id, steps=[
    PipelineProcessor(type="convert", settings={"mode": "accurate"}),
    PipelineProcessor(type="extract", settings={"page_schema": {...}})
])

List

result = client.list_pipelines(
    saved_only=True,           # Only saved pipelines (default)
    include_archived=False,    # Include archived (default: False)
    limit=50,
    offset=0
)

for p in result["pipelines"]:
    print(f"{p.pipeline_id}: {p.name}")

Get

pipeline = client.get_pipeline("pl_abc123")

Archive / Unarchive

client.archive_pipeline("pl_abc123")
client.unarchive_pipeline("pl_abc123")

Versioning

Publish a Version

version = client.create_pipeline_version(
    "pl_abc123",
    description="Added line items extraction"
)
print(f"Published v{version.version}")

List Versions

result = client.list_pipeline_versions("pl_abc123")
for v in result["versions"]:
    print(f"v{v.version}: {v.description}")

Discard Draft

# Revert to active published version
pipeline = client.discard_pipeline_draft("pl_abc123")

# Revert to a specific version
pipeline = client.discard_pipeline_draft("pl_abc123", version=1)

Get Rate

rate = client.get_pipeline_rate("pl_abc123")
print(f"{rate['rate_per_1000_pages_cents']} cents per 1000 pages")

Execution

Run

execution = client.run_pipeline(
    "pl_abc123",
    file_path="document.pdf",     # or file_url="https://..."
    page_range="0-10",
    output_format="json",
    skip_cache=False,
    run_evals=False,
    webhook_url="https://example.com/hook",
    version=2,                    # omit for active version
    max_polls=1,                  # polls after submission
    poll_interval=1,
)
ParameterTypeDefaultDescription
pipeline_idstrRequiredPipeline to run
file_pathstr-Local file path
file_urlstr-URL to document
page_rangestr-Pages to process ("0-5,10")
output_formatstr-Override output format
skip_cacheboolFalseSkip cached results
run_evalsboolFalseRun eval rubrics on steps
webhook_urlstr-Webhook URL for completion
versionint-Version to run (omit=active, 0=draft, N=specific)
max_pollsint1Polling attempts
poll_intervalint1Seconds between polls

Poll Execution

execution = client.get_pipeline_execution(
    "pex_abc123",
    max_polls=300,
    poll_interval=2
)

List Executions

result = client.list_pipeline_executions("pl_abc123", limit=20)
for ex in result["executions"]:
    print(f"{ex.execution_id}: {ex.status}")

Get Step Result

result = client.get_step_result("pex_abc123", step_index=1)

Async Usage

All pipeline methods are available on AsyncDatalabClient:
import asyncio
from datalab_sdk import AsyncDatalabClient, PipelineProcessor

async def run():
    async with AsyncDatalabClient() as client:
        pipeline = await client.create_pipeline(steps=[
            PipelineProcessor(type="convert", settings={"mode": "balanced"}),
            PipelineProcessor(type="extract", settings={"page_schema": {
                "type": "object",
                "properties": {"title": {"type": "string"}}
            }})
        ])

        pipeline = await client.save_pipeline(
            pipeline.pipeline_id, name="Async Pipeline"
        )

        execution = await client.run_pipeline(
            pipeline.pipeline_id, file_path="doc.pdf"
        )

        execution = await client.get_pipeline_execution(
            execution.execution_id, max_polls=300
        )

        result = await client.get_step_result(
            execution.execution_id, step_index=1
        )
        return result

result = asyncio.run(run())

Next Steps

Pipeline Overview

Concepts, processor types, and when to use pipelines.

Create a Pipeline

Step-by-step guide to building pipelines.

Pipeline Versioning

Manage drafts and publish versions.

Run a Pipeline

Execution, overrides, and result retrieval.