Skip to content

Commit 1657635

Browse files
committed
Adapt to latest Asio APIs (Asio 1.32 or Boost.Asio 1.88) (#477)
(cherry picked from commit f2f9f65)
1 parent 8b421e0 commit 1657635

31 files changed

+158
-210
lines changed

.github/workflows/ci-pr-validation.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ jobs:
142142
- name: Run unit tests
143143
run: RETRY_FAILED=3 CMAKE_BUILD_DIRECTORY=./build ./run-unit-tests.sh
144144

145+
- name: Build with Boost.Asio
146+
run: |
147+
cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
148+
cmake --build build-boost-asio -j8
149+
145150
- name: Build perf tools
146151
run: |
147152
cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON

CMakeLists.txt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919

2020
cmake_minimum_required(VERSION 3.13)
2121

22-
option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
23-
2422
option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
2523
if (INTEGRATE_VCPKG)
26-
set(USE_ASIO ON)
24+
option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
2725
if (NOT CMAKE_TOOLCHAIN_FILE)
2826
set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
2927
endif ()
28+
if (NOT USE_ASIO)
29+
list(APPEND VCPKG_MANIFEST_FEATURES "boost-asio")
30+
endif ()
31+
else ()
32+
option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
3033
endif ()
34+
message(STATUS "USE_ASIO: ${USE_ASIO}")
3135

3236
option(BUILD_TESTS "Build tests" ON)
3337
message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})

lib/AckGroupingTrackerEnabled.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
121121
this->flush();
122122
std::lock_guard<std::mutex> lock(this->mutexTimer_);
123123
if (this->timer_) {
124-
ASIO_ERROR ec;
125-
this->timer_->cancel(ec);
124+
cancelTimer(*this->timer_);
126125
}
127126
}
128127

@@ -172,7 +171,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
172171

173172
std::lock_guard<std::mutex> lock(this->mutexTimer_);
174173
this->timer_ = this->executor_->createDeadlineTimer();
175-
this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
174+
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
176175
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
177176
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
178177
auto self = weakSelf.lock();

lib/AsioTimer.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,12 @@
2929
#include "AsioDefines.h"
3030

3131
using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;
32+
33+
inline void cancelTimer(ASIO::steady_timer& timer) {
34+
try {
35+
timer.cancel();
36+
} catch (const ASIO_SYSTEM_ERROR& ignored) {
37+
// Most of the time the exception can be ignored unless the following logic depends on the fact that
38+
// the timer is cancelled.
39+
}
40+
}

lib/ClientConnection.cc

Lines changed: 43 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@
4141
#include "auth/InitialAuthData.h"
4242
#include "checksum/ChecksumProvider.h"
4343

44+
#ifdef USE_ASIO
45+
#include <asio/connect.hpp>
46+
#include <asio/ssl/host_name_verification.hpp>
47+
#else
48+
#include <boost/asio/connect.hpp>
49+
#include <boost/asio/ssl/host_name_verification.hpp>
50+
#endif
51+
4452
DECLARE_LOG_OBJECT()
4553

4654
using namespace ASIO::ip;
@@ -170,13 +178,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
170178
executor_(executor),
171179
resolver_(executor_->createTcpResolver()),
172180
socket_(executor_->createSocket()),
173-
#if defined(USE_ASIO) || BOOST_VERSION >= 107000
174181
strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
175-
#elif BOOST_VERSION >= 106600
176-
strand_(executor_->getIOService().get_executor()),
177-
#else
178-
strand_(executor_->getIOService()),
179-
#endif
180182
logicalAddress_(logicalAddress),
181183
physicalAddress_(physicalAddress),
182184
cnxString_("[<none> -> " + physicalAddress + "] "),
@@ -266,7 +268,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
266268
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
267269
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
268270
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
269-
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
271+
tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
270272
}
271273

