Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ jobs:
- name: Run unit tests
run: RETRY_FAILED=3 CMAKE_BUILD_DIRECTORY=./build ./run-unit-tests.sh

- name: Build with Boost.Asio
run: |
cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
cmake --build build-boost-asio -j8

- name: Build perf tools
run: |
cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON
Expand Down
10 changes: 7 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

cmake_minimum_required(VERSION 3.13)

option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)

option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
if (INTEGRATE_VCPKG)
set(USE_ASIO ON)
option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
if (NOT CMAKE_TOOLCHAIN_FILE)
set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
endif ()
if (NOT USE_ASIO)
list(APPEND VCPKG_MANIFEST_FEATURES "boost-asio")
endif ()
else ()
option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
endif ()
message(STATUS "USE_ASIO: ${USE_ASIO}")

option(BUILD_TESTS "Build tests" ON)
message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
Expand Down
5 changes: 2 additions & 3 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
this->flush();
std::lock_guard<std::mutex> lock(this->mutexTimer_);
if (this->timer_) {
ASIO_ERROR ec;
this->timer_->cancel(ec);
cancelTimer(*this->timer_);
}
}

Expand Down Expand Up @@ -172,7 +171,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {

std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
auto self = weakSelf.lock();
Expand Down
9 changes: 9 additions & 0 deletions lib/AsioTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,12 @@
#include "AsioDefines.h"

using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;

inline void cancelTimer(ASIO::steady_timer& timer) {
try {
timer.cancel();
} catch (const ASIO_SYSTEM_ERROR& ignored) {
// Most of the time the exception can be ignored unless the following logic depends on the fact that
// the timer is cancelled.
}
}
118 changes: 43 additions & 75 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"

#ifdef USE_ASIO
#include <asio/connect.hpp>
#include <asio/ssl/host_name_verification.hpp>
#else
#include <boost/asio/connect.hpp>
#include <boost/asio/ssl/host_name_verification.hpp>
#endif

DECLARE_LOG_OBJECT()

using namespace ASIO::ip;
Expand Down Expand Up @@ -170,13 +178,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
#if defined(USE_ASIO) || BOOST_VERSION >= 107000
strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
#elif BOOST_VERSION >= 106600
strand_(executor_->getIOService().get_executor()),
#else
strand_(executor_->getIOService()),
#endif
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
Expand Down Expand Up @@ -266,7 +268,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
}

LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
Expand Down Expand Up @@ -309,7 +311,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -354,7 +356,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
// Check if we have a timer still before we set the request timer to pop again.
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
consumerStatsRequestTimer_->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -394,7 +396,7 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this class
*/
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
if (!err) {
std::stringstream cnxStringStream;
try {
Expand Down Expand Up @@ -479,38 +481,13 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
} else {
handleHandshake(ASIO_SUCCESS);
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
ASIO_ERROR closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
auto weakSelf = weak_from_this();
socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
if (err == ASIO::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
close(ResultRetryable);
}
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close(ResultRetryable);
if (err == ASIO::error::operation_aborted) {
close();
} else {
close(ResultRetryable);
}
}
}

Expand Down Expand Up @@ -603,18 +580,18 @@ void ClientConnection::tcpConnectAsync() {
}

LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));

auto weakSelf = weak_from_this();
resolver_->async_resolve(query,
[weakSelf](const ASIO_ERROR& err, const tcp::resolver::iterator& iterator) {
resolver_->async_resolve(service_url.host(), std::to_string(service_url.port()),
[weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
auto self = weakSelf.lock();
if (self) {
self->handleResolve(err, iterator);
self->handleResolve(err, results);
}
});
}

void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver::iterator& endpointIterator) {
void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::results_type& results) {
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
Expand All @@ -641,23 +618,13 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver:
}
ptr->connectTimeoutTask_->stop();
});

LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
LOG_WARN(cnxString_ << "No IP address found");
close();
return;
}
ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpoint);
}
});
}

void ClientConnection::readNextCommand() {
Expand Down Expand Up @@ -1061,7 +1028,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
LookupRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1201,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf

PendingRequestData requestData;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1256,7 +1223,7 @@ void ClientConnection::handleKeepAliveTimeout() {
// be zero And we do not attempt to dereference the pointer.
Lock lock(mutex_);
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1318,12 +1285,12 @@ void ClientConnection::close(Result result, bool detach) {
numOfPendingLookupRequest_ = 0;

if (keepAliveTimer_) {
keepAliveTimer_->cancel();
cancelTimer(*keepAliveTimer_);
keepAliveTimer_.reset();
}

if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->cancel();
cancelTimer(*consumerStatsRequestTimer_);
consumerStatsRequestTimer_.reset();
}

Expand Down Expand Up @@ -1435,7 +1402,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
LastMessageIdRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1483,7 +1450,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
lock.unlock();

auto weakSelf = weak_from_this();
timer->expires_from_now(operationsTimeout_);
timer->expires_after(operationsTimeout_);
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
Expand Down Expand Up @@ -1570,7 +1537,7 @@ void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
lock.unlock();

requestData.promise.setValue({});
requestData.timer->cancel();
cancelTimer(*requestData.timer);
}
}

Expand All @@ -1582,7 +1549,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
cancelTimer(*it->second.timer);

LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
Expand Down Expand Up @@ -1661,7 +1629,7 @@ void ClientConnection::handleLookupTopicRespose(
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
cancelTimer(*it->second.timer);
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
Expand Down Expand Up @@ -1739,7 +1707,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
cancelTimer(*requestData.timer);
}
}
}
Expand All @@ -1759,7 +1727,7 @@ void ClientConnection::handleError(const proto::CommandError& error) {
lock.unlock();

requestData.promise.setFailed(result);
requestData.timer->cancel();
cancelTimer(*requestData.timer);
} else {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
Expand Down Expand Up @@ -2052,8 +2020,8 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
auto it = pendingRequests_.find(requestId);
if (it != pendingRequests_.end()) {
it->second.promise.setFailed(ResultDisconnected);
ASIO_ERROR ec;
it->second.timer->cancel(ec);
cancelTimer(*it->second.timer);

pendingRequests_.erase(it);
}
}
Expand Down
10 changes: 5 additions & 5 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
#include <cstdint>
#ifdef USE_ASIO
#include <asio/bind_executor.hpp>
#include <asio/io_service.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ssl/stream.hpp>
#include <asio/strand.hpp>
#else
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
Expand Down Expand Up @@ -238,7 +238,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
* although not usable at this point, since this is just tcp connection
* Pulsar - Connect/Connected has yet to happen
*/
void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
void handleTcpConnected(const ASIO_ERROR& err, const ASIO::ip::tcp::endpoint& endpoint);

void handleHandshake(const ASIO_ERROR& err);

Expand All @@ -261,7 +261,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

void handlePulsarConnected(const proto::CommandConnected& cmdConnected);

void handleResolve(const ASIO_ERROR& err, const ASIO::ip::tcp::resolver::iterator& endpointIterator);
void handleResolve(ASIO_ERROR err, const ASIO::ip::tcp::resolver::results_type& results);

void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
void handleSendPair(const ASIO_ERROR& err);
Expand Down Expand Up @@ -325,7 +325,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
ASIO::strand<ASIO::io_service::executor_type> strand_;
ASIO::strand<ASIO::io_context::executor_type> strand_;

const std::string logicalAddress_;
/*
Expand Down
Loading
Loading