idempotency-handling

📁 aj-geddes/useful-ai-prompts 📅 Jan 21, 2026
56
总安装量
56
周安装量
#3890
全站排名
安装命令
npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill idempotency-handling

Agent 安装分布

claude-code 47
opencode 42
gemini-cli 39
antigravity 36
cursor 35

Skill 文档

Idempotency Handling

Overview

Implement idempotency to ensure operations produce the same result regardless of how many times they’re executed.

When to Use

  • Payment processing
  • API endpoints with retries
  • Webhooks and callbacks
  • Message queue consumers
  • Distributed transactions
  • Bank transfers
  • Order creation
  • Email sending
  • Resource creation

Implementation Examples

1. Express Idempotency Middleware

import express from 'express';
import Redis from 'ioredis';
import crypto from 'crypto';

interface IdempotentRequest {
  key: string;
  status: 'processing' | 'completed' | 'failed';
  response?: any;
  error?: string;
  createdAt: number;
  completedAt?: number;
}

class IdempotencyService {
  private redis: Redis;
  private ttl = 86400; // 24 hours

  constructor(redisUrl: string) {
    this.redis = new Redis(redisUrl);
  }

  async getRequest(key: string): Promise<IdempotentRequest | null> {
    const data = await this.redis.get(`idempotency:${key}`);
    return data ? JSON.parse(data) : null;
  }

  async setRequest(
    key: string,
    request: IdempotentRequest
  ): Promise<void> {
    await this.redis.setex(
      `idempotency:${key}`,
      this.ttl,
      JSON.stringify(request)
    );
  }

  async startProcessing(key: string): Promise<boolean> {
    const request: IdempotentRequest = {
      key,
      status: 'processing',
      createdAt: Date.now()
    };

    // Use SET NX to ensure only one request processes
    const result = await this.redis.set(
      `idempotency:${key}`,
      JSON.stringify(request),
      'EX',
      this.ttl,
      'NX'
    );

    return result === 'OK';
  }

  async completeRequest(
    key: string,
    response: any
  ): Promise<void> {
    const request: IdempotentRequest = {
      key,
      status: 'completed',
      response,
      createdAt: Date.now(),
      completedAt: Date.now()
    };

    await this.setRequest(key, request);
  }

  async failRequest(
    key: string,
    error: string
  ): Promise<void> {
    const request: IdempotentRequest = {
      key,
      status: 'failed',
      error,
      createdAt: Date.now(),
      completedAt: Date.now()
    };

    await this.setRequest(key, request);
  }
}

function idempotencyMiddleware(idempotency: IdempotencyService) {
  return async (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // Only apply to POST, PUT, PATCH, DELETE
    if (!['POST', 'PUT', 'PATCH', 'DELETE'].includes(req.method)) {
      return next();
    }

    const idempotencyKey = req.headers['idempotency-key'] as string;

    if (!idempotencyKey) {
      return res.status(400).json({
        error: 'Idempotency-Key header required'
      });
    }

    // Check for existing request
    const existing = await idempotency.getRequest(idempotencyKey);

    if (existing) {
      if (existing.status === 'processing') {
        return res.status(409).json({
          error: 'Request already processing',
          message: 'Please wait and retry'
        });
      }

      if (existing.status === 'completed') {
        return res.status(200).json(existing.response);
      }

      if (existing.status === 'failed') {
        return res.status(500).json({
          error: 'Previous request failed',
          message: existing.error
        });
      }
    }

    // Start processing
    const canProcess = await idempotency.startProcessing(idempotencyKey);

    if (!canProcess) {
      return res.status(409).json({
        error: 'Request already processing'
      });
    }

    // Capture response
    const originalSend = res.json.bind(res);
    res.json = (body: any) => {
      // Save response for future requests
      idempotency.completeRequest(idempotencyKey, body).catch(console.error);
      return originalSend(body);
    };

    // Handle errors
    const originalNext = next;
    next = (err?: any) => {
      if (err) {
        idempotency.failRequest(idempotencyKey, err.message).catch(console.error);
      }
      return originalNext(err);
    };

    next();
  };
}

// Usage
const app = express();
const redis = new Redis('redis://localhost:6379');
const idempotency = new IdempotencyService('redis://localhost:6379');

app.use(express.json());
app.use(idempotencyMiddleware(idempotency));

app.post('/api/payments', async (req, res) => {
  const { amount, userId } = req.body;

  // Process payment
  const payment = await processPayment(amount, userId);

  res.json(payment);
});

async function processPayment(amount: number, userId: string) {
  // Payment processing logic
  return {
    id: crypto.randomUUID(),
    amount,
    userId,
    status: 'completed'
  };
}

app.listen(3000);

2. Database-Based Idempotency

import { Pool } from 'pg';

interface IdempotencyRecord {
  key: string;
  request_body: any;
  response_body?: any;
  status: string;
  error_message?: string;
  created_at: Date;
  completed_at?: Date;
}

class DatabaseIdempotency {
  constructor(private db: Pool) {
    this.createTable();
  }

