databricks-performance-tuning
9
总安装量
2
周安装量
#31431
全站排名
安装命令
npx skills add https://github.com/jeremylongshore/claude-code-plugins-plus --skill databricks-performance-tuning
Agent 安装分布
claude-code
2
mcpjam
1
openhands
1
zencoder
1
crush
1
Skill 文档
Databricks Performance Tuning
Overview
Optimize Databricks cluster, Spark, and Delta Lake performance.
Prerequisites
- Access to cluster configuration
- Understanding of workload characteristics
- Query history access
Instructions
Step 1: Cluster Sizing
# Cluster sizing calculator
def recommend_cluster_size(
data_size_gb: float,
complexity: str = "medium", # low, medium, high
parallelism_need: str = "standard", # standard, high
) -> dict:
"""
Recommend cluster configuration based on workload.
Args:
data_size_gb: Estimated data size to process
complexity: Query/transform complexity
parallelism_need: Required parallelism level
Returns:
Recommended cluster configuration
"""
# Memory per executor (standard DS3_v2 = 14GB)
memory_per_worker = 14
# Base calculation
workers_by_data = max(1, int(data_size_gb / memory_per_worker / 2))
# Adjust for complexity
complexity_multiplier = {"low": 1, "medium": 1.5, "high": 2.5}
workers = int(workers_by_data * complexity_multiplier.get(complexity, 1.5))
# Adjust for parallelism
if parallelism_need == "high":
workers = max(workers, 8)
return {
"node_type_id": "Standard_DS3_v2",
"num_workers": workers,
"autoscale": {
"min_workers": max(1, workers // 2),
"max_workers": workers * 2,
},
"spark_conf": {
"spark.sql.shuffle.partitions": str(workers * 4),
"spark.default.parallelism": str(workers * 4),
}
}
Step 2: Spark Configuration Optimization
# Optimized Spark configurations by workload type
spark_configs = {
"etl_batch": {
# Memory and parallelism
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",
"spark.sql.files.maxPartitionBytes": "134217728", # 128MB
# Delta Lake optimizations
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true",
"spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
# Adaptive query execution
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
},
"ml_training": {
# Memory for ML workloads
"spark.driver.memory": "16g",
"spark.executor.memory": "16g",
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
# Serialization for ML
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryoserializer.buffer.max": "1024m",
},
"streaming": {
# Streaming configurations
"spark.sql.streaming.schemaInference": "true",
"spark.sql.streaming.checkpointLocation": "/mnt/checkpoints",
"spark.databricks.delta.autoCompact.minNumFiles": "10",
# Micro-batch tuning
"spark.sql.streaming.forEachBatch.enabled": "true",
},
"interactive": {
# Fast startup
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
# Caching
"spark.sql.inMemoryColumnarStorage.compressed": "true",
"spark.sql.inMemoryColumnarStorage.batchSize": "10000",
}
}
Step 3: Delta Lake Optimization
# delta_optimization.py
from pyspark.sql import SparkSession
def optimize_delta_table(
spark: SparkSession,
table_name: str,
z_order_columns: list[str] = None,
vacuum_hours: int = 168, # 7 days
) -> dict:
"""
Optimize Delta table for query performance.
Args:
spark: SparkSession
table_name: Fully qualified table name
z_order_columns: Columns for Z-ordering (max 4)
vacuum_hours: Retention period for vacuum
Returns:
Optimization results
"""
results = {}
# 1. Run OPTIMIZE with Z-ordering
if z_order_columns:
z_order_clause = ", ".join(z_order_columns[:4]) # Max 4 columns
spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({z_order_clause})")
results["z_order"] = z_order_columns
else:
spark.sql(f"OPTIMIZE {table_name}")
results["optimized"] = True
# 2. Analyze table statistics
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
results["statistics_computed"] = True
# 3. Vacuum old files
spark.sql(f"VACUUM {table_name} RETAIN {vacuum_hours} HOURS")
results["vacuumed"] = True
# 4. Get table metrics
detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
results["metrics"] = {
"num_files": detail.numFiles,
"size_bytes": detail.sizeInBytes,
"partitions": detail.partitionColumns,
}
return results
def enable_liquid_clustering(
spark: SparkSession,
table_name: str,
cluster_columns: list[str],
) -> None:
"""
Enable Liquid Clustering for automatic data layout optimization.
Liquid Clustering replaces traditional partitioning and Z-ordering
with automatic, incremental clustering.
"""
columns = ", ".join(cluster_columns)
spark.sql(f"""
ALTER TABLE {table_name}
CLUSTER BY ({columns})
""")
def enable_predictive_optimization(
spark: SparkSession,
table_name: str,
) -> None:
"""Enable Databricks Predictive Optimization."""
spark.sql(f"""
ALTER TABLE {table_name}
SET TBLPROPERTIES (
'delta.enableDeletionVectors' = 'true',
'delta.targetFileSize' = '104857600'
)
""")
Step 4: Query Performance Analysis
-- Find slow queries from query history
SELECT
query_id,
query_text,
duration / 1000 as seconds,
rows_produced,
bytes_read,
start_time
FROM system.query.history
WHERE duration > 60000 -- > 60 seconds
AND start_time > current_timestamp() - INTERVAL 24 HOURS
ORDER BY duration DESC
LIMIT 20;
-- Analyze query plan
EXPLAIN FORMATTED
SELECT * FROM main.silver.orders
WHERE order_date > '2024-01-01'
AND region = 'US';
-- Check table scan statistics
SELECT
table_name,
SUM(bytes_read) / 1024 / 1024 / 1024 as gb_read,
SUM(rows_produced) as total_rows,
COUNT(*) as query_count
FROM system.query.history
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY table_name
ORDER BY gb_read DESC;
Step 5: Caching Strategy
# Intelligent caching for repeated queries
from pyspark.sql import DataFrame
from functools import lru_cache
class CacheManager:
"""Manage Spark DataFrame caching."""
def __init__(self, spark: SparkSession):
self.spark = spark
self._cache_registry = {}
def cache_table(
self,
table_name: str,
cache_level: str = "MEMORY_AND_DISK",
) -> DataFrame:
"""Cache table with specified storage level."""
if table_name in self._cache_registry:
return self._cache_registry[table_name]
df = self.spark.table(table_name)
if cache_level == "MEMORY_ONLY":
df.cache()
elif cache_level == "MEMORY_AND_DISK":
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
elif cache_level == "DISK_ONLY":
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
# Trigger caching
df.count()
self._cache_registry[table_name] = df
return df
def uncache_all(self):
"""Clear all cached DataFrames."""
for df in self._cache_registry.values():
df.unpersist()
self._cache_registry.clear()
self.spark.catalog.clearCache()
# Delta Cache (automatic)
# Enable in cluster config:
# "spark.databricks.io.cache.enabled": "true"
# "spark.databricks.io.cache.maxDiskUsage": "50g"
Step 6: Join Optimization
from pyspark.sql import DataFrame
from pyspark.sql.functions import broadcast
def optimize_join(
df_large: DataFrame,
df_small: DataFrame,
join_key: str,
small_table_threshold_mb: int = 100,
) -> DataFrame:
"""
Optimize join based on table sizes.
Uses broadcast join for small tables,
sort-merge join for large tables.
"""
# Estimate small table size
small_size_mb = df_small.count() * 100 / 1024 / 1024 # rough estimate
if small_size_mb < small_table_threshold_mb:
# Broadcast join (small table fits in memory)
return df_large.join(broadcast(df_small), join_key)
else:
# Sort-merge join with bucketing hint
return df_large.join(df_small, join_key, "inner")
# Bucketed tables for frequent joins
def create_bucketed_table(
spark: SparkSession,
df: DataFrame,
table_name: str,
bucket_columns: list[str],
num_buckets: int = 100,
):
"""Create bucketed table for join optimization."""
(
df.write
.bucketBy(num_buckets, *bucket_columns)
.sortBy(*bucket_columns)
.saveAsTable(table_name)
)
Output
- Optimized cluster configuration
- Tuned Spark settings
- Optimized Delta tables
- Improved query performance
Error Handling
| Issue | Cause | Solution |
|---|---|---|
| OOM errors | Insufficient memory | Increase executor memory or reduce partition size |
| Skewed data | Uneven distribution | Use salting or AQE skew handling |
| Slow joins | Large shuffle | Use broadcast for small tables |
| Too many files | Small files problem | Run OPTIMIZE regularly |
Examples
Performance Benchmark
import time
def benchmark_query(spark, query: str, runs: int = 3) -> dict:
"""Benchmark query execution time."""
times = []
for _ in range(runs):
spark.catalog.clearCache()
start = time.time()
spark.sql(query).collect()
times.append(time.time() - start)
return {
"min": min(times),
"max": max(times),
"avg": sum(times) / len(times),
"runs": runs,
}
Resources
Next Steps
For cost optimization, see databricks-cost-tuning.