Skip to main content
Workflows chain document processing steps into reusable pipelines. Define once, execute many times with different documents.

SDK Usage

from datalab_sdk import DatalabClient
from datalab_sdk.models import WorkflowStep, InputConfig

client = DatalabClient()

# Create a workflow
steps = [
    WorkflowStep(
        unique_name="parse",
        step_key="marker_parse",
        settings={"max_pages": 10},
        depends_on=[]
    ),
    WorkflowStep(
        unique_name="extract",
        step_key="marker_extract",
        settings={
            "page_schema": {
                "title": {"type": "string"},
                "total": {"type": "number"}
            }
        },
        depends_on=["parse"]
    )
]

workflow = client.create_workflow(name="Invoice Processor", steps=steps)

# Execute
input_config = InputConfig(file_urls=["https://example.com/invoice.pdf"])
execution = client.execute_workflow(workflow.id, input_config)

# Wait for results
result = client.get_execution_status(execution.id, max_polls=300)
print(result.status)
See SDK Workflows for complete documentation.

Key Concepts

Steps

A step is a single operation (parse, extract, segment). Each step has:
FieldDescription
step_keyOperation type (marker_parse, marker_extract, etc.)
unique_nameUnique identifier within the workflow
settingsStep-specific configuration
depends_onSteps that must complete first

Dependencies

Steps execute based on their depends_on array:
steps = [
    WorkflowStep(unique_name="parse", step_key="marker_parse", ...),
    WorkflowStep(unique_name="extract", depends_on=["parse"], ...),
    WorkflowStep(unique_name="segment", depends_on=["parse"], ...)
]
# extract and segment run in parallel after parse completes

Available Step Types

Get the list programmatically:
step_types = client.get_step_types()
for st in step_types['step_types']:
    print(f"{st['step_type']}: {st['description']}")

marker_parse

Parse documents into markdown. Saves a checkpoint for downstream steps.
{
  "step_key": "marker_parse",
  "settings": {
    "max_pages": 10,
    "page_range": "0-5,10"
  }
}

marker_extract

Extract structured data using a schema. Requires marker_parse first.
{
  "step_key": "marker_extract",
  "settings": {
    "page_schema": {
      "invoice_number": {"type": "string"},
      "total": {"type": "number"}
    }
  },
  "depends_on": ["parse"]
}

marker_segment

Detect document boundaries. Requires marker_parse first.
{
  "step_key": "marker_segment",
  "settings": {
    "segmentation_schema": {
      "Introduction": "Opening section",
      "Methods": "Methodology description"
    }
  },
  "depends_on": ["parse"]
}

conditional

Route based on step outputs:
{
  "step_key": "conditional",
  "settings": {
    "conditions": [
      {"left": "{{parse.parse_quality_score}}", "operator": ">=", "right": 4.0}
    ],
    "logic": "AND",
    "routes": {
      "true": {"enable_steps": ["extract"]},
      "false": {"enable_steps": ["reparse"]}
    }
  }
}

await_parse_quality

Wait for quality scoring before conditional routing:
{
  "step_key": "await_parse_quality",
  "settings": {
    "max_wait_seconds": 120,
    "poll_interval_seconds": 10
  }
}

api_request

Make external API calls:
{
  "step_key": "api_request",
  "settings": {
    "url": "https://your-webhook.com/notify",
    "method": "POST",
    "headers": {"Content-Type": "application/json"},
    "body": {"status": "complete"}
  }
}

Multi-File Processing

Workflows process multiple files in parallel:
input_config = InputConfig(file_urls=[
    "https://example.com/invoice1.pdf",
    "https://example.com/invoice2.pdf",
    "https://example.com/invoice3.pdf"
])

execution = client.execute_workflow(workflow.id, input_config)
  • Files execute independently and in parallel
  • If one file fails, others continue
  • Results are organized by file

Execution Lifecycle

StatusDescription
PENDINGQueued, not started
IN_PROGRESSSteps running
COMPLETEDAll steps finished
FAILEDError occurred
Poll for status:
result = client.get_execution_status(
    execution.id,
    max_polls=300,
    download_results=True
)

if result.status == "COMPLETED":
    for step_name, output in result.steps.items():
        print(f"{step_name}: {output}")

Error Handling

Errors are isolated per file:
result = client.get_execution_status(execution.id)

for step_name, outputs in result.steps.items():
    for file_id, output in outputs.items():
        if "error" in output:
            print(f"Error in {step_name} for {file_id}: {output['error']}")
        else:
            print(f"Success: {output}")

Try Datalab

Get started with our API in less than a minute. We include free credits.