databricks-rate-limits

📁 jeremylongshore/claude-code-plugins-plus-skills 📅 9 days ago
1
总安装量
1
周安装量
#42798
全站排名
安装命令
npx skills add https://github.com/jeremylongshore/claude-code-plugins-plus-skills --skill databricks-rate-limits

Agent 安装分布

mcpjam 1
claude-code 1
junie 1
zencoder 1
crush 1

Skill 文档

Databricks Rate Limits

Overview

Handle Databricks API rate limits gracefully with exponential backoff.

Prerequisites

  • Databricks SDK installed
  • Understanding of async/await patterns
  • Access to Databricks workspace

Instructions

Step 1: Understand Rate Limit Tiers

API Category Rate Limit Notes
Workspace APIs 30 req/s Clusters, jobs, notebooks
SQL Warehouse 10 req/s Statement execution
Unity Catalog 100 req/s Tables, grants, lineage
Jobs Runs 1000 runs/hour Per workspace
Clusters 20 creates/hour Per workspace

Step 2: Implement Exponential Backoff with Jitter

# src/databricks/rate_limit.py
import time
import random
from typing import Callable, TypeVar
from databricks.sdk.errors import (
    DatabricksError,
    TooManyRequests,
    TemporarilyUnavailable,
    ResourceConflict,
)

T = TypeVar("T")

def with_exponential_backoff(
    operation: Callable[[], T],
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter_factor: float = 0.5,
) -> T:
    """
    Execute operation with exponential backoff on rate limits.

    Args:
        operation: Callable to execute
        max_retries: Maximum retry attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay cap
        jitter_factor: Random jitter factor (0-1)

    Returns:
        Result of operation

    Raises:
        Original exception after max retries
    """
    retryable_errors = (TooManyRequests, TemporarilyUnavailable, ResourceConflict)

    for attempt in range(max_retries + 1):
        try:
            return operation()
        except retryable_errors as e:
            if attempt == max_retries:
                raise

            # Calculate delay with exponential backoff
            exponential_delay = base_delay * (2 ** attempt)
            # Add jitter to prevent thundering herd
            jitter = random.uniform(0, jitter_factor * exponential_delay)
            delay = min(exponential_delay + jitter, max_delay)

            # Check for Retry-After header
            if hasattr(e, 'retry_after') and e.retry_after:
                delay = max(delay, float(e.retry_after))

            print(f"Rate limited (attempt {attempt + 1}/{max_retries}). "
                  f"Retrying in {delay:.2f}s...")
            time.sleep(delay)

        except DatabricksError as e:
            # Don't retry client errors (4xx except 429)
            if e.error_code and e.error_code.startswith("4"):
                raise
            if attempt == max_retries:
                raise
            time.sleep(base_delay * (2 ** attempt))

    raise RuntimeError("Unreachable")

Step 3: Implement Request Queue for Bulk Operations

# src/databricks/queue.py
import time
from collections import deque
from threading import Lock
from typing import Callable, TypeVar

T = TypeVar("T")

class RateLimitedQueue:
    """Token bucket rate limiter for API calls."""

    def __init__(
        self,
        requests_per_second: float = 10.0,
        burst_size: int = 20,
    ):
        self.rate = requests_per_second
        self.burst_size = burst_size
        self.tokens = burst_size
        self.last_update = time.monotonic()
        self.lock = Lock()

    def _refill_tokens(self) -> None:
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_update
        self.tokens = min(
            self.burst_size,
            self.tokens + elapsed * self.rate
        )
        self.last_update = now

    def acquire(self, tokens: int = 1) -> None:
        """Block until tokens are available."""
        with self.lock:
            while True:
                self._refill_tokens()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return
                # Calculate wait time
                wait_time = (tokens - self.tokens) / self.rate
                time.sleep(wait_time)

    def execute(self, operation: Callable[[], T]) -> T:
        """Execute operation with rate limiting."""
        self.acquire()
        return operation()


# Global rate limiters by API category
rate_limiters = {
    "workspace": RateLimitedQueue(requests_per_second=25, burst_size=30),
    "sql": RateLimitedQueue(requests_per_second=8, burst_size=10),
    "unity_catalog": RateLimitedQueue(requests_per_second=80, burst_size=100),
}

