session-compression

📁 bobmatnyc/claude-mpm-skills 📅 Jan 23, 2026
48
总安装量
48
周安装量
#4446
全站排名
安装命令
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill session-compression

Agent 安装分布

claude-code 32
gemini-cli 29
opencode 27
antigravity 25
cursor 21

Skill 文档

AI Session Compression Techniques

Summary

Compress long AI conversations to fit context windows while preserving critical information.

Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.

Key Benefits:

  • Cost Reduction: 80-90% token cost savings through hierarchical memory
  • Performance: 2x faster responses with compressed context
  • Scalability: Handle conversations exceeding 1M tokens
  • Quality: Preserve critical information with <2% accuracy loss

When to Use

Use session compression when:

  • Multi-turn conversations approach context window limits (>50% capacity)
  • Long-running chat sessions (customer support, tutoring, code assistants)
  • Token costs become significant (high-volume applications)
  • Response latency increases due to large context
  • Managing conversation history across multiple sessions

Don’t use when:

  • Short conversations (<10 turns) fitting easily in context
  • Every detail must be preserved verbatim (legal, compliance)
  • Single-turn or stateless interactions
  • Context window usage is <30%

Ideal scenarios:

  • Chatbots with 50+ turn conversations
  • AI code assistants tracking long development sessions
  • Customer support with multi-session ticket history
  • Educational tutors with student progress tracking
  • Multi-day collaborative AI workflows

Quick Start

Basic Setup with LangChain

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic

# Initialize Claude client
llm = ChatAnthropic(
    model="claude-3-5-sonnet-20241022",
    api_key="your-api-key"
)

# Setup memory with automatic summarization
memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding this
    return_messages=True
)

# Add conversation turns
memory.save_context(
    {"input": "What's session compression?"},
    {"output": "Session compression reduces conversation token usage..."}
)

# Retrieve compressed context
context = memory.load_memory_variables({})

Progressive Compression Pattern

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class ProgressiveCompressor:
    def __init__(self, thresholds=[0.70, 0.85, 0.95]):
        self.thresholds = thresholds
        self.messages = []
        self.max_tokens = 200000  # Claude context window

    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})

        # Check if compression needed
        current_usage = self._estimate_tokens()
        usage_ratio = current_usage / self.max_tokens

        if usage_ratio >= self.thresholds[0]:
            self._compress(level=self._get_compression_level(usage_ratio))

    def _estimate_tokens(self):
        return sum(len(m["content"]) // 4 for m in self.messages)

    def _get_compression_level(self, ratio):
        for i, threshold in enumerate(self.thresholds):
            if ratio < threshold:
                return i
        return len(self.thresholds)

    def _compress(self, level: int):
        """Apply compression based on severity level."""
        if level == 1:  # 70% threshold: Light compression
            self._remove_redundant_messages()
        elif level == 2:  # 85% threshold: Medium compression
            self._summarize_old_messages(keep_recent=10)
        else:  # 95% threshold: Aggressive compression
            self._summarize_old_messages(keep_recent=5)

    def _remove_redundant_messages(self):
        """Remove duplicate or low-value messages."""
        # Implementation: Use semantic deduplication
        pass

    def _summarize_old_messages(self, keep_recent: int):
        """Summarize older messages, keep recent ones verbatim."""
        if len(self.messages) <= keep_recent:
            return

        # Messages to summarize
        to_summarize = self.messages[:-keep_recent]
        recent = self.messages[-keep_recent:]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{m['role'].upper()}: {m['content']}"
            for m in to_summarize
        ])

        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Replace old messages with summary
        summary = {
            "role": "system",
            "content": f"[Summary]\n{response.content[0].text}"
        }
        self.messages = [summary] + recent

# Usage
compressor = ProgressiveCompressor()

for i in range(100):
    compressor.add_message("user", f"Message {i}")
    compressor.add_message("assistant", f"Response {i}")

Using Anthropic Prompt Caching (90% Cost Reduction)

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Build context with cache control
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "Long conversation context here...",
                "cache_control": {"type": "ephemeral"}  # Cache this
            }
        ]
    },
    {
        "role": "assistant",
        "content": "Previous response..."
    },
    {
        "role": "user",
        "content": "New question"  # Not cached, changes frequently
    }
]

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=messages
)

# Cache hit reduces costs by 90% for cached content

Core Concepts

Context Windows and Token Limits

Context window: Maximum tokens an LLM can process in a single request (input + output).

Current limits (2025):

  • Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
  • GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
  • Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)

Token estimation:

  • English: ~4 characters per token
  • Code: ~3 characters per token
  • Rule of thumb: 1 token ≈ 0.75 words

Why compression matters:

  • Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
  • Latency: Larger contexts increase processing time
  • Quality: Excessive context can dilute attention on relevant information

Compression Ratios

Compression ratio = Original tokens / Compressed tokens

Industry benchmarks:

  • Extractive summarization: 2-3x
  • Abstractive summarization: 5-10x
  • Hierarchical summarization: 20x+
  • LLMLingua (prompt compression): 20x with 1.5% accuracy loss
  • KVzip (KV cache compression): 3-4x with 2x speed improvement

Target ratios by use case:

  • Customer support: 5-7x (preserve details)
  • General chat: 8-12x (balance quality/efficiency)
  • Code assistants: 3-5x (preserve technical accuracy)
  • Long documents: 15-20x (extract key insights)

Progressive Compression Thresholds

Industry standard pattern:

Context Usage    Action                     Technique
─────────────────────────────────────────────────────────
0-70%           No compression             Store verbatim
70-85%          Light compression          Remove redundancy
85-95%          Medium compression         Summarize old messages
95-100%         Aggressive compression     Hierarchical + RAG

Implementation guidelines:

  • 70% threshold: Remove duplicate/redundant messages, semantic deduplication
  • 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
  • 95% threshold: Multi-level hierarchical summarization + vector store archival
  • Emergency (100%): Drop least important messages, aggressive summarization

Compression Techniques

1. Summarization Techniques

1.1 Extractive Summarization

Selects key sentences/phrases without modification.

Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression

from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

def extractive_compress(messages: list, compression_ratio: float = 0.3):
    """Extract most important messages using TF-IDF scoring."""
    texts = [msg['content'] for msg in messages]

    # Calculate TF-IDF scores
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform(texts)
    scores = np.array(tfidf_matrix.sum(axis=1)).flatten()

    # Select top messages
    n_keep = max(1, int(len(messages) * compression_ratio))
    top_indices = sorted(np.argsort(scores)[-n_keep:])

    return [messages[i] for i in top_indices]

1.2 Abstractive Summarization

Uses LLMs to semantically condense conversation history.

Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity

from anthropic import Anthropic

def abstractive_compress(messages: list, client: Anthropic):
    """Generate semantic summary using Claude."""
    conversation_text = "\n\n".join([
        f"{msg['role'].upper()}: {msg['content']}"
        for msg in messages
    ])

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items

Conversation:
{conversation_text}

Summary (aim for 1/5 the original length):"""
        }]
    )

    return {
        "role": "assistant",
        "content": f"[Summary]\n{response.content[0].text}"
    }

1.3 Hierarchical Summarization (Multi-Level)

Creates summaries of summaries in a tree structure.

Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications

Architecture:

Level 0 (Raw):    [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk):  [Summary1-2]  [Summary3-4]  [Summary5-6]  [Summary7-8]
Level 2 (Group):  [Summary1-4]              [Summary5-8]
Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic
from typing import List, Dict

class HierarchicalMemory:
    def __init__(self, client: Anthropic, chunk_size: int = 10):
        self.client = client
        self.chunk_size = chunk_size
        self.levels: List[List[Dict]] = [[]]  # Level 0 = raw messages

    def add_message(self, message: Dict):
        """Add message and trigger summarization if needed."""
        self.levels[0].append(message)

        if len(self.levels[0]) >= self.chunk_size * 2:
            self._summarize_level(0)

    def _summarize_level(self, level: int):
        """Summarize a level into the next higher level."""
        messages = self.levels[level]

        # Ensure next level exists
        while len(self.levels) <= level + 1:
            self.levels.append([])

        # Summarize first chunk
        chunk = messages[:self.chunk_size]
        summary = self._generate_summary(chunk, level)

        # Move to next level
        self.levels[level + 1].append(summary)
        self.levels[level] = messages[self.chunk_size:]

        # Recursively check if next level needs summarization
        if len(self.levels[level + 1]) >= self.chunk_size * 2:
            self._summarize_level(level + 1)

    def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
        """Generate summary for a chunk."""
        conversation_text = "\n\n".join([
            f"{msg['role'].upper()}: {msg['content']}"
            for msg in messages
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
            }]
        )

        return {
            "role": "system",
            "content": f"[L{level+1} Summary] {response.content[0].text}",
            "level": level + 1
        }

    def get_context(self, max_tokens: int = 4000) -> List[Dict]:
        """Retrieve context within token budget."""
        context = []
        token_count = 0

        # Prioritize recent raw messages
        for msg in reversed(self.levels[0]):
            msg_tokens = len(msg['content']) // 4
            if token_count + msg_tokens > max_tokens * 0.6:
                break
            context.insert(0, msg)
            token_count += msg_tokens

        # Add summaries from higher levels
        for level in range(1, len(self.levels)):
            for summary in self.levels[level]:
                summary_tokens = len(summary['content']) // 4
                if token_count + summary_tokens > max_tokens:
                    break
                context.insert(0, summary)
                token_count += summary_tokens

        return context

Academic reference: “Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models” (arXiv:2308.15022)

1.4 Rolling Summarization (Continuous)

Continuously compresses conversation with sliding window.

Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations

from anthropic import Anthropic

class RollingMemory:
    def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
        self.client = client
        self.window_size = window_size
        self.compress_threshold = compress_threshold
        self.rolling_summary = None
        self.recent_messages = []

    def add_message(self, message: dict):
        self.recent_messages.append(message)

        if len(self.recent_messages) >= self.compress_threshold:
            self._compress()

    def _compress(self):
        """Compress older messages into rolling summary."""
        messages_to_compress = self.recent_messages[:-self.window_size]

        parts = []
        if self.rolling_summary:
            parts.append(f"Existing summary:\n{self.rolling_summary}")

        parts.append("\nNew messages:\n" + "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages_to_compress
        ]))

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": "\n".join(parts) + "\n\nUpdate the summary:"
            }]
        )

        self.rolling_summary = response.content[0].text
        self.recent_messages = self.recent_messages[-self.window_size:]

    def get_context(self):
        context = []
        if self.rolling_summary:
            context.append({
                "role": "system",
                "content": f"[Summary]\n{self.rolling_summary}"
            })
        context.extend(self.recent_messages)
        return context

2. Embedding-Based Approaches

2.1 RAG (Retrieval-Augmented Generation)

Store full conversation in vector database, retrieve only relevant chunks.

Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history

from anthropic import Anthropic
from openai import OpenAI
import chromadb

class RAGMemory:
    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Initialize vector store
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(
            name="conversation",
            metadata={"hnsw:space": "cosine"}
        )

        self.recent_messages = []
        self.recent_window = 5
        self.message_counter = 0

    def add_message(self, message: dict):
        """Add to recent memory and vector store."""
        self.recent_messages.append(message)

        if len(self.recent_messages) > self.recent_window:
            old_msg = self.recent_messages.pop(0)
            self._store_in_vectordb(old_msg)

    def _store_in_vectordb(self, message: dict):
        """Archive to vector database."""
        # Generate embedding
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{"role": message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def retrieve_context(self, query: str, max_tokens: int = 4000):
        """Retrieve relevant context using RAG."""
        context = []
        token_count = 0

        # 1. Recent messages (short-term memory)
        for msg in self.recent_messages:
            context.append(msg)
            token_count += len(msg['content']) // 4

        # 2. Retrieve relevant historical context
        if token_count < max_tokens:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=query
            )

            n_results = min(10, (max_tokens - token_count) // 100)
            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=n_results
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens:
                    break

                metadata = results['metadatas'][0][i]
                context.insert(0, {
                    "role": metadata['role'],
                    "content": f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        return context

Vector database options:

  • ChromaDB: Embedded, easy local development
  • Pinecone: Managed, 50ms p95 latency
  • Weaviate: Open-source, hybrid search
  • Qdrant: High performance, payload filtering

2.2 Vector Search and Clustering

Group similar messages into clusters, represent with centroids.

Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries

from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np

class ClusteredMemory:
    def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
        self.client = openai_client
        self.n_clusters = n_clusters
        self.messages = []
        self.embeddings = []

    def add_messages(self, messages: list):
        for msg in messages:
            self.messages.append(msg)

            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            self.embeddings.append(response.data[0].embedding)

    def compress_by_clustering(self):
        """Cluster messages and return representatives."""
        if len(self.messages) < self.n_clusters:
            return self.messages

        embeddings_array = np.array(self.embeddings)
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings_array)

        # Select message closest to each centroid
        compressed = []
        for cluster_id in range(self.n_clusters):
            cluster_indices = np.where(labels == cluster_id)[0]
            centroid = kmeans.cluster_centers_[cluster_id]
            cluster_embeddings = embeddings_array[cluster_indices]
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            closest_idx = cluster_indices[np.argmin(distances)]

            compressed.append({
                **self.messages[closest_idx],
                "cluster_id": int(cluster_id),
                "cluster_size": len(cluster_indices)
            })

        return compressed

2.3 Semantic Deduplication

Remove semantically similar messages that convey redundant information.

Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations

from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticDeduplicator:
    def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
        self.client = openai_client
        self.threshold = similarity_threshold

    def deduplicate(self, messages: list):
        """Remove semantically similar messages."""
        if len(messages) <= 1:
            return messages

        # Generate embeddings
        embeddings = []
        for msg in messages:
            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            embeddings.append(response.data[0].embedding)

        embeddings_array = np.array(embeddings)
        similarity_matrix = cosine_similarity(embeddings_array)

        # Mark unique messages
        keep_indices = []
        for i in range(len(messages)):
            is_unique = True
            for j in keep_indices:
                if similarity_matrix[i][j] > self.threshold:
                    is_unique = False
                    break

            if is_unique:
                keep_indices.append(i)

        return [messages[i] for i in keep_indices]

3. Token-Efficient Strategies

3.1 Message Prioritization

Assign importance scores and retain only high-priority content.

Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise

import re

class MessagePrioritizer:
    def score_message(self, msg: dict, index: int, total: int) -> float:
        """Calculate composite importance score."""
        scores = []

        # Length score (longer = more info)
        scores.append(min(len(msg['content']) / 500, 1.0))

        # Question score
        if msg['role'] == 'user':
            scores.append(min(msg['content'].count('?') * 0.5, 1.0))

        # Entity score (capitalized words)
        entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
        scores.append(min(entities / 10, 1.0))

        # Recency score (linear decay)
        scores.append(index / max(total - 1, 1))

        # Role score
        scores.append(0.6 if msg['role'] == 'user' else 0.4)

        return sum(scores) / len(scores)

    def prioritize(self, messages: list, target_count: int):
        """Select top N messages by priority."""
        scored = [
            (msg, self.score_message(msg, i, len(messages)), i)
            for i, msg in enumerate(messages)
        ]

        scored.sort(key=lambda x: x[1], reverse=True)
        top_messages = scored[:target_count]
        top_messages.sort(key=lambda x: x[2])  # Restore chronological order

        return [msg for msg, score, idx in top_messages]

3.2 Delta Compression

Store only changes between consecutive messages.

Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits

import difflib

class DeltaCompressor:
    def __init__(self):
        self.base_messages = []
        self.deltas = []

    def add_message(self, message: dict):
        if not self.base_messages:
            self.base_messages.append(message)
            return

        # Find most similar previous message
        last_msg = self.base_messages[-1]

        if last_msg['role'] == message['role']:
            # Calculate delta
            diff = list(difflib.unified_diff(
                last_msg['content'].splitlines(),
                message['content'].splitlines(),
                lineterm=''
            ))

            if len('\n'.join(diff)) < len(message['content']) * 0.7:
                # Store as delta if compression achieved
                self.deltas.append({
                    'base_index': len(self.base_messages) - 1,
                    'delta': diff,
                    'role': message['role']
                })
                return

        # Store as new base message
        self.base_messages.append(message)

    def reconstruct(self):
        """Reconstruct full conversation from bases + deltas."""
        messages = self.base_messages.copy()

        for delta_info in self.deltas:
            base_content = messages[delta_info['base_index']]['content']
            # Apply diff to reconstruct (simplified)
            reconstructed = base_content  # Full implementation would apply diff
            messages.append({
                'role': delta_info['role'],
                'content': reconstructed
            })

        return messages

4. LangChain Memory Types

4.1 ConversationSummaryMemory

Automatically summarizes conversation as it progresses.

from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryMemory(llm=llm)

# Add conversation
memory.save_context(
    {"input": "Hi, I'm working on a Python project"},
    {"output": "Great! How can I help with your Python project?"}
)

# Get summary
summary = memory.load_memory_variables({})
print(summary['history'])

Pros: Automatic summarization, simple API Cons: Every turn triggers LLM call Best for: Medium conversations (20-50 turns)

4.2 ConversationSummaryBufferMemory

Hybrid: Recent messages verbatim, older summarized.

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding
    return_messages=True
)

# Add conversation
for i in range(50):
    memory.save_context(
        {"input": f"Question {i}"},
        {"output": f"Answer {i}"}
    )

# Automatically keeps recent messages + summary of old
context = memory.load_memory_variables({})

Pros: Best balance of detail and compression Cons: Requires token limit tuning Best for: Most production applications

4.3 ConversationTokenBufferMemory

Maintains fixed token budget, drops oldest when exceeded.

from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=2000
)

# Simple FIFO when token limit exceeded

Pros: Predictable token usage, simple Cons: Loses old information completely Best for: Real-time chat with strict limits

4.4 VectorStoreRetrieverMemory

Stores all messages in vector database, retrieves relevant ones.

from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

# Automatically retrieves most relevant context

Pros: Infinite conversation length, semantic retrieval Cons: Requires vector DB, retrieval overhead Best for: Long-running conversations, knowledge bases

5. Anthropic-Specific Patterns

5.1 Prompt Caching (90% Cost Reduction)

Cache static context to reduce token costs.

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Long conversation context
conversation_history = [
    {"role": "user", "content": "Message 1"},
    {"role": "assistant", "content": "Response 1"},
    # ... many more messages
]

# Mark context for caching
messages = []
for i, msg in enumerate(conversation_history[:-1]):
    content = msg['content']

    # Add cache control to last context message
    if i == len(conversation_history) - 2:
        messages.append({
            "role": msg['role'],
            "content": [
                {
                    "type": "text",
                    "text": content,
                    "cache_control": {"type": "ephemeral"}
                }
            ]
        })
    else:
        messages.append(msg)

# Add new user message (not cached)
messages.append(conversation_history[-1])

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=messages
)

# Subsequent calls with same cached context cost 90% less

Cache TTL: 5 minutes Savings: 90% cost reduction for cached tokens Limits: Max 4 cache breakpoints per request Best practices:

  • Cache conversation history, not current query
  • Update cache when context changes significantly
  • Combine with summarization for maximum efficiency

5.2 Extended Thinking for Compression Planning

Use extended thinking to plan optimal compression strategy.

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": f"""Analyze this conversation and recommend compression:

{conversation_text}

Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x

Recommend optimal strategy."""
    }]
)

# Access thinking process
thinking_content = [
    block for block in response.content
    if block.type == "thinking"
]

# Get compression recommendation
recommendation = response.content[-1].text

Production Patterns

Checkpointing and Persistence

Save compression state for recovery and resume.

import json
import pickle
from pathlib import Path

class PersistentMemory:
    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        self.memory = []
        self.summary = None

    def save_checkpoint(self, session_id: str):
        """Save current memory state."""
        checkpoint = {
            'messages': self.memory,
            'summary': self.summary,
            'timestamp': time.time()
        }

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)

    def load_checkpoint(self, session_id: str):
        """Load memory state from checkpoint."""
        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        if checkpoint_file.exists():
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            self.memory = checkpoint['messages']
            self.summary = checkpoint.get('summary')
            return True

        return False

    def auto_checkpoint(self, session_id: str, interval: int = 10):
        """Automatically save every N messages."""
        if len(self.memory) % interval == 0:
            self.save_checkpoint(session_id)

Resume Workflows

Continue conversations across sessions.

from anthropic import Anthropic
import json

class ResumableConversation:
    def __init__(self, client: Anthropic, session_id: str):
        self.client = client
        self.session_id = session_id
        self.memory = self._load_or_create()

    def _load_or_create(self):
        """Load existing session or create new."""
        try:
            with open(f'sessions/{self.session_id}.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {
                'messages': [],
                'summary': None,
                'created_at': time.time()
            }

    def add_turn(self, user_message: str):
        """Add user message and get response."""
        # Add user message
        self.memory['messages'].append({
            'role': 'user',
            'content': user_message
        })

        # Build context (with compression)
        context = self._build_context()

        # Get response
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=context + [{
                'role': 'user',
                'content': user_message
            }]
        )

        # Save response
        assistant_message = response.content[0].text
        self.memory['messages'].append({
            'role': 'assistant',
            'content': assistant_message
        })

        # Compress if needed
        if len(self.memory['messages']) > 20:
            self._compress()

        # Save state
        self._save()

        return assistant_message

    def _build_context(self):
        """Build context with compression."""
        context = []

        # Add summary if exists
        if self.memory['summary']:
            context.append({
                'role': 'system',
                'content': f"[Previous conversation summary]\n{self.memory['summary']}"
            })

        # Add recent messages
        context.extend(self.memory['messages'][-10:])

        return context

    def _compress(self):
        """Compress older messages."""
        if len(self.memory['messages']) < 15:
            return

        # Messages to summarize
        to_summarize = self.memory['messages'][:-10]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in to_summarize
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                'role': 'user',
                'content': f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Update memory
        self.memory['summary'] = response.content[0].text
        self.memory['messages'] = self.memory['messages'][-10:]

    def _save(self):
        """Save session to disk."""
        with open(f'sessions/{self.session_id}.json', 'w') as f:
            json.dump(self.memory, f, indent=2)

# Usage
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")

# Continue across multiple sessions
response1 = conversation.add_turn("What's Python?")
# ... later session
response2 = conversation.add_turn("Show me an example")  # Remembers context

Hybrid Approaches (Best Practice)

Combine multiple techniques for optimal results.

from anthropic import Anthropic
from openai import OpenAI
import chromadb

class HybridMemorySystem:
    """
    Combines:
    - Rolling summarization (short-term compression)
    - RAG retrieval (long-term memory)
    - Prompt caching (cost optimization)
    - Progressive compression (adaptive behavior)
    """

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Recent messages (verbatim)
        self.recent_messages = []
        self.recent_window = 10

        # Rolling summary
        self.rolling_summary = None

        # Vector store (long-term)
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(name="memory")
        self.message_counter = 0

        # Compression thresholds
        self.thresholds = {
            'light': 0.70,    # Start basic compression
            'medium': 0.85,   # Aggressive summarization
            'heavy': 0.95     # Emergency measures
        }

    def add_message(self, message: dict):
        """Add message with intelligent compression."""
        self.recent_messages.append(message)

        # Check compression needs
        usage_ratio = self._estimate_usage()

        if usage_ratio >= self.thresholds['heavy']:
            self._emergency_compress()
        elif usage_ratio >= self.thresholds['medium']:
            self._medium_compress()
        elif usage_ratio >= self.thresholds['light']:
            self._light_compress()

    def _light_compress(self):
        """Remove redundancy, archive to vector store."""
        if len(self.recent_messages) > self.recent_window * 1.5:
            # Archive oldest to vector store
            to_archive = self.recent_messages[:5]
            for msg in to_archive:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[5:]

    def _medium_compress(self):
        """Generate rolling summary, aggressive archival."""
        if len(self.recent_messages) > self.recent_window:
            # Summarize older messages
            to_summarize = self.recent_messages[:-self.recent_window]

            summary_text = "\n\n".join([
                f"{msg['role']}: {msg['content']}"
                for msg in to_summarize
            ])

            if self.rolling_summary:
                summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"

            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=400,
                messages=[{
                    'role': 'user',
                    'content': f"Update summary:\n{summary_text}"
                }]
            )

            self.rolling_summary = response.content[0].text

            # Archive all summarized messages
            for msg in to_summarize:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[-self.recent_window:]

    def _emergency_compress(self):
        """Extreme compression for near-limit situations."""
        # Keep only 5 most recent messages
        to_archive = self.recent_messages[:-5]
        for msg in to_archive:
            self._archive_to_vectorstore(msg)

        self.recent_messages = self.recent_messages[-5:]

        # Compress summary further if needed
        if self.rolling_summary and len(self.rolling_summary) > 1000:
            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=200,
                messages=[{
                    'role': 'user',
                    'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
                }]
            )
            self.rolling_summary = response.content[0].text

    def _archive_to_vectorstore(self, message: dict):
        """Store in vector database for retrieval."""
        embedding_response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[embedding_response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{'role': message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def get_context(self, current_query: str, max_tokens: int = 8000):
        """Build optimal context for current query."""
        context = []
        token_count = 0

        # 1. Add rolling summary (if exists)
        if self.rolling_summary:
            summary_msg = {
                'role': 'system',
                'content': [
                    {
                        'type': 'text',
                        'text': f"[Conversation Summary]\n{self.rolling_summary}",
                        'cache_control': {'type': 'ephemeral'}  # Cache it
                    }
                ]
            }
            context.append(summary_msg)
            token_count += len(self.rolling_summary) // 4

        # 2. Retrieve relevant historical context (RAG)
        if token_count < max_tokens * 0.3:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=current_query
            )

            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=5
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens * 0.3:
                    break

                metadata = results['metadatas'][0][i]
                context.append({
                    'role': metadata['role'],
                    'content': f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        # 3. Add recent messages verbatim
        for msg in self.recent_messages:
            if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
                break
            context.append(msg)
            token_count += len(msg['content']) // 4

        return context

    def _estimate_usage(self):
        """Estimate current context window usage."""
        total_tokens = 0

        if self.rolling_summary:
            total_tokens += len(self.rolling_summary) // 4

        for msg in self.recent_messages:
            total_tokens += len(msg['content']) // 4

        return total_tokens / 200000  # Claude Sonnet context window

# Usage
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")

memory = HybridMemorySystem(anthropic_client, openai_client)

# Add messages over time
for i in range(1000):
    memory.add_message({
        'role': 'user' if i % 2 == 0 else 'assistant',
        'content': f"Message {i} with some content..."
    })

# Retrieve optimized context
current_query = "What did we discuss about pricing?"
context = memory.get_context(current_query)

# Use with Claude
response = anthropic_client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=context + [{
        'role': 'user',
        'content': current_query
    }]
)

Performance Benchmarks

Compression Efficiency

Technique Compression Ratio Quality Loss Latency Cost Impact
Extractive 2-3x <1% <10ms None
Abstractive 5-10x 2-5% 1-2s +$0.001/turn
Hierarchical 20x+ 5-8% 2-5s +$0.003/turn
LLMLingua 20x 1.5% 500ms None
RAG Variable <1% 100-300ms +$0.0005/turn
Prompt Caching N/A 0% 0ms -90%

Token Savings by Use Case

Customer Support (50-turn conversation):

  • No compression: ~8,000 tokens/request
  • Rolling summary: ~2,000 tokens/request (75% reduction)
  • Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)

Code Assistant (100-turn session):

  • No compression: ~25,000 tokens/request
  • Hierarchical: ~5,000 tokens/request (80% reduction)
  • Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)

Educational Tutor (multi-session):

  • No compression: Would exceed context window
  • RAG + summarization: ~3,000 tokens/request
  • Infinite session length enabled

Cost Analysis

Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)

1,000 conversations, 50 turns each:

  • No compression:

    • Avg 8K tokens/request × 50K requests = 400M tokens
    • Cost: $1,200
  • With rolling summarization:

    • Avg 2K tokens/request × 50K requests = 100M tokens
    • Summarization overhead: +10M tokens
    • Cost: $330 (72% savings)
  • With hybrid system + caching:

    • First turn: 2K tokens (no cache)
    • Subsequent: 200 tokens effective (90% cache hit)
    • Total: ~15M tokens effective
    • Cost: $45 (96% savings)

Tool Recommendations

Memory Management Tools

Mem0 (Recommended for Production)

Best for: Hybrid memory systems with minimal code

from mem0 import MemoryClient

client = MemoryClient(api_key="your-mem0-key")

# Automatically handles compression, summarization, RAG
memory = client.create_memory(
    user_id="user123",
    messages=[
        {"role": "user", "content": "I'm working on a Python project"},
        {"role": "assistant", "content": "Great! What kind of project?"}
    ]
)

# Retrieve relevant context
context = client.get_memory(
    user_id="user123",
    query="What programming language am I using?"
)

Features:

  • Automatic hierarchical summarization
  • Built-in RAG retrieval
  • Multi-user session management
  • Analytics dashboard

Pricing: $0.40/1K memory operations

Zep

Best for: Low-latency production deployments**

from zep_python import ZepClient

client = ZepClient(api_key="your-zep-key")

# Add to session
client.memory.add_memory(
    session_id="session123",
    messages=[
        {"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hi there!"}
    ]
)

# Auto-summarized retrieval
memory = client.memory.get_memory(session_id="session123")

Features:

  • <100ms retrieval latency
  • Automatic fact extraction
  • Entity recognition
  • Session management

Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)

ChromaDB

Best for: Self-hosted vector storage**

import chromadb

client = chromadb.Client()
collection = client.create_collection("conversations")

# Store embeddings
collection.add(
    documents=["Message content"],
    embeddings=[[0.1, 0.2, ...]],
    ids=["msg1"]
)

# Retrieve
results = collection.query(
    query_embeddings=[[0.1, 0.2, ...]],
    n_results=5
)

Features:

  • Fully open-source
  • Embedded or client-server
  • Fast local development

Pricing: Free (self-hosted)

LangChain

Best for: Rapid prototyping and experimentation**

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)

Features:

  • Multiple memory types
  • Framework integration
  • Extensive documentation

Pricing: Free (uses your LLM API costs)

Compression Libraries

LLMLingua

Best for: Extreme compression with minimal quality loss**

from llmlingua import PromptCompressor

compressor = PromptCompressor()

compressed = compressor.compress_prompt(
    context="Long conversation history...",
    instruction="Current user query",
    target_token=500
)

# Achieves 20x compression with 1.5% accuracy loss

Features:

  • 20x compression ratios
  • <2% quality degradation
  • Fast inference (<500ms)

Pricing: Free (open-source)


Use Cases and Patterns

Chatbot (Customer Support)

Requirements:

  • Multi-turn conversations (50-100 turns)
  • Preserve customer context
  • Fast response times
  • Cost-efficient

Recommended approach:

  • ConversationSummaryBufferMemory (LangChain)
  • 70% threshold: Semantic deduplication
  • 85% threshold: Rolling summarization
  • Prompt caching for frequent patterns

Implementation:

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,
    return_messages=True
)

# Add customer conversation
for turn in customer_conversation:
    memory.save_context(
        {"input": turn['customer_message']},
        {"output": turn['agent_response']}
    )

# Retrieve compressed context
context = memory.load_memory_variables({})

Code Assistant

Requirements:

  • Long development sessions (100+ turns)
  • Preserve technical details
  • Handle large code blocks
  • Track incremental changes

Recommended approach:

  • Hierarchical summarization for overall context
  • RAG retrieval for specific code references
  • Delta compression for iterative edits
  • Prompt caching for system prompts

Implementation:

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class CodeAssistantMemory:
    def __init__(self):
        self.hierarchy = HierarchicalMemory(client, chunk_size=15)
        self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
        self.deltas = DeltaCompressor()

    def add_interaction(self, code_change: dict):
        # Store in hierarchy
        self.hierarchy.add_message({
            'role': 'user',
            'content': code_change['description']
        })

        # Store in RAG for retrieval
        self.rag.add_message(code_change)

        # Store as delta if incremental
        if code_change.get('is_incremental'):
            self.deltas.add_message(code_change)

    def get_context(self, current_query: str):
        # Combine hierarchical summary + RAG retrieval
        summary_context = self.hierarchy.get_context(max_tokens=2000)
        rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)

        return summary_context + rag_context

Educational Tutor

Requirements:

  • Multi-session tracking
  • Student progress persistence
  • Personalized context retrieval
  • Long-term knowledge retention

Recommended approach:

  • VectorStoreRetrieverMemory for multi-session
  • Fact extraction for student knowledge
  • Progressive compression across sessions
  • Resumable conversations

Implementation:

from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class TutorMemory:
    def __init__(self, student_id: str):
        self.student_id = student_id

        # Vector store for all sessions
        embeddings = OpenAIEmbeddings()
        vectorstore = Chroma(
            collection_name=f"student_{student_id}",
            embedding_function=embeddings
        )

        self.memory = VectorStoreRetrieverMemory(
            retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
        )

    def add_lesson_content(self, lesson: dict):
        """Add lesson interaction to student memory."""
        self.memory.save_context(
            {"input": lesson['topic']},
            {"output": lesson['explanation']}
        )

    def get_student_context(self, current_topic: str):
        """Retrieve relevant past lessons for current topic."""
        return self.memory.load_memory_variables({
            "prompt": current_topic
        })

Best Practices

1. Choose the Right Technique for Your Use Case

  • Short conversations (<20 turns): No compression needed
  • Medium conversations (20-50 turns): ConversationSummaryBufferMemory
  • Long conversations (50-100 turns): Hierarchical or rolling summarization
  • Very long (100+ turns): Hybrid (RAG + summarization + caching)
  • Multi-session: VectorStoreRetrieverMemory or Mem0

2. Implement Progressive Compression

Don’t compress aggressively from the start. Use thresholds:

  • 0-70%: Store verbatim
  • 70-85%: Light compression (deduplication)
  • 85-95%: Medium compression (summarization)
  • 95-100%: Aggressive compression (hierarchical)

3. Combine Techniques

Single-technique approaches are suboptimal. Best production systems use:

  • Rolling summarization (short-term)
  • RAG retrieval (long-term)
  • Prompt caching (cost optimization)
  • Semantic deduplication (redundancy removal)

4. Monitor Quality Metrics

Track compression impact:

  • Response relevance score
  • Information retention rate
  • User satisfaction metrics
  • Token usage reduction

5. Use Prompt Caching Strategically

Cache stable content:

  • Conversation summaries
  • System prompts
  • Knowledge base context
  • User profiles

Don’t cache frequently changing content:

  • Current user query
  • Real-time data
  • Session-specific state

6. Implement Checkpointing

Save compression state for:

  • Recovery from failures
  • Multi-session continuity
  • Analytics and debugging
  • A/B testing different strategies

7. Tune Compression Parameters

Test and optimize:

  • Summary token limits
  • Compression thresholds
  • Retrieval result counts
  • Cache TTLs
  • Chunk sizes for hierarchical

8. Handle Edge Cases

Plan for:

  • Very long messages (split or compress individually)
  • Code blocks (preserve formatting)
  • Multi-language content
  • Rapidly changing context

Troubleshooting

Problem: Summary loses critical information

Solutions:

  • Lower compression ratio (less aggressive)
  • Implement importance scoring to preserve key messages
  • Use extractive summarization for critical sections
  • Increase summary token budget

Problem: Retrieval returns irrelevant context

Solutions:

  • Improve embedding model quality
  • Add metadata filtering (timestamps, topics)
  • Adjust similarity threshold
  • Use hybrid search (semantic + keyword)

Problem: High latency from compression

Solutions:

  • Compress asynchronously (background tasks)
  • Use faster models for summarization (Haiku instead of Sonnet)
  • Cache summaries more aggressively
  • Reduce compression frequency

Problem: Conversations still exceeding context window

Solutions:

  • Implement hierarchical compression
  • Archive to vector database more aggressively
  • Use more aggressive compression ratios
  • Consider switching to model with larger context window

Problem: High costs despite compression

Solutions:

  • Implement prompt caching
  • Use cheaper models for summarization (Haiku)
  • Batch summarization operations
  • Reduce summarization frequency

Problem: Lost conversation continuity

Solutions:

  • Increase recent message window
  • Include summary in every request
  • Use more descriptive summaries
  • Implement session resumption with context injection

Advanced Topics

Streaming Compression

Compress in real-time as conversation progresses:

async def streaming_compress(messages: list):
    """Compress while streaming responses."""
    compressor = ProgressiveCompressor()

    async for message in conversation_stream:
        compressor.add_message(message)

        # Compression happens asynchronously
        if compressor.should_compress():
            asyncio.create_task(compressor.compress_async())

    return compressor.get_context()

Multi-User Session Management

Handle concurrent conversations with shared context:

class MultiUserMemory:
    def __init__(self):
        self.user_sessions = {}

    def get_or_create_session(self, user_id: str):
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = HybridMemorySystem(...)
        return self.user_sessions[user_id]

    def cleanup_inactive_sessions(self, timeout: int = 3600):
        """Remove sessions inactive for > timeout seconds."""
        current_time = time.time()
        inactive = [
            user_id for user_id, session in self.user_sessions.items()
            if current_time - session.last_activity > timeout
        ]

        for user_id in inactive:
            self._archive_session(user_id)
            del self.user_sessions[user_id]

Custom Importance Scoring

Train ML models to score message importance:

from transformers import pipeline

class MLImportanceScorer:
    def __init__(self):
        # Use pre-trained classifier or fine-tune on your data
        self.classifier = pipeline(
            "text-classification",
            model="your-importance-model"
        )

    def score(self, message: dict) -> float:
        """Score message importance (0-1)."""
        result = self.classifier(message['content'])
        return result[0]['score']

Context Window Utilization Optimization

Maximize information density within token budget:

def optimize_context_allocation(
    summary_tokens: int,
    recent_tokens: int,
    retrieval_tokens: int,
    max_tokens: int
):
    """
    Optimal allocation (empirically tested):
    - 20% summary
    - 50% recent messages
    - 30% retrieved context
    """
    return {
        'summary': int(max_tokens * 0.20),
        'recent': int(max_tokens * 0.50),
        'retrieval': int(max_tokens * 0.30)
    }

Future Directions

Emerging Techniques (2025+)

1. Infinite Attention Mechanisms

  • Models with >10M token context windows (Gemini 1.5, future Claude)
  • Reduces need for compression but doesn’t eliminate cost concerns

2. Learned Compression Models

  • Neural networks trained to compress conversation optimally
  • Maintain semantic meaning while minimizing tokens
  • Examples: LLMLingua v2, PromptCompressor

3. Multimodal Session Compression

  • Compress conversations with images, audio, video
  • Maintain cross-modal context relationships

4. Federated Memory Systems

  • Distributed compression across multiple memory stores
  • Privacy-preserving compression for sensitive conversations

5. Adaptive Compression Strategies

  • RL-based systems that learn optimal compression per user/domain
  • Dynamic threshold adjustment based on conversation importance

References

Academic Papers

  • “Recursively Summarizing Enables Long-Term Dialogue Memory” (arXiv:2308.15022)
  • “LLMLingua: Compressing Prompts for Accelerated Inference” (arXiv:2310.05736)
  • “Lost in the Middle: How Language Models Use Long Contexts” (arXiv:2307.03172)

Documentation

Tools

  • Mem0 – Managed memory service
  • Zep – Fast memory layer
  • LLMLingua – Prompt compression
  • ChromaDB – Vector database

Last Updated: 2025-11-30 Version: 1.0.0 License: MIT