import requests
import time
API_KEY = "YOUR_API_KEY"
headers = {"X-API-Key": API_KEY}
# Step 1: Request upload URL
upload_request = {
"filename": "invoice.pdf",
"content_type": "application/pdf"
}
response = requests.post(
"https://www.datalab.to/api/v1/files/upload",
json=upload_request,
headers=headers
)
upload_data = response.json()
file_id = upload_data["file_id"]
upload_url = upload_data["upload_url"]
file_reference = upload_data["reference"]
# Step 2: Upload file to R2
with open("invoice.pdf", "rb") as f:
upload_response = requests.put(
upload_url,
data=f,
headers={"Content-Type": "application/pdf"}
)
if upload_response.status_code != 200:
raise Exception(f"Upload failed: {upload_response.status_code}")
# Step 3: Confirm upload
confirm_response = requests.get(
f"https://www.datalab.to/api/v1/files/{file_id}/confirm",
headers=headers
)
confirm_data = confirm_response.json()
if not confirm_data["success"]:
raise Exception("Upload confirmation failed")
print(f"File uploaded successfully: {file_reference}")
# Step 4: Use file in workflow
workflow_response = requests.post(
"https://www.datalab.to/api/v1/workflows/123/execute",
json={
"input_config": {
"type": "single_file",
"file_url": file_reference
}
},
headers=headers
)
execution_data = workflow_response.json()
execution_id = execution_data["execution_id"]
# Step 5: Poll for workflow completion
max_polls = 300
for i in range(max_polls):
time.sleep(5)
status_response = requests.get(
f"https://www.datalab.to/api/v1/workflows/executions/{execution_id}",
headers=headers
)
status_data = status_response.json()
if status_data["status"] == "COMPLETED":
print("Workflow completed successfully!")
print(status_data["steps"])
break
elif status_data["status"] == "FAILED":
print("Workflow failed")
break