Skip to content

Commit 076ed84

Browse files
committed
Save work on Azure support
1 parent e65673c commit 076ed84

File tree

7 files changed

+124
-71
lines changed

7 files changed

+124
-71
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ Some highlights:
6666

6767
## Running eRPC over DPDK on Microsoft Azure VMs
6868

69-
* eRPC works well on Azure VMs with accelerated networking. For now, eRPC
70-
supports only one RPC ID per machine on Azure.
69+
* eRPC works well on Azure VMs with accelerated networking.
7170

7271
* Configure two Ubuntu 18.04 VMs as below. Use the same resource group and
7372
availability zone for both VMs.

apps/small_rpc_tput/config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
--concurrency 60
55
--msg_size 32
66
--num_processes 2
7-
--num_threads 1
7+
--num_threads 4
88
--numa_0_ports 0
99
--numa_1_ports 1,3

scripts/autorun_app_file

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
latency
1+
small_rpc_tput

src/transport_impl/dpdk/dpdk_transport.cc

Lines changed: 85 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <iomanip>
44
#include <stdexcept>
55

6+
#include <rte_thash.h>
67
#include <rte_version.h>
78
#include <set>
89
#include "dpdk_transport.h"
@@ -21,6 +22,14 @@ static std::set<size_t> used_qp_ids[RTE_MAX_ETHPORTS];
2122
/// mempool_arr[i][j] is the mempool to use for port i, queue j
2223
rte_mempool *mempool_arr[RTE_MAX_ETHPORTS][DpdkTransport::kMaxQueuesPerPort];
2324

25+
/// Key used for RSS hashing
26+
static constexpr uint8_t default_rss_key[] = {
27+
0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2, 0x41, 0x67,
28+
0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0, 0xd0, 0xca, 0x2b, 0xcb,
29+
0xae, 0x7b, 0x30, 0xb4, 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30,
30+
0xf2, 0x0c, 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
31+
};
32+
2433
// Initialize the protection domain, queue pair, and memory registration and
2534
// deregistration functions. RECVs will be initialized later when the hugepage
2635
// allocator is provided.
@@ -116,42 +125,19 @@ void DpdkTransport::setup_phy_port() {
116125
rte_eth_conf eth_conf;
117126
memset(&eth_conf, 0, sizeof(eth_conf));
118127

119-
eth_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
120-
eth_conf.rxmode.max_rx_pkt_len = RTE_ETHER_MAX_LEN;
121-
eth_conf.rxmode.offloads = 0;
128+
eth_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
129+
eth_conf.rx_adv_conf.rss_conf.rss_key =
130+
const_cast<uint8_t *>(default_rss_key);
131+
eth_conf.rx_adv_conf.rss_conf.rss_key_len = 40;
132+
eth_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_UDP;
122133

123134
eth_conf.txmode.mq_mode = ETH_MQ_TX_NONE;
124135
eth_conf.txmode.offloads = kOffloads;
125136

126-
eth_conf.fdir_conf.mode = RTE_FDIR_MODE_PERFECT;
127-
eth_conf.fdir_conf.pballoc = RTE_FDIR_PBALLOC_64K;
128-
eth_conf.fdir_conf.status = RTE_FDIR_NO_REPORT_STATUS;
129-
eth_conf.fdir_conf.mask.dst_port_mask = 0xffff;
130-
eth_conf.fdir_conf.drop_queue = 0;
131-
132137
int ret = rte_eth_dev_configure(phy_port, kMaxQueuesPerPort,
133138
kMaxQueuesPerPort, &eth_conf);
134139
rt_assert(ret == 0, "Ethdev configuration error: ", strerror(-1 * ret));
135140

136-
// Set flow director fields if flow director is supported. It's OK if the
137-
// FILTER_SET command fails (e.g., on ConnectX-4 NICs).
138-
if (kInstallFlowRules &&
139-
rte_eth_dev_filter_supported(phy_port, RTE_ETH_FILTER_FDIR) == 0) {
140-
struct rte_eth_fdir_filter_info fi;
141-
memset(&fi, 0, sizeof(fi));
142-
fi.info_type = RTE_ETH_FDIR_FILTER_INPUT_SET_SELECT;
143-
fi.info.input_set_conf.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_UDP;
144-
fi.info.input_set_conf.inset_size = 2;
145-
fi.info.input_set_conf.field[0] = RTE_ETH_INPUT_SET_L3_DST_IP4;
146-
fi.info.input_set_conf.field[1] = RTE_ETH_INPUT_SET_L4_UDP_DST_PORT;
147-
fi.info.input_set_conf.op = RTE_ETH_INPUT_SET_SELECT;
148-
ret = rte_eth_dev_filter_ctrl(phy_port, RTE_ETH_FILTER_FDIR,
149-
RTE_ETH_FILTER_SET, &fi);
150-
if (ret != 0) {
151-
ERPC_WARN("Failed to set flow director fields. Could be survivable...\n");
152-
}
153-
}
154-
155141
// Set up all RX and TX queues and start the device. This can't be done later
156142
// on a per-thread basis since we must start the device to use any queue.
157143
// Once the device is started, more queues cannot be added without stopping
@@ -190,11 +176,6 @@ void DpdkTransport::setup_phy_port() {
190176
ret = rte_eth_tx_queue_setup(phy_port, i, kNumTxRingDesc, numa_node,
191177
&eth_tx_conf);
192178
rt_assert(ret == 0, "Failed to setup TX queue: " + std::to_string(i));
193-
194-
if (kInstallFlowRules) {
195-
install_flow_rule(phy_port, i, get_port_ipv4_addr(phy_port),
196-
udp_port_for_queue(phy_port, i));
197-
}
198179
}
199180

200181
rte_eth_dev_start(phy_port);
@@ -225,6 +206,13 @@ void DpdkTransport::resolve_phy_port() {
225206

226207
resolve.ipv4_addr = get_port_ipv4_addr(phy_port);
227208

209+
// Resolve RSS indirection table size
210+
struct rte_eth_dev_info dev_info;
211+
rte_eth_dev_info_get(phy_port, &dev_info);
212+
reta_size = dev_info.reta_size;
213+
rt_assert(reta_size >= kMaxQueuesPerPort,
214+
"Too few entries in NIC RSS indirection table");
215+
228216
// Resolve bandwidth
229217
struct rte_eth_link link;
230218
rte_eth_link_get(static_cast<uint8_t>(phy_port), &link);
@@ -244,10 +232,12 @@ void DpdkTransport::resolve_phy_port() {
244232
resolve.bandwidth = 10.0 * (1000 * 1000 * 1000) / 8.0;
245233
}
246234

247-
ERPC_INFO("Resolved port %u: MAC %s, IPv4 %s, bandwidth %.1f Gbps\n",
248-
phy_port, mac_to_string(resolve.mac_addr).c_str(),
249-
ipv4_to_string(htonl(resolve.ipv4_addr)).c_str(),
250-
resolve.bandwidth * 8.0 / (1000 * 1000 * 1000));
235+
ERPC_INFO(
236+
"Resolved port %u: MAC %s, IPv4 %s, RETA size %zu entries, bandwidth "
237+
"%.1f Gbps\n",
238+
phy_port, mac_to_string(resolve.mac_addr).c_str(),
239+
ipv4_to_string(htonl(resolve.ipv4_addr)).c_str(), reta_size,
240+
resolve.bandwidth * 8.0 / (1000 * 1000 * 1000));
251241
}
252242

253243
void DpdkTransport::fill_local_routing_info(RoutingInfo *routing_info) const {
@@ -256,17 +246,71 @@ void DpdkTransport::fill_local_routing_info(RoutingInfo *routing_info) const {
256246
memcpy(ri->mac, resolve.mac_addr, 6);
257247
ri->ipv4_addr = resolve.ipv4_addr;
258248
ri->udp_port = rx_flow_udp_port;
249+
ri->rxq_id = qp_id;
250+
ri->reta_size = reta_size;
251+
}
252+
253+
/**
254+
* @brief Return a source UDP port for which the RSS target queue at the remote
255+
* receiver for the ntuple {src_ip, dst_ip, src_port, dst_port} will be
256+
* remote_queue_id.
257+
*
258+
* All ntuple arguments and return value is in host-byte order
259+
*
260+
* @param remote_queue_id The remote NIC RX queue to target
261+
* @param remote_reta_size The number of entries in the remote NIC's RSS
262+
* indirection table
263+
* @param src_ip This NIC's IPv4 address
264+
* @param dst_ip The remote NIC's IPv4 address
265+
* @param dst_port The UDP port the remote endpoint is listening on
266+
* @return The source UDP port this endpoint should use for targeting
267+
*/
268+
static uint16_t get_udp_src_port_for_target_queue(size_t remote_queue_id,
269+
size_t remote_reta_size,
270+
uint32_t src_ip,
271+
uint32_t dst_ip,
272+
uint16_t dst_port) {
273+
uint16_t src_port = kBaseEthUDPPort;
274+
for (; src_port < UINT16_MAX; src_port++) {
275+
union rte_thash_tuple tuple;
276+
tuple.v4.src_addr = src_ip;
277+
tuple.v4.dst_addr = dst_ip;
278+
tuple.v4.sport = src_port;
279+
tuple.v4.dport = dst_port;
280+
uint32_t rss_l3l4 = rte_softrss(reinterpret_cast<uint32_t *>(&tuple),
281+
RTE_THASH_V4_L4_LEN, default_rss_key);
282+
283+
size_t target_queue =
284+
(rss_l3l4 % remote_reta_size) % DpdkTransport::kMaxQueuesPerPort;
285+
if (target_queue == remote_queue_id) break;
286+
}
287+
288+
if (src_port == UINT16_MAX) {
289+
ERPC_ERROR(
290+
"Failed to find src port that targets remote queue %zu. "
291+
"Remote RETA size = %zu.\n",
292+
remote_queue_id, remote_reta_size);
293+
rt_assert(false);
294+
}
295+
296+
return src_port;
259297
}
260298

261299
// Generate most fields of the L2--L4 headers now to avoid recomputation.
262300
bool DpdkTransport::resolve_remote_routing_info(
263301
RoutingInfo *routing_info) const {
264302
auto *ri = reinterpret_cast<eth_routing_info_t *>(routing_info);
303+
304+
// Save/use info from routing_info before we overwrite it
265305
uint8_t remote_mac[6];
266306
memcpy(remote_mac, ri->mac, 6);
267-
uint32_t remote_ipv4_addr = ri->ipv4_addr;
268-
uint16_t remote_udp_port = ri->udp_port;
307+
const uint32_t remote_ipv4_addr = ri->ipv4_addr;
308+
const uint16_t remote_udp_port = ri->udp_port;
309+
const uint16_t udp_src_port = get_udp_src_port_for_target_queue(
310+
ri->rxq_id, ri->reta_size, resolve.ipv4_addr, remote_ipv4_addr,
311+
remote_udp_port);
269312

313+
// Overwrite routing_info by constructing the packet header in place
270314
static_assert(kMaxRoutingInfoSize >= kInetHdrsTotSize, "");
271315

272316
auto *eth_hdr = reinterpret_cast<eth_hdr_t *>(ri);
@@ -276,7 +320,8 @@ bool DpdkTransport::resolve_remote_routing_info(
276320
gen_ipv4_header(ipv4_hdr, resolve.ipv4_addr, remote_ipv4_addr, 0);
277321

278322
auto *udp_hdr = reinterpret_cast<udp_hdr_t *>(&ipv4_hdr[1]);
279-
gen_udp_header(udp_hdr, rx_flow_udp_port, remote_udp_port, 0);
323+
gen_udp_header(udp_hdr, udp_src_port, remote_udp_port, 0);
324+
280325
return true;
281326
}
282327

src/transport_impl/dpdk/dpdk_transport.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,11 @@ class DpdkTransport : public Transport {
2525
// Transport-specific constants
2626
static constexpr TransportType kTransportType = TransportType::kDPDK;
2727
static constexpr size_t kMTU = 1024;
28-
static constexpr size_t kMaxQueuesPerPort = kIsAzure ? 1 : 16;
28+
static constexpr size_t kMaxQueuesPerPort = 16;
2929

3030
static constexpr size_t kNumTxRingDesc = 128;
3131
static constexpr size_t kPostlist = 32;
3232

33-
// If true, install flow steering rules into the NIC
34-
static constexpr bool kInstallFlowRules = kIsAzure ? false : true;
35-
3633
// The PMD may inline internally, but this class doesn't do it
3734
static constexpr size_t kMaxInline = 0;
3835

@@ -204,8 +201,11 @@ class DpdkTransport : public Transport {
204201

205202
size_t rx_ring_head = 0, rx_ring_tail = 0;
206203

207-
uint16_t rx_flow_udp_port = 0;
208-
size_t qp_id = SIZE_MAX; ///< The RX/TX queue pair for this Transport
204+
uint16_t rx_flow_udp_port = 0; ///< The UDP port this transport listens on
205+
size_t qp_id = SIZE_MAX; ///< The RX/TX queue pair for this Transport
206+
207+
///< Number of entries in this NIC's RSS indirection table
208+
size_t reta_size = 0;
209209

210210
// We don't use DPDK's lcore threads, so a shared mempool with per-lcore
211211
// cache won't work. Instead, we use per-thread pools with zero cached mbufs.

src/transport_impl/dpdk/dpdk_transport_datapath.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#ifdef ERPC_DPDK
22

3+
#include <rte_thash.h>
34
#include "dpdk_transport.h"
45
#include "util/huge_alloc.h"
56

@@ -19,8 +20,8 @@ static void format_pkthdr(pkthdr_t *pkthdr,
1920
memset(&eth_hdr->dst_mac, 0, sizeof(eth_hdr->dst_mac));
2021
}
2122

22-
// On most bare-metal clusters, a zero IP checksum works fine.
23-
// But on Azure VMs we need a valid checksum.
23+
// On most bare-metal clusters, a zero IP checksum works fine. But on Azure
24+
// VMs we need a valid checksum.
2425
ipv4_hdr_t *ipv4_hdr = pkthdr->get_ipv4_hdr();
2526
ipv4_hdr->tot_len = htons(pkt_size - sizeof(eth_hdr_t));
2627
ipv4_hdr->check = get_ipv4_checksum(ipv4_hdr);

src/transport_impl/eth_common.h

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,25 +55,6 @@ static std::string ipv4_to_string(uint32_t ipv4_addr) {
5555
return str;
5656
}
5757

58-
/// eRPC session endpoint routing info for Ethernet-based transports. The MAC
59-
/// address is in the byte order retrived from the driver. The IPv4 address and
60-
/// UDP port are in host-byte order.
61-
struct eth_routing_info_t {
62-
uint8_t mac[6];
63-
uint32_t ipv4_addr;
64-
uint16_t udp_port;
65-
66-
std::string to_string() {
67-
std::ostringstream ret;
68-
ret << "[MAC " << mac_to_string(mac) << ", IP " << ipv4_to_string(ipv4_addr)
69-
<< ", UDP port " << std::to_string(udp_port) << "]";
70-
71-
return std::string(ret.str());
72-
}
73-
// This must be smaller than Transport::kMaxRoutingInfoSize, but a static
74-
// assert here causes a circular dependency.
75-
};
76-
7758
struct eth_hdr_t {
7859
uint8_t dst_mac[6];
7960
uint8_t src_mac[6];
@@ -136,6 +117,33 @@ static constexpr size_t kInetHdrsTotSize =
136117
sizeof(eth_hdr_t) + sizeof(ipv4_hdr_t) + sizeof(udp_hdr_t);
137118
static_assert(kInetHdrsTotSize == 42, "");
138119

120+
/// eRPC session endpoint routing info for Ethernet-based transports. The MAC
121+
/// address is in the byte order retrived from the driver. The IPv4 address and
122+
/// UDP port are in host-byte order.
123+
struct eth_routing_info_t {
124+
uint8_t mac[6];
125+
uint32_t ipv4_addr; // The IPv4 address for this endpoint
126+
uint16_t udp_port; // The UDP port this endpoint listens on
127+
uint16_t rxq_id = UINT16_MAX; // The NIC RX queue ID this endpoint listens on
128+
129+
// Number of entries in this endpoint's NIC RSS indirection table
130+
uint16_t reta_size = UINT16_MAX;
131+
132+
std::string to_string() const {
133+
std::ostringstream ret;
134+
ret << "[MAC " << mac_to_string(mac) << ", IP " << ipv4_to_string(ipv4_addr)
135+
<< ", UDP port " << std::to_string(udp_port) << ", RQ queue ID "
136+
<< (rxq_id == UINT16_MAX ? " N/A " : std::to_string(rxq_id))
137+
<< ", RETA size "
138+
<< ((reta_size == UINT16_MAX) ? " N/A" : std::to_string(reta_size))
139+
<< "]";
140+
141+
return std::string(ret.str());
142+
}
143+
// This must be smaller than Transport::kMaxRoutingInfoSize, but a static
144+
// assert here causes a circular dependency.
145+
};
146+
139147
static std::string frame_header_to_string(uint8_t* buf) {
140148
auto* eth_hdr = reinterpret_cast<eth_hdr_t*>(buf);
141149
auto* ipv4_hdr = reinterpret_cast<ipv4_hdr_t*>(&eth_hdr[1]);

0 commit comments

Comments
 (0)