databricks-sdk-patterns
10
总安装量
10
周安装量
#29263
全站排名
安装命令
npx skills add https://github.com/jeremylongshore/claude-code-plugins-plus-skills --skill databricks-sdk-patterns
Agent 安装分布
opencode
9
gemini-cli
9
codex
9
mcpjam
8
openhands
8
zencoder
8
Skill 文档
Databricks SDK Patterns
Overview
Production-ready patterns for Databricks SDK usage in Python.
Prerequisites
- Completed
databricks-install-authsetup - Familiarity with async/await patterns
- Understanding of error handling best practices
Instructions
Step 1: Implement Singleton Pattern
# src/databricks/client.py
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os
@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
"""Get singleton WorkspaceClient instance."""
return WorkspaceClient(profile=profile)
# Multi-workspace support
_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
"""Get or create client for specific workspace."""
if workspace_name not in _clients:
config = get_workspace_config(workspace_name)
_clients[workspace_name] = WorkspaceClient(
host=config["host"],
token=config["token"]
)
return _clients[workspace_name]
def get_workspace_config(name: str) -> dict:
"""Load workspace config from environment."""
return {
"host": os.environ[f"DATABRICKS_{name.upper()}_HOST"],
"token": os.environ[f"DATABRICKS_{name.upper()}_TOKEN"],
}
Step 2: Add Error Handling Wrapper
# src/databricks/errors.py
from databricks.sdk.errors import (
DatabricksError,
NotFound,
ResourceAlreadyExists,
PermissionDenied,
ResourceConflict,
BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class Result(Generic[T]):
"""Result wrapper for API operations."""
data: Optional[T] = None
error: Optional[Exception] = None
@property
def is_success(self) -> bool:
return self.error is None
def unwrap(self) -> T:
if self.error:
raise self.error
return self.data
def safe_databricks_call(operation):
"""Decorator for safe Databricks API calls."""
def wrapper(*args, **kwargs) -> Result:
try:
result = operation(*args, **kwargs)
return Result(data=result)
except NotFound as e:
logger.warning(f"Resource not found: {e}")
return Result(error=e)
except ResourceAlreadyExists as e:
logger.warning(f"Resource already exists: {e}")
return Result(error=e)
except PermissionDenied as e:
logger.error(f"Permission denied: {e}")
return Result(error=e)
except DatabricksError as e:
logger.error(f"Databricks error: {e.error_code} - {e.message}")
return Result(error=e)
return wrapper
Step 3: Implement Retry Logic with Backoff
# src/databricks/retry.py
import time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
DatabricksError,
TemporarilyUnavailable,
TooManyRequests,
)
T = TypeVar("T")
def with_retry(
operation: Callable[[], T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: float = 0.5,
) -> T:
"""Execute operation with exponential backoff retry."""
import random
for attempt in range(max_retries + 1):
try:
return operation()
except (TemporarilyUnavailable, TooManyRequests) as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
time.sleep(delay)
except DatabricksError as e:
# Don't retry client errors
if e.error_code and e.error_code.startswith("4"):
raise
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
raise RuntimeError("Unreachable")
Step 4: Context Manager for Clusters
# src/databricks/cluster.py
from contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time
@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
"""Context manager for cluster lifecycle."""
try:
# Start cluster if not running
cluster = w.clusters.get(cluster_id)
if cluster.state != State.RUNNING:
print(f"Starting cluster {cluster_id}...")
w.clusters.start(cluster_id)
w.clusters.wait_get_cluster_running(cluster_id)
yield cluster_id
finally:
# Optionally terminate after use
pass # Or: w.clusters.delete(cluster_id)
@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
"""Create temporary cluster that auto-deletes."""
cluster = w.clusters.create_and_wait(**spec)
try:
yield cluster.cluster_id
finally:
w.clusters.permanent_delete(cluster.cluster_id)
Step 5: Type-Safe Job Builders
# src/databricks/jobs.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Task, NotebookTask, PythonWheelTask,
JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class JobBuilder:
"""Fluent builder for Databricks jobs."""
name: str
tasks: list[Task] = field(default_factory=list)
job_clusters: list[JobCluster] = field(default_factory=list)
tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
self,
task_key: str,
notebook_path: str,
cluster_key: str,
parameters: Optional[dict] = None,
) -> "JobBuilder":
"""Add a notebook task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
notebook_task=NotebookTask(
notebook_path=notebook_path,
base_parameters=parameters or {},
)
))
return self
def add_wheel_task(
self,
task_key: str,
package_name: str,
entry_point: str,
cluster_key: str,
parameters: Optional[list[str]] = None,
) -> "JobBuilder":
"""Add a Python wheel task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
python_wheel_task=PythonWheelTask(
package_name=package_name,
entry_point=entry_point,
parameters=parameters or [],
)
))
return self
def create(self, w: WorkspaceClient) -> int:
"""Create the job and return job_id."""
job = w.jobs.create(
name=self.name,
tasks=self.tasks,
job_clusters=self.job_clusters,
tags=self.tags,
)
return job.job_id
# Usage
job_id = (
JobBuilder("etl-pipeline")
.add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
.add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
.create(w)
)
Output
- Type-safe client singleton
- Robust error handling with structured logging
- Automatic retry with exponential backoff
- Fluent job builder pattern
Error Handling
| Pattern | Use Case | Benefit |
|---|---|---|
| Result wrapper | All API calls | Type-safe error handling |
| Retry logic | Transient failures | Improves reliability |
| Context managers | Cluster lifecycle | Resource cleanup |
| Builders | Job creation | Type safety and fluency |
Examples
Factory Pattern (Multi-Tenant)
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
"""Get workspace client for tenant."""
config = get_tenant_config(tenant_id)
return WorkspaceClient(
host=config.workspace_url,
token=config.token,
)
Async Operations
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def async_list_clusters(w: WorkspaceClient):
"""Async wrapper for cluster listing."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
clusters = await loop.run_in_executor(
executor,
lambda: list(w.clusters.list())
)
return clusters
Resources
Next Steps
Apply patterns in databricks-core-workflow-a for Delta Lake ETL.