Skip to content

Commit f6104ea

Browse files
committed
Save work on support
1 parent e65673c commit f6104ea

File tree

7 files changed

+137
-74
lines changed

7 files changed

+137
-74
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ Some highlights:
6666

6767
## Running eRPC over DPDK on Microsoft Azure VMs
6868

69-
* eRPC works well on Azure VMs with accelerated networking. For now, eRPC
70-
supports only one RPC ID per machine on Azure.
69+
* eRPC works well on Azure VMs with accelerated networking.
7170

7271
* Configure two Ubuntu 18.04 VMs as below. Use the same resource group and
7372
availability zone for both VMs.

apps/small_rpc_tput/config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
--concurrency 60
55
--msg_size 32
66
--num_processes 2
7-
--num_threads 1
7+
--num_threads 4
88
--numa_0_ports 0
99
--numa_1_ports 1,3

scripts/autorun_app_file

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
latency
1+
small_rpc_tput

src/transport_impl/dpdk/dpdk_transport.cc

Lines changed: 97 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <iomanip>
44
#include <stdexcept>
55

6+
#include <rte_thash.h>
67
#include <rte_version.h>
78
#include <set>
89
#include "dpdk_transport.h"
@@ -12,6 +13,7 @@
1213
namespace erpc {
1314

1415
constexpr size_t DpdkTransport::kMaxDataPerPkt;
16+
static_assert(sizeof(eth_routing_info_t) <= Transport::kMaxRoutingInfoSize, "");
1517

1618
static volatile bool port_initialized[RTE_MAX_ETHPORTS]; // Uses dpdk_lock
1719

@@ -21,6 +23,14 @@ static std::set<size_t> used_qp_ids[RTE_MAX_ETHPORTS];
2123
/// mempool_arr[i][j] is the mempool to use for port i, queue j
2224
rte_mempool *mempool_arr[RTE_MAX_ETHPORTS][DpdkTransport::kMaxQueuesPerPort];
2325

26+
/// Key used for RSS hashing
27+
static constexpr uint8_t default_rss_key[] = {
28+
0x2c, 0xc6, 0x81, 0xd1, 0x5b, 0xdb, 0xf4, 0xf7, 0xfc, 0xa2,
29+
0x83, 0x19, 0xdb, 0x1a, 0x3e, 0x94, 0x6b, 0x9e, 0x38, 0xd9,
30+
0x2c, 0x9c, 0x03, 0xd1, 0xad, 0x99, 0x44, 0xa7, 0xd9, 0x56,
31+
0x3d, 0x59, 0x06, 0x3c, 0x25, 0xf3, 0xfc, 0x1f, 0xdc, 0x2a,
32+
};
33+
2434
// Initialize the protection domain, queue pair, and memory registration and
2535
// deregistration functions. RECVs will be initialized later when the hugepage
2636
// allocator is provided.
@@ -116,42 +126,19 @@ void DpdkTransport::setup_phy_port() {
116126
rte_eth_conf eth_conf;
117127
memset(&eth_conf, 0, sizeof(eth_conf));
118128

119-
eth_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
120-
eth_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
121-
eth_conf.rxmode.offloads = 0;
129+
eth_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
130+
eth_conf.rx_adv_conf.rss_conf.rss_key =
131+
const_cast<uint8_t *>(default_rss_key);
132+
eth_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
133+
eth_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;
122134

123135
eth_conf.txmode.mq_mode = ETH_MQ_TX_NONE;
124136
eth_conf.txmode.offloads = kOffloads;
125137

126-
eth_conf.fdir_conf.mode = RTE_FDIR_MODE_PERFECT;
127-
eth_conf.fdir_conf.pballoc = RTE_FDIR_PBALLOC_64K;
128-
eth_conf.fdir_conf.status = RTE_FDIR_NO_REPORT_STATUS;
129-
eth_conf.fdir_conf.mask.dst_port_mask = 0xffff;
130-
eth_conf.fdir_conf.drop_queue = 0;
131-
132138
int ret = rte_eth_dev_configure(phy_port, kMaxQueuesPerPort,
133139
kMaxQueuesPerPort, &eth_conf);
134140
rt_assert(ret == 0, "Ethdev configuration error: ", strerror(-1 * ret));
135141

136-
// Set flow director fields if flow director is supported. It's OK if the
137-
// FILTER_SET command fails (e.g., on ConnectX-4 NICs).
138-
if (kInstallFlowRules &&
139-
rte_eth_dev_filter_supported(phy_port, RTE_ETH_FILTER_FDIR) == 0) {
140-
struct rte_eth_fdir_filter_info fi;
141-
memset(&fi, 0, sizeof(fi));
142-
fi.info_type = RTE_ETH_FDIR_FILTER_INPUT_SET_SELECT;
143-
fi.info.input_set_conf.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_UDP;
144-
fi.info.input_set_conf.inset_size = 2;
145-
fi.info.input_set_conf.field[0] = RTE_ETH_INPUT_SET_L3_DST_IP4;
146-
fi.info.input_set_conf.field[1] = RTE_ETH_INPUT_SET_L4_UDP_DST_PORT;
147-
fi.info.input_set_conf.op = RTE_ETH_INPUT_SET_SELECT;
148-
ret = rte_eth_dev_filter_ctrl(phy_port, RTE_ETH_FILTER_FDIR,
149-
RTE_ETH_FILTER_SET, &fi);
150-
if (ret != 0) {
151-
ERPC_WARN("Failed to set flow director fields. Could be survivable...\n");
152-
}
153-
}
154-
155142
// Set up all RX and TX queues and start the device. This can't be done later
156143
// on a per-thread basis since we must start the device to use any queue.
157144
// Once the device is started, more queues cannot be added without stopping
@@ -190,11 +177,6 @@ void DpdkTransport::setup_phy_port() {
190177
ret = rte_eth_tx_queue_setup(phy_port, i, kNumTxRingDesc, numa_node,
191178
&eth_tx_conf);
192179
rt_assert(ret == 0, "Failed to setup TX queue: " + std::to_string(i));
193-
194-
if (kInstallFlowRules) {
195-
install_flow_rule(phy_port, i, get_port_ipv4_addr(phy_port),
196-
udp_port_for_queue(phy_port, i));
197-
}
198180
}
199181

200182
rte_eth_dev_start(phy_port);
@@ -225,6 +207,24 @@ void DpdkTransport::resolve_phy_port() {
225207

226208
resolve.ipv4_addr = get_port_ipv4_addr(phy_port);
227209

210+
// Resolve RSS indirection table size
211+
struct rte_eth_dev_info dev_info;
212+
rte_eth_dev_info_get(phy_port, &dev_info);
213+
214+
rt_assert(std::string(dev_info.driver_name) == "net_mlx4" or
215+
std::string(dev_info.driver_name) == "net_mlx5",
216+
"eRPC supports only mlx4 or mlx5 devices with DPDK");
217+
if (std::string(dev_info.driver_name) == "net_mlx4") {
218+
// MLX4 NICs report a reta size of zero, but they use 128 internally
219+
rt_assert(dev_info.reta_size == 0,
220+
"Unexpected RETA size for MLX4 NIC (expected zero)");
221+
resolve.reta_size = 128;
222+
} else {
223+
resolve.reta_size = dev_info.reta_size;
224+
rt_assert(resolve.reta_size >= kMaxQueuesPerPort,
225+
"Too few entries in NIC RSS indirection table");
226+
}
227+
228228
// Resolve bandwidth
229229
struct rte_eth_link link;
230230
rte_eth_link_get(static_cast<uint8_t>(phy_port), &link);
@@ -244,10 +244,12 @@ void DpdkTransport::resolve_phy_port() {
244244
resolve.bandwidth = 10.0 * (1000 * 1000 * 1000) / 8.0;
245245
}
246246

247-
ERPC_INFO("Resolved port %u: MAC %s, IPv4 %s, bandwidth %.1f Gbps\n",
248-
phy_port, mac_to_string(resolve.mac_addr).c_str(),
249-
ipv4_to_string(htonl(resolve.ipv4_addr)).c_str(),
250-
resolve.bandwidth * 8.0 / (1000 * 1000 * 1000));
247+
ERPC_INFO(
248+
"Resolved port %u: MAC %s, IPv4 %s, RETA size %zu entries, bandwidth "
249+
"%.1f Gbps\n",
250+
phy_port, mac_to_string(resolve.mac_addr).c_str(),
251+
ipv4_to_string(htonl(resolve.ipv4_addr)).c_str(), resolve.reta_size,
252+
resolve.bandwidth * 8.0 / (1000 * 1000 * 1000));
251253
}
252254

253255
void DpdkTransport::fill_local_routing_info(RoutingInfo *routing_info) const {
@@ -256,17 +258,71 @@ void DpdkTransport::fill_local_routing_info(RoutingInfo *routing_info) const {
256258
memcpy(ri->mac, resolve.mac_addr, 6);
257259
ri->ipv4_addr = resolve.ipv4_addr;
258260
ri->udp_port = rx_flow_udp_port;
261+
ri->rxq_id = qp_id;
262+
ri->reta_size = resolve.reta_size;
263+
}
264+
265+
/**
266+
* @brief Return a source UDP port for which the RSS target queue at the remote
267+
* receiver for the ntuple {src_ip, dst_ip, src_port, dst_port} will be
268+
* remote_queue_id.
269+
*
270+
* All ntuple arguments and return value is in host-byte order
271+
*
272+
* @param remote_queue_id The remote NIC RX queue to target
273+
* @param remote_reta_size The number of entries in the remote NIC's RSS
274+
* indirection table
275+
* @param src_ip This NIC's IPv4 address
276+
* @param dst_ip The remote NIC's IPv4 address
277+
* @param dst_port The UDP port the remote endpoint is listening on
278+
* @return The source UDP port this endpoint should use for targeting
279+
*/
280+
static uint16_t get_udp_src_port_for_target_queue(size_t remote_queue_id,
281+
size_t remote_reta_size,
282+
uint32_t src_ip,
283+
uint32_t dst_ip,
284+
uint16_t dst_port) {
285+
uint16_t src_port = kBaseEthUDPPort;
286+
for (; src_port < UINT16_MAX; src_port++) {
287+
union rte_thash_tuple tuple;
288+
tuple.v4.src_addr = src_ip;
289+
tuple.v4.dst_addr = dst_ip;
290+
tuple.v4.sport = src_port;
291+
tuple.v4.dport = dst_port;
292+
uint32_t rss_l3l4 = rte_softrss(reinterpret_cast<uint32_t *>(&tuple),
293+
RTE_THASH_V4_L4_LEN, default_rss_key);
294+
295+
size_t target_queue =
296+
(rss_l3l4 % remote_reta_size) % DpdkTransport::kMaxQueuesPerPort;
297+
if (target_queue == remote_queue_id) break;
298+
}
299+
300+
if (src_port == UINT16_MAX) {
301+
ERPC_ERROR(
302+
"Failed to find src port that targets remote queue %zu. "
303+
"Remote RETA size = %zu.\n",
304+
remote_queue_id, remote_reta_size);
305+
rt_assert(false);
306+
}
307+
308+
return src_port;
259309
}
260310

261311
// Generate most fields of the L2--L4 headers now to avoid recomputation.
262312
bool DpdkTransport::resolve_remote_routing_info(
263313
RoutingInfo *routing_info) const {
264314
auto *ri = reinterpret_cast<eth_routing_info_t *>(routing_info);
315+
316+
// Save/use info from routing_info before we overwrite it
265317
uint8_t remote_mac[6];
266318
memcpy(remote_mac, ri->mac, 6);
267-
uint32_t remote_ipv4_addr = ri->ipv4_addr;
268-
uint16_t remote_udp_port = ri->udp_port;
319+
const uint32_t remote_ipv4_addr = ri->ipv4_addr;
320+
const uint16_t remote_udp_port = ri->udp_port;
321+
const uint16_t udp_src_port = get_udp_src_port_for_target_queue(
322+
ri->rxq_id, ri->reta_size, resolve.ipv4_addr, remote_ipv4_addr,
323+
remote_udp_port);
269324

325+
// Overwrite routing_info by constructing the packet header in place
270326
static_assert(kMaxRoutingInfoSize >= kInetHdrsTotSize, "");
271327

272328
auto *eth_hdr = reinterpret_cast<eth_hdr_t *>(ri);
@@ -276,7 +332,8 @@ bool DpdkTransport::resolve_remote_routing_info(
276332
gen_ipv4_header(ipv4_hdr, resolve.ipv4_addr, remote_ipv4_addr, 0);
277333

278334
auto *udp_hdr = reinterpret_cast<udp_hdr_t *>(&ipv4_hdr[1]);
279-
gen_udp_header(udp_hdr, rx_flow_udp_port, remote_udp_port, 0);
335+
gen_udp_header(udp_hdr, udp_src_port, remote_udp_port, 0);
336+
280337
return true;
281338
}
282339

src/transport_impl/dpdk/dpdk_transport.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,11 @@ class DpdkTransport : public Transport {
2525
// Transport-specific constants
2626
static constexpr TransportType kTransportType = TransportType::kDPDK;
2727
static constexpr size_t kMTU = 1024;
28-
static constexpr size_t kMaxQueuesPerPort = kIsAzure ? 1 : 16;
28+
static constexpr size_t kMaxQueuesPerPort = 16;
2929

3030
static constexpr size_t kNumTxRingDesc = 128;
3131
static constexpr size_t kPostlist = 32;
3232

33-
// If true, install flow steering rules into the NIC
34-
static constexpr bool kInstallFlowRules = kIsAzure ? false : true;
35-
3633
// The PMD may inline internally, but this class doesn't do it
3734
static constexpr size_t kMaxInline = 0;
3835

@@ -204,18 +201,19 @@ class DpdkTransport : public Transport {
204201

205202
size_t rx_ring_head = 0, rx_ring_tail = 0;
206203

207-
uint16_t rx_flow_udp_port = 0;
208-
size_t qp_id = SIZE_MAX; ///< The RX/TX queue pair for this Transport
204+
uint16_t rx_flow_udp_port = 0; ///< The UDP port this transport listens on
205+
size_t qp_id = SIZE_MAX; ///< The RX/TX queue pair for this Transport
209206

210207
// We don't use DPDK's lcore threads, so a shared mempool with per-lcore
211208
// cache won't work. Instead, we use per-thread pools with zero cached mbufs.
212209
rte_mempool *mempool;
213210

214211
/// Info resolved from \p phy_port, must be filled by constructor.
215212
struct {
216-
uint32_t ipv4_addr; ///< The port's IPv4 address in host-byte order
217-
uint8_t mac_addr[6]; ///< The port's MAC address
218-
size_t bandwidth = 0; ///< Link bandwidth in bytes per second
213+
uint32_t ipv4_addr; // The port's IPv4 address in host-byte order
214+
uint8_t mac_addr[6]; // The port's MAC address
215+
size_t bandwidth; // Link bandwidth in bytes per second
216+
size_t reta_size; // Number of entries in NIC RX indirection table
219217
} resolve;
220218
};
221219

src/transport_impl/dpdk/dpdk_transport_datapath.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#ifdef ERPC_DPDK
22

3+
#include <rte_thash.h>
34
#include "dpdk_transport.h"
45
#include "util/huge_alloc.h"
56

@@ -19,8 +20,8 @@ static void format_pkthdr(pkthdr_t *pkthdr,
1920
memset(&eth_hdr->dst_mac, 0, sizeof(eth_hdr->dst_mac));
2021
}
2122

22-
// On most bare-metal clusters, a zero IP checksum works fine.
23-
// But on Azure VMs we need a valid checksum.
23+
// On most bare-metal clusters, a zero IP checksum works fine. But on Azure
24+
// VMs we need a valid checksum.
2425
ipv4_hdr_t *ipv4_hdr = pkthdr->get_ipv4_hdr();
2526
ipv4_hdr->tot_len = htons(pkt_size - sizeof(eth_hdr_t));
2627
ipv4_hdr->check = get_ipv4_checksum(ipv4_hdr);

src/transport_impl/eth_common.h

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,6 @@ static std::string ipv4_to_string(uint32_t ipv4_addr) {
5555
return str;
5656
}
5757

58-
/// eRPC session endpoint routing info for Ethernet-based transports. The MAC
59-
/// address is in the byte order retrived from the driver. The IPv4 address and
60-
/// UDP port are in host-byte order.
61-
struct eth_routing_info_t {
62-
uint8_t mac[6];
63-
uint32_t ipv4_addr;
64-
uint16_t udp_port;
65-
66-
std::string to_string() {
67-
std::ostringstream ret;
68-
ret << "[MAC " << mac_to_string(mac) << ", IP " << ipv4_to_string(ipv4_addr)
69-
<< ", UDP port " << std::to_string(udp_port) << "]";
70-
71-
return std::string(ret.str());
72-
}
73-
// This must be smaller than Transport::kMaxRoutingInfoSize, but a static
74-
// assert here causes a circular dependency.
75-
};
76-
7758
struct eth_hdr_t {
7859
uint8_t dst_mac[6];
7960
uint8_t src_mac[6];
@@ -136,6 +117,33 @@ static constexpr size_t kInetHdrsTotSize =
136117
sizeof(eth_hdr_t) + sizeof(ipv4_hdr_t) + sizeof(udp_hdr_t);
137118
static_assert(kInetHdrsTotSize == 42, "");
138119

120+
/// eRPC session endpoint routing info for Ethernet-based transports. The MAC
121+
/// address is in the byte order retrived from the driver. The IPv4 address and
122+
/// UDP port are in host-byte order.
123+
struct eth_routing_info_t {
124+
uint8_t mac[6];
125+
uint32_t ipv4_addr; // The IPv4 address for this endpoint
126+
uint16_t udp_port; // The UDP port this endpoint listens on
127+
uint16_t rxq_id = UINT16_MAX; // The NIC RX queue ID this endpoint listens on
128+
129+
// Number of entries in this endpoint's NIC RSS indirection table
130+
uint16_t reta_size = UINT16_MAX;
131+
132+
std::string to_string() const {
133+
std::ostringstream ret;
134+
ret << "[MAC " << mac_to_string(mac) << ", IP " << ipv4_to_string(ipv4_addr)
135+
<< ", UDP port " << std::to_string(udp_port) << ", RQ queue ID "
136+
<< (rxq_id == UINT16_MAX ? " N/A " : std::to_string(rxq_id))
137+
<< ", RETA size "
138+
<< ((reta_size == UINT16_MAX) ? " N/A" : std::to_string(reta_size))
139+
<< "]";
140+
141+
return std::string(ret.str());
142+
}
143+
// This must be smaller than Transport::kMaxRoutingInfoSize, but a static
144+
// assert here causes a circular dependency.
145+
};
146+
139147
static std::string frame_header_to_string(uint8_t* buf) {
140148
auto* eth_hdr = reinterpret_cast<eth_hdr_t*>(buf);
141149
auto* ipv4_hdr = reinterpret_cast<ipv4_hdr_t*>(&eth_hdr[1]);

0 commit comments

Comments
 (0)