Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ option(ENABLE_TSAN "Enable Thread Sanitizer" OFF)
option(ENABLE_ASAN "Enable Address Sanitizer" OFF)
option(PHLEX_USE_FORM "Enable experimental integration with FORM" OFF)
option(ENABLE_COVERAGE "Enable code coverage instrumentation" OFF)
option(ENABLE_CLANG_TIDY "Enable clang-tidy checks during build" OFF)

add_compile_options(
-Wall
Expand Down Expand Up @@ -163,22 +162,6 @@ if(ENABLE_COVERAGE)
endif()
endif()

# Configure clang-tidy integration
find_program(CLANG_TIDY_EXECUTABLE NAMES clang-tidy-20 clang-tidy)

if(ENABLE_CLANG_TIDY)
if(CLANG_TIDY_EXECUTABLE)
message(STATUS "Found clang-tidy: ${CLANG_TIDY_EXECUTABLE}")
set(
CMAKE_CXX_CLANG_TIDY
${CLANG_TIDY_EXECUTABLE}
--config-file=${CMAKE_SOURCE_DIR}/.clang-tidy
)
else()
message(WARNING "clang-tidy not found, disabling clang-tidy checks")
endif()
endif()

if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE RelWithDebInfo CACHE STRING "Build type" FORCE)
endif()
Expand All @@ -202,10 +185,4 @@ if(BUILD_TESTING)
endif()
endif()

# Report clang-tidy availability
if(CLANG_TIDY_EXECUTABLE)
message(STATUS "Clang-tidy available: ${CLANG_TIDY_EXECUTABLE}")
message(STATUS "Use -DCMAKE_CXX_CLANG_TIDY=clang-tidy to enable automatic checks during build")
endif()