def rate_limited(category: str = "workspace"):
    """Decorator for rate-limited API calls."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            limiter = rate_limiters.get(category, rate_limiters["workspace"])
            return limiter.execute(lambda: func(*args, **kwargs))
        return wrapper
    return decorator

# Usage
@rate_limited("workspace")
def list_clusters(w):
    return list(w.clusters.list())

Step 4: Async Batch Processing

# src/databricks/batch.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, TypeVar
from databricks.sdk import WorkspaceClient

T = TypeVar("T")

async def batch_api_calls(
    w: WorkspaceClient,
    items: List[any],
    operation: Callable[[WorkspaceClient, any], T],
    concurrency: int = 5,
    delay_between_batches: float = 1.0,
) -> List[T]:
    """
    Execute API calls in batches with rate limiting.

    Args:
        w: WorkspaceClient instance
        items: Items to process
        operation: Function to call for each item
        concurrency: Max concurrent requests
        delay_between_batches: Delay between batches

    Returns:
        List of results
    """
    results = []
    semaphore = asyncio.Semaphore(concurrency)
    loop = asyncio.get_event_loop()

    async def process_item(item):
        async with semaphore:
            # Run in thread pool since SDK is sync
            with ThreadPoolExecutor() as executor:
                result = await loop.run_in_executor(
                    executor,
                    lambda: with_exponential_backoff(
                        lambda: operation(w, item)
                    )
                )
            return result

    # Process in batches
    batch_size = concurrency * 2
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[process_item(item) for item in batch],
            return_exceptions=True
        )
        results.extend(batch_results)

        # Delay between batches
        if i + batch_size < len(items):
            await asyncio.sleep(delay_between_batches)

    return results

# Usage
async def update_all_job_tags(w: WorkspaceClient, job_ids: List[int], tags: dict):
    def update_job(w, job_id):
        return w.jobs.update(job_id=job_id, new_settings={"tags": tags})

    results = await batch_api_calls(
        w, job_ids, update_job,
        concurrency=5,
        delay_between_batches=1.0
    )
    return results

Step 5: Idempotency for Job Submissions

# src/databricks/idempotent.py
import hashlib
import json
from typing import Optional
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import RunNow, BaseRun

def generate_idempotency_token(
    job_id: int,
    params: dict,
    timestamp_bucket: int = 3600,  # 1 hour buckets
) -> str:
    """
    Generate deterministic idempotency token for job run.

    Uses timestamp buckets to allow retries within a time window
    while preventing duplicate runs across windows.
    """
    import time
    bucket = int(time.time()) // timestamp_bucket

    data = json.dumps({
        "job_id": job_id,
        "params": params,
        "bucket": bucket,
    }, sort_keys=True)

    return hashlib.sha256(data.encode()).hexdigest()[:32]

def idempotent_run_now(
    w: WorkspaceClient,
    job_id: int,
    notebook_params: Optional[dict] = None,
    idempotency_token: Optional[str] = None,
) -> BaseRun:
    """
    Submit job run with idempotency guarantee.

    If a run with the same idempotency token exists,
    returns the existing run instead of creating a new one.
    """
    # Generate token if not provided
    if not idempotency_token:
        idempotency_token = generate_idempotency_token(
            job_id,
            notebook_params or {}
        )

    # Check for existing run with same token
    recent_runs = list(w.jobs.list_runs(
        job_id=job_id,
        active_only=True,
    ))

    for run in recent_runs:
        # Check if run was started with same token
        # (Token would need to be stored in run tags in practice)
        pass

    # Submit new run
    run = w.jobs.run_now(
        job_id=job_id,
        notebook_params=notebook_params,
    )

    return run

Output

  • Reliable API calls with automatic retry
  • Rate-limited request queue
  • Async batch processing for bulk operations
  • Idempotent job submissions

Error Handling

Scenario Behavior Configuration
HTTP 429 Exponential backoff max_retries=5
HTTP 503 Retry with delay base_delay=1.0
Conflict (409) Retry once Check idempotency
Timeout Retry with increased timeout max_delay=60

Examples

Bulk Cluster Operations

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Rate-limited listing
@rate_limited("workspace")
def safe_list_clusters():
    return list(w.clusters.list())

# Bulk updates with queue
cluster_ids = [c.cluster_id for c in safe_list_clusters()]

async def update_all_clusters():
    results = await batch_api_calls(
        w,
        cluster_ids,
        lambda w, cid: w.clusters.edit(
            cluster_id=cid,
            autotermination_minutes=30
        ),
        concurrency=3
    )
    return results

# Run async
import asyncio
asyncio.run(update_all_clusters())

Monitor Rate Limit Usage

class RateLimitMonitor:
    """Track rate limit usage across API calls."""

    def __init__(self):
        self.calls = []
        self.window_seconds = 60

    def record_call(self, category: str):
        import time
        now = time.time()
        self.calls.append({"time": now, "category": category})
        # Cleanup old entries
        self.calls = [c for c in self.calls if now - c["time"] < self.window_seconds]

    def get_rate(self, category: str) -> float:
        import time
        now = time.time()
        recent = [c for c in self.calls
                  if c["category"] == category and now - c["time"] < self.window_seconds]
        return len(recent) / self.window_seconds

    def should_throttle(self, category: str, limit: float) -> bool:
        return self.get_rate(category) > limit * 0.8  # 80% threshold

Resources

Next Steps

For security configuration, see databricks-security-basics.