Skip to main content

Overview

Workflows let you chain together document processing steps into reusable pipelines. Define a workflow once, then execute it with different files.

Create a Workflow

from datalab_sdk import DatalabClient
from datalab_sdk.models import WorkflowStep

client = DatalabClient()

# Define workflow steps
steps = [
    WorkflowStep(
        unique_name="parse",
        step_key="marker_parse",
        settings={"max_pages": 10},
        depends_on=[]
    ),
    WorkflowStep(
        unique_name="extract",
        step_key="marker_extract",
        settings={
            "page_schema": {
                "invoice_number": {"type": "string"},
                "total": {"type": "number"}
            }
        },
        depends_on=["parse"]
    )
]

# Create the workflow
workflow = client.create_workflow(
    name="Invoice Extraction",
    steps=steps
)

print(f"Created workflow: {workflow.id}")

WorkflowStep

Each step in a workflow is defined with:
FieldTypeRequiredDescription
unique_namestrYesUnique identifier for this step
step_keystrYesStep type (see available types below)
settingsdictYesStep-specific configuration
depends_onlistYesList of step names that must complete first

Available Step Types

Get the list of available step types:
step_types = client.get_step_types()

for st in step_types['step_types']:
    print(f"{st['step_type']}: {st['description']}")
Common step types:
Step TypeDescription
marker_parseParse documents to markdown
marker_extractExtract structured data with a schema
marker_segmentSegment documents by section
conditionalRoute based on conditions
await_parse_qualityWait for quality score
api_requestMake external API calls

Execute a Workflow

from datalab_sdk.models import InputConfig

# Execute with file URLs
input_config = InputConfig(
    file_urls=[
        "https://example.com/invoice1.pdf",
        "https://example.com/invoice2.pdf"
    ]
)

execution = client.execute_workflow(
    workflow_id=workflow.id,
    input_config=input_config
)

print(f"Execution ID: {execution.id}")
print(f"Status: {execution.status}")  # "processing"

InputConfig Options

FieldTypeDescription
file_urlslistDirect URLs to files (including datalab://file-xxx)
bucketstrS3/R2 bucket name
prefixstrBucket prefix path
patternstrFile pattern (e.g., "*.pdf")
storage_typestr"s3" or "r2"

Using Uploaded Files

Use files from Datalab storage:
# Upload files first
files = client.upload_files(["invoice1.pdf", "invoice2.pdf"])

# Use references in workflow
input_config = InputConfig(
    file_urls=[f.reference for f in files]
)

execution = client.execute_workflow(
    workflow_id=workflow.id,
    input_config=input_config
)

Check Execution Status

Poll for workflow completion:
# Single check
status = client.get_execution_status(execution.id)
print(f"Status: {status.status}")

# Poll until complete
status = client.get_execution_status(
    execution_id=execution.id,
    max_polls=300,        # Maximum polling attempts
    poll_interval=2,      # Seconds between polls
    download_results=True # Download results from presigned URLs
)

if status.status == "COMPLETED":
    print("Workflow completed!")
    print(f"Steps: {status.steps}")
elif status.status == "FAILED":
    print(f"Failed: {status.error}")

Execution Status Values

StatusDescription
IN_PROGRESSWorkflow is running
COMPLETEDAll steps finished successfully
FAILEDOne or more steps failed

Accessing Results

status = client.get_execution_status(
    execution.id,
    max_polls=300,
    download_results=True
)

# Results organized by step
for step_name, step_result in status.steps.items():
    print(f"\n{step_name}:")
    print(f"  Status: {step_result.get('status')}")
    print(f"  Output: {step_result.get('output')}")

List Workflows

workflows = client.list_workflows()

for wf in workflows:
    print(f"{wf.id}: {wf.name}")
    print(f"  Steps: {[s.unique_name for s in wf.steps]}")
    print(f"  Created: {wf.created}")

Get Workflow Details

workflow = client.get_workflow(workflow_id=42)

print(f"Name: {workflow.name}")
print(f"Steps:")
for step in workflow.steps:
    print(f"  - {step.unique_name} ({step.step_key})")
    print(f"    Depends on: {step.depends_on}")

Delete Workflow

result = client.delete_workflow(workflow_id=42)

if result['success']:
    print("Workflow deleted")

Example: Invoice Processing Pipeline

from datalab_sdk import DatalabClient
from datalab_sdk.models import WorkflowStep, InputConfig

client = DatalabClient()

# Create workflow
steps = [
    WorkflowStep(
        unique_name="parse",
        step_key="marker_parse",
        settings={"max_pages": 20},
        depends_on=[]
    ),
    WorkflowStep(
        unique_name="extract_data",
        step_key="marker_extract",
        settings={
            "page_schema": {
                "vendor_name": {"type": "string", "description": "Company name"},
                "invoice_number": {"type": "string", "description": "Invoice ID"},
                "invoice_date": {"type": "string", "description": "Date issued"},
                "total_amount": {"type": "number", "description": "Total due"},
                "line_items": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "description": {"type": "string"},
                            "quantity": {"type": "number"},
                            "unit_price": {"type": "number"},
                            "total": {"type": "number"}
                        }
                    }
                }
            }
        },
        depends_on=["parse"]
    )
]

workflow = client.create_workflow(name="Invoice Processor", steps=steps)
print(f"Created workflow {workflow.id}")

# Execute with multiple invoices
input_config = InputConfig(
    file_urls=[
        "https://example.com/invoice1.pdf",
        "https://example.com/invoice2.pdf",
        "https://example.com/invoice3.pdf"
    ]
)

execution = client.execute_workflow(workflow.id, input_config)
print(f"Started execution {execution.id}")

# Wait for completion
result = client.get_execution_status(
    execution.id,
    max_polls=300,
    download_results=True
)

print(f"Status: {result.status}")
if result.status == "COMPLETED":
    # Process extracted data
    for step_name, outputs in result.steps.items():
        print(f"\n{step_name} results:")
        print(outputs)

Async Usage

import asyncio
from datalab_sdk import AsyncDatalabClient
from datalab_sdk.models import WorkflowStep, InputConfig

async def run_workflow():
    async with AsyncDatalabClient() as client:
        # Create workflow
        steps = [
            WorkflowStep(
                unique_name="parse",
                step_key="marker_parse",
                settings={},
                depends_on=[]
            )
        ]

        workflow = await client.create_workflow(
            name="Async Workflow",
            steps=steps
        )

        # Execute
        input_config = InputConfig(file_urls=["https://example.com/doc.pdf"])
        execution = await client.execute_workflow(workflow.id, input_config)

        # Wait for completion
        result = await client.get_execution_status(
            execution.id,
            max_polls=300
        )

        return result

result = asyncio.run(run_workflow())

Learn More

Try Datalab

Get started with our API in less than a minute. We include free credits.