rust-dpdk
1
总安装量
1
周安装量
#46913
全站排名
安装命令
npx skills add https://github.com/huiali/rust-skills --skill rust-dpdk
Agent 安装分布
trae
1
qoder
1
trae-cn
1
claude-code
1
antigravity
1
Skill 文档
ç¨æ·æç½ç» (DPDK)
æ ¸å¿é®é¢
å¦ä½å®ç°ç¾ä¸çº§ PPS ç髿§è½ç½ç»æ°æ®å å¤çï¼
ä¼ ç»å æ ¸ç½ç»æ æå¤ªå¤ä¸ä¸æåæ¢åå åæ·è´å¼éã
DPDK vs å æ ¸ç½ç»æ
| ç¹æ§ | å æ ¸ç½ç»æ | DPDK |
|---|---|---|
| ä¸ä¸æåæ¢ | æ¯æ¬¡å é½åæ¢ | 轮询模å¼ï¼æ 忢 |
| å åæ·è´ | 夿¬¡æ·è´ | é¶æ·è´ |
| 䏿 | é¢ç¹ä¸æ | 轮询 (poll mode driver) |
| å»¶è¿ | è¾é« | å¾®ç§çº§ |
| ååé | ä¸çº§ PPS | ç¾ä¸çº§ PPS |
| CPU å©ç¨ç | è¾ä½ä½æå¼é | é«ä½é«æ |
æ ¸å¿ç»ä»¶
// DPDK æ ¸å¿ç»æ
struct DpdkContext {
memory_pool: Mempool, // å
åæ±
ports: Vec<Port>, // ç½å¡ç«¯å£
rx_queues: Vec<RxQueue>, // æ¥æ¶éå
tx_queues: Vec<TxQueue>, // åééå
cpu_cores: Vec<Core>, // CPU æ ¸å¿åé
}
struct Port {
port_id: u16,
mac_addr: [u8; 6],
link_speed: u32,
max_queues: u16,
}
struct Mempool {
name: String,
buffer_size: usize,
cache_size: usize,
total_buffers: u32,
}
å åæ± 管ç
// å建 DPDK å
åæ±
fn create_mempool() -> Result<Mempool, DpdkError> {
let mempool = unsafe {
rte_mempool_create(
b"packet_pool\0".as_ptr() as *const c_char,
NUM_BUFFERS as u32, // ç¼å²åºæ°é
BUFFER_SIZE as u16, // æ¯ä¸ªç¼å²åºå¤§å°
CACHE_SIZE as u32, // CPU ç¼å大å°
0, // ç§ææ°æ®å¤§å°
Some(rte_pktmbuf_pool_init), // åå§å彿°
std::ptr::null(), // åå§ååæ°
Some(rte_pktmbuf_init), // 对象åå§å彿°
std::ptr::null(), // å¯¹è±¡åæ°
rte_socket_id() as i32, // å
åæå¨ Socket
0, // æ å¿ä½
)
};
if mempool.is_null() {
Err(DpdkError::MempoolCreateFailed)
} else {
Ok(Mempool { inner: mempool })
}
}
// åé
ç¼å²åº
fn alloc_mbuf(mempool: &Mempool) -> Option<*mut rte_mbuf> {
unsafe {
let mbuf = rte_pktmbuf_alloc(mempool.inner);
if mbuf.is_null() {
None
} else {
Some(mbuf)
}
}
}
é¶æ·è´æ¥æ¶
// é¶æ·è´æ¥æ¶æ°æ®å
fn process_packets(
port_id: u16,
queue_id: u16,
bufs: &mut [*mut rte_mbuf; MAX_BURST_SIZE],
) -> usize {
let num_received = unsafe {
rte_eth_rx_burst(
port_id,
queue_id,
bufs.as_mut_ptr(),
MAX_BURST_SIZE as u16,
)
};
// ç´æ¥å¤ç mbufï¼ä¸éè¦æ·è´
for i in 0..num_received {
let mbuf = bufs[i];
// è®¿é®æ°æ®ï¼é¶æ·è´ï¼
let data_ptr = unsafe {
rte_pktmbuf_mtod(mbuf, *const u8)
};
let data_len = unsafe {
rte_pktmbuf_pkt_len(mbuf)
};
// å¤çæ°æ®å
process_packet(data_ptr, data_len);
// éæ¾ mbuf åå
åæ±
unsafe {
rte_pktmbuf_free(mbuf);
}
}
num_received
}
æ¹éåé
// æ¹éåéæ°æ®å
fn transmit_packets(
port_id: u16,
queue_id: u16,
packets: &[Packet],
) -> usize {
let mut mbufs: Vec<*mut rte_mbuf> = packets
.iter()
.map(|p| p.to_mbuf())
.collect();
let sent = unsafe {
rte_eth_tx_burst(
port_id,
queue_id,
mbufs.as_mut_ptr(),
mbufs.len() as u16,
)
};
// éæ¾æªåéç mbuf
for i in sent..mbufs.len() {
unsafe {
rte_pktmbuf_free(mbufs[i]);
}
}
sent
}
RSS è´è½½åè¡¡
// é
ç½® RSS (Receive Side Scaling)
fn configure_rss(port_id: u16) -> Result<(), DpdkError> {
let mut port_info: rte_eth_dev_info = unsafe { std::mem::zeroed() };
unsafe {
rte_eth_dev_info_get(port_id, &mut port_info);
}
// é
ç½® RSS åå¸
let mut rss_conf: rte_eth_rss_conf = unsafe { std::mem::zeroed() };
rss_conf.rss_key_len = 40;
rss_conf.rss_hf = RTE_ETH_RSS_TCP | RTE_ETH_RSS_UDP | RTE_ETH_RSS_IPV4;
unsafe {
let ret = rte_eth_dev_rss_hash_conf_update(
port_id,
&rss_conf,
);
if ret < 0 {
return Err(DpdkError::RssConfigFailed);
}
}
Ok(())
}
// æ ¹æ®åå¸å¼åé
éå
fn get_queue_by_hash(hash: u32, num_queues: u16) -> u16 {
// 使ç¨ç®åç忍¡åå
(hash % num_queues as u32) as u16
}
å¤éåé ç½®
// é
ç½®å¤éå
fn configure_multi_queue(port_id: u16, num_queues: u16) -> Result<(), DpdkError> {
let mut port_conf: rte_eth_conf = unsafe { std::mem::zeroed() };
port_conf.rxmode.split_hdr_size = 0;
port_conf.rxmode.mq_mode = rte_eth_mq_mode::ETH_MQ_RX_RSS;
port_conf.txmode.mq_mode = rte_eth_mq_mode::ETH_MQ_TX_NONE;
// é
ç½® RX éå
let mut rx_conf: rte_eth_rxconf = unsafe { std::mem::zeroed() };
rx_conf.rx_free_thresh = 32;
rx_conf.rx_drop_en = 0;
// é
ç½® TX éå
let mut tx_conf: rte_eth_txconf = unsafe { std::mem::zeroed() };
tx_conf.tx_free_thresh = 32;
// åé
RX éå
for queue in 0..num_queues {
unsafe {
let ret = rte_eth_rx_queue_setup(
port_id,
queue,
1024, // éåæ·±åº¦
rte_socket_id() as u32,
&rx_conf,
mempool.inner,
);
if ret < 0 {
return Err(DpdkError::QueueSetupFailed);
}
}
}
// åé
TX éå
for queue in 0..num_queues {
unsafe {
let ret = rte_eth_tx_queue_setup(
port_id,
queue,
1024,
rte_socket_id() as u32,
&tx_conf,
);
if ret < 0 {
return Err(DpdkError::QueueSetupFailed);
}
}
}
Ok(())
}
CPU äº²åæ§
use std::os::raw::c_int;
use std::thread;
fn set_cpu_affinity(core_id: u32) -> Result<(), DpdkError> {
let mut cpuset: cpu_set_t = unsafe { std::mem::zeroed() };
unsafe {
CPU_SET(core_id as usize, &mut cpuset);
let ret = pthread_setaffinity_np(
pthread_self(),
std::mem::size_of::<cpu_set_t>(),
&cpuset,
);
if ret != 0 {
return Err(DpdkError::AffinitySetFailed);
}
}
Ok(())
}
// 为æ¯ä¸ª RX éååé
ä¸ç¨æ ¸å¿
fn allocate_cores_for_queues(num_queues: u16) {
for queue in 0..num_queues {
thread::spawn(move || {
set_cpu_affinity(queue as u32).unwrap();
process_queue(queue);
});
}
}
æ§è½ä¼å
| ä¼åç¹ | æ¹æ³ |
|---|---|
| å åå¯¹é½ | ç¼åè¡å¯¹é½ (64 åè) |
| æ ééå | ä½¿ç¨ SPSC éå |
| æ¹å¤ç | æ¹éæ¶ååå°ç³»ç»è°ç¨ |
| CPU äº²åæ§ | æ ¸å¿ç»å®åå°ä¸ä¸æåæ¢ |
| Hugepages | 2MB/1GB 大页åå° TLB miss |
ä¸å ¶ä»æè½å ³è
rust-dpdk
â
ââ⺠rust-performance â æ§è½ä¼å
ââ⺠rust-embedded â no_std ç¯å¢
ââ⺠rust-concurrency â 并忍¡å