diff --git a/lib/base/io-future.hpp b/lib/base/io-future.hpp new file mode 100644 index 0000000000..43a9ddd7c4 --- /dev/null +++ b/lib/base/io-future.hpp @@ -0,0 +1,212 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#pragma once + +#include "base/application.hpp" +#include "base/shared-object.hpp" +#include "base/threadpool.hpp" +#include +#include +#include +#include + +namespace boost::asio::detail { + +struct fixed_throw_tag +{}; + +/** + * Fixes the issue where operations crash the program that can throw exceptions but don't. + * + * The issues is that in the orginal version of this specialization, the exception_ptr is + * never checked against nullptr, but only the pointer to the exception_ptr, which is likely + * a mistake. + */ +template +class spawn_handler : public spawn_handler_base +{ +public: + using return_type = T; + + struct result_type + { + std::exception_ptr ex_; + return_type* value_; + }; + + spawn_handler(const basic_yield_context& yield, result_type& result) + : spawn_handler_base(yield), result_(result) + { + } + + void operator()(std::exception_ptr ex, T value) + { + result_.ex_ = ex; + result_.value_ = &value; + this->resume(); + } + + static return_type on_resume(result_type& result) + { + if (result.ex_) { + rethrow_exception(result.ex_); + } + return BOOST_ASIO_MOVE_CAST(return_type)(*result.value_); + } + +private: + result_type& result_; +}; + +} // namespace boost::asio::detail + +namespace icinga { + +template +class AsioPromise; + +/** + * Implements a generic, asynchronously awaitable future. + * + * This allows to queue an CPU-intensive action on another thread without blocking any + * IO-threads and pass back the result via the @c AsioPromise. + * + * Similar to @c std::future, this is single-use only. Once a value has been set by the + * @c AsioPromise, the job is done. + */ +template +class AsioFuture : public SharedObject +{ + template + friend class AsioPromise; + +public: + DECLARE_PTR_TYPEDEFS(AsioFuture); + + /** + * Returns the value held in the future, or waits for the promise to complete. + * + * If an exception has been stored in the future via AsioPromise::SetException(), it will be + * thrown by this function. Simply passing `yc[ec]` as a token will not change this, even if + * the exception that would be thrown is a @c boost::asio::system::system_error. + */ + template + auto Get(CompletionToken&& token) + { + using Signature = void(std::exception_ptr, ValueType, boost::asio::detail::fixed_throw_tag); + + return boost::asio::async_initiate( + [this](auto&& handler) { InitOperation(std::forward(handler)); }, + std::forward(token) + ); + } + + // TODO: Add WaitFor and WaitUntil + +private: + template + void CallHandler(Handler&& handler) + { + if (std::holds_alternative(m_Value)) { + std::forward(handler)(nullptr, std::get(m_Value)); + } else { + std::forward(handler)(std::get(m_Value), {}); + } + } + + template + void InitOperation(Handler&& handler) + { + auto handlerPtr = std::make_shared>(std::forward(handler)); + + auto handlerWrapper = [handler = handlerPtr, future = AsioFuture::Ptr{this}]() { + if (std::holds_alternative(future->m_Value)) { + (*handler)({}, std::get(future->m_Value)); + } else { + (*handler)(std::get(future->m_Value), {}); + } + }; + + std::unique_lock lock(m_Mutex); + + if (!std::holds_alternative(m_Value)) { + boost::asio::post(boost::asio::get_associated_executor(handler), handlerWrapper); + return; + } + + auto work = boost::asio::make_work_guard(handler); + m_Callback = [handler = std::move(handlerWrapper), work = std::move(work)]() mutable { + boost::asio::dispatch(work.get_executor(), handler); + work.reset(); + }; + } + + std::mutex m_Mutex; + std::variant m_Value; + std::function m_Callback; +}; + +/** + * A promise type that can be passed to any other thread or coroutine. + */ +template +class AsioPromise +{ +public: + AsioPromise() : m_Future(new AsioFuture) {} + + template + void SetValue(ForwardingType&& value) const + { + std::unique_lock lock{m_Future->m_Mutex}; + + if (!std::holds_alternative(m_Future->m_Value)) { + BOOST_THROW_EXCEPTION(std::future_error{std::future_errc::promise_already_satisfied}); + } + + m_Future->m_Value = std::forward(value); + if (m_Future->m_Callback) { + m_Future->m_Callback(); + } + } + + template + void SetException(ExceptionType&& ex) const + { + std::unique_lock lock{m_Future->m_Mutex}; + + if (!std::holds_alternative(m_Future->m_Value)) { + BOOST_THROW_EXCEPTION(std::future_error{std::future_errc::promise_already_satisfied}); + } + + m_Future->m_Value = std::make_exception_ptr(std::forward(ex)); + if (m_Future->m_Callback) { + m_Future->m_Callback(); + } + } + + auto GetFuture() const { return m_Future; } + +private: + typename AsioFuture::Ptr m_Future; +}; + +template +auto QueueAsioFutureCallback(Callback&& cb) +{ + AsioPromise promise; + auto future = promise.GetFuture(); + Application::GetTP().Post( + [cb = std::forward(cb), promise = std::move(promise)]() { + try { + promise.SetValue(cb()); + } catch (const std::exception&) { + promise.SetException(std::current_exception()); + } + }, + {} + ); + return future; +}; + +} // namespace icinga diff --git a/lib/remote/configpackageshandler.cpp b/lib/remote/configpackageshandler.cpp index 7e0c7b02c5..c3acbcfeaa 100644 --- a/lib/remote/configpackageshandler.cpp +++ b/lib/remote/configpackageshandler.cpp @@ -6,6 +6,7 @@ #include "remote/httputility.hpp" #include "remote/filterutility.hpp" #include "base/exception.hpp" +#include "base/io-future.hpp" using namespace icinga; @@ -20,23 +21,25 @@ bool ConfigPackagesHandler::HandleRequest( { namespace http = boost::beast::http; - auto url = request.Url(); - auto user = request.User(); - auto params = request.Params(); - - if (url->GetPath().size() > 4) - return false; - - if (request.method() == http::verb::get) - HandleGet(request, response); - else if (request.method() == http::verb::post) - HandlePost(request, response); - else if (request.method() == http::verb::delete_) - HandleDelete(request, response); - else - return false; - - return true; + return QueueAsioFutureCallback([&]() { + auto url = request.Url(); + auto user = request.User(); + auto params = request.Params(); + + if (url->GetPath().size() > 4) + return false; + + if (request.method() == http::verb::get) + HandleGet(request, response); + else if (request.method() == http::verb::post) + HandlePost(request, response); + else if (request.method() == http::verb::delete_) + HandleDelete(request, response); + else + return false; + + return true; + })->Get(yc); } void ConfigPackagesHandler::HandleGet(const HttpRequest& request, HttpResponse& response) diff --git a/lib/remote/configstageshandler.cpp b/lib/remote/configstageshandler.cpp index b08270e56e..b4a9ebc946 100644 --- a/lib/remote/configstageshandler.cpp +++ b/lib/remote/configstageshandler.cpp @@ -8,6 +8,7 @@ #include "base/application.hpp" #include "base/defer.hpp" #include "base/exception.hpp" +#include "base/io-future.hpp" using namespace icinga; @@ -27,23 +28,27 @@ bool ConfigStagesHandler::HandleRequest( { namespace http = boost::beast::http; - auto url = request.Url(); - auto user = request.User(); - auto params = request.Params(); + auto future = QueueAsioFutureCallback([&]() { + auto url = request.Url(); + auto user = request.User(); + auto params = request.Params(); - if (url->GetPath().size() > 5) - return false; + if (url->GetPath().size() > 5) + return false; - if (request.method() == http::verb::get) - HandleGet(request, response); - else if (request.method() == http::verb::post) - HandlePost(request, response); - else if (request.method() == http::verb::delete_) - HandleDelete(request, response); - else - return false; + if (request.method() == http::verb::get) + HandleGet(request, response); + else if (request.method() == http::verb::post) + HandlePost(request, response); + else if (request.method() == http::verb::delete_) + HandleDelete(request, response); + else + return false; + + return true; + }); - return true; + return future->Get(yc); } void ConfigStagesHandler::HandleGet(const HttpRequest& request, HttpResponse& response) diff --git a/lib/remote/createobjecthandler.cpp b/lib/remote/createobjecthandler.cpp index beff9c9870..57897240f1 100644 --- a/lib/remote/createobjecthandler.cpp +++ b/lib/remote/createobjecthandler.cpp @@ -9,17 +9,17 @@ #include "remote/apiaction.hpp" #include "remote/zone.hpp" #include "base/configtype.hpp" +#include "base/io-future.hpp" #include using namespace icinga; REGISTER_URLHANDLER("/v1/objects", CreateObjectHandler); -bool CreateObjectHandler::HandleRequest( +static bool HandleRequestImpl( const WaitGroup::Ptr& waitGroup, const HttpRequest& request, - HttpResponse& response, - boost::asio::yield_context& yc + HttpResponse& response ) { namespace http = boost::beast::http; @@ -162,3 +162,15 @@ bool CreateObjectHandler::HandleRequest( return true; } + +bool CreateObjectHandler::HandleRequest( + const WaitGroup::Ptr& waitGroup, + const HttpRequest& request, + HttpResponse& response, + boost::asio::yield_context& yc +) +{ + return QueueAsioFutureCallback([&](){ + return HandleRequestImpl(waitGroup, request, response); + })->Get(yc); +} diff --git a/test/base-io-engine.cpp b/test/base-io-engine.cpp index 3a251b1b42..1f8db084be 100644 --- a/test/base-io-engine.cpp +++ b/test/base-io-engine.cpp @@ -6,6 +6,7 @@ #include #include #include +#include "base/io-future.hpp" using namespace icinga; @@ -156,4 +157,116 @@ BOOST_AUTO_TEST_CASE(timeout_due_scope) BOOST_CHECK_EQUAL(called, 0); } +BOOST_AUTO_TEST_CASE(future_early_value) +{ + boost::asio::io_context io; + + AsioPromise promise; + promise.SetValue(true); + + IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) { + auto val = future->Get(yc); + BOOST_REQUIRE(val); + }); + + io.run(); +} + +BOOST_AUTO_TEST_CASE(future_value) +{ + boost::asio::io_context io; + + AsioPromise promise; + std::atomic_bool before = false; + std::atomic_bool after = false; + + IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) { + before = true; + auto val = future->Get(yc); + BOOST_REQUIRE(val); + after = true; + }); + + std::thread testThread + { + [&io]() { + io.run(); + }}; + + while (!before) { + Utility::Sleep(0.01); + } + + BOOST_REQUIRE(before); + BOOST_REQUIRE(!after); + + promise.SetValue(true); + + while (!after) { + Utility::Sleep(0.01); + } + + BOOST_REQUIRE(before); + BOOST_REQUIRE(after); + + testThread.join(); +} + +BOOST_AUTO_TEST_CASE(future_early_exception) +{ + boost::asio::io_context io; + + AsioPromise promise; + promise.SetException(std::runtime_error{"test"}); + + IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) { + BOOST_REQUIRE_EXCEPTION(future->Get(yc), std::runtime_error, [](const std::exception& ex) -> bool { + return std::string_view{ex.what()} == "test"; + }); + }); + + io.run(); +} + +BOOST_AUTO_TEST_CASE(future_exception) +{ + boost::asio::io_context io; + + AsioPromise promise; + std::atomic_bool before = false; + std::atomic_bool after = false; + + IoEngine::SpawnCoroutine(io, [&, future = promise.GetFuture()](boost::asio::yield_context yc) { + before = true; + BOOST_REQUIRE_EXCEPTION(future->Get(yc), std::runtime_error, [](const std::exception& ex) -> bool { + return std::string_view{ex.what()} == "test"; + }); + after = true; + }); + + std::thread testThread + { + [&io]() { + io.run(); + }}; + + while (!before) { + Utility::Sleep(0.01); + } + + BOOST_REQUIRE(before); + BOOST_REQUIRE(!after); + + promise.SetException(std::runtime_error{"test"}); + + while (!after) { + Utility::Sleep(0.01); + } + + BOOST_REQUIRE(before); + BOOST_REQUIRE(after); + + testThread.join(); +} + BOOST_AUTO_TEST_SUITE_END()