272274
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
@@ -309,7 +311,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
309311
// Only send keep-alive probes if the broker supports it
310312
keepAliveTimer_ = executor_->createDeadlineTimer();
311313
if (keepAliveTimer_) {
312-
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
314+
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
313315
auto weakSelf = weak_from_this();
314316
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
315317
auto self = weakSelf.lock();
@@ -354,7 +356,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
354356
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
355357
// Check if we have a timer still before we set the request timer to pop again.
356358
if (consumerStatsRequestTimer_) {
357-
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
359+
consumerStatsRequestTimer_->expires_after(operationsTimeout_);
358360
auto weakSelf = weak_from_this();
359361
consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
360362
auto self = weakSelf.lock();
@@ -394,7 +396,7 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep
394396
* if async_connect without any error, connected_ would be set to true
395397
* at this point the connection is deemed valid to be used by clients of this class
396398
*/
397-
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
399+
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
398400
if (!err) {
399401
std::stringstream cnxStringStream;
400402
try {
@@ -479,38 +481,13 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
479481
} else {
480482
handleHandshake(ASIO_SUCCESS);
481483
}
482-
} else if (endpointIterator != tcp::resolver::iterator()) {
483-
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
484-
// The connection failed. Try the next endpoint in the list.
485-
ASIO_ERROR closeError;
486-
socket_->close(closeError); // ignore the error of close
487-
if (closeError) {
488-
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
489-
}
490-
connectTimeoutTask_->stop();
491-
++endpointIterator;
492-
if (endpointIterator != tcp::resolver::iterator()) {
493-
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
494-
connectTimeoutTask_->start();
495-
tcp::endpoint endpoint = *endpointIterator;
496-
auto weakSelf = weak_from_this();
497-
socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
498-
auto self = weakSelf.lock();
499-
if (self) {
500-
self->handleTcpConnected(err, endpointIterator);
501-
}
502-
});
503-
} else {
504-
if (err == ASIO::error::operation_aborted) {
505-
// TCP connect timeout, which is not retryable
506-
close();
507-
} else {
508-
close(ResultRetryable);
509-
}
510-
}
511484
} else {
512485
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
513-
close(ResultRetryable);
486+
if (err == ASIO::error::operation_aborted) {
487+
close();
488+
} else {
489+
close(ResultRetryable);
490+
}
514491
}
515492
}
516493

@@ -603,18 +580,18 @@ void ClientConnection::tcpConnectAsync() {
603580
}
604581

605582
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
606-
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
583+
607584
auto weakSelf = weak_from_this();
608-
resolver_->async_resolve(query,
609-
[weakSelf](const ASIO_ERROR& err, const tcp::resolver::iterator& iterator) {
585+
resolver_->async_resolve(service_url.host(), std::to_string(service_url.port()),
586+
[weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
610587
auto self = weakSelf.lock();
611588
if (self) {
612-
self->handleResolve(err, iterator);
589+
self->handleResolve(err, results);
613590
}
614591
});
615592
}
616593

617-
void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver::iterator& endpointIterator) {
594+
void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::results_type& results) {
618595
if (err) {
619596
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
620597
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
@@ -641,23 +618,13 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver:
641618
}
642619
ptr->connectTimeoutTask_->stop();
643620
});
644-
645-
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
646621
connectTimeoutTask_->start();
647-
if (endpointIterator != tcp::resolver::iterator()) {
648-
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
649-
<< " to " << endpointIterator->endpoint());
650-
socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
651-
auto self = weakSelf.lock();
652-
if (self) {
653-
self->handleTcpConnected(err, endpointIterator);
654-
}
655-
});
656-
} else {
657-
LOG_WARN(cnxString_ << "No IP address found");
658-
close();
659-
return;
660-
}
622+
ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
623+
auto self = weakSelf.lock();
624+
if (self) {
625+
self->handleTcpConnected(err, endpoint);
626+
}
627+
});
661628
}
662629

