Skip to main content
Before you begin, make sure you have:
  1. A Datalab account with an API key (new accounts include $5 in free credits)
  2. Python 3.10+ installed
  3. The Datalab SDK: pip install datalab-python-sdk
  4. Your DATALAB_API_KEY environment variable set

Basic Execution

Run a pipeline on a document:
from datalab_sdk import DatalabClient

client = DatalabClient()

execution = client.run_pipeline(
    "pl_abc123",
    file_path="document.pdf"
)

# Poll until complete
execution = client.get_pipeline_execution(
    execution.execution_id,
    max_polls=300,
    poll_interval=2
)

print(f"Status: {execution.status}")
You can also pass a URL instead of a file:
execution = client.run_pipeline(
    "pl_abc123",
    file_url="https://example.com/document.pdf"
)

Version Selection

The version parameter controls which pipeline configuration runs:
ValueBehavior
Omitted / NoneRuns the active published version. If no version is published, runs the draft.
0Explicitly runs the draft (current unpublished edits).
1, 2, …Runs a specific published version.
# Run active published version (recommended for production)
execution = client.run_pipeline("pl_abc123", file_path="doc.pdf")

# Run draft for testing
execution = client.run_pipeline("pl_abc123", file_path="doc.pdf", version=0)

# Pin to specific version
execution = client.run_pipeline("pl_abc123", file_path="doc.pdf", version=2)
If you omit version and no version has been published, the draft runs. Publish a version before using a pipeline in production to avoid running unfinished drafts.

Run-Level Overrides

Override pipeline behavior per execution without changing the pipeline configuration:
execution = client.run_pipeline(
    "pl_abc123",
    file_path="document.pdf",
    page_range="0-5",          # Process specific pages
    output_format="json",      # Override output format
    skip_cache=True,           # Force reprocessing (skip cached results)
    run_evals=True,            # Run evaluation rubrics defined on steps
    webhook_url="https://example.com/webhook",  # Notify on completion
    version=2,                 # Pin to version 2
)

Override Reference

ParameterTypeDefaultDescription
file_pathstr-Local file to process (mutually exclusive with file_url)
file_urlstr-URL to document
page_rangestr-Pages to process (e.g., "0-5,10", 0-indexed)
output_formatstr-Override output format: markdown, html, json, chunks
skip_cacheboolFalseSkip cached results, reprocess from scratch
run_evalsboolFalseRun evaluation rubrics configured on steps
webhook_urlstr-URL to POST when execution completes
versionint-Pipeline version to run (see above)
max_pollsint1Polling attempts after submission
poll_intervalint1Seconds between polls

Execution Status

Poll for status using get_pipeline_execution():
execution = client.get_pipeline_execution(
    execution.execution_id,
    max_polls=300,      # Keep polling until complete
    poll_interval=2     # Check every 2 seconds
)

print(f"Status: {execution.status}")
print(f"Version: {execution.pipeline_version}")
print(f"Started: {execution.started_at}")
print(f"Completed: {execution.completed_at}")

Status Values

StatusDescription
pendingQueued, not started
runningProcessors are executing
completedAll steps finished successfully
completed_with_errorsSome steps completed, some failed
failedExecution failed

Per-Processor Tracking

Each processor in the execution reports its own status:
for step in execution.steps:
    print(f"Step {step.step_index} ({step.step_type}): {step.status}")
    if step.error_message:
        print(f"  Error: {step.error_message}")
    if step.result_url:
        print(f"  Result available")
Step status values: pending, dispatched, running, completed, failed, skipped.

Retrieve Processor Results

Fetch the output of a specific processor:
# Get result for step at index 1 (e.g., extract step)
result = client.get_step_result(execution.execution_id, step_index=1)
print(result)
curl https://www.datalab.to/api/v1/pipelines/executions/EXECUTION_ID/steps/1/result \
  -H "X-API-Key: $DATALAB_API_KEY"
Results are deleted from Datalab servers one hour after processing completes. Retrieve your results promptly.

Webhooks

Get notified when a pipeline execution completes instead of polling:
execution = client.run_pipeline(
    "pl_abc123",
    file_path="document.pdf",
    webhook_url="https://your-server.com/pipeline-webhook"
)
Datalab sends a POST request to your webhook URL when the execution reaches a terminal status. See Webhooks for payload details.

List Executions

View recent executions for a pipeline:
result = client.list_pipeline_executions("pl_abc123", limit=20)

for ex in result["executions"]:
    print(f"{ex.execution_id}: {ex.status} (v{ex.pipeline_version})")

Billing

Pipeline execution is billed per page, with rates additive across processors. Each processor type has its own per-page rate. Check a pipeline’s rate before running:
rate = client.get_pipeline_rate("pl_abc123")
print(f"Rate per 1000 pages: {rate['rate_per_1000_pages_cents']} cents")
print(f"Breakdown: {rate['rate_breakdown']}")

End-to-End Example

Create a pipeline, publish it, and run it in production:
from datalab_sdk import DatalabClient, PipelineProcessor

client = DatalabClient()

# 1. Create and save
pipeline = client.create_pipeline(steps=[
    PipelineProcessor(type="convert", settings={"mode": "balanced"}),
    PipelineProcessor(type="extract", settings={
        "page_schema": {
            "type": "object",
            "properties": {
                "vendor": {"type": "string", "description": "Vendor name"},
                "amount": {"type": "number", "description": "Total amount"},
                "date": {"type": "string", "description": "Invoice date"}
            }
        }
    })
])
pipeline = client.save_pipeline(pipeline.pipeline_id, name="Invoice Parser")

# 2. Test the draft
test_exec = client.run_pipeline(
    pipeline.pipeline_id,
    file_path="test_invoice.pdf",
    version=0
)
test_exec = client.get_pipeline_execution(test_exec.execution_id, max_polls=300)
test_result = client.get_step_result(test_exec.execution_id, step_index=1)
print(f"Test result: {test_result}")

# 3. Publish
version = client.create_pipeline_version(
    pipeline.pipeline_id,
    description="Initial release — balanced mode, basic fields"
)

# 4. Run in production (pinned to version)
execution = client.run_pipeline(
    pipeline.pipeline_id,
    file_path="real_invoice.pdf",
    version=version.version
)
execution = client.get_pipeline_execution(execution.execution_id, max_polls=300)

if execution.status == "completed":
    result = client.get_step_result(execution.execution_id, step_index=1)
    print(f"Extracted: {result}")
else:
    for step in execution.steps:
        if step.error_message:
            print(f"Step {step.step_index} failed: {step.error_message}")

Next Steps

Pipeline Overview

Processor types, composition rules, and when to use pipelines.

Pipeline Versioning

Manage drafts, versions, and production pinning.

Webhooks

Configure webhook notifications for pipeline executions.

SDK Reference

Full SDK reference for all pipeline methods.