jy-aiot-collector-developer
4
总安装量
3
周安装量
#53631
全站排名
安装命令
npx skills add https://github.com/feelingsray/ray-aillm-skills --skill jy-aiot-collector-developer
Agent 安装分布
mcpjam
3
claude-code
3
kilo
3
windsurf
3
zencoder
3
Skill 文档
JY-AIOT Collector Developer
Overview
æ¬æè½ç¨äºä¸ºç¿å±±é¸¿éï¼jy-aiotï¼äº§åçPluginæ¡æ¶å¿«éçææ°æ®é鿍¡å代ç ãæ ¹æ®ç¨æ·æä¾çMarkdownè¡¨æ ¼æ ¼å¼çåè®®å®ä¹ææ¡£ï¼èªå¨çæç¬¦åPluginæ¡æ¶è§èçé鿍¡åï¼å æ¬ï¼
- éä¿¡å±ä»£ç ï¼HTTPæå¡ç«¯/客æ·ç«¯ãFTPæä»¶è¯»åãæ°æ®åºå®¢æ·ç«¯ãMQTT订é ãKafka订é
- æ°æ®è§£æå¨ï¼å°åå§æ°æ®è½¬æ¢ä¸ºæ¡æ¶æ½è±¡çæ åæ ¼å¼
- é 置模æ¿ï¼ç¬¦åPluginæ¡æ¶è§èçYAML/JSONé ç½®æä»¶
- éæå ¥å£ï¼å¯ç´æ¥éæå°jy-aiot产åçééæ¡æ¶ä¸
Protocol Document Format
Required Fields
åè®®ææ¡£å¿ é¡»å å«ä»¥ä¸è¡¨æ ¼ç»æï¼
## éä¿¡é
ç½®
| åæ®µ | ç±»å | å¿
å¡« | 说æ |
|------|------|------|------|
| protocol | string | æ¯ | éä¿¡åè®®ï¼http_server/http_client/ftp/database/mqtt/kafka |
| host | string | æ¯ | æå¡å°å |
| port | int | æ¯ | 端å£å· |
| path | string | å¦ | è·¯å¾ï¼HTTP/FTPï¼ |
| username | string | å¦ | ç¨æ·å |
| password | string | å¦ | å¯ç |
| interval | int | å¦ | ééé´éï¼ç§ï¼ï¼é»è®¤60 |
## æ°æ®ç¹å®ä¹
| åæ®µåç§° | åæ®µæ è¯ | æ°æ®ç±»å | åèåç§» | åèé¿åº¦ | 缩æ¾å å | èµ·å§ä½ | ä½é¿åº¦ | 夿³¨ |
|----------|----------|----------|----------|----------|----------|--------|--------|------|
| 温度 | temperature | float | 0 | 4 | 0.1 | - | - | è®¾å¤æ¸©åº¦å¼ |
| åå | pressure | int | 4 | 2 | 1 | - | - | 管éåå |
## æ°æ®è§£æè§å
| è§åID | ç®æ åæ®µ | æºæ°æ®è¡¨è¾¾å¼ | 转æ¢å
¬å¼ | æ°æ®åä½ | 夿³¨ |
|--------|----------|--------------|----------|----------|------|
| rule_01 | temperature | raw[0:4] | bytes_to_float_le(value) | â | å°ç«¯è§£æ |
| rule_02 | pressure | raw[4:6] | bytes_to_int_le(value) / 100 | MPa | 缩æ¾è½¬æ¢ |
Supported Communication Protocols
| åè®® | 说æ | éç¨åºæ¯ |
|---|---|---|
| http_server | HTTPæå¡ç«¯ | è¢«å¨æ¥æ¶è®¾å¤æ¨éæ°æ® |
| http_client | HTTP客æ·ç«¯ | 主å¨è½®è¯¢RESTful API |
| ftp | FTPæä»¶è¯»å | 宿¶æå设å¤å¯¼åºæä»¶ |
| database | æ°æ®åºå®¢æ·ç«¯ | ç´æ¥è¯»åæ°æ®åºè¡¨ |
| mqtt | MQTT订é | 订é MQTT䏻颿¥æ¶æ°æ® |
| kafka | Kafka订é | 订é Kafka topicæ¥æ¶æ°æ® |
Supported Data Types
| ç±»åæ è¯ | Goç±»å | 说æ | ç¤ºä¾ |
|---|---|---|---|
| int | int32/int64 | æç¬¦å·æ´æ° | ä¼ æå¨æ´æ°å¼ |
| uint | uint32/uint64 | æ ç¬¦å·æ´æ° | 计æ°å¨å¼ |
| float | float32/float64 | æµ®ç¹æ° | 温度ãåå |
| string | string | å符串 | 设å¤ç¶æææ¬ |
| bool | bool | å¸å°å¼ | å¼å ³ç¶æ |
| bytes | []byte | åèæ°ç» | åå§äºè¿å¶æ°æ® |
Workflow
1. Protocol Analysis
åæç¨æ·æä¾çåè®®ææ¡£ï¼æåï¼
- éä¿¡æ¹å¼ï¼protocolåæ®µï¼
- è¿æ¥åæ°ï¼hostãportãcredentialsï¼
- æ°æ®ç¹å表ï¼å段åç§°ãæ è¯ãç±»åãä½ç½®ï¼
- è§£æè§åï¼è½¬æ¢å ¬å¼ãåä½ï¼
2. Code Generation
æ ¹æ®å议类åçæå¯¹åºçé鿍¡å代ç ï¼
collector_{protocol}/
âââ config.yaml # éä¿¡åééé
ç½®
âââ collector.go # ééå¨ä¸»é»è¾
âââ parser.go # æ°æ®è§£æå¨
âââ metrics.go # æ°æ®æ¨¡åå®ä¹
âââ README.md # éæè¯´æ
3. Core Components
Collector Interface
ææé鿍¡åå¿ é¡»å®ç°Pluginæ¡æ¶çæ ¸å¿æ¥å£ï¼
type Collector interface {
// Init åå§åééå¨
Init(config Config) error
// Start å¯å¨éé
Start(ctx context.Context) error
// Stop 忢éé
Stop() error
// GetMetrics è·åééå°çæ°æ®
GetMetrics() []Metric
// Health å¥åº·æ£æ¥
Health() HealthStatus
}
Metric Data Model
type Metric struct {
DeviceID string `json:"device_id"` // è®¾å¤æ è¯
MetricID string `json:"metric_id"` // ææ æ è¯
Timestamp time.Time `json:"timestamp"` // ééæ¶é´
Value interface{} `json:"value"` // å¼
Quality DataQuality `json:"quality"` // æ°æ®è´¨é
Tags map[string]string `json:"tags"` // æ ç¾
Fields map[string]string `json:"fields"` // æ©å±å段
}
type DataQuality int
const (
QualityGood DataQuality = 0 // 好
QualityBad DataQuality = 1 // åæ°æ®
QualityUncertain DataQuality = 2 // ä¸ç¡®å®
)
4. Protocol-Specific Implementations
HTTP Server Collector
ç¨äºè¢«å¨æ¥æ¶è®¾å¤æ¨éçæ°æ®ï¼
type HTTPServerCollector struct {
config HTTPServerConfig
handler *http.Server
parser DataParser
buffer sync.Map
}
func (c *HTTPServerCollector) Start(ctx context.Context) error {
c.handler = &http.Server{
Addr: fmt.Sprintf(":%d", c.config.Port),
Handler: c.createRouter(),
}
go c.handler.ListenAndServe()
return nil
}
MQTT Collector
ç¨äºè®¢é MQTT䏻颿¥æ¶å®æ¶æ°æ®ï¼
type MQTTCollector struct {
config MQTTConfig
client mqtt.Client
parser DataParser
buffer sync.Map
}
func (c *MQTTCollector) Start(ctx context.Context) error {
opts := mqtt.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s:%d", c.config.Host, c.config.Port)).
SetClientID(c.config.ClientID)
c.client = mqtt.NewClient(opts)
if token := c.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
c.client.Subscribe(c.config.Topic, 0, c.messageHandler)
return nil
}
Database Collector
ç¨äºç´æ¥æ¥è¯¢æ°æ®åºï¼
type DatabaseCollector struct {
config DatabaseConfig
db *sql.DB
parser DataParser
interval time.Duration
}
func (c *DatabaseCollector) Start(ctx context.Context) error {
connStr := fmt.Sprintf("user=%s password=%s host=%s port=%d dbname=%s",
c.config.Username, c.config.Password,
c.config.Host, c.config.Port, c.config.Database)
var err error
c.db, err = sql.Open("postgres", connStr)
if err != nil {
return err
}
ticker := time.NewTicker(c.interval)
go c.pollLoop(ctx, ticker)
return nil
}
Generated Code Structure
Directory Layout
jy-aiot-collector-{protocol}/
âââ config/
â âââ collector.yaml # 主é
ç½®
â âââ metrics.yaml # ææ å®ä¹
âââ internal/
â âââ collector.go # ééå¨å®ç°
â âââ parser.go # æ°æ®è§£æå¨
â âââ client.go # å议客æ·ç«¯
â âââ model.go # æ°æ®æ¨¡å
âââ scripts/
â âââ test_connection.sh # è¿æ¥æµè¯èæ¬
â âââ mock_server.py # 模ææå¡å¨ï¼æµè¯ç¨ï¼
âââ deploy/
â âââ docker-compose.yml # é¨ç½²é
ç½®
â âââ k8s-deployment.yaml # K8sé¨ç½²
âââ go.mod
âââ main.go
âââ README.md
Configuration Template
# collector.yaml
collector:
name: "temperature_sensor"
protocol: "mqtt"
enabled: true
interval: 10 # ç§
mqtt:
broker: "tcp://192.168.1.100:1883"
topic: "sensors/+/temperature"
client_id: "jy-aiot-collector-001"
username: "collector"
password: "${MQTT_PASSWORD}"
qos: 1
clean_session: true
metrics:
- id: "temperature"
name: "è®¾å¤æ¸©åº¦"
source: "payload.temperature"
datatype: "float"
unit: "â"
tags:
device_type: "sensor"
parser:
format: "json"
timestamp_field: "timestamp"
timestamp_format: "2006-01-02T15:04:05Z07:00"
output:
buffer_size: 1000
batch_size: 100
flush_interval: 5
Parser Implementation
// internal/parser.go
type DataParser interface {
Parse(raw []byte) ([]Metric, error)
Validate(data interface{}) bool
Transform(value interface{}, rule string) interface{}
}
type JSONParser struct {
timestampField string
timestampFormat string
}
func (p *JSONParser) Parse(raw []byte) ([]Metric, error) {
var data map[string]interface{}
if err := json.Unmarshal(raw, &data); err != nil {
return nil, err
}
metrics := make([]Metric, 0)
// è§£æé»è¾
return metrics, nil
}
Integration Guide
1. Module Registration
å¨Pluginæ¡æ¶ä¸æ³¨åé鿍¡åï¼
// plugin.go
func init() {
collector.Register("mqtt", NewMQTTCollector)
collector.Register("http_server", NewHTTPServerCollector)
collector.Register("database", NewDatabaseCollector)
}
2. Configuration Loading
å 载并éªè¯é ç½®ï¼
// config/config.go
type Config struct {
Collector CollectorConfig `yaml:"collector"`
MQTT MQTTConfig `yaml:"mqtt"`
Metrics []MetricConfig `yaml:"metrics"`
Parser ParserConfig `yaml:"parser"`
Output OutputConfig `yaml:"output"`
}
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
// éªè¯é
ç½®
if err := config.Validate(); err != nil {
return nil, err
}
return &config, nil
}
3. Health Check
å®ç°å¥åº·æ£æ¥æ¥å£ï¼
func (c *MQTTCollector) Health() HealthStatus {
status := HealthStatus{
Status: "healthy",
Checks: make(map[string]string),
}
if !c.client.IsConnected() {
status.Status = "unhealthy"
status.Checks["connection"] = "disconnected"
}
return status
}
Example Usage
User Request Example
ç¨æ·è¯·æ±ï¼
请åºäºä»¥ä¸åè®®ææ¡£çæMQTTæ¸©åº¦ä¼ æå¨é鿍¡åï¼
## éä¿¡é
ç½®
| åæ®µ | å¼ |
|------|-----|
| protocol | mqtt |
| host | 192.168.1.100 |
| port | 1883 |
| topic | sensors/+/temperature |
## æ°æ®ç¹å®ä¹
| åæ®µåç§° | åæ®µæ è¯ | æ°æ®ç±»å |
|----------|----------|----------|
| æ¸©åº¦å¼ | temperature | float |
| 设å¤ID | device_id | string |
Generated Response
æ ¹æ®åè®®ææ¡£çæå®æ´çé鿍¡å代ç ï¼å æ¬ï¼
collector.yamlé ç½®æä»¶internal/collector.goééå¨å®ç°internal/parser.goJSONè§£æå¨main.goç¨åºå ¥å£README.mdéæè¯´æ
Resources
scripts/
generate_collector.py– é鿍¡å代ç çæèæ¬test_protocol.py– åè®®è§£ææµè¯å·¥å ·mock_device.py– 模æè®¾å¤æ°æ®æº
references/
- COMMUNICATION.md – åéä¿¡å议详ç»é ç½®
- PARSING.md – æ°æ®è§£æè§åå转æ¢å½æ°
- MODELS.md – æ°æ®æ¨¡å忥å£å®ä¹
- EXAMPLES.md – 宿´é鿍¡å示ä¾
assets/
templates/collector_template.go– ééå¨ä»£ç 模æ¿templates/config_template.yaml– é ç½®æä»¶æ¨¡æ¿schemas/metric.schema.json– ææ å®ä¹JSON Schema