artifact-collection
npx skills add https://github.com/sherifeldeeb/agentskills --skill artifact-collection
Agent 安装分布
Skill 文档
Artifact Collection
Comprehensive artifact collection skill for gathering and preserving digital forensic evidence. Enables systematic collection of volatile and non-volatile artifacts from endpoints, maintaining chain of custody, and ensuring forensic integrity throughout the collection process.
Capabilities
- Volatile Data Collection: Capture RAM, running processes, network connections
- Disk Artifact Collection: Collect registry, event logs, browser data
- Log Collection: Gather system, application, and security logs
- Configuration Collection: Capture system configuration and state
- Evidence Packaging: Package artifacts with integrity verification
- Chain of Custody: Document and maintain evidence chain of custody
- Remote Collection: Collect artifacts from remote systems
- Triage Collection: Quick artifact collection for rapid response
- Selective Collection: Target specific artifact types
- Collection Verification: Verify collected artifact integrity
Quick Start
from artifact_collection import ArtifactCollector, WindowsCollector, ChainOfCustody
# Initialize collector
collector = WindowsCollector(output_dir="/evidence/case001/")
# Collect volatile artifacts
collector.collect_volatile()
# Collect disk artifacts
collector.collect_disk_artifacts()
# Generate chain of custody
coc = ChainOfCustody(collector)
coc.generate_report("/evidence/case001/chain_of_custody.pdf")
Usage
Task 1: Volatile Data Collection
Input: Target system (local or remote)
Process:
- Document system state
- Capture memory dump
- Collect running processes
- Capture network connections
- Preserve volatile artifacts
Output: Volatile artifacts with documentation
Example:
from artifact_collection import VolatileCollector
# Initialize collector
collector = VolatileCollector(
output_dir="/evidence/case001/volatile/",
case_id="CASE-2024-001",
examiner="John Doe"
)
# Collect memory dump
memory = collector.collect_memory()
print(f"Memory dump: {memory.path}")
print(f"Size: {memory.size_gb}GB")
print(f"Hash: {memory.sha256}")
print(f"Tool: {memory.acquisition_tool}")
# Collect running processes
processes = collector.collect_processes()
for proc in processes:
print(f"PID {proc.pid}: {proc.name}")
print(f" Path: {proc.exe_path}")
print(f" User: {proc.username}")
print(f" Command: {proc.command_line}")
print(f" Start: {proc.start_time}")
# Collect network connections
connections = collector.collect_network_connections()
for conn in connections:
print(f"{conn.local_addr}:{conn.local_port} -> "
f"{conn.remote_addr}:{conn.remote_port}")
print(f" PID: {conn.pid}")
print(f" State: {conn.state}")
print(f" Protocol: {conn.protocol}")
# Collect network interfaces
interfaces = collector.collect_network_interfaces()
for iface in interfaces:
print(f"Interface: {iface.name}")
print(f" IP: {iface.ip_address}")
print(f" MAC: {iface.mac_address}")
# Collect DNS cache
dns_cache = collector.collect_dns_cache()
# Collect ARP cache
arp_cache = collector.collect_arp_cache()
# Collect clipboard
clipboard = collector.collect_clipboard()
# Collect environment variables
env_vars = collector.collect_environment_variables()
# Generate collection report
collector.generate_report("/evidence/case001/volatile_report.html")
Task 2: Windows Artifact Collection
Input: Windows system
Process:
- Collect registry hives
- Collect event logs
- Collect prefetch files
- Collect browser artifacts
- Package with hashes
Output: Windows artifacts with documentation
Example:
from artifact_collection import WindowsCollector
# Initialize Windows collector
collector = WindowsCollector(
output_dir="/evidence/case001/windows/",
case_id="CASE-2024-001"
)
# Collect registry hives
registry = collector.collect_registry()
for hive in registry:
print(f"Registry: {hive.name}")
print(f" Path: {hive.source_path}")
print(f" Hash: {hive.sha256}")
# Collect event logs
event_logs = collector.collect_event_logs()
for log in event_logs:
print(f"Event Log: {log.name}")
print(f" Records: {log.record_count}")
print(f" Hash: {log.sha256}")
# Collect prefetch files
prefetch = collector.collect_prefetch()
print(f"Prefetch files: {len(prefetch)}")
# Collect Amcache
amcache = collector.collect_amcache()
# Collect SRUM database
srum = collector.collect_srum()
# Collect scheduled tasks
tasks = collector.collect_scheduled_tasks()
# Collect services
services = collector.collect_services()
# Collect startup items
startup = collector.collect_startup_items()
# Collect browser data
browsers = collector.collect_browser_artifacts()
for browser in browsers:
print(f"Browser: {browser.name}")
print(f" History: {browser.history_count}")
print(f" Downloads: {browser.download_count}")
# Collect USB history
usb = collector.collect_usb_history()
# Collect recent files
recent = collector.collect_recent_files()
# Collect Jump Lists
jumplists = collector.collect_jumplists()
# Generate collection manifest
collector.generate_manifest("/evidence/case001/windows_manifest.json")
Task 3: Linux Artifact Collection
Input: Linux system
Process:
- Collect system logs
- Collect user artifacts
- Collect configuration files
- Collect authentication data
- Package artifacts
Output: Linux artifacts with documentation
Example:
from artifact_collection import LinuxCollector
# Initialize Linux collector
collector = LinuxCollector(
output_dir="/evidence/case001/linux/",
case_id="CASE-2024-001"
)
# Collect system logs
logs = collector.collect_system_logs()
for log in logs:
print(f"Log: {log.name}")
print(f" Path: {log.path}")
print(f" Size: {log.size}")
# Collect auth logs
auth = collector.collect_auth_logs()
# Collect user home directories
homes = collector.collect_user_homes()
for home in homes:
print(f"User: {home.username}")
print(f" Bash history: {home.bash_history}")
print(f" SSH keys: {home.ssh_keys}")
# Collect cron jobs
cron = collector.collect_cron_jobs()
for job in cron:
print(f"Cron: {job.user} - {job.schedule}")
print(f" Command: {job.command}")
# Collect systemd units
systemd = collector.collect_systemd_units()
# Collect network configuration
network = collector.collect_network_config()
# Collect installed packages
packages = collector.collect_installed_packages()
# Collect SSH configuration
ssh = collector.collect_ssh_config()
# Collect web server logs (if present)
web_logs = collector.collect_web_logs()
# Collect Docker artifacts (if present)
docker = collector.collect_docker_artifacts()
# Generate collection report
collector.generate_report("/evidence/case001/linux_report.html")
Task 4: macOS Artifact Collection
Input: macOS system
Process:
- Collect system logs
- Collect user data
- Collect application artifacts
- Collect security data
- Package artifacts
Output: macOS artifacts with documentation
Example:
from artifact_collection import MacOSCollector
# Initialize macOS collector
collector = MacOSCollector(
output_dir="/evidence/case001/macos/",
case_id="CASE-2024-001"
)
# Collect unified logs
unified = collector.collect_unified_logs()
# Collect FSEvents
fsevents = collector.collect_fsevents()
# Collect user artifacts
users = collector.collect_user_artifacts()
for user in users:
print(f"User: {user.username}")
print(f" Recent items: {len(user.recent_items)}")
print(f" Downloads: {len(user.downloads)}")
# Collect Spotlight data
spotlight = collector.collect_spotlight()
# Collect Keychain data (metadata only)
keychain = collector.collect_keychain_metadata()
# Collect LaunchAgents/Daemons
launch_items = collector.collect_launch_items()
for item in launch_items:
print(f"Launch item: {item.name}")
print(f" Path: {item.path}")
print(f" Program: {item.program}")
# Collect quarantine events
quarantine = collector.collect_quarantine_events()
for q in quarantine:
print(f"Quarantine: {q.filename}")
print(f" URL: {q.origin_url}")
print(f" Date: {q.quarantine_date}")
# Collect Safari data
safari = collector.collect_safari_artifacts()
# Collect Terminal history
terminal = collector.collect_terminal_history()
# Collect installed applications
apps = collector.collect_installed_apps()
# Generate report
collector.generate_report("/evidence/case001/macos_report.html")
Task 5: Remote Artifact Collection
Input: Remote system credentials
Process:
- Establish secure connection
- Deploy collection agent
- Collect artifacts remotely
- Transfer with integrity check
- Document collection
Output: Remote artifacts with verification
Example:
from artifact_collection import RemoteCollector
# Initialize remote collector
collector = RemoteCollector(
target="192.168.1.100",
credentials={
"username": "admin",
"method": "key",
"key_path": "/path/to/key"
},
output_dir="/evidence/case001/remote/"
)
# Connect to remote system
connection = collector.connect()
print(f"Connected: {connection.hostname}")
print(f"OS: {connection.os_type}")
# Collect volatile data first
volatile = collector.collect_volatile()
print(f"Memory collected: {volatile.memory_path}")
print(f"Processes: {len(volatile.processes)}")
# Collect disk artifacts
disk = collector.collect_disk_artifacts(
artifact_types=["registry", "eventlogs", "browser"]
)
# Transfer artifacts securely
transfer = collector.transfer_artifacts()
for artifact in transfer:
print(f"Transferred: {artifact.name}")
print(f" Size: {artifact.size}")
print(f" Local hash: {artifact.local_hash}")
print(f" Remote hash: {artifact.remote_hash}")
print(f" Verified: {artifact.verified}")
# Disconnect
collector.disconnect()
# Generate collection report
collector.generate_report("/evidence/case001/remote_report.html")
Task 6: Triage Collection
Input: System requiring rapid assessment
Process:
- Quick system inventory
- Collect critical artifacts
- Identify IOCs
- Prioritize findings
- Generate triage report
Output: Triage results with priorities
Example:
from artifact_collection import TriageCollector
# Initialize triage collector
collector = TriageCollector(
output_dir="/evidence/triage/",
case_id="TRIAGE-001"
)
# Run quick triage
triage = collector.run_triage()
print(f"System: {triage.system_info.hostname}")
print(f"OS: {triage.system_info.os_version}")
print(f"Collection time: {triage.duration_seconds}s")
# Get alerts
for alert in triage.alerts:
print(f"ALERT: {alert.severity} - {alert.description}")
print(f" Evidence: {alert.evidence}")
# Get quick IOCs
for ioc in triage.iocs:
print(f"IOC: {ioc.type} - {ioc.value}")
print(f" Source: {ioc.source}")
# Get suspicious processes
for proc in triage.suspicious_processes:
print(f"Suspicious: {proc.name} (PID {proc.pid})")
print(f" Reason: {proc.reason}")
# Get suspicious connections
for conn in triage.suspicious_connections:
print(f"Connection: {conn.remote_addr}:{conn.remote_port}")
print(f" Process: {conn.process_name}")
print(f" Reason: {conn.reason}")
# Get persistence mechanisms
for persist in triage.persistence:
print(f"Persistence: {persist.type}")
print(f" Path: {persist.path}")
print(f" Suspicious: {persist.is_suspicious}")
# Generate triage report
collector.generate_triage_report("/evidence/triage/triage_report.html")
Task 7: Chain of Custody Management
Input: Collected artifacts
Process:
- Document evidence items
- Record handling events
- Verify integrity
- Generate custody log
- Produce legal documentation
Output: Chain of custody documentation
Example:
from artifact_collection import ChainOfCustody
# Initialize chain of custody
coc = ChainOfCustody(
case_id="CASE-2024-001",
case_name="Security Incident Investigation",
custodian="John Doe"
)
# Add evidence items
item1 = coc.add_evidence(
item_id="EVD-001",
description="Memory dump from workstation",
source_system="WORKSTATION01",
acquisition_method="WinPMEM",
acquisition_time="2024-01-15T10:30:00Z",
original_location="Physical RAM",
file_path="/evidence/case001/memory.raw",
hash_sha256="abc123..."
)
item2 = coc.add_evidence(
item_id="EVD-002",
description="Windows Event Logs",
source_system="WORKSTATION01",
acquisition_method="Robocopy",
acquisition_time="2024-01-15T10:45:00Z",
original_location="C:\\Windows\\System32\\winevt\\Logs\\",
file_path="/evidence/case001/eventlogs/",
hash_sha256="def456..."
)
# Record custody transfer
coc.record_transfer(
item_id="EVD-001",
from_custodian="John Doe",
to_custodian="Jane Smith",
transfer_time="2024-01-15T14:00:00Z",
reason="Transfer for analysis",
location="Forensics Lab"
)
# Record evidence access
coc.record_access(
item_id="EVD-001",
accessor="Jane Smith",
access_time="2024-01-15T14:30:00Z",
purpose="Memory analysis",
actions_performed="Parsed with Volatility"
)
# Verify evidence integrity
verification = coc.verify_all()
for item in verification:
print(f"Item: {item.item_id}")
print(f" Current hash: {item.current_hash}")
print(f" Original hash: {item.original_hash}")
print(f" Verified: {item.verified}")
# Generate chain of custody report
coc.generate_report("/evidence/case001/chain_of_custody.pdf")
# Export custody log
coc.export_log("/evidence/case001/custody_log.json")
Task 8: Evidence Packaging
Input: Collected artifacts
Process:
- Organize artifacts
- Calculate hashes
- Create evidence container
- Document contents
- Seal package
Output: Sealed evidence package
Example:
from artifact_collection import EvidencePackager
# Initialize packager
packager = EvidencePackager(
case_id="CASE-2024-001",
examiner="John Doe"
)
# Add artifacts to package
packager.add_directory("/evidence/case001/volatile/")
packager.add_directory("/evidence/case001/windows/")
packager.add_file("/evidence/case001/notes.txt")
# Set package metadata
packager.set_metadata(
case_name="Security Incident",
description="Forensic artifacts from WORKSTATION01",
collection_start="2024-01-15T10:00:00Z",
collection_end="2024-01-15T12:00:00Z",
source_system="WORKSTATION01"
)
# Create evidence package
package = packager.create_package(
output_path="/evidence/packages/CASE-2024-001.zip",
compress=True,
encrypt=True,
encryption_password="secure_password"
)
print(f"Package: {package.path}")
print(f"Size: {package.size_mb}MB")
print(f"Files: {package.file_count}")
print(f"SHA256: {package.sha256}")
# Generate manifest
manifest = packager.generate_manifest()
for item in manifest.items:
print(f"File: {item.relative_path}")
print(f" Size: {item.size}")
print(f" SHA256: {item.sha256}")
# Seal package (creates tamper-evident record)
seal = packager.seal_package()
print(f"Seal ID: {seal.seal_id}")
print(f"Seal time: {seal.timestamp}")
print(f"Seal hash: {seal.seal_hash}")
Task 9: Selective Collection
Input: Target system and artifact specification
Process:
- Parse collection specification
- Identify target artifacts
- Collect specified items
- Verify collection
- Document results
Output: Targeted artifact collection
Example:
from artifact_collection import SelectiveCollector
# Initialize selective collector
collector = SelectiveCollector(
output_dir="/evidence/selective/",
case_id="CASE-2024-001"
)
# Define collection specification
spec = {
"registry": ["HKLM\\SOFTWARE", "HKCU\\SOFTWARE"],
"event_logs": ["Security", "System", "Application"],
"directories": [
"C:\\Users\\*\\Downloads",
"C:\\Users\\*\\Documents"
],
"files": [
"C:\\Windows\\System32\\config\\SAM",
"C:\\Windows\\System32\\config\\SYSTEM"
],
"file_patterns": ["*.exe", "*.dll", "*.ps1"],
"date_range": {
"start": "2024-01-01",
"end": "2024-01-31"
}
}
# Collect based on specification
results = collector.collect(spec)
print(f"Items collected: {results.total_items}")
print(f"Size: {results.total_size_mb}MB")
print(f"Duration: {results.duration_seconds}s")
# Get collection details
for item in results.items:
print(f"Collected: {item.source_path}")
print(f" Destination: {item.dest_path}")
print(f" Size: {item.size}")
print(f" SHA256: {item.sha256}")
# Generate selective collection report
collector.generate_report("/evidence/selective/collection_report.html")
Task 10: Collection Verification
Input: Evidence collection directory
Process:
- Read collection manifest
- Verify file integrity
- Check for missing items
- Validate metadata
- Generate verification report
Output: Verification results
Example:
from artifact_collection import CollectionVerifier
# Initialize verifier
verifier = CollectionVerifier(
collection_path="/evidence/case001/",
manifest_path="/evidence/case001/manifest.json"
)
# Run full verification
verification = verifier.verify()
print(f"Verification result: {verification.status}")
print(f"Items verified: {verification.verified_count}")
print(f"Items failed: {verification.failed_count}")
print(f"Items missing: {verification.missing_count}")
# Get verification details
for item in verification.items:
print(f"Item: {item.path}")
print(f" Expected hash: {item.expected_hash}")
print(f" Actual hash: {item.actual_hash}")
print(f" Status: {item.status}")
if item.status != "verified":
print(f" Error: {item.error}")
# Check for integrity issues
issues = verifier.get_integrity_issues()
for issue in issues:
print(f"ISSUE: {issue.type}")
print(f" Item: {issue.item}")
print(f" Description: {issue.description}")
# Verify chain of custody
coc_verification = verifier.verify_chain_of_custody()
print(f"Chain of custody valid: {coc_verification.valid}")
# Generate verification report
verifier.generate_report("/evidence/case001/verification_report.pdf")
Configuration
Environment Variables
| Variable | Description | Required | Default |
|---|---|---|---|
EVIDENCE_OUTPUT |
Default output directory | No | ./evidence |
ACQUISITION_TOOL |
Memory acquisition tool | No | Auto-detect |
HASH_ALGORITHM |
Hash algorithm for integrity | No | SHA256 |
COMPRESS_ARTIFACTS |
Compress collected artifacts | No | true |
Options
| Option | Type | Description |
|---|---|---|
include_memory |
boolean | Include memory dump |
compress |
boolean | Compress artifacts |
encrypt |
boolean | Encrypt evidence package |
verify_collection |
boolean | Verify after collection |
parallel_collection |
boolean | Parallel artifact collection |
Examples
Example 1: Incident Response Collection
Scenario: Rapid artifact collection during active incident
from artifact_collection import IncidentResponseCollector
# Initialize IR collector
collector = IncidentResponseCollector(
case_id="IR-2024-001",
priority="high"
)
# Quick volatile collection
volatile = collector.collect_volatile()
# Critical artifacts only
critical = collector.collect_critical_artifacts()
# Generate IR report
collector.generate_ir_report("/evidence/ir_report.html")
Example 2: Legal Hold Collection
Scenario: Collecting artifacts for legal proceedings
from artifact_collection import LegalHoldCollector
# Initialize with legal requirements
collector = LegalHoldCollector(
case_id="LEGAL-2024-001",
legal_hold_id="LH-12345",
custodian="John Doe"
)
# Collect with full chain of custody
artifacts = collector.collect_all()
# Generate court-ready documentation
collector.generate_legal_package("/evidence/legal/")
Limitations
- Memory acquisition requires appropriate privileges
- Some artifacts may be locked by running processes
- Remote collection depends on network connectivity
- Encrypted files cannot be decrypted without keys
- Collection may impact system performance
- Storage space required for large collections
- Some artifacts may be volatile and change
Troubleshooting
Common Issue 1: Access Denied
Problem: Cannot access certain files Solution:
- Run with elevated privileges
- Use forensic boot media
- Deploy signed collection agent
Common Issue 2: Memory Acquisition Failure
Problem: Cannot capture memory Solution:
- Use alternative acquisition tool
- Check security software interference
- Verify driver compatibility
Common Issue 3: Incomplete Collection
Problem: Some artifacts missing Solution:
- Check for file locks
- Verify permissions
- Review collection logs
Related Skills
- memory-forensics: Analyze collected memory
- disk-forensics: Analyze collected disk artifacts
- timeline-forensics: Build timeline from artifacts
- log-forensics: Analyze collected logs
- incident-response: IR workflow