Skip to main content
Process directories of documents with the SDK or CLI. Both handle rate limiting and retries automatically.

SDK Batch Processing

Process multiple files using Python’s async capabilities:

Async Batch Processing

For higher throughput:
import asyncio
from pathlib import Path
from datalab_sdk import AsyncDatalabClient, ConvertOptions

async def process_directory(input_dir: str, output_dir: str):
    async with AsyncDatalabClient() as client:
        pdf_files = list(Path(input_dir).glob("*.pdf"))

        # Process all files concurrently
        tasks = [
            client.convert(str(pdf), options=ConvertOptions(mode="balanced"))
            for pdf in pdf_files
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        for pdf, result in zip(pdf_files, results):
            if isinstance(result, Exception):
                print(f"Error processing {pdf.name}: {result}")
            else:
                output_path = Path(output_dir) / f"{pdf.stem}.md"
                output_path.write_text(result.markdown)
                print(f"Saved: {output_path}")

asyncio.run(process_directory("./documents/", "./output/"))

CLI Batch Processing

The CLI handles directory processing automatically:
# Convert all PDFs in a directory
datalab convert ./documents/ --output_dir ./output/

# Filter by extension
datalab convert ./documents/ --extensions pdf,docx

# Control concurrency
datalab convert ./documents/ --max_concurrent 10

# With processing options
datalab convert ./documents/ \
  --mode balanced \
  --format markdown \
  --output_dir ./output/
See CLI Reference for all options.

REST API Batch Processing

For raw API usage, implement parallel requests with retry handling:
import os
import time
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter, Retry

API_URL = "https://www.datalab.to/api/v1/marker"
API_KEY = os.getenv("DATALAB_API_KEY")

# Configure session with retries
session = requests.Session()
retries = Retry(
    total=20,
    backoff_factor=4,
    status_forcelist=[429],
    allowed_methods=["GET", "POST"],
    raise_on_status=False,
)
session.mount("https://", HTTPAdapter(max_retries=retries))


def convert_document(pdf_path: Path, output_format="markdown", mode="balanced"):
    """Convert a single document with polling."""
    headers = {"X-API-Key": API_KEY}

    # Submit request
    with open(pdf_path, "rb") as f:
        response = session.post(
            API_URL,
            files={"file": (pdf_path.name, f, "application/pdf")},
            data={"output_format": output_format, "mode": mode},
            headers=headers
        )

    data = response.json()
    check_url = data["request_check_url"]

    # Poll for completion
    for _ in range(300):
        result = session.get(check_url, headers=headers).json()

        if result["status"] == "complete":
            return result
        elif result["status"] == "failed":
            raise Exception(f"Failed: {result.get('error')}")

        time.sleep(2)

    raise Exception("Timeout")


def batch_convert(directory: str, max_workers: int = 5):
    """Process all PDFs in a directory."""
    doc_dir = Path(directory)
    pdfs = list(doc_dir.glob("*.pdf"))
    print(f"Found {len(pdfs)} PDFs")

    results = {}

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(convert_document, pdf): pdf.name
            for pdf in pdfs
        }

        for future in as_completed(futures):
            filename = futures[future]
            try:
                result = future.result()
                results[filename] = result
                print(f"Converted: {filename}")
            except Exception as e:
                print(f"Error processing {filename}: {e}")

    return results


# Usage
results = batch_convert("./documents/", max_workers=5)

Rate Limits

  • Default limit: 200 requests per minute per account
  • The SDK and CLI handle rate limiting automatically
  • For REST API, use retry logic with exponential backoff
  • Enterprise plans can request higher limits
See API Limits for details.

Tips

  1. Use async for high throughput - Async processing handles many concurrent requests efficiently
  2. Limit concurrency - Start with 5-10 concurrent requests and adjust based on your rate limits
  3. Handle failures gracefully - Use return_exceptions=True with asyncio.gather to continue processing on errors
  4. Save progress - Write results incrementally to avoid losing work on long batches

Try Datalab

Get started with our API in less than a minute. We include free credits.