import os
import time
import requests
from requests.adapters import HTTPAdapter, Retry
from pathlib import Path
from typing import Optional
API_URL = "https://www.datalab.to/api/v1/marker"
API_KEY = os.getenv("DATALAB_API_KEY")
#
# Configure a session with retries, customize retry behavior
# for your usage needs. Our default rate limit is 200 per minute
# per account (not per API key).
#
session = requests.Session()
retries = Retry(
total=20,
backoff_factor=4,
status_forcelist=[429],
allowed_methods=["GET", "POST"],
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
def submit_and_poll_pdf_conversion(
pdf_path: Path,
output_format: Optional[str] = 'markdown',
use_llm: Optional[bool] = True
):
url = "https://www.datalab.to/api/v1/marker"
def submit_request():
#
# Submit initial request
#
with open(pdf_path, 'rb') as f:
form_data = {
'file': (pdf_path.name, f, 'application/pdf'),
"force_ocr": (None, False),
"paginate": (None, False),
'output_format': (None, output_format),
"use_llm": (None, use_llm),
"strip_existing_ocr": (None, False),
"disable_image_extraction": (None, False)
}
headers = {"X-Api-Key": API_KEY}
return session.post(API_URL, headers=headers, files=form_data)
response = api_call_with_retry(submit_request)
response.raise_for_status()
data = response.json()
#
# Poll for completion
#
max_polls = 300
check_url = data["request_check_url"]
for i in range(max_polls):
response = session.get(check_url, headers=headers) # Need to include headers for API key
check_result = response.json()
if check_result['status'] == 'complete':
#
# Your processing is finished, you can do your post-processing!
#
converted_document = check_result[output_format] # the 'html', 'markdown', or 'json' field in the response will contain what you're looking for (maps to our initial `output_format`)
#
# .. do something with it!
#
elif check_result["status"] == "failed":
print("Failed to convert, uh oh...")
break
else:
print("Waiting 2 more seconds to re-check conversion status")
time.sleep(2)
def batch_convert_pdfs(
document_directory: str,
max_workers: int = 3
):
doc_dir = Path(document_directory)
if not doc_dir.exists():
print("Couldn't find your directory, exiting early...")
raise FileNotFoundError(f"Couldn't find {document_directory}")
# Collect all PDF files
docs_to_process = list(doc_dir.glob("*.pdf"))
print(f"Found {len(docs_to_process)} PDFs to convert...")
#
# Process multiple files at once, up to `max_workers`
#
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(submit_and_poll_pdf_conversion, pdf_path): pdf_path.name
for pdf_path in docs_to_process
}
for future in as_completed(future_to_file):
filename = future_to_file[future]
try:
future.result()
except Exception as e:
print(f"✗ Error processing {filename}: {e}")