cet_cmake_config()
71 changes: 37 additions & 34 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@
class fold_node : public declared_fold, private count_stores {
using all_parameter_types = typename AlgorithmBits::input_parameter_types;
using input_parameter_types = skip_first_type<all_parameter_types>; // Skip fold object
static constexpr auto N = std::tuple_size_v<input_parameter_types>;
using R = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;
static constexpr auto num_inputs = std::tuple_size_v<input_parameter_types>;
using r = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;

static constexpr std::size_t M = 1; // hard-coded for now
static constexpr std::size_t num_outputs = 1; // hard-coded for now
using function_t = typename AlgorithmBits::bound_type;

public:
Expand All @@ -74,7 +74,7 @@
std::string partition) :
declared_fold{std::move(name), std::move(predicates), std::move(product_labels)},
initializer_{std::move(initializer)},
output_{to_product_specifications(full_name(), std::move(output), make_type_ids<R>())},
output_{to_product_specifications(full_name(), std::move(output), make_type_ids<r>())},
partition_{std::move(partition)},
flush_receiver_{g,
tbb::flow::unlimited,
Expand All @@ -88,32 +88,33 @@
emit_and_evict_if_done(index);
return {};
}},
join_{make_join_or_none<N>(
join_{make_join_or_none<num_inputs>(
g, full_name(), layers())}, // FIXME: This should change to include result product!
fold_{
g, concurrency, [this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& index = msg.store->index();
fold_{g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<num_inputs> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& index = msg.store->index();

auto fold_index = index->parent(partition_);
if (not fold_index) {
return;
}
auto fold_index = index->parent(partition_);
if (not fold_index) {
return;
}

auto const& index_hash_for_counter = fold_index->hash();
auto const& index_hash_for_counter = fold_index->hash();

call(ft, messages, std::make_index_sequence<N>{});
++calls_;
call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;

counter_for(index_hash_for_counter).increment(index->layer_hash());
counter_for(index_hash_for_counter).increment(index->layer_hash());

emit_and_evict_if_done(fold_index);
}}
emit_and_evict_if_done(fold_index);
}}
{
if constexpr (N > 1ull) {
if constexpr (num_inputs > 1ull) {
make_edge(join_, fold_);
}
}
Expand All @@ -123,7 +124,7 @@
{
if (auto counter = done_with(fold_index->hash())) {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(parent);
commit(parent);
++product_count_;
tbb::flow::output_port<0>(fold_).try_put(
{.store = parent, .id = counter->original_message_id()});
Expand All @@ -132,20 +133,22 @@

tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label, fold_);
return receiver_for<num_inputs>(join_, input(), product_label, fold_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, fold_);
return input_ports<num_inputs>(join_, fold_);

Check warning on line 141 in phlex/core/declared_fold.hpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_fold.hpp#L141

Added line #L141 was not covered by tests
}

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& output_port() override { return tbb::flow::output_port<0>(fold_); }
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
void call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
auto const parent_index = most_derived(messages).store->index()->parent(partition_);

Expand All @@ -160,7 +163,7 @@
.first;
}

if constexpr (N == 1ull) {
if constexpr (num_inputs == 1ull) {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
Expand All @@ -174,11 +177,11 @@
template <size_t... Is>
auto initialized_object(InitTuple&& tuple, std::index_sequence<Is...>) const
{
return std::unique_ptr<R>{
new R{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
return std::unique_ptr<r>{
new r{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
}

auto commit_(product_store_ptr& store)
auto commit(product_store_ptr& store)
{
auto& result = results_.at(store->index()->hash());
if constexpr (requires { send(*result); }) {
Expand All @@ -196,9 +199,9 @@
product_specifications output_;
std::string partition_;
tbb::flow::function_node<flush_message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, message_tuple<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<R>> results_;
join_or_none_t<num_inputs> join_;
tbb::flow::multifunction_node<messages_t<num_inputs>, message_tuple<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<r>> results_;
std::atomic<std::size_t> calls_;
std::atomic<std::size_t> product_count_;
};
Expand Down
28 changes: 15 additions & 13 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ namespace phlex::experimental {

template <typename AlgorithmBits>
class observer_node : public declared_observer {
using InputArgs = typename AlgorithmBits::input_parameter_types;
using input_args = typename AlgorithmBits::input_parameter_types;
using function_t = typename AlgorithmBits::bound_type;
static constexpr auto N = AlgorithmBits::number_inputs;
static constexpr auto num_inputs = AlgorithmBits::number_inputs;

public:
static constexpr auto number_output_products = 0;
Expand All @@ -60,36 +60,38 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
join_{make_join_or_none<N>(g, full_name(), layers())},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
observer_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<N> const& messages) -> oneapi::tbb::flow::continue_msg {
call(ft, messages, std::make_index_sequence<N>{});
messages_t<num_inputs> const& messages) -> oneapi::tbb::flow::continue_msg {
call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;
return {};
}}
{
if constexpr (N > 1ull) {
if constexpr (num_inputs > 1ull) {
make_edge(join_, observer_);
}
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label, observer_);
return receiver_for<num_inputs>(join_, input(), product_label, observer_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, observer_);
return input_ports<num_inputs>(join_, observer_);
}

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
void call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
if constexpr (N == 1ull) {
if constexpr (num_inputs == 1ull) {
std::invoke(ft, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
Expand All @@ -99,9 +101,9 @@ namespace phlex::experimental {
named_index_ports index_ports() final { return join_.index_ports(); }
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>> observer_;
input_retriever_types<input_args> input_{input_arguments<input_args>()};
join_or_none_t<num_inputs> join_;
tbb::flow::function_node<messages_t<num_inputs>> observer_;
std::atomic<std::size_t> calls_;
};
}
Expand Down
46 changes: 24 additions & 22 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ namespace phlex::experimental {

template <typename AlgorithmBits>
class predicate_node : public declared_predicate {
using InputArgs = typename AlgorithmBits::input_parameter_types;
using input_args = typename AlgorithmBits::input_parameter_types;
using function_t = typename AlgorithmBits::bound_type;
static constexpr auto N = AlgorithmBits::number_inputs;
static constexpr auto num_inputs = AlgorithmBits::number_inputs;

public:
static constexpr auto number_output_products = 0ull;
Expand All @@ -64,40 +64,42 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_predicate{std::move(name), std::move(predicates), std::move(input_products)},
join_{make_join_or_none<N>(g, full_name(), layers())},
predicate_{
g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<N> const& messages) -> predicate_result {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);

bool const rc = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
return {message_id, rc};
}}
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
predicate_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<num_inputs> const& messages) -> predicate_result {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);

bool const rc = call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;
return {message_id, rc};
}}
{
if constexpr (N > 1ull) {
if constexpr (num_inputs > 1ull) {
make_edge(join_, predicate_);
}
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label, predicate_);
return receiver_for<num_inputs>(join_, input(), product_label, predicate_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, predicate_);
return input_ports<num_inputs>(join_, predicate_);
}
tbb::flow::sender<predicate_result>& sender() override { return predicate_; }

template <std::size_t... Is>
bool call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
bool call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
if constexpr (N == 1ull) {
if constexpr (num_inputs == 1ull) {
return std::invoke(ft, std::get<Is>(input_).retrieve(messages)...);
} else {
return std::invoke(ft, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
Expand All @@ -107,9 +109,9 @@ namespace phlex::experimental {
named_index_ports index_ports() final { return join_.index_ports(); }
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>, predicate_result> predicate_;
input_retriever_types<input_args> input_{input_arguments<input_args>()};
join_or_none_t<num_inputs> join_;
tbb::flow::function_node<messages_t<num_inputs>, predicate_result> predicate_;
std::atomic<std::size_t> calls_;
};

Expand Down
Loading
Loading