session-compression
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill session-compression
Agent 安装分布
Skill 文档
AI Session Compression Techniques
Summary
Compress long AI conversations to fit context windows while preserving critical information.
Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.
Key Benefits:
- Cost Reduction: 80-90% token cost savings through hierarchical memory
- Performance: 2x faster responses with compressed context
- Scalability: Handle conversations exceeding 1M tokens
- Quality: Preserve critical information with <2% accuracy loss
When to Use
Use session compression when:
- Multi-turn conversations approach context window limits (>50% capacity)
- Long-running chat sessions (customer support, tutoring, code assistants)
- Token costs become significant (high-volume applications)
- Response latency increases due to large context
- Managing conversation history across multiple sessions
Don’t use when:
- Short conversations (<10 turns) fitting easily in context
- Every detail must be preserved verbatim (legal, compliance)
- Single-turn or stateless interactions
- Context window usage is <30%
Ideal scenarios:
- Chatbots with 50+ turn conversations
- AI code assistants tracking long development sessions
- Customer support with multi-session ticket history
- Educational tutors with student progress tracking
- Multi-day collaborative AI workflows
Quick Start
Basic Setup with LangChain
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic
# Initialize Claude client
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key"
)
# Setup memory with automatic summarization
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # Summarize when exceeding this
return_messages=True
)
# Add conversation turns
memory.save_context(
{"input": "What's session compression?"},
{"output": "Session compression reduces conversation token usage..."}
)
# Retrieve compressed context
context = memory.load_memory_variables({})
Progressive Compression Pattern
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class ProgressiveCompressor:
def __init__(self, thresholds=[0.70, 0.85, 0.95]):
self.thresholds = thresholds
self.messages = []
self.max_tokens = 200000 # Claude context window
def add_message(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
# Check if compression needed
current_usage = self._estimate_tokens()
usage_ratio = current_usage / self.max_tokens
if usage_ratio >= self.thresholds[0]:
self._compress(level=self._get_compression_level(usage_ratio))
def _estimate_tokens(self):
return sum(len(m["content"]) // 4 for m in self.messages)
def _get_compression_level(self, ratio):
for i, threshold in enumerate(self.thresholds):
if ratio < threshold:
return i
return len(self.thresholds)
def _compress(self, level: int):
"""Apply compression based on severity level."""
if level == 1: # 70% threshold: Light compression
self._remove_redundant_messages()
elif level == 2: # 85% threshold: Medium compression
self._summarize_old_messages(keep_recent=10)
else: # 95% threshold: Aggressive compression
self._summarize_old_messages(keep_recent=5)
def _remove_redundant_messages(self):
"""Remove duplicate or low-value messages."""
# Implementation: Use semantic deduplication
pass
def _summarize_old_messages(self, keep_recent: int):
"""Summarize older messages, keep recent ones verbatim."""
if len(self.messages) <= keep_recent:
return
# Messages to summarize
to_summarize = self.messages[:-keep_recent]
recent = self.messages[-keep_recent:]
# Generate summary
conversation_text = "\n\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in to_summarize
])
response = client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Replace old messages with summary
summary = {
"role": "system",
"content": f"[Summary]\n{response.content[0].text}"
}
self.messages = [summary] + recent
# Usage
compressor = ProgressiveCompressor()
for i in range(100):
compressor.add_message("user", f"Message {i}")
compressor.add_message("assistant", f"Response {i}")
Using Anthropic Prompt Caching (90% Cost Reduction)
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
# Build context with cache control
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Long conversation context here...",
"cache_control": {"type": "ephemeral"} # Cache this
}
]
},
{
"role": "assistant",
"content": "Previous response..."
},
{
"role": "user",
"content": "New question" # Not cached, changes frequently
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
# Cache hit reduces costs by 90% for cached content
Core Concepts
Context Windows and Token Limits
Context window: Maximum tokens an LLM can process in a single request (input + output).
Current limits (2025):
- Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
- GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
- Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)
Token estimation:
- English: ~4 characters per token
- Code: ~3 characters per token
- Rule of thumb: 1 token â 0.75 words
Why compression matters:
- Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
- Latency: Larger contexts increase processing time
- Quality: Excessive context can dilute attention on relevant information
Compression Ratios
Compression ratio = Original tokens / Compressed tokens
Industry benchmarks:
- Extractive summarization: 2-3x
- Abstractive summarization: 5-10x
- Hierarchical summarization: 20x+
- LLMLingua (prompt compression): 20x with 1.5% accuracy loss
- KVzip (KV cache compression): 3-4x with 2x speed improvement
Target ratios by use case:
- Customer support: 5-7x (preserve details)
- General chat: 8-12x (balance quality/efficiency)
- Code assistants: 3-5x (preserve technical accuracy)
- Long documents: 15-20x (extract key insights)
Progressive Compression Thresholds
Industry standard pattern:
Context Usage Action Technique
âââââââââââââââââââââââââââââââââââââââââââââââââââââââââ
0-70% No compression Store verbatim
70-85% Light compression Remove redundancy
85-95% Medium compression Summarize old messages
95-100% Aggressive compression Hierarchical + RAG
Implementation guidelines:
- 70% threshold: Remove duplicate/redundant messages, semantic deduplication
- 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
- 95% threshold: Multi-level hierarchical summarization + vector store archival
- Emergency (100%): Drop least important messages, aggressive summarization
Compression Techniques
1. Summarization Techniques
1.1 Extractive Summarization
Selects key sentences/phrases without modification.
Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
def extractive_compress(messages: list, compression_ratio: float = 0.3):
"""Extract most important messages using TF-IDF scoring."""
texts = [msg['content'] for msg in messages]
# Calculate TF-IDF scores
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(texts)
scores = np.array(tfidf_matrix.sum(axis=1)).flatten()
# Select top messages
n_keep = max(1, int(len(messages) * compression_ratio))
top_indices = sorted(np.argsort(scores)[-n_keep:])
return [messages[i] for i in top_indices]
1.2 Abstractive Summarization
Uses LLMs to semantically condense conversation history.
Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity
from anthropic import Anthropic
def abstractive_compress(messages: list, client: Anthropic):
"""Generate semantic summary using Claude."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items
Conversation:
{conversation_text}
Summary (aim for 1/5 the original length):"""
}]
)
return {
"role": "assistant",
"content": f"[Summary]\n{response.content[0].text}"
}
1.3 Hierarchical Summarization (Multi-Level)
Creates summaries of summaries in a tree structure.
Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications
Architecture:
Level 0 (Raw): [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk): [Summary1-2] [Summary3-4] [Summary5-6] [Summary7-8]
Level 2 (Group): [Summary1-4] [Summary5-8]
Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic
from typing import List, Dict
class HierarchicalMemory:
def __init__(self, client: Anthropic, chunk_size: int = 10):
self.client = client
self.chunk_size = chunk_size
self.levels: List[List[Dict]] = [[]] # Level 0 = raw messages
def add_message(self, message: Dict):
"""Add message and trigger summarization if needed."""
self.levels[0].append(message)
if len(self.levels[0]) >= self.chunk_size * 2:
self._summarize_level(0)
def _summarize_level(self, level: int):
"""Summarize a level into the next higher level."""
messages = self.levels[level]
# Ensure next level exists
while len(self.levels) <= level + 1:
self.levels.append([])
# Summarize first chunk
chunk = messages[:self.chunk_size]
summary = self._generate_summary(chunk, level)
# Move to next level
self.levels[level + 1].append(summary)
self.levels[level] = messages[self.chunk_size:]
# Recursively check if next level needs summarization
if len(self.levels[level + 1]) >= self.chunk_size * 2:
self._summarize_level(level + 1)
def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
"""Generate summary for a chunk."""
conversation_text = "\n\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in messages
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=300,
messages=[{
"role": "user",
"content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
}]
)
return {
"role": "system",
"content": f"[L{level+1} Summary] {response.content[0].text}",
"level": level + 1
}
def get_context(self, max_tokens: int = 4000) -> List[Dict]:
"""Retrieve context within token budget."""
context = []
token_count = 0
# Prioritize recent raw messages
for msg in reversed(self.levels[0]):
msg_tokens = len(msg['content']) // 4
if token_count + msg_tokens > max_tokens * 0.6:
break
context.insert(0, msg)
token_count += msg_tokens
# Add summaries from higher levels
for level in range(1, len(self.levels)):
for summary in self.levels[level]:
summary_tokens = len(summary['content']) // 4
if token_count + summary_tokens > max_tokens:
break
context.insert(0, summary)
token_count += summary_tokens
return context
Academic reference: “Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models” (arXiv:2308.15022)
1.4 Rolling Summarization (Continuous)
Continuously compresses conversation with sliding window.
Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations
from anthropic import Anthropic
class RollingMemory:
def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
self.client = client
self.window_size = window_size
self.compress_threshold = compress_threshold
self.rolling_summary = None
self.recent_messages = []
def add_message(self, message: dict):
self.recent_messages.append(message)
if len(self.recent_messages) >= self.compress_threshold:
self._compress()
def _compress(self):
"""Compress older messages into rolling summary."""
messages_to_compress = self.recent_messages[:-self.window_size]
parts = []
if self.rolling_summary:
parts.append(f"Existing summary:\n{self.rolling_summary}")
parts.append("\nNew messages:\n" + "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages_to_compress
]))
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": "\n".join(parts) + "\n\nUpdate the summary:"
}]
)
self.rolling_summary = response.content[0].text
self.recent_messages = self.recent_messages[-self.window_size:]
def get_context(self):
context = []
if self.rolling_summary:
context.append({
"role": "system",
"content": f"[Summary]\n{self.rolling_summary}"
})
context.extend(self.recent_messages)
return context
2. Embedding-Based Approaches
2.1 RAG (Retrieval-Augmented Generation)
Store full conversation in vector database, retrieve only relevant chunks.
Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class RAGMemory:
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Initialize vector store
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(
name="conversation",
metadata={"hnsw:space": "cosine"}
)
self.recent_messages = []
self.recent_window = 5
self.message_counter = 0
def add_message(self, message: dict):
"""Add to recent memory and vector store."""
self.recent_messages.append(message)
if len(self.recent_messages) > self.recent_window:
old_msg = self.recent_messages.pop(0)
self._store_in_vectordb(old_msg)
def _store_in_vectordb(self, message: dict):
"""Archive to vector database."""
# Generate embedding
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[response.data[0].embedding],
documents=[message['content']],
metadatas=[{"role": message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def retrieve_context(self, query: str, max_tokens: int = 4000):
"""Retrieve relevant context using RAG."""
context = []
token_count = 0
# 1. Recent messages (short-term memory)
for msg in self.recent_messages:
context.append(msg)
token_count += len(msg['content']) // 4
# 2. Retrieve relevant historical context
if token_count < max_tokens:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
n_results = min(10, (max_tokens - token_count) // 100)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=n_results
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens:
break
metadata = results['metadatas'][0][i]
context.insert(0, {
"role": metadata['role'],
"content": f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
return context
Vector database options:
- ChromaDB: Embedded, easy local development
- Pinecone: Managed, 50ms p95 latency
- Weaviate: Open-source, hybrid search
- Qdrant: High performance, payload filtering
2.2 Vector Search and Clustering
Group similar messages into clusters, represent with centroids.
Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries
from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np
class ClusteredMemory:
def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
self.client = openai_client
self.n_clusters = n_clusters
self.messages = []
self.embeddings = []
def add_messages(self, messages: list):
for msg in messages:
self.messages.append(msg)
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
self.embeddings.append(response.data[0].embedding)
def compress_by_clustering(self):
"""Cluster messages and return representatives."""
if len(self.messages) < self.n_clusters:
return self.messages
embeddings_array = np.array(self.embeddings)
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings_array)
# Select message closest to each centroid
compressed = []
for cluster_id in range(self.n_clusters):
cluster_indices = np.where(labels == cluster_id)[0]
centroid = kmeans.cluster_centers_[cluster_id]
cluster_embeddings = embeddings_array[cluster_indices]
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
closest_idx = cluster_indices[np.argmin(distances)]
compressed.append({
**self.messages[closest_idx],
"cluster_id": int(cluster_id),
"cluster_size": len(cluster_indices)
})
return compressed
2.3 Semantic Deduplication
Remove semantically similar messages that convey redundant information.
Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticDeduplicator:
def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
self.client = openai_client
self.threshold = similarity_threshold
def deduplicate(self, messages: list):
"""Remove semantically similar messages."""
if len(messages) <= 1:
return messages
# Generate embeddings
embeddings = []
for msg in messages:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=msg['content']
)
embeddings.append(response.data[0].embedding)
embeddings_array = np.array(embeddings)
similarity_matrix = cosine_similarity(embeddings_array)
# Mark unique messages
keep_indices = []
for i in range(len(messages)):
is_unique = True
for j in keep_indices:
if similarity_matrix[i][j] > self.threshold:
is_unique = False
break
if is_unique:
keep_indices.append(i)
return [messages[i] for i in keep_indices]
3. Token-Efficient Strategies
3.1 Message Prioritization
Assign importance scores and retain only high-priority content.
Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise
import re
class MessagePrioritizer:
def score_message(self, msg: dict, index: int, total: int) -> float:
"""Calculate composite importance score."""
scores = []
# Length score (longer = more info)
scores.append(min(len(msg['content']) / 500, 1.0))
# Question score
if msg['role'] == 'user':
scores.append(min(msg['content'].count('?') * 0.5, 1.0))
# Entity score (capitalized words)
entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
scores.append(min(entities / 10, 1.0))
# Recency score (linear decay)
scores.append(index / max(total - 1, 1))
# Role score
scores.append(0.6 if msg['role'] == 'user' else 0.4)
return sum(scores) / len(scores)
def prioritize(self, messages: list, target_count: int):
"""Select top N messages by priority."""
scored = [
(msg, self.score_message(msg, i, len(messages)), i)
for i, msg in enumerate(messages)
]
scored.sort(key=lambda x: x[1], reverse=True)
top_messages = scored[:target_count]
top_messages.sort(key=lambda x: x[2]) # Restore chronological order
return [msg for msg, score, idx in top_messages]
3.2 Delta Compression
Store only changes between consecutive messages.
Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits
import difflib
class DeltaCompressor:
def __init__(self):
self.base_messages = []
self.deltas = []
def add_message(self, message: dict):
if not self.base_messages:
self.base_messages.append(message)
return
# Find most similar previous message
last_msg = self.base_messages[-1]
if last_msg['role'] == message['role']:
# Calculate delta
diff = list(difflib.unified_diff(
last_msg['content'].splitlines(),
message['content'].splitlines(),
lineterm=''
))
if len('\n'.join(diff)) < len(message['content']) * 0.7:
# Store as delta if compression achieved
self.deltas.append({
'base_index': len(self.base_messages) - 1,
'delta': diff,
'role': message['role']
})
return
# Store as new base message
self.base_messages.append(message)
def reconstruct(self):
"""Reconstruct full conversation from bases + deltas."""
messages = self.base_messages.copy()
for delta_info in self.deltas:
base_content = messages[delta_info['base_index']]['content']
# Apply diff to reconstruct (simplified)
reconstructed = base_content # Full implementation would apply diff
messages.append({
'role': delta_info['role'],
'content': reconstructed
})
return messages
4. LangChain Memory Types
4.1 ConversationSummaryMemory
Automatically summarizes conversation as it progresses.
from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryMemory(llm=llm)
# Add conversation
memory.save_context(
{"input": "Hi, I'm working on a Python project"},
{"output": "Great! How can I help with your Python project?"}
)
# Get summary
summary = memory.load_memory_variables({})
print(summary['history'])
Pros: Automatic summarization, simple API Cons: Every turn triggers LLM call Best for: Medium conversations (20-50 turns)
4.2 ConversationSummaryBufferMemory
Hybrid: Recent messages verbatim, older summarized.
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000, # Summarize when exceeding
return_messages=True
)
# Add conversation
for i in range(50):
memory.save_context(
{"input": f"Question {i}"},
{"output": f"Answer {i}"}
)
# Automatically keeps recent messages + summary of old
context = memory.load_memory_variables({})
Pros: Best balance of detail and compression Cons: Requires token limit tuning Best for: Most production applications
4.3 ConversationTokenBufferMemory
Maintains fixed token budget, drops oldest when exceeded.
from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationTokenBufferMemory(
llm=llm,
max_token_limit=2000
)
# Simple FIFO when token limit exceeded
Pros: Predictable token usage, simple Cons: Loses old information completely Best for: Real-time chat with strict limits
4.4 VectorStoreRetrieverMemory
Stores all messages in vector database, retrieves relevant ones.
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
# Automatically retrieves most relevant context
Pros: Infinite conversation length, semantic retrieval Cons: Requires vector DB, retrieval overhead Best for: Long-running conversations, knowledge bases
5. Anthropic-Specific Patterns
5.1 Prompt Caching (90% Cost Reduction)
Cache static context to reduce token costs.
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
# Long conversation context
conversation_history = [
{"role": "user", "content": "Message 1"},
{"role": "assistant", "content": "Response 1"},
# ... many more messages
]
# Mark context for caching
messages = []
for i, msg in enumerate(conversation_history[:-1]):
content = msg['content']
# Add cache control to last context message
if i == len(conversation_history) - 2:
messages.append({
"role": msg['role'],
"content": [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"}
}
]
})
else:
messages.append(msg)
# Add new user message (not cached)
messages.append(conversation_history[-1])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
# Subsequent calls with same cached context cost 90% less
Cache TTL: 5 minutes Savings: 90% cost reduction for cached tokens Limits: Max 4 cache breakpoints per request Best practices:
- Cache conversation history, not current query
- Update cache when context changes significantly
- Combine with summarization for maximum efficiency
5.2 Extended Thinking for Compression Planning
Use extended thinking to plan optimal compression strategy.
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{
"role": "user",
"content": f"""Analyze this conversation and recommend compression:
{conversation_text}
Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x
Recommend optimal strategy."""
}]
)
# Access thinking process
thinking_content = [
block for block in response.content
if block.type == "thinking"
]
# Get compression recommendation
recommendation = response.content[-1].text
Production Patterns
Checkpointing and Persistence
Save compression state for recovery and resume.
import json
import pickle
from pathlib import Path
class PersistentMemory:
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
self.memory = []
self.summary = None
def save_checkpoint(self, session_id: str):
"""Save current memory state."""
checkpoint = {
'messages': self.memory,
'summary': self.summary,
'timestamp': time.time()
}
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f, indent=2)
def load_checkpoint(self, session_id: str):
"""Load memory state from checkpoint."""
checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
if checkpoint_file.exists():
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
self.memory = checkpoint['messages']
self.summary = checkpoint.get('summary')
return True
return False
def auto_checkpoint(self, session_id: str, interval: int = 10):
"""Automatically save every N messages."""
if len(self.memory) % interval == 0:
self.save_checkpoint(session_id)
Resume Workflows
Continue conversations across sessions.
from anthropic import Anthropic
import json
class ResumableConversation:
def __init__(self, client: Anthropic, session_id: str):
self.client = client
self.session_id = session_id
self.memory = self._load_or_create()
def _load_or_create(self):
"""Load existing session or create new."""
try:
with open(f'sessions/{self.session_id}.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {
'messages': [],
'summary': None,
'created_at': time.time()
}
def add_turn(self, user_message: str):
"""Add user message and get response."""
# Add user message
self.memory['messages'].append({
'role': 'user',
'content': user_message
})
# Build context (with compression)
context = self._build_context()
# Get response
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': user_message
}]
)
# Save response
assistant_message = response.content[0].text
self.memory['messages'].append({
'role': 'assistant',
'content': assistant_message
})
# Compress if needed
if len(self.memory['messages']) > 20:
self._compress()
# Save state
self._save()
return assistant_message
def _build_context(self):
"""Build context with compression."""
context = []
# Add summary if exists
if self.memory['summary']:
context.append({
'role': 'system',
'content': f"[Previous conversation summary]\n{self.memory['summary']}"
})
# Add recent messages
context.extend(self.memory['messages'][-10:])
return context
def _compress(self):
"""Compress older messages."""
if len(self.memory['messages']) < 15:
return
# Messages to summarize
to_summarize = self.memory['messages'][:-10]
# Generate summary
conversation_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
response = self.client.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=500,
messages=[{
'role': 'user',
'content': f"Summarize this conversation:\n\n{conversation_text}"
}]
)
# Update memory
self.memory['summary'] = response.content[0].text
self.memory['messages'] = self.memory['messages'][-10:]
def _save(self):
"""Save session to disk."""
with open(f'sessions/{self.session_id}.json', 'w') as f:
json.dump(self.memory, f, indent=2)
# Usage
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")
# Continue across multiple sessions
response1 = conversation.add_turn("What's Python?")
# ... later session
response2 = conversation.add_turn("Show me an example") # Remembers context
Hybrid Approaches (Best Practice)
Combine multiple techniques for optimal results.
from anthropic import Anthropic
from openai import OpenAI
import chromadb
class HybridMemorySystem:
"""
Combines:
- Rolling summarization (short-term compression)
- RAG retrieval (long-term memory)
- Prompt caching (cost optimization)
- Progressive compression (adaptive behavior)
"""
def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
self.anthropic = anthropic_client
self.openai = openai_client
# Recent messages (verbatim)
self.recent_messages = []
self.recent_window = 10
# Rolling summary
self.rolling_summary = None
# Vector store (long-term)
self.chroma = chromadb.Client()
self.collection = self.chroma.create_collection(name="memory")
self.message_counter = 0
# Compression thresholds
self.thresholds = {
'light': 0.70, # Start basic compression
'medium': 0.85, # Aggressive summarization
'heavy': 0.95 # Emergency measures
}
def add_message(self, message: dict):
"""Add message with intelligent compression."""
self.recent_messages.append(message)
# Check compression needs
usage_ratio = self._estimate_usage()
if usage_ratio >= self.thresholds['heavy']:
self._emergency_compress()
elif usage_ratio >= self.thresholds['medium']:
self._medium_compress()
elif usage_ratio >= self.thresholds['light']:
self._light_compress()
def _light_compress(self):
"""Remove redundancy, archive to vector store."""
if len(self.recent_messages) > self.recent_window * 1.5:
# Archive oldest to vector store
to_archive = self.recent_messages[:5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[5:]
def _medium_compress(self):
"""Generate rolling summary, aggressive archival."""
if len(self.recent_messages) > self.recent_window:
# Summarize older messages
to_summarize = self.recent_messages[:-self.recent_window]
summary_text = "\n\n".join([
f"{msg['role']}: {msg['content']}"
for msg in to_summarize
])
if self.rolling_summary:
summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=400,
messages=[{
'role': 'user',
'content': f"Update summary:\n{summary_text}"
}]
)
self.rolling_summary = response.content[0].text
# Archive all summarized messages
for msg in to_summarize:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-self.recent_window:]
def _emergency_compress(self):
"""Extreme compression for near-limit situations."""
# Keep only 5 most recent messages
to_archive = self.recent_messages[:-5]
for msg in to_archive:
self._archive_to_vectorstore(msg)
self.recent_messages = self.recent_messages[-5:]
# Compress summary further if needed
if self.rolling_summary and len(self.rolling_summary) > 1000:
response = self.anthropic.messages.create(
model="claude-3-5-haiku-20241022",
max_tokens=200,
messages=[{
'role': 'user',
'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
}]
)
self.rolling_summary = response.content[0].text
def _archive_to_vectorstore(self, message: dict):
"""Store in vector database for retrieval."""
embedding_response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=message['content']
)
self.collection.add(
embeddings=[embedding_response.data[0].embedding],
documents=[message['content']],
metadatas=[{'role': message['role']}],
ids=[f"msg_{self.message_counter}"]
)
self.message_counter += 1
def get_context(self, current_query: str, max_tokens: int = 8000):
"""Build optimal context for current query."""
context = []
token_count = 0
# 1. Add rolling summary (if exists)
if self.rolling_summary:
summary_msg = {
'role': 'system',
'content': [
{
'type': 'text',
'text': f"[Conversation Summary]\n{self.rolling_summary}",
'cache_control': {'type': 'ephemeral'} # Cache it
}
]
}
context.append(summary_msg)
token_count += len(self.rolling_summary) // 4
# 2. Retrieve relevant historical context (RAG)
if token_count < max_tokens * 0.3:
query_embedding = self.openai.embeddings.create(
model="text-embedding-3-small",
input=current_query
)
results = self.collection.query(
query_embeddings=[query_embedding.data[0].embedding],
n_results=5
)
for i, doc in enumerate(results['documents'][0]):
if token_count + len(doc) // 4 > max_tokens * 0.3:
break
metadata = results['metadatas'][0][i]
context.append({
'role': metadata['role'],
'content': f"[Retrieved] {doc}"
})
token_count += len(doc) // 4
# 3. Add recent messages verbatim
for msg in self.recent_messages:
if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
break
context.append(msg)
token_count += len(msg['content']) // 4
return context
def _estimate_usage(self):
"""Estimate current context window usage."""
total_tokens = 0
if self.rolling_summary:
total_tokens += len(self.rolling_summary) // 4
for msg in self.recent_messages:
total_tokens += len(msg['content']) // 4
return total_tokens / 200000 # Claude Sonnet context window
# Usage
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")
memory = HybridMemorySystem(anthropic_client, openai_client)
# Add messages over time
for i in range(1000):
memory.add_message({
'role': 'user' if i % 2 == 0 else 'assistant',
'content': f"Message {i} with some content..."
})
# Retrieve optimized context
current_query = "What did we discuss about pricing?"
context = memory.get_context(current_query)
# Use with Claude
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=context + [{
'role': 'user',
'content': current_query
}]
)
Performance Benchmarks
Compression Efficiency
| Technique | Compression Ratio | Quality Loss | Latency | Cost Impact |
|---|---|---|---|---|
| Extractive | 2-3x | <1% | <10ms | None |
| Abstractive | 5-10x | 2-5% | 1-2s | +$0.001/turn |
| Hierarchical | 20x+ | 5-8% | 2-5s | +$0.003/turn |
| LLMLingua | 20x | 1.5% | 500ms | None |
| RAG | Variable | <1% | 100-300ms | +$0.0005/turn |
| Prompt Caching | N/A | 0% | 0ms | -90% |
Token Savings by Use Case
Customer Support (50-turn conversation):
- No compression: ~8,000 tokens/request
- Rolling summary: ~2,000 tokens/request (75% reduction)
- Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)
Code Assistant (100-turn session):
- No compression: ~25,000 tokens/request
- Hierarchical: ~5,000 tokens/request (80% reduction)
- Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)
Educational Tutor (multi-session):
- No compression: Would exceed context window
- RAG + summarization: ~3,000 tokens/request
- Infinite session length enabled
Cost Analysis
Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)
1,000 conversations, 50 turns each:
-
No compression:
- Avg 8K tokens/request à 50K requests = 400M tokens
- Cost: $1,200
-
With rolling summarization:
- Avg 2K tokens/request à 50K requests = 100M tokens
- Summarization overhead: +10M tokens
- Cost: $330 (72% savings)
-
With hybrid system + caching:
- First turn: 2K tokens (no cache)
- Subsequent: 200 tokens effective (90% cache hit)
- Total: ~15M tokens effective
- Cost: $45 (96% savings)
Tool Recommendations
Memory Management Tools
Mem0 (Recommended for Production)
Best for: Hybrid memory systems with minimal code
from mem0 import MemoryClient
client = MemoryClient(api_key="your-mem0-key")
# Automatically handles compression, summarization, RAG
memory = client.create_memory(
user_id="user123",
messages=[
{"role": "user", "content": "I'm working on a Python project"},
{"role": "assistant", "content": "Great! What kind of project?"}
]
)
# Retrieve relevant context
context = client.get_memory(
user_id="user123",
query="What programming language am I using?"
)
Features:
- Automatic hierarchical summarization
- Built-in RAG retrieval
- Multi-user session management
- Analytics dashboard
Pricing: $0.40/1K memory operations
Zep
Best for: Low-latency production deployments**
from zep_python import ZepClient
client = ZepClient(api_key="your-zep-key")
# Add to session
client.memory.add_memory(
session_id="session123",
messages=[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
)
# Auto-summarized retrieval
memory = client.memory.get_memory(session_id="session123")
Features:
- <100ms retrieval latency
- Automatic fact extraction
- Entity recognition
- Session management
Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)
ChromaDB
Best for: Self-hosted vector storage**
import chromadb
client = chromadb.Client()
collection = client.create_collection("conversations")
# Store embeddings
collection.add(
documents=["Message content"],
embeddings=[[0.1, 0.2, ...]],
ids=["msg1"]
)
# Retrieve
results = collection.query(
query_embeddings=[[0.1, 0.2, ...]],
n_results=5
)
Features:
- Fully open-source
- Embedded or client-server
- Fast local development
Pricing: Free (self-hosted)
LangChain
Best for: Rapid prototyping and experimentation**
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)
Features:
- Multiple memory types
- Framework integration
- Extensive documentation
Pricing: Free (uses your LLM API costs)
Compression Libraries
LLMLingua
Best for: Extreme compression with minimal quality loss**
from llmlingua import PromptCompressor
compressor = PromptCompressor()
compressed = compressor.compress_prompt(
context="Long conversation history...",
instruction="Current user query",
target_token=500
)
# Achieves 20x compression with 1.5% accuracy loss
Features:
- 20x compression ratios
- <2% quality degradation
- Fast inference (<500ms)
Pricing: Free (open-source)
Use Cases and Patterns
Chatbot (Customer Support)
Requirements:
- Multi-turn conversations (50-100 turns)
- Preserve customer context
- Fast response times
- Cost-efficient
Recommended approach:
- ConversationSummaryBufferMemory (LangChain)
- 70% threshold: Semantic deduplication
- 85% threshold: Rolling summarization
- Prompt caching for frequent patterns
Implementation:
from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-haiku-20241022")
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=2000,
return_messages=True
)
# Add customer conversation
for turn in customer_conversation:
memory.save_context(
{"input": turn['customer_message']},
{"output": turn['agent_response']}
)
# Retrieve compressed context
context = memory.load_memory_variables({})
Code Assistant
Requirements:
- Long development sessions (100+ turns)
- Preserve technical details
- Handle large code blocks
- Track incremental changes
Recommended approach:
- Hierarchical summarization for overall context
- RAG retrieval for specific code references
- Delta compression for iterative edits
- Prompt caching for system prompts
Implementation:
from anthropic import Anthropic
client = Anthropic(api_key="your-api-key")
class CodeAssistantMemory:
def __init__(self):
self.hierarchy = HierarchicalMemory(client, chunk_size=15)
self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
self.deltas = DeltaCompressor()
def add_interaction(self, code_change: dict):
# Store in hierarchy
self.hierarchy.add_message({
'role': 'user',
'content': code_change['description']
})
# Store in RAG for retrieval
self.rag.add_message(code_change)
# Store as delta if incremental
if code_change.get('is_incremental'):
self.deltas.add_message(code_change)
def get_context(self, current_query: str):
# Combine hierarchical summary + RAG retrieval
summary_context = self.hierarchy.get_context(max_tokens=2000)
rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)
return summary_context + rag_context
Educational Tutor
Requirements:
- Multi-session tracking
- Student progress persistence
- Personalized context retrieval
- Long-term knowledge retention
Recommended approach:
- VectorStoreRetrieverMemory for multi-session
- Fact extraction for student knowledge
- Progressive compression across sessions
- Resumable conversations
Implementation:
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class TutorMemory:
def __init__(self, student_id: str):
self.student_id = student_id
# Vector store for all sessions
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name=f"student_{student_id}",
embedding_function=embeddings
)
self.memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
)
def add_lesson_content(self, lesson: dict):
"""Add lesson interaction to student memory."""
self.memory.save_context(
{"input": lesson['topic']},
{"output": lesson['explanation']}
)
def get_student_context(self, current_topic: str):
"""Retrieve relevant past lessons for current topic."""
return self.memory.load_memory_variables({
"prompt": current_topic
})
Best Practices
1. Choose the Right Technique for Your Use Case
- Short conversations (<20 turns): No compression needed
- Medium conversations (20-50 turns): ConversationSummaryBufferMemory
- Long conversations (50-100 turns): Hierarchical or rolling summarization
- Very long (100+ turns): Hybrid (RAG + summarization + caching)
- Multi-session: VectorStoreRetrieverMemory or Mem0
2. Implement Progressive Compression
Don’t compress aggressively from the start. Use thresholds:
- 0-70%: Store verbatim
- 70-85%: Light compression (deduplication)
- 85-95%: Medium compression (summarization)
- 95-100%: Aggressive compression (hierarchical)
3. Combine Techniques
Single-technique approaches are suboptimal. Best production systems use:
- Rolling summarization (short-term)
- RAG retrieval (long-term)
- Prompt caching (cost optimization)
- Semantic deduplication (redundancy removal)
4. Monitor Quality Metrics
Track compression impact:
- Response relevance score
- Information retention rate
- User satisfaction metrics
- Token usage reduction
5. Use Prompt Caching Strategically
Cache stable content:
- Conversation summaries
- System prompts
- Knowledge base context
- User profiles
Don’t cache frequently changing content:
- Current user query
- Real-time data
- Session-specific state
6. Implement Checkpointing
Save compression state for:
- Recovery from failures
- Multi-session continuity
- Analytics and debugging
- A/B testing different strategies
7. Tune Compression Parameters
Test and optimize:
- Summary token limits
- Compression thresholds
- Retrieval result counts
- Cache TTLs
- Chunk sizes for hierarchical
8. Handle Edge Cases
Plan for:
- Very long messages (split or compress individually)
- Code blocks (preserve formatting)
- Multi-language content
- Rapidly changing context
Troubleshooting
Problem: Summary loses critical information
Solutions:
- Lower compression ratio (less aggressive)
- Implement importance scoring to preserve key messages
- Use extractive summarization for critical sections
- Increase summary token budget
Problem: Retrieval returns irrelevant context
Solutions:
- Improve embedding model quality
- Add metadata filtering (timestamps, topics)
- Adjust similarity threshold
- Use hybrid search (semantic + keyword)
Problem: High latency from compression
Solutions:
- Compress asynchronously (background tasks)
- Use faster models for summarization (Haiku instead of Sonnet)
- Cache summaries more aggressively
- Reduce compression frequency
Problem: Conversations still exceeding context window
Solutions:
- Implement hierarchical compression
- Archive to vector database more aggressively
- Use more aggressive compression ratios
- Consider switching to model with larger context window
Problem: High costs despite compression
Solutions:
- Implement prompt caching
- Use cheaper models for summarization (Haiku)
- Batch summarization operations
- Reduce summarization frequency
Problem: Lost conversation continuity
Solutions:
- Increase recent message window
- Include summary in every request
- Use more descriptive summaries
- Implement session resumption with context injection
Advanced Topics
Streaming Compression
Compress in real-time as conversation progresses:
async def streaming_compress(messages: list):
"""Compress while streaming responses."""
compressor = ProgressiveCompressor()
async for message in conversation_stream:
compressor.add_message(message)
# Compression happens asynchronously
if compressor.should_compress():
asyncio.create_task(compressor.compress_async())
return compressor.get_context()
Multi-User Session Management
Handle concurrent conversations with shared context:
class MultiUserMemory:
def __init__(self):
self.user_sessions = {}
def get_or_create_session(self, user_id: str):
if user_id not in self.user_sessions:
self.user_sessions[user_id] = HybridMemorySystem(...)
return self.user_sessions[user_id]
def cleanup_inactive_sessions(self, timeout: int = 3600):
"""Remove sessions inactive for > timeout seconds."""
current_time = time.time()
inactive = [
user_id for user_id, session in self.user_sessions.items()
if current_time - session.last_activity > timeout
]
for user_id in inactive:
self._archive_session(user_id)
del self.user_sessions[user_id]
Custom Importance Scoring
Train ML models to score message importance:
from transformers import pipeline
class MLImportanceScorer:
def __init__(self):
# Use pre-trained classifier or fine-tune on your data
self.classifier = pipeline(
"text-classification",
model="your-importance-model"
)
def score(self, message: dict) -> float:
"""Score message importance (0-1)."""
result = self.classifier(message['content'])
return result[0]['score']
Context Window Utilization Optimization
Maximize information density within token budget:
def optimize_context_allocation(
summary_tokens: int,
recent_tokens: int,
retrieval_tokens: int,
max_tokens: int
):
"""
Optimal allocation (empirically tested):
- 20% summary
- 50% recent messages
- 30% retrieved context
"""
return {
'summary': int(max_tokens * 0.20),
'recent': int(max_tokens * 0.50),
'retrieval': int(max_tokens * 0.30)
}
Future Directions
Emerging Techniques (2025+)
1. Infinite Attention Mechanisms
- Models with >10M token context windows (Gemini 1.5, future Claude)
- Reduces need for compression but doesn’t eliminate cost concerns
2. Learned Compression Models
- Neural networks trained to compress conversation optimally
- Maintain semantic meaning while minimizing tokens
- Examples: LLMLingua v2, PromptCompressor
3. Multimodal Session Compression
- Compress conversations with images, audio, video
- Maintain cross-modal context relationships
4. Federated Memory Systems
- Distributed compression across multiple memory stores
- Privacy-preserving compression for sensitive conversations
5. Adaptive Compression Strategies
- RL-based systems that learn optimal compression per user/domain
- Dynamic threshold adjustment based on conversation importance
References
Academic Papers
- “Recursively Summarizing Enables Long-Term Dialogue Memory” (arXiv:2308.15022)
- “LLMLingua: Compressing Prompts for Accelerated Inference” (arXiv:2310.05736)
- “Lost in the Middle: How Language Models Use Long Contexts” (arXiv:2307.03172)
Documentation
Tools
- Mem0 – Managed memory service
- Zep – Fast memory layer
- LLMLingua – Prompt compression
- ChromaDB – Vector database
Last Updated: 2025-11-30 Version: 1.0.0 License: MIT