Skip to main content

Overview

Pipelines chain processors (convert, extract, segment, custom) into reusable, versioned configurations. See Pipeline Overview for concepts.

Basic Usage

from datalab_sdk import DatalabClient, PipelineProcessor

client = DatalabClient()

# Create a pipeline
pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={
        "page_schema": {
            "type": "object",
            "properties": {
                "title": {"type": "string"},
                "date": {"type": "string"}
            }
        }
    })
])

# Save and publish
pipeline = client.save_pipeline(pipeline.pipeline_id, name="My Pipeline")
version = client.create_pipeline_version(pipeline.pipeline_id)

# Run
execution = client.run_pipeline(pipeline.pipeline_id, file_path="doc.pdf")
execution = client.get_pipeline_execution(execution.execution_id, max_polls=300)

# Get results
result = client.get_step_result(execution.execution_id, step_index=1)

Models

PipelineProcessor

Defines a single processor in a pipeline.
from datalab_sdk import PipelineProcessor

step = PipelineProcessor(
    type="extract",                      # Step type
    settings={"page_schema": {...}},     # Step-specific config
    custom_processor_id="cp_abc123",     # For custom steps
    eval_rubric_id=42,                   # Optional eval rubric
)
FieldTypeRequiredDescription
typestrYes"convert", "extract", "segment", or "custom"
settingsdictYesStep-specific configuration
custom_processor_idstrNoCustom processor ID for "custom" steps
eval_rubric_idintNoEvaluation rubric to apply

PipelineConfig

Returned by pipeline CRUD methods.
FieldTypeDescription
pipeline_idstrUnique ID (pl_XXXXX)
stepslistOrdered list of step definitions
namestrPipeline name (set via save_pipeline)
is_savedboolWhether pipeline has been saved
archivedboolWhether pipeline is archived
active_versionintCurrent published version (0 = no published version)
createddatetimeCreation timestamp
updateddatetimeLast update timestamp

PipelineVersion

Immutable snapshot of pipeline steps at a point in time.
FieldTypeDescription
versionintVersion number
stepslistSteps at this version
descriptionstrVersion description
createddatetimeWhen version was published

PipelineExecution

Result from running a pipeline.
FieldTypeDescription
execution_idstrUnique ID (pex_XXXXX)
pipeline_idstrPipeline that was executed
pipeline_versionintVersion used (0 = draft)
statusstrpending, running, completed, completed_with_errors, failed
stepslistList of PipelineExecutionStepResult
started_atdatetimeExecution start time
completed_atdatetimeExecution end time
createddatetimeWhen execution was created
config_snapshotdictFrozen step configuration used
input_configdictInput file details
rate_breakdowndictBilling breakdown

PipelineExecutionStepResult

Status of a single step within an execution.
FieldTypeDescription
step_indexintPosition in pipeline
step_typestrStep type
statusstrpending, dispatched, running, completed, failed, skipped
result_urlstrURL to fetch step result
checkpoint_idstrCheckpoint passed to downstream steps
started_atdatetimeStep start time
finished_atdatetimeStep end time
error_messagestrError details if failed

Pipeline Management

Create

pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={"page_schema": {...}})
])

Save

pipeline = client.save_pipeline(pipeline.pipeline_id, name="Invoice Parser")

Update

Creates a draft if a published version exists:
pipeline = client.update_pipeline(pipeline.pipeline_id, steps=[
    PipelineProcessor(type="convert", settings={"mode": "accurate"}),
    PipelineProcessor(type="extract", settings={"page_schema": {...}})
])

List

result = client.list_pipelines(
    saved_only=True,           # Only saved pipelines (default)
    include_archived=False,    # Include archived (default: False)
    limit=50,
    offset=0
)

for p in result["pipelines"]:
    print(f"{p.pipeline_id}: {p.name}")

Get

pipeline = client.get_pipeline("pl_abc123")

Archive / Unarchive

client.archive_pipeline("pl_abc123")
client.unarchive_pipeline("pl_abc123")

Versioning

Publish a Version

version = client.create_pipeline_version(
    "pl_abc123",
    description="Added line items extraction"
)
print(f"Published v{version.version}")

List Versions

result = client.list_pipeline_versions("pl_abc123")
for v in result["versions"]:
    print(f"v{v.version}: {v.description}")

Discard Draft

# Revert to active published version
pipeline = client.discard_pipeline_draft("pl_abc123")

# Revert to a specific version
pipeline = client.discard_pipeline_draft("pl_abc123", version=1)

Get Rate

rate = client.get_pipeline_rate("pl_abc123")
print(f"{rate['rate_per_1000_pages_cents']} cents per 1000 pages")

Execution

Run

execution = client.run_pipeline(
    "pl_abc123",
    file_path="document.pdf",     # or file_url="https://..."
    page_range="0-10",
    output_format="json",
    skip_cache=False,
    run_evals=False,
    webhook_url="https://example.com/hook",
    version=2,                    # omit for active version
    max_polls=1,                  # polls after submission
    poll_interval=1,
)
ParameterTypeDefaultDescription
pipeline_idstrRequiredPipeline to run
file_pathstr-Local file path
file_urlstr-URL to document
page_rangestr-Pages to process ("0-5,10")
output_formatstr-Override output format
skip_cacheboolFalseSkip cached results
run_evalsboolFalseRun eval rubrics on steps
webhook_urlstr-Webhook URL for completion
versionint-Version to run (omit=active, 0=draft, N=specific)
max_pollsint1Polling attempts
poll_intervalint1Seconds between polls

Poll Execution

execution = client.get_pipeline_execution(
    "pex_abc123",
    max_polls=300,
    poll_interval=2
)

List Executions

result = client.list_pipeline_executions("pl_abc123", limit=20)
for ex in result["executions"]:
    print(f"{ex.execution_id}: {ex.status}")

Get Step Result

result = client.get_step_result("pex_abc123", step_index=1)

Async Usage

All pipeline methods are available on AsyncDatalabClient:
import asyncio
from datalab_sdk import AsyncDatalabClient, PipelineProcessor

async def run():
    async with AsyncDatalabClient() as client:
        pipeline = await client.create_pipeline(steps=[
            PipelineProcessor(type="convert", settings={"mode": "balanced"}),
            PipelineProcessor(type="extract", settings={"page_schema": {
                "type": "object",
                "properties": {"title": {"type": "string"}}
            }})
        ])

        pipeline = await client.save_pipeline(
            pipeline.pipeline_id, name="Async Pipeline"
        )

        execution = await client.run_pipeline(
            pipeline.pipeline_id, file_path="doc.pdf"
        )

        execution = await client.get_pipeline_execution(
            execution.execution_id, max_polls=300
        )

        result = await client.get_step_result(
            execution.execution_id, step_index=1
        )
        return result

result = asyncio.run(run())

Next Steps

Pipeline Overview

Concepts, processor types, and when to use pipelines.

Create a Pipeline

Step-by-step guide to building pipelines.

Pipeline Versioning

Manage drafts and publish versions.

Run a Pipeline

Execution, overrides, and result retrieval.