  private async createTable(): Promise<void> {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS idempotency_keys (
        key VARCHAR(255) PRIMARY KEY,
        request_body JSONB NOT NULL,
        response_body JSONB,
        status VARCHAR(50) NOT NULL,
        error_message TEXT,
        created_at TIMESTAMP DEFAULT NOW(),
        completed_at TIMESTAMP,
        expires_at TIMESTAMP NOT NULL
      );

      CREATE INDEX IF NOT EXISTS idx_idempotency_expires
      ON idempotency_keys (expires_at);
    `);
  }

  async checkIdempotency(
    key: string,
    requestBody: any
  ): Promise<IdempotencyRecord | null> {
    const result = await this.db.query(
      'SELECT * FROM idempotency_keys WHERE key = $1',
      [key]
    );

    if (result.rows.length === 0) {
      return null;
    }

    const record = result.rows[0];

    // Check if request body matches
    if (JSON.stringify(record.request_body) !== JSON.stringify(requestBody)) {
      throw new Error('Request body mismatch for idempotency key');
    }

    return record;
  }

  async startProcessing(
    key: string,
    requestBody: any
  ): Promise<boolean> {
    try {
      const expiresAt = new Date(Date.now() + 86400 * 1000); // 24 hours

      await this.db.query(`
        INSERT INTO idempotency_keys (key, request_body, status, expires_at)
        VALUES ($1, $2, 'processing', $3)
      `, [key, requestBody, expiresAt]);

      return true;
    } catch (error: any) {
      if (error.code === '23505') { // Unique violation
        return false;
      }
      throw error;
    }
  }

  async completeRequest(
    key: string,
    responseBody: any
  ): Promise<void> {
    await this.db.query(`
      UPDATE idempotency_keys
      SET
        response_body = $1,
        status = 'completed',
        completed_at = NOW()
      WHERE key = $2
    `, [responseBody, key]);
  }

  async failRequest(
    key: string,
    errorMessage: string
  ): Promise<void> {
    await this.db.query(`
      UPDATE idempotency_keys
      SET
        error_message = $1,
        status = 'failed',
        completed_at = NOW()
      WHERE key = $2
    `, [errorMessage, key]);
  }

  async cleanup(): Promise<number> {
    const result = await this.db.query(`
      DELETE FROM idempotency_keys
      WHERE expires_at < NOW()
    `);

    return result.rowCount || 0;
  }
}

3. Stripe-Style Idempotency

import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import psycopg2

class IdempotencyManager:
    def __init__(self, db_connection):
        self.db = db_connection
        self.ttl_days = 1

    def process_request(
        self,
        idempotency_key: str,
        request_data: Dict[str, Any],
        process_fn: callable
    ) -> Dict[str, Any]:
        """
        Process request with idempotency guarantee.

        Args:
            idempotency_key: Unique key for this request
            request_data: Request payload
            process_fn: Function to process the request

        Returns:
            Response data
        """
        # Check for existing request
        existing = self.get_existing_request(
            idempotency_key,
            request_data
        )

        if existing:
            if existing['status'] == 'processing':
                raise ConflictError('Request already processing')

            if existing['status'] == 'completed':
                return existing['response']

            if existing['status'] == 'failed':
                raise ProcessingError(existing['error'])

        # Start processing
        if not self.start_processing(idempotency_key, request_data):
            raise ConflictError('Request already processing')

        try:
            # Process request
            result = process_fn(request_data)

            # Store result
            self.complete_request(idempotency_key, result)

            return result

        except Exception as e:
            # Store error
            self.fail_request(idempotency_key, str(e))
            raise

    def get_existing_request(
        self,
        key: str,
        request_data: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Get existing idempotent request."""
        cursor = self.db.cursor()

        cursor.execute("""
            SELECT status, response, error, request_hash
            FROM idempotency_requests
            WHERE idempotency_key = %s
            AND created_at > %s
        """, (key, datetime.now() - timedelta(days=self.ttl_days)))

        row = cursor.fetchone()
        cursor.close()

        if not row:
            return None

        # Verify request data matches
        request_hash = self.hash_request(request_data)
        if row[3] != request_hash:
            raise ValueError(
                'Request data does not match idempotency key'
            )

        return {
            'status': row[0],
            'response': row[1],
            'error': row[2]
        }

    def start_processing(
        self,
        key: str,
        request_data: Dict[str, Any]
    ) -> bool:
        """Mark request as processing."""
        cursor = self.db.cursor()
        request_hash = self.hash_request(request_data)

        try:
            cursor.execute("""
                INSERT INTO idempotency_requests
                (idempotency_key, request_hash, status, created_at)
                VALUES (%s, %s, 'processing', NOW())
            """, (key, request_hash))

            self.db.commit()
            cursor.close()
            return True

        except psycopg2.IntegrityError:
            self.db.rollback()
            cursor.close()
            return False

    def complete_request(
        self,
        key: str,
        response: Dict[str, Any]
    ):
        """Mark request as completed."""
        cursor = self.db.cursor()

        cursor.execute("""
            UPDATE idempotency_requests
            SET
                status = 'completed',
                response = %s,
                completed_at = NOW()
            WHERE idempotency_key = %s
        """, (json.dumps(response), key))

        self.db.commit()
        cursor.close()

    def fail_request(self, key: str, error: str):
        """Mark request as failed."""
        cursor = self.db.cursor()

        cursor.execute("""
            UPDATE idempotency_requests
            SET
                status = 'failed',
                error = %s,
                completed_at = NOW()
            WHERE idempotency_key = %s
        """, (error, key))

        self.db.commit()
        cursor.close()

    def hash_request(self, data: Dict[str, Any]) -> str:
        """Create hash of request data."""
        json_str = json.dumps(data, sort_keys=True)
        return hashlib.sha256(json_str.encode()).hexdigest()


class ConflictError(Exception):
    pass


class ProcessingError(Exception):
    pass


# Usage
def process_payment(data):
    # Process payment logic
    return {
        'payment_id': 'pay_123',
        'amount': data['amount'],
        'status': 'completed'
    }

# In your API handler
idempotency = IdempotencyManager(db_connection)

try:
    result = idempotency.process_request(
        idempotency_key='key_abc123',
        request_data={'amount': 100, 'currency': 'USD'},
        process_fn=process_payment
    )
    print(result)
except ConflictError as e:
    print(f"Conflict: {e}")
except ProcessingError as e:
    print(f"Processing error: {e}")

4. Message Queue Idempotency

interface Message {
  id: string;
  data: any;
  timestamp: number;
}

class IdempotentMessageProcessor {
  private processedMessages = new Set<string>();
  private db: Pool;

  constructor(db: Pool) {
    this.db = db;
    this.loadProcessedMessages();
  }

  private async loadProcessedMessages(): Promise<void> {
    // Load recent processed message IDs
    const result = await this.db.query(`
      SELECT message_id
      FROM processed_messages
      WHERE processed_at > NOW() - INTERVAL '24 hours'
    `);

    result.rows.forEach(row => {
      this.processedMessages.add(row.message_id);
    });
  }

  async processMessage(message: Message): Promise<void> {
    // Check if already processed
    if (this.processedMessages.has(message.id)) {
      console.log(`Message ${message.id} already processed, skipping`);
      return;
    }

    // Mark as processing (atomic operation)
    const wasInserted = await this.markAsProcessing(message.id);

    if (!wasInserted) {
      console.log(`Message ${message.id} already being processed`);
      return;
    }

    try {
      // Process message
      await this.handleMessage(message);

      // Mark as completed
      await this.markAsCompleted(message.id);

      this.processedMessages.add(message.id);
    } catch (error) {
      console.error(`Failed to process message ${message.id}:`, error);
      await this.markAsFailed(message.id, (error as Error).message);
      throw error;
    }
  }

  private async markAsProcessing(messageId: string): Promise<boolean> {
    try {
      await this.db.query(`
        INSERT INTO processed_messages (message_id, status, processed_at)
        VALUES ($1, 'processing', NOW())
      `, [messageId]);

      return true;
    } catch (error: any) {
      if (error.code === '23505') {
        return false;
      }
      throw error;
    }
  }

  private async markAsCompleted(messageId: string): Promise<void> {
    await this.db.query(`
      UPDATE processed_messages
      SET status = 'completed', completed_at = NOW()
      WHERE message_id = $1
    `, [messageId]);
  }

  private async markAsFailed(
    messageId: string,
    error: string
  ): Promise<void> {
    await this.db.query(`
      UPDATE processed_messages
      SET status = 'failed', error = $2, completed_at = NOW()
      WHERE message_id = $1
    `, [messageId, error]);
  }

  private async handleMessage(message: Message): Promise<void> {
    // Actual message processing logic
    console.log('Processing message:', message);
  }
}

Best Practices

✅ DO

  • Require idempotency keys for mutations
  • Store request and response together
  • Set appropriate TTL for idempotency records
  • Validate request body matches stored request
  • Handle concurrent requests gracefully
  • Return same response for duplicate requests
  • Clean up old idempotency records
  • Use database constraints for atomicity

❌ DON’T

  • Apply idempotency to GET requests
  • Store idempotency data forever
  • Skip validation of request body
  • Use non-unique idempotency keys
  • Process same request concurrently
  • Change response for duplicate requests

Schema Design

CREATE TABLE idempotency_keys (
  key VARCHAR(255) PRIMARY KEY,
  request_hash VARCHAR(64) NOT NULL,
  request_body JSONB NOT NULL,
  response_body JSONB,
  status VARCHAR(20) NOT NULL CHECK (status IN ('processing', 'completed', 'failed')),
  error_message TEXT,
  created_at TIMESTAMP DEFAULT NOW(),
  completed_at TIMESTAMP,
  expires_at TIMESTAMP NOT NULL
);

CREATE INDEX idx_idempotency_expires ON idempotency_keys (expires_at);
CREATE INDEX idx_idempotency_status ON idempotency_keys (status);

Resources