from datalab_sdk import DatalabClient
from datalab_sdk.models import WorkflowStep, InputConfig
client = DatalabClient()
# Create workflow
steps = [
WorkflowStep(
unique_name="parse",
step_key="marker_parse",
settings={"max_pages": 20},
depends_on=[]
),
WorkflowStep(
unique_name="extract_data",
step_key="marker_extract",
settings={
"page_schema": {
"vendor_name": {"type": "string", "description": "Company name"},
"invoice_number": {"type": "string", "description": "Invoice ID"},
"invoice_date": {"type": "string", "description": "Date issued"},
"total_amount": {"type": "number", "description": "Total due"},
"line_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"quantity": {"type": "number"},
"unit_price": {"type": "number"},
"total": {"type": "number"}
}
}
}
}
},
depends_on=["parse"]
)
]
workflow = client.create_workflow(name="Invoice Processor", steps=steps)
print(f"Created workflow {workflow.id}")
# Execute with multiple invoices
input_config = InputConfig(
file_urls=[
"https://example.com/invoice1.pdf",
"https://example.com/invoice2.pdf",
"https://example.com/invoice3.pdf"
]
)
execution = client.execute_workflow(workflow.id, input_config)
print(f"Started execution {execution.id}")
# Wait for completion
result = client.get_execution_status(
execution.id,
max_polls=300,
download_results=True
)
print(f"Status: {result.status}")
if result.status == "COMPLETED":
# Process extracted data
for step_name, outputs in result.steps.items():
print(f"\n{step_name} results:")
print(outputs)