663630
void ClientConnection::readNextCommand() {
@@ -1061,7 +1028,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
10611028
LookupRequestData requestData;
10621029
requestData.promise = promise;
10631030
requestData.timer = executor_->createDeadlineTimer();
1064-
requestData.timer->expires_from_now(operationsTimeout_);
1031+
requestData.timer->expires_after(operationsTimeout_);
10651032
auto weakSelf = weak_from_this();
10661033
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
10671034
auto self = weakSelf.lock();
@@ -1201,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
12011168

12021169
PendingRequestData requestData;
12031170
requestData.timer = executor_->createDeadlineTimer();
1204-
requestData.timer->expires_from_now(operationsTimeout_);
1171+
requestData.timer->expires_after(operationsTimeout_);
12051172
auto weakSelf = weak_from_this();
12061173
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
12071174
auto self = weakSelf.lock();
@@ -1256,7 +1223,7 @@ void ClientConnection::handleKeepAliveTimeout() {
12561223
// be zero And we do not attempt to dereference the pointer.
12571224
Lock lock(mutex_);
12581225
if (keepAliveTimer_) {
1259-
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
1226+
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
12601227
auto weakSelf = weak_from_this();
12611228
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
12621229
auto self = weakSelf.lock();
@@ -1318,12 +1285,12 @@ void ClientConnection::close(Result result, bool detach) {
13181285
numOfPendingLookupRequest_ = 0;
13191286

13201287
if (keepAliveTimer_) {
1321-
keepAliveTimer_->cancel();
1288+
cancelTimer(*keepAliveTimer_);
13221289
keepAliveTimer_.reset();
13231290
}
13241291

13251292
if (consumerStatsRequestTimer_) {
1326-
consumerStatsRequestTimer_->cancel();
1293+
cancelTimer(*consumerStatsRequestTimer_);
13271294
consumerStatsRequestTimer_.reset();
13281295
}
13291296

@@ -1435,7 +1402,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
14351402
LastMessageIdRequestData requestData;
14361403
requestData.promise = promise;
14371404
requestData.timer = executor_->createDeadlineTimer();
1438-
requestData.timer->expires_from_now(operationsTimeout_);
1405+
requestData.timer->expires_after(operationsTimeout_);
14391406
auto weakSelf = weak_from_this();
14401407
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
14411408
auto self = weakSelf.lock();
@@ -1483,7 +1450,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
14831450
lock.unlock();
14841451

14851452
auto weakSelf = weak_from_this();
1486-
timer->expires_from_now(operationsTimeout_);
1453+
timer->expires_after(operationsTimeout_);
14871454
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
14881455
auto self = weakSelf.lock();
14891456
if (!self) {
@@ -1570,7 +1537,7 @@ void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
15701537
lock.unlock();
15711538

15721539
requestData.promise.setValue({});
1573-
requestData.timer->cancel();
1540+
cancelTimer(*requestData.timer);
15741541
}
15751542
}
15761543

@@ -1582,7 +1549,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
15821549
Lock lock(mutex_);
15831550
auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id());
15841551
if (it != pendingLookupRequests_.end()) {
1585-
it->second.timer->cancel();
1552+
cancelTimer(*it->second.timer);
1553+
15861554
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
15871555
pendingLookupRequests_.erase(it);
15881556
numOfPendingLookupRequest_--;
@@ -1661,7 +1629,7 @@ void ClientConnection::handleLookupTopicRespose(
16611629
Lock lock(mutex_);
16621630
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
16631631
if (it != pendingLookupRequests_.end()) {
1664-
it->second.timer->cancel();
1632+
cancelTimer(*it->second.timer);
16651633
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
16661634
pendingLookupRequests_.erase(it);
16671635
numOfPendingLookupRequest_--;
@@ -1739,7 +1707,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
17391707
data.topicEpoch = boost::none;
17401708
}
17411709
requestData.promise.setValue(data);
1742-
requestData.timer->cancel();
1710+
cancelTimer(*requestData.timer);
17431711
}
17441712
}
17451713
}
@@ -1759,7 +1727,7 @@ void ClientConnection::handleError(const proto::CommandError& error) {
17591727
lock.unlock();
17601728

17611729
requestData.promise.setFailed(result);
1762-
requestData.timer->cancel();
1730+
cancelTimer(*requestData.timer);
17631731
} else {
17641732
PendingGetLastMessageIdRequestsMap::iterator it =
17651733
pendingGetLastMessageIdRequests_.find(error.request_id());
@@ -2052,8 +2020,8 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
20522020
auto it = pendingRequests_.find(requestId);
20532021
if (it != pendingRequests_.end()) {
20542022
it->second.promise.setFailed(ResultDisconnected);
2055-
ASIO_ERROR ec;
2056-
it->second.timer->cancel(ec);
2023+
cancelTimer(*it->second.timer);
2024+
20572025
pendingRequests_.erase(it);
20582026
}
20592027
}

lib/ClientConnection.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
#include <cstdint>
2727
#ifdef USE_ASIO
2828
#include <asio/bind_executor.hpp>
29-
#include <asio/io_service.hpp>
29+
#include <asio/io_context.hpp>
3030
#include <asio/ip/tcp.hpp>
3131
#include <asio/ssl/stream.hpp>
3232
#include <asio/strand.hpp>
3333
#else
3434
#include <boost/asio/bind_executor.hpp>
35-
#include <boost/asio/io_service.hpp>
35+
#include <boost/asio/io_context.hpp>
3636
#include <boost/asio/ip/tcp.hpp>
3737
#include <boost/asio/ssl/stream.hpp>
3838
#include <boost/asio/strand.hpp>
@@ -238,7 +238,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
238238
* although not usable at this point, since this is just tcp connection
239239
* Pulsar - Connect/Connected has yet to happen
240240
*/
241-
void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
241+
void handleTcpConnected(const ASIO_ERROR& err, const ASIO::ip::tcp::endpoint& endpoint);
242242

243243
void handleHandshake(const ASIO_ERROR& err);
244244

@@ -261,7 +261,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
261261

262262
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
263263

264-
void handleResolve(const ASIO_ERROR& err, const ASIO::ip::tcp::resolver::iterator& endpointIterator);
264+
void handleResolve(ASIO_ERROR err, const ASIO::ip::tcp::resolver::results_type& results);
265265

266266
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
267267
void handleSendPair(const ASIO_ERROR& err);
@@ -325,7 +325,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
325325
*/
326326
SocketPtr socket_;
327327
TlsSocketPtr tlsSocket_;
328-
ASIO::strand<ASIO::io_service::executor_type> strand_;
328+
ASIO::strand<ASIO::io_context::executor_type> strand_;
329329

330330
const std::string logicalAddress_;
331331
/*

0 commit comments

Comments
 (0)