research
npx skills add https://github.com/sherifeldeeb/agentskills --skill research
Agent 安装分布
Skill 文档
Research Skill
Gather and synthesize information from web sources, APIs, and databases with support for structured data extraction and report generation.
Capabilities
- Web Research: Fetch and extract information from web pages
- API Integration: Query APIs for structured data (NVD, MITRE, etc.)
- Information Synthesis: Compile findings into structured reports
- Source Tracking: Maintain references and citations
- Data Extraction: Parse HTML, JSON, and XML content
- RSS/Atom Feeds: Monitor security feeds and news sources
Quick Start
import requests
from bs4 import BeautifulSoup
# Fetch web page content
response = requests.get('https://example.com/security-advisory')
soup = BeautifulSoup(response.text, 'html.parser')
# Extract title and content
title = soup.find('h1').text
content = soup.find('article').text
print(f"Title: {title}")
Usage
Web Page Content Extraction
Extract text and structured data from web pages.
Input: URL to fetch
Process:
- Send HTTP request
- Parse HTML response
- Extract relevant content
Example:
import requests
from bs4 import BeautifulSoup
from typing import Dict, Any, List
from urllib.parse import urljoin
def extract_page_content(url: str) -> Dict[str, Any]:
"""Extract content from a web page."""
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; ResearchBot/1.0)'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for element in soup(['script', 'style', 'nav', 'footer']):
element.decompose()
# Extract metadata
title = soup.find('title')
description = soup.find('meta', attrs={'name': 'description'})
# Extract main content
main_content = soup.find('main') or soup.find('article') or soup.find('body')
return {
'url': url,
'title': title.text.strip() if title else '',
'description': description['content'] if description else '',
'text': main_content.get_text(separator='\n', strip=True) if main_content else '',
'links': [urljoin(url, a['href']) for a in soup.find_all('a', href=True)][:20]
}
# Usage
content = extract_page_content('https://nvd.nist.gov/vuln/detail/CVE-2024-1234')
print(f"Title: {content['title']}")
print(f"Content length: {len(content['text'])} chars")
API Data Retrieval
Query security-related APIs for structured data.
Example – NVD CVE Lookup:
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any
def search_nvd_cves(
keyword: str = None,
cvss_severity: str = None,
days_back: int = 30
) -> List[Dict[str, Any]]:
"""
Search NVD for CVEs matching criteria.
Args:
keyword: Search keyword
cvss_severity: LOW, MEDIUM, HIGH, or CRITICAL
days_back: Number of days to look back
Returns:
List of CVE records
"""
base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
params = {}
if keyword:
params['keywordSearch'] = keyword
if cvss_severity:
params['cvssV3Severity'] = cvss_severity
# Date range
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days_back)
params['pubStartDate'] = start_date.strftime('%Y-%m-%dT00:00:00.000')
params['pubEndDate'] = end_date.strftime('%Y-%m-%dT23:59:59.999')
response = requests.get(base_url, params=params, timeout=60)
response.raise_for_status()
data = response.json()
cves = []
for item in data.get('vulnerabilities', []):
cve = item.get('cve', {})
cves.append({
'id': cve.get('id'),
'description': cve.get('descriptions', [{}])[0].get('value', ''),
'published': cve.get('published'),
'cvss_score': extract_cvss_score(cve),
'references': [ref.get('url') for ref in cve.get('references', [])]
})
return cves
def extract_cvss_score(cve: dict) -> float:
"""Extract CVSS v3 score from CVE data."""
metrics = cve.get('metrics', {})
cvss_v3 = metrics.get('cvssMetricV31', metrics.get('cvssMetricV30', []))
if cvss_v3:
return cvss_v3[0].get('cvssData', {}).get('baseScore', 0)
return 0
# Usage
critical_cves = search_nvd_cves(keyword='apache', cvss_severity='CRITICAL', days_back=90)
for cve in critical_cves[:5]:
print(f"{cve['id']} (CVSS: {cve['cvss_score']})")
print(f" {cve['description'][:100]}...")
RSS/Atom Feed Monitoring
Monitor security news and advisory feeds.
Example:
import feedparser
from datetime import datetime
from typing import List, Dict, Any
SECURITY_FEEDS = {
'us_cert': 'https://www.cisa.gov/uscert/ncas/alerts.xml',
'nist': 'https://nvd.nist.gov/feeds/xml/cve/misc/nvd-rss.xml',
'krebs': 'https://krebsonsecurity.com/feed/',
'schneier': 'https://www.schneier.com/feed/atom/',
'threatpost': 'https://threatpost.com/feed/'
}
def fetch_feed(feed_url: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Fetch and parse an RSS/Atom feed."""
feed = feedparser.parse(feed_url)
entries = []
for entry in feed.entries[:limit]:
entries.append({
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'summary': entry.get('summary', '')[:500],
'published': entry.get('published', ''),
'source': feed.feed.get('title', '')
})
return entries
def aggregate_security_news(feeds: dict = None, limit_per_feed: int = 5) -> List[Dict]:
"""Aggregate news from multiple security feeds."""
feeds = feeds or SECURITY_FEEDS
all_entries = []
for name, url in feeds.items():
try:
entries = fetch_feed(url, limit_per_feed)
for entry in entries:
entry['feed_name'] = name
all_entries.extend(entries)
except Exception as e:
print(f"Error fetching {name}: {e}")
return all_entries
# Usage
news = aggregate_security_news(limit_per_feed=3)
for item in news[:10]:
print(f"[{item['feed_name']}] {item['title']}")
print(f" {item['link']}\n")
MITRE ATT&CK Research
Query MITRE ATT&CK framework data.
Example:
import requests
from typing import Dict, List, Any, Optional
ATTACK_BASE_URL = "https://raw.githubusercontent.com/mitre/cti/master"
def get_attack_techniques(
tactic: Optional[str] = None,
platform: str = 'enterprise'
) -> List[Dict[str, Any]]:
"""
Get ATT&CK techniques, optionally filtered by tactic.
Args:
tactic: Tactic name (e.g., 'initial-access', 'execution')
platform: 'enterprise', 'mobile', or 'ics'
Returns:
List of technique information
"""
url = f"{ATTACK_BASE_URL}/{platform}-attack/{platform}-attack.json"
response = requests.get(url, timeout=60)
response.raise_for_status()
data = response.json()
techniques = []
for obj in data.get('objects', []):
if obj.get('type') != 'attack-pattern':
continue
# Filter by tactic if specified
if tactic:
kill_chain = obj.get('kill_chain_phases', [])
tactics = [p.get('phase_name') for p in kill_chain]
if tactic not in tactics:
continue
technique_id = ''
for ref in obj.get('external_references', []):
if ref.get('source_name') == 'mitre-attack':
technique_id = ref.get('external_id', '')
break
techniques.append({
'id': technique_id,
'name': obj.get('name'),
'description': obj.get('description', '')[:500],
'tactics': [p.get('phase_name') for p in obj.get('kill_chain_phases', [])],
'platforms': obj.get('x_mitre_platforms', []),
'detection': obj.get('x_mitre_detection', '')[:300]
})
return techniques
# Usage
initial_access = get_attack_techniques(tactic='initial-access')
for technique in initial_access[:5]:
print(f"{technique['id']}: {technique['name']}")
print(f" Platforms: {', '.join(technique['platforms'])}")
Research Report Generation
Compile research findings into structured reports.
Example:
from datetime import datetime
from typing import List, Dict, Any
def generate_research_report(
topic: str,
findings: List[Dict[str, Any]],
sources: List[str]
) -> str:
"""Generate a markdown research report."""
report = []
# Header
report.append(f"# Research Report: {topic}")
report.append(f"\n**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}")
report.append(f"**Sources Consulted:** {len(sources)}")
report.append("")
# Executive Summary
report.append("## Executive Summary")
report.append(f"\nThis report compiles {len(findings)} findings related to {topic}.")
report.append("")
# Findings
report.append("## Key Findings")
for i, finding in enumerate(findings, 1):
report.append(f"\n### Finding {i}: {finding.get('title', 'Untitled')}")
report.append(f"\n{finding.get('summary', '')}")
if finding.get('details'):
report.append(f"\n**Details:** {finding['details']}")
if finding.get('source'):
report.append(f"\n*Source: {finding['source']}*")
# Sources
report.append("\n## Sources")
for i, source in enumerate(sources, 1):
report.append(f"{i}. {source}")
return '\n'.join(report)
# Usage
findings = [
{
'title': 'Critical RCE in Apache Struts',
'summary': 'A remote code execution vulnerability allows attackers to execute arbitrary commands.',
'details': 'CVE-2024-XXXX affects versions 2.0.0 through 2.5.30',
'source': 'NVD'
},
{
'title': 'Active Exploitation Reported',
'summary': 'CISA has added this vulnerability to KEV catalog.',
'source': 'CISA Alerts'
}
]
sources = [
'https://nvd.nist.gov/vuln/detail/CVE-2024-XXXX',
'https://www.cisa.gov/known-exploited-vulnerabilities-catalog'
]
report = generate_research_report('Apache Struts Vulnerability', findings, sources)
print(report)
Caching and Rate Limiting
Implement responsible data collection practices.
Example:
import requests
import time
import hashlib
import json
from pathlib import Path
from functools import wraps
from typing import Callable, Any
class ResearchCache:
"""Simple file-based cache for research data."""
def __init__(self, cache_dir: str = '.research_cache', ttl_hours: int = 24):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.ttl_seconds = ttl_hours * 3600
def _get_cache_key(self, url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
def get(self, url: str) -> Any:
cache_file = self.cache_dir / f"{self._get_cache_key(url)}.json"
if cache_file.exists():
data = json.loads(cache_file.read_text())
if time.time() - data['timestamp'] < self.ttl_seconds:
return data['content']
return None
def set(self, url: str, content: Any):
cache_file = self.cache_dir / f"{self._get_cache_key(url)}.json"
cache_file.write_text(json.dumps({
'timestamp': time.time(),
'url': url,
'content': content
}))
class RateLimiter:
"""Simple rate limiter."""
def __init__(self, requests_per_minute: int = 30):
self.min_interval = 60.0 / requests_per_minute
self.last_request = 0
def wait(self):
elapsed = time.time() - self.last_request
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_request = time.time()
# Usage
cache = ResearchCache()
rate_limiter = RateLimiter(requests_per_minute=10)
def fetch_with_cache(url: str) -> str:
"""Fetch URL with caching and rate limiting."""
cached = cache.get(url)
if cached:
return cached
rate_limiter.wait()
response = requests.get(url, timeout=30)
content = response.text
cache.set(url, content)
return content
Configuration
Environment Variables
| Variable | Description | Required | Default |
|---|---|---|---|
RESEARCH_CACHE_DIR |
Cache directory | No | .research_cache |
RESEARCH_CACHE_TTL |
Cache TTL in hours | No | 24 |
RESEARCH_RATE_LIMIT |
Requests per minute | No | 30 |
NVD_API_KEY |
NVD API key for higher rate limits | No | None |
Script Options
| Option | Type | Description |
|---|---|---|
--query |
string | Search query |
--source |
string | Data source to query |
--output |
path | Output file path |
--format |
string | Output format (json, markdown, csv) |
Examples
Example 1: Vulnerability Research Report
Scenario: Generate a report on recent critical vulnerabilities.
import requests
from datetime import datetime
def research_critical_vulns(keyword: str, days: int = 30) -> dict:
"""Research critical vulnerabilities for a technology."""
results = {
'keyword': keyword,
'date_range': f'Last {days} days',
'cves': [],
'sources': []
}
# Query NVD
nvd_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
params = {
'keywordSearch': keyword,
'cvssV3Severity': 'CRITICAL'
}
try:
response = requests.get(nvd_url, params=params, timeout=60)
data = response.json()
for item in data.get('vulnerabilities', [])[:10]:
cve = item['cve']
results['cves'].append({
'id': cve['id'],
'description': cve['descriptions'][0]['value'][:300],
'published': cve['published']
})
results['sources'].append(nvd_url)
except Exception as e:
results['errors'] = [str(e)]
return results
# Usage
research = research_critical_vulns('Microsoft Exchange')
print(f"Found {len(research['cves'])} critical CVEs")
Example 2: Threat Actor Research
Scenario: Compile information about a threat actor.
def research_threat_actor(actor_name: str) -> dict:
"""Compile threat actor intelligence."""
report = {
'name': actor_name,
'aliases': [],
'techniques': [],
'campaigns': [],
'sources': []
}
# This would query MITRE ATT&CK groups data
# Simplified example
attack_url = f"https://attack.mitre.org/groups/"
# In practice, you'd parse the ATT&CK data
# and extract relevant information
return report
Limitations
- Rate Limits: External APIs have rate limits that must be respected
- Data Freshness: Cached data may be stale
- Access Restrictions: Some sources require authentication or subscriptions
- Content Parsing: Dynamic/JavaScript-rendered pages may not extract properly
- Legal Compliance: Ensure compliance with robots.txt and terms of service
Troubleshooting
Request Blocked
Problem: Requests returning 403 or blocked status
Solution: Use proper headers and respect rate limits:
headers = {
'User-Agent': 'YourBot/1.0 (contact@example.com)',
'Accept': 'text/html,application/json'
}
Incomplete Content
Problem: Missing content from dynamic pages
Solution: For JavaScript-rendered content, consider using selenium or playwright.
API Rate Limit Exceeded
Problem: Getting 429 Too Many Requests
Solution: Implement exponential backoff:
import time
for attempt in range(5):
response = requests.get(url)
if response.status_code != 429:
break
time.sleep(2 ** attempt)
Related Skills
- threat-intelligence: CTI-specific research
- docx: Generate reports from research findings
- xlsx: Export research data to spreadsheets