Skip to main content
Process directories of documents with the SDK or CLI. Both handle rate limiting and retries automatically.

SDK Batch Processing

Process multiple files using Python’s async capabilities:

Async Batch Processing

For higher throughput:
import asyncio
from pathlib import Path
from datalab_sdk import AsyncDatalabClient, ConvertOptions

async def process_directory(input_dir: str, output_dir: str):
    async with AsyncDatalabClient() as client:
        pdf_files = list(Path(input_dir).glob("*.pdf"))

        # Process all files concurrently
        tasks = [
            client.convert(str(pdf), options=ConvertOptions(mode="balanced"))
            for pdf in pdf_files
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        for pdf, result in zip(pdf_files, results):
            if isinstance(result, Exception):
                print(f"Error processing {pdf.name}: {result}")
            else:
                output_path = Path(output_dir) / f"{pdf.stem}.md"
                output_path.write_text(result.markdown)
                print(f"Saved: {output_path}")

asyncio.run(process_directory("./documents/", "./output/"))

CLI Batch Processing

The CLI handles directory processing automatically:
# Convert all PDFs in a directory
datalab convert ./documents/ --output_dir ./output/

# Filter by extension
datalab convert ./documents/ --extensions pdf,docx

# Control concurrency
datalab convert ./documents/ --max_concurrent 10

# With processing options
datalab convert ./documents/ \
  --mode balanced \
  --format markdown \
  --output_dir ./output/
See CLI Reference for all options.

REST API Batch Processing

For raw API usage, implement parallel requests with retry handling:
import os
import time
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter, Retry

API_URL = "https://www.datalab.to/api/v1/convert"
API_KEY = os.getenv("DATALAB_API_KEY")

# Configure session with retries
session = requests.Session()
retries = Retry(
    total=20,
    backoff_factor=4,
    status_forcelist=[429],
    allowed_methods=["GET", "POST"],
    raise_on_status=False,
)
session.mount("https://", HTTPAdapter(max_retries=retries))


def convert_document(pdf_path: Path, output_format="markdown", mode="balanced"):
    """Convert a single document with polling."""
    headers = {"X-API-Key": API_KEY}

    # Submit request
    with open(pdf_path, "rb") as f:
        response = session.post(
            API_URL,
            files={"file": (pdf_path.name, f, "application/pdf")},
            data={"output_format": output_format, "mode": mode},
            headers=headers
        )

    data = response.json()
    check_url = data["request_check_url"]

    # Poll for completion
    for _ in range(300):
        result = session.get(check_url, headers=headers).json()

        if result["status"] == "complete":
            return result
        elif result["status"] == "failed":
            raise Exception(f"Failed: {result.get('error')}")

        time.sleep(2)

    raise Exception("Timeout")


def batch_convert(directory: str, max_workers: int = 5):
    """Process all PDFs in a directory."""
    doc_dir = Path(directory)
    pdfs = list(doc_dir.glob("*.pdf"))
    print(f"Found {len(pdfs)} PDFs")

    results = {}

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(convert_document, pdf): pdf.name
            for pdf in pdfs
        }

        for future in as_completed(futures):
            filename = futures[future]
            try:
                result = future.result()
                results[filename] = result
                print(f"Converted: {filename}")
            except Exception as e:
                print(f"Error processing {filename}: {e}")

    return results


# Usage
results = batch_convert("./documents/", max_workers=5)

Rate Limits

  • Request rate limit: 400 requests per minute per account (429 on exceed)
  • Concurrent request limit: 400 concurrent requests (429 on exceed)
  • Page concurrency limit: 5,000 pages in flight across all requests — this is enforced during processing, not at submission. Results return with success: false if exceeded. Always check the success field when polling for results.
  • The SDK and CLI handle request rate limiting and retries automatically
  • For higher limits, contact support@datalab.to
See API Limits for details.

Tips

  1. Use async for high throughput - Async processing handles many concurrent requests efficiently
  2. Limit concurrency - Start with 5-10 concurrent requests and adjust based on your rate limits
  3. Handle failures gracefully - Use return_exceptions=True with asyncio.gather to continue processing on errors
  4. Save progress - Write results incrementally to avoid losing work on long batches

Next Steps

Document Conversion

Learn more about Marker’s conversion API and output formats.

API Limits

Understand rate limits and how to optimize throughput.

Workflows

Chain batch processing into multi-step document workflows.

Webhooks

Get notified when batch conversions complete via webhooks.