databricks-sdk-patterns

📁 jeremylongshore/claude-code-plugins-plus-skills 📅 14 days ago
10
总安装量
10
周安装量
#29263
全站排名
安装命令
npx skills add https://github.com/jeremylongshore/claude-code-plugins-plus-skills --skill databricks-sdk-patterns

Agent 安装分布

opencode 9
gemini-cli 9
codex 9
mcpjam 8
openhands 8
zencoder 8

Skill 文档

Databricks SDK Patterns

Overview

Production-ready patterns for Databricks SDK usage in Python.

Prerequisites

  • Completed databricks-install-auth setup
  • Familiarity with async/await patterns
  • Understanding of error handling best practices

Instructions

Step 1: Implement Singleton Pattern

# src/databricks/client.py
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os

@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
    """Get singleton WorkspaceClient instance."""
    return WorkspaceClient(profile=profile)

# Multi-workspace support
_clients: dict[str, WorkspaceClient] = {}

def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
    """Get or create client for specific workspace."""
    if workspace_name not in _clients:
        config = get_workspace_config(workspace_name)
        _clients[workspace_name] = WorkspaceClient(
            host=config["host"],
            token=config["token"]
        )
    return _clients[workspace_name]

def get_workspace_config(name: str) -> dict:
    """Load workspace config from environment."""
    return {
        "host": os.environ[f"DATABRICKS_{name.upper()}_HOST"],
        "token": os.environ[f"DATABRICKS_{name.upper()}_TOKEN"],
    }

Step 2: Add Error Handling Wrapper

# src/databricks/errors.py
from databricks.sdk.errors import (
    DatabricksError,
    NotFound,
    ResourceAlreadyExists,
    PermissionDenied,
    ResourceConflict,
    BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging

logger = logging.getLogger(__name__)

T = TypeVar("T")

@dataclass
class Result(Generic[T]):
    """Result wrapper for API operations."""
    data: Optional[T] = None
    error: Optional[Exception] = None

    @property
    def is_success(self) -> bool:
        return self.error is None

    def unwrap(self) -> T:
        if self.error:
            raise self.error
        return self.data

def safe_databricks_call(operation):
    """Decorator for safe Databricks API calls."""
    def wrapper(*args, **kwargs) -> Result:
        try:
            result = operation(*args, **kwargs)
            return Result(data=result)
        except NotFound as e:
            logger.warning(f"Resource not found: {e}")
            return Result(error=e)
        except ResourceAlreadyExists as e:
            logger.warning(f"Resource already exists: {e}")
            return Result(error=e)
        except PermissionDenied as e:
            logger.error(f"Permission denied: {e}")
            return Result(error=e)
        except DatabricksError as e:
            logger.error(f"Databricks error: {e.error_code} - {e.message}")
            return Result(error=e)
    return wrapper

Step 3: Implement Retry Logic with Backoff

# src/databricks/retry.py
import time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
    DatabricksError,
    TemporarilyUnavailable,
    TooManyRequests,
)

T = TypeVar("T")

def with_retry(
    operation: Callable[[], T],
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: float = 0.5,
) -> T:
    """Execute operation with exponential backoff retry."""
    import random

    for attempt in range(max_retries + 1):
        try:
            return operation()
        except (TemporarilyUnavailable, TooManyRequests) as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            delay += random.uniform(0, jitter)
            print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
            time.sleep(delay)
        except DatabricksError as e:
            # Don't retry client errors
            if e.error_code and e.error_code.startswith("4"):
                raise
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            time.sleep(delay)
    raise RuntimeError("Unreachable")

Step 4: Context Manager for Clusters

# src/databricks/cluster.py
from contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time

@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
    """Context manager for cluster lifecycle."""
    try:
        # Start cluster if not running
        cluster = w.clusters.get(cluster_id)
        if cluster.state != State.RUNNING:
            print(f"Starting cluster {cluster_id}...")
            w.clusters.start(cluster_id)
            w.clusters.wait_get_cluster_running(cluster_id)

        yield cluster_id
    finally:
        # Optionally terminate after use
        pass  # Or: w.clusters.delete(cluster_id)

@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
    """Create temporary cluster that auto-deletes."""
    cluster = w.clusters.create_and_wait(**spec)
    try:
        yield cluster.cluster_id
    finally:
        w.clusters.permanent_delete(cluster.cluster_id)

Step 5: Type-Safe Job Builders

# src/databricks/jobs.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
    Task, NotebookTask, PythonWheelTask,
    JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class JobBuilder:
    """Fluent builder for Databricks jobs."""
    name: str
    tasks: list[Task] = field(default_factory=list)
    job_clusters: list[JobCluster] = field(default_factory=list)
    tags: dict[str, str] = field(default_factory=dict)

    def add_notebook_task(
        self,
        task_key: str,
        notebook_path: str,
        cluster_key: str,
        parameters: Optional[dict] = None,
    ) -> "JobBuilder":
        """Add a notebook task."""
        self.tasks.append(Task(
            task_key=task_key,
            job_cluster_key=cluster_key,
            notebook_task=NotebookTask(
                notebook_path=notebook_path,
                base_parameters=parameters or {},
            )
        ))
        return self

    def add_wheel_task(
        self,
        task_key: str,
        package_name: str,
        entry_point: str,
        cluster_key: str,
        parameters: Optional[list[str]] = None,
    ) -> "JobBuilder":
        """Add a Python wheel task."""
        self.tasks.append(Task(
            task_key=task_key,
            job_cluster_key=cluster_key,
            python_wheel_task=PythonWheelTask(
                package_name=package_name,
                entry_point=entry_point,
                parameters=parameters or [],
            )
        ))
        return self

    def create(self, w: WorkspaceClient) -> int:
        """Create the job and return job_id."""
        job = w.jobs.create(
            name=self.name,
            tasks=self.tasks,
            job_clusters=self.job_clusters,
            tags=self.tags,
        )
        return job.job_id

# Usage
job_id = (
    JobBuilder("etl-pipeline")
    .add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
    .add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
    .create(w)
)

Output

  • Type-safe client singleton
  • Robust error handling with structured logging
  • Automatic retry with exponential backoff
  • Fluent job builder pattern

Error Handling

Pattern Use Case Benefit
Result wrapper All API calls Type-safe error handling
Retry logic Transient failures Improves reliability
Context managers Cluster lifecycle Resource cleanup
Builders Job creation Type safety and fluency

Examples

Factory Pattern (Multi-Tenant)

from functools import lru_cache

@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
    """Get workspace client for tenant."""
    config = get_tenant_config(tenant_id)
    return WorkspaceClient(
        host=config.workspace_url,
        token=config.token,
    )

Async Operations

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def async_list_clusters(w: WorkspaceClient):
    """Async wrapper for cluster listing."""
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        clusters = await loop.run_in_executor(
            executor,
            lambda: list(w.clusters.list())
        )
    return clusters

Resources

Next Steps

Apply patterns in databricks-core-workflow-a for Delta Lake ETL.