Documentation Index Fetch the complete documentation index at: https://documentation.datalab.to/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Pipelines chain processors (convert, extract, segment, custom) into reusable, versioned configurations. See Pipeline Overview for concepts.
Basic Usage
from datalab_sdk import DatalabClient, PipelineProcessor
client = DatalabClient()
# Create a pipeline
pipeline = client.create_pipeline( steps = [
PipelineProcessor( type = "convert" , settings = { "mode" : "balanced" }),
PipelineProcessor( type = "extract" , settings = {
"page_schema" : {
"type" : "object" ,
"properties" : {
"title" : { "type" : "string" },
"date" : { "type" : "string" }
}
}
})
])
# Save and publish
pipeline = client.save_pipeline(pipeline.pipeline_id, name = "My Pipeline" )
version = client.create_pipeline_version(pipeline.pipeline_id)
# Run
execution = client.run_pipeline(pipeline.pipeline_id, file_path = "doc.pdf" )
execution = client.get_pipeline_execution(execution.execution_id, max_polls = 300 )
# Get results
result = client.get_step_result(execution.execution_id, step_index = 1 )
Models
PipelineProcessor
Defines a single processor in a pipeline.
from datalab_sdk import PipelineProcessor
step = PipelineProcessor(
type = "extract" , # Step type
settings = { "page_schema" : { ... }}, # Step-specific config
custom_processor_id = "cp_abc123" , # For custom steps
eval_rubric_id = 42 , # Optional eval rubric
)
Field Type Required Description typestr Yes "convert", "extract", "segment", or "custom"settingsdict Yes Step-specific configuration custom_processor_idstr No Custom processor ID for "custom" steps eval_rubric_idint No Evaluation rubric to apply
PipelineConfig
Returned by pipeline CRUD methods.
Field Type Description pipeline_idstr Unique ID (pl_XXXXX) stepslist Ordered list of step definitions namestr Pipeline name (set via save_pipeline) is_savedbool Whether pipeline has been saved archivedbool Whether pipeline is archived active_versionint Current published version (0 = no published version) createddatetime Creation timestamp updateddatetime Last update timestamp
PipelineVersion
Immutable snapshot of pipeline steps at a point in time.
Field Type Description versionint Version number stepslist Steps at this version descriptionstr Version description createddatetime When version was published
PipelineExecution
Result from running a pipeline.
Field Type Description execution_idstr Unique ID (pex_XXXXX) pipeline_idstr Pipeline that was executed pipeline_versionint Version used (0 = draft) statusstr pending, running, completed, completed_with_errors, failedstepslist List of PipelineExecutionStepResult started_atdatetime Execution start time completed_atdatetime Execution end time createddatetime When execution was created config_snapshotdict Frozen step configuration used input_configdict Input file details rate_breakdowndict Billing breakdown
PipelineExecutionStepResult
Status of a single step within an execution.
Field Type Description step_indexint Position in pipeline step_typestr Step type statusstr pending, dispatched, running, completed, failed, skippedresult_urlstr URL to fetch step result checkpoint_idstr Checkpoint passed to downstream steps started_atdatetime Step start time finished_atdatetime Step end time error_messagestr Error details if failed
Pipeline Management
Create
pipeline = client.create_pipeline( steps = [
PipelineProcessor( type = "convert" , settings = { "mode" : "balanced" }),
PipelineProcessor( type = "extract" , settings = { "page_schema" : { ... }})
])
Save
pipeline = client.save_pipeline(pipeline.pipeline_id, name = "Invoice Parser" )
Update
Creates a draft if a published version exists:
pipeline = client.update_pipeline(pipeline.pipeline_id, steps = [
PipelineProcessor( type = "convert" , settings = { "mode" : "accurate" }),
PipelineProcessor( type = "extract" , settings = { "page_schema" : { ... }})
])
List
result = client.list_pipelines(
saved_only = True , # Only saved pipelines (default)
include_archived = False , # Include archived (default: False)
limit = 50 ,
offset = 0
)
for p in result[ "pipelines" ]:
print ( f " { p.pipeline_id } : { p.name } " )
Get
pipeline = client.get_pipeline( "pl_abc123" )
Archive / Unarchive
client.archive_pipeline( "pl_abc123" )
client.unarchive_pipeline( "pl_abc123" )
Versioning
Publish a Version
version = client.create_pipeline_version(
"pl_abc123" ,
description = "Added line items extraction"
)
print ( f "Published v { version.version } " )
List Versions
result = client.list_pipeline_versions( "pl_abc123" )
for v in result[ "versions" ]:
print ( f "v { v.version } : { v.description } " )
Discard Draft
# Revert to active published version
pipeline = client.discard_pipeline_draft( "pl_abc123" )
# Revert to a specific version
pipeline = client.discard_pipeline_draft( "pl_abc123" , version = 1 )
Get Rate
rate = client.get_pipeline_rate( "pl_abc123" )
print ( f " { rate[ 'rate_per_1000_pages_cents' ] } cents per 1000 pages" )
Execution
Run
execution = client.run_pipeline(
"pl_abc123" ,
file_path = "document.pdf" , # or file_url="https://..."
page_range = "0-10" ,
output_format = "json" ,
skip_cache = False ,
run_evals = False ,
webhook_url = "https://example.com/hook" ,
version = 2 , # omit for active version
max_polls = 1 , # polls after submission
poll_interval = 1 ,
)
Parameter Type Default Description pipeline_idstr Required Pipeline to run file_pathstr - Local file path file_urlstr - URL to document page_rangestr - Pages to process ("0-5,10") output_formatstr - Override output format skip_cachebool FalseSkip cached results run_evalsbool FalseRun eval rubrics on steps webhook_urlstr - Webhook URL for completion versionint - Version to run (omit=active, 0=draft, N=specific) max_pollsint 1Polling attempts poll_intervalint 1Seconds between polls
Poll Execution
execution = client.get_pipeline_execution(
"pex_abc123" ,
max_polls = 300 ,
poll_interval = 2
)
List Executions
result = client.list_pipeline_executions( "pl_abc123" , limit = 20 )
for ex in result[ "executions" ]:
print ( f " { ex.execution_id } : { ex.status } " )
Get Step Result
result = client.get_step_result( "pex_abc123" , step_index = 1 )
Async Usage
All pipeline methods are available on AsyncDatalabClient:
import asyncio
from datalab_sdk import AsyncDatalabClient, PipelineProcessor
async def run ():
async with AsyncDatalabClient() as client:
pipeline = await client.create_pipeline( steps = [
PipelineProcessor( type = "convert" , settings = { "mode" : "balanced" }),
PipelineProcessor( type = "extract" , settings = { "page_schema" : {
"type" : "object" ,
"properties" : { "title" : { "type" : "string" }}
}})
])
pipeline = await client.save_pipeline(
pipeline.pipeline_id, name = "Async Pipeline"
)
execution = await client.run_pipeline(
pipeline.pipeline_id, file_path = "doc.pdf"
)
execution = await client.get_pipeline_execution(
execution.execution_id, max_polls = 300
)
result = await client.get_step_result(
execution.execution_id, step_index = 1
)
return result
result = asyncio.run(run())
Next Steps
Pipeline Overview Concepts, processor types, and when to use pipelines.
Create a Pipeline Step-by-step guide to building pipelines.
Pipeline Versioning Manage drafts and publish versions.
Run a Pipeline Execution, overrides, and result retrieval.