diff --git a/ucm/transport/p2p/CMakeLists.txt b/ucm/transport/p2p/CMakeLists.txt index e69de29bb..02ff5f6b6 100644 --- a/ucm/transport/p2p/CMakeLists.txt +++ b/ucm/transport/p2p/CMakeLists.txt @@ -0,0 +1,3 @@ +if(RUNTIME_ENVIRONMENT STREQUAL "ascend") + add_subdirectory(ffts) +endif() diff --git a/ucm/transport/p2p/ffts/CMakeLists.txt b/ucm/transport/p2p/ffts/CMakeLists.txt new file mode 100644 index 000000000..7d54704db --- /dev/null +++ b/ucm/transport/p2p/ffts/CMakeLists.txt @@ -0,0 +1,120 @@ +# Copyright (c) 2026 Huawei Technologies Co., Ltd. + +option(BUILD_FFTS_BENCHMARKS "build FFTS transport benchmark executables." OFF) + +if(DEFINED ENV{ASCEND_HOME_PATH}) + set(Ascend_ROOT $ENV{ASCEND_HOME_PATH}) +elseif(DEFINED ENV{ASCEND_CUSTOM_PATH}) + set(Ascend_ROOT $ENV{ASCEND_CUSTOM_PATH}/latest) +else() + set(Ascend_ROOT /usr/local/Ascend/ascend-toolkit/latest) +endif() + +find_path(FFTS_ASCEND_INCLUDE_DIR + NAMES acl/acl.h + HINTS + ${Ascend_ROOT}/include + ${Ascend_ROOT}/aarch64-linux/include + ${Ascend_ROOT}/arm64-linux/include + NO_DEFAULT_PATH +) + +find_path(FFTS_RUNTIME_INCLUDE_DIR + NAMES runtime/rt_ffts_plus.h + HINTS + ${Ascend_ROOT}/pkg_inc/runtime + ${Ascend_ROOT}/aarch64-linux/pkg_inc/runtime + ${Ascend_ROOT}/arm64-linux/pkg_inc/runtime + NO_DEFAULT_PATH +) + +find_path(FFTS_TOOLCHAIN_INCLUDE_DIR + NAMES toolchain/prof_api.h + HINTS + ${Ascend_ROOT}/pkg_inc + ${Ascend_ROOT}/aarch64-linux/pkg_inc + ${Ascend_ROOT}/arm64-linux/pkg_inc + ${Ascend_ROOT}/include/experiment/msprof + ${Ascend_ROOT}/aarch64-linux/include/experiment/msprof + ${Ascend_ROOT}/arm64-linux/include/experiment/msprof + NO_DEFAULT_PATH +) + +find_path(FFTS_PROFILING_INCLUDE_DIR + NAMES prof_common.h + HINTS + ${Ascend_ROOT}/pkg_inc/profiling + ${Ascend_ROOT}/aarch64-linux/pkg_inc/profiling + ${Ascend_ROOT}/arm64-linux/pkg_inc/profiling + ${Ascend_ROOT}/include/experiment/msprof/toolchain + ${Ascend_ROOT}/aarch64-linux/include/experiment/msprof/toolchain + ${Ascend_ROOT}/arm64-linux/include/experiment/msprof/toolchain + NO_DEFAULT_PATH +) + +find_library(FFTS_ASCENDCL_LIBRARY + NAMES ascendcl + HINTS + ${Ascend_ROOT}/lib64 + ${Ascend_ROOT}/aarch64-linux/lib64 + ${Ascend_ROOT}/arm64-linux/lib64 + NO_DEFAULT_PATH +) + +find_library(FFTS_RUNTIME_LIBRARY + NAMES runtime + HINTS + ${Ascend_ROOT}/lib64 + ${Ascend_ROOT}/runtime/lib64 + ${Ascend_ROOT}/aarch64-linux/lib64 + ${Ascend_ROOT}/arm64-linux/lib64 + NO_DEFAULT_PATH +) + +if(NOT FFTS_ASCEND_INCLUDE_DIR OR NOT FFTS_RUNTIME_INCLUDE_DIR OR NOT FFTS_TOOLCHAIN_INCLUDE_DIR OR + NOT FFTS_PROFILING_INCLUDE_DIR OR NOT FFTS_ASCENDCL_LIBRARY OR NOT FFTS_RUNTIME_LIBRARY) + message(FATAL_ERROR "Ascend CANN headers/libraries required for ucm_transport_ffts were not found") +endif() + +find_package(Threads REQUIRED) + +add_library(ucm_transport_ffts STATIC + src/ffts_transport.cpp + src/ffts_engine.cpp +) + +target_compile_features(ucm_transport_ffts PUBLIC cxx_std_17) + +target_include_directories(ucm_transport_ffts + PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} +) + +target_include_directories(ucm_transport_ffts SYSTEM + PRIVATE + ${FFTS_ASCEND_INCLUDE_DIR} + ${FFTS_RUNTIME_INCLUDE_DIR} + ${FFTS_TOOLCHAIN_INCLUDE_DIR} + ${FFTS_PROFILING_INCLUDE_DIR} +) + +target_link_libraries(ucm_transport_ffts + PUBLIC + infra_status + PRIVATE + Threads::Threads + ${CMAKE_DL_LIBS} + m + ${FFTS_ASCENDCL_LIBRARY} + ${FFTS_RUNTIME_LIBRARY} +) + +if(BUILD_UNIT_TESTS) + add_subdirectory(test) +endif() + +if(BUILD_FFTS_BENCHMARKS) + add_subdirectory(bench) +endif() diff --git a/ucm/transport/p2p/ffts/bench/CMakeLists.txt b/ucm/transport/p2p/ffts/bench/CMakeLists.txt new file mode 100644 index 000000000..7d11534cd --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/CMakeLists.txt @@ -0,0 +1,16 @@ +# Copyright (c) 2026 Huawei Technologies Co., Ltd. + +add_executable(ucm_transport_ffts_bench + ffts_bench_options.cpp + ffts_bench_runtime.cpp + ffts_bench_runner.cpp + ffts_transport_bench.cpp +) + +target_link_libraries(ucm_transport_ffts_bench PRIVATE + ucm_transport_ffts +) + +target_include_directories(ucm_transport_ffts_bench SYSTEM PRIVATE + ${FFTS_ASCEND_INCLUDE_DIR} +) diff --git a/ucm/transport/p2p/ffts/bench/ffts_bench_options.cpp b/ucm/transport/p2p/ffts/bench/ffts_bench_options.cpp new file mode 100644 index 000000000..940a508e9 --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_bench_options.cpp @@ -0,0 +1,144 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ffts_bench_options.h" + +#include +#include +#include +#include +#include +#include + +namespace UC::Transport::Ffts::Bench { +namespace { +size_t ParseBytes(const std::string& value) +{ + if (value.empty()) { throw std::invalid_argument("empty size value"); } + + std::string number = value; + size_t multiplier = 1; + const auto suffix = static_cast(std::tolower(static_cast(value.back()))); + if (suffix == 'k' || suffix == 'm' || suffix == 'g') { + number = value.substr(0, value.size() - 1); + multiplier = suffix == 'k' ? kKiB : (suffix == 'm' ? kMiB : kGiB); + } + + size_t consumed = 0; + const auto parsed = std::stoull(number, &consumed, 0); + if (consumed != number.size()) { throw std::invalid_argument("invalid size value: " + value); } + if (parsed > std::numeric_limits::max() / multiplier) { + throw std::overflow_error("size value overflows: " + value); + } + return static_cast(parsed) * multiplier; +} + +size_t ParseCount(const std::string& value) +{ + size_t consumed = 0; + const auto parsed = std::stoull(value, &consumed, 0); + if (consumed != value.size()) { throw std::invalid_argument("invalid count value: " + value); } + return static_cast(parsed); +} + +std::vector ParseList(const std::string& value, bool parseBytes) +{ + std::vector result; + size_t begin = 0; + while (begin <= value.size()) { + const auto end = value.find(',', begin); + const auto token = value.substr(begin, end == std::string::npos ? std::string::npos : end - begin); + if (token.empty()) { throw std::invalid_argument("empty list item: " + value); } + result.push_back(parseBytes ? ParseBytes(token) : ParseCount(token)); + if (end == std::string::npos) { break; } + begin = end + 1; + } + return result; +} + +Scenario ParseScenario(const std::string& value) +{ + if (value == "all") { return Scenario::All; } + if (value == "single") { return Scenario::Single; } + if (value == "batch") { return Scenario::Batch; } + throw std::invalid_argument("invalid scenario: " + value); +} + +void PrintUsage(const char* program) +{ + std::cout << "Usage: " << program << " [options]\n" + << "Options:\n" + << " --device N Ascend device id, default 0\n" + << " --warmup N Warmup iterations per case, default 10\n" + << " --iters N Timed iterations per case, default 100\n" + << " --scenario NAME all, single, or batch; default all\n" + << " --min-bytes SIZE Minimum single-copy size, default 4K\n" + << " --max-bytes SIZE Maximum single-copy size, default 256M\n" + << " --batch-counts LIST Comma-separated batch counts, default 4,16,64,128\n" + << " --batch-chunks LIST Comma-separated chunk sizes, default 4K,16K,64K,256K,1M\n" + << " --help Show this help text\n"; +} +} // namespace + +Options ParseOptions(int argc, char** argv) +{ + Options options; + for (int i = 1; i < argc; ++i) { + const std::string arg = argv[i]; + auto requireValue = [&](const char* name) { + if (i + 1 >= argc) { throw std::invalid_argument(std::string(name) + " requires a value"); } + return std::string(argv[++i]); + }; + + if (arg == "--help") { + PrintUsage(argv[0]); + std::exit(0); + } else if (arg == "--device") { + options.deviceId = static_cast(ParseCount(requireValue("--device"))); + } else if (arg == "--warmup") { + options.warmup = ParseCount(requireValue("--warmup")); + } else if (arg == "--iters") { + options.iterations = ParseCount(requireValue("--iters")); + } else if (arg == "--scenario") { + options.scenario = ParseScenario(requireValue("--scenario")); + } else if (arg == "--min-bytes") { + options.minBytes = ParseBytes(requireValue("--min-bytes")); + } else if (arg == "--max-bytes") { + options.maxBytes = ParseBytes(requireValue("--max-bytes")); + } else if (arg == "--batch-counts") { + options.batchCounts = ParseList(requireValue("--batch-counts"), false); + } else if (arg == "--batch-chunks") { + options.batchChunkBytes = ParseList(requireValue("--batch-chunks"), true); + } else { + throw std::invalid_argument("unknown argument: " + arg); + } + } + + if (options.iterations == 0) { throw std::invalid_argument("--iters must be greater than 0"); } + if (options.minBytes == 0 || options.maxBytes == 0 || options.minBytes > options.maxBytes) { + throw std::invalid_argument("invalid min/max bytes"); + } + return options; +} + +} // namespace UC::Transport::Ffts::Bench diff --git a/ucm/transport/p2p/ffts/bench/ffts_bench_options.h b/ucm/transport/p2p/ffts/bench/ffts_bench_options.h new file mode 100644 index 000000000..33310b3c7 --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_bench_options.h @@ -0,0 +1,58 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_OPTIONS_H +#define UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_OPTIONS_H + +#include +#include +#include + +namespace UC::Transport::Ffts::Bench { + +constexpr size_t kKiB = 1024ULL; +constexpr size_t kMiB = 1024ULL * kKiB; +constexpr size_t kGiB = 1024ULL * kMiB; + +enum class Scenario { + All, + Single, + Batch, +}; + +struct Options { + int32_t deviceId{0}; + size_t warmup{10}; + size_t iterations{100}; + size_t minBytes{4ULL * kKiB}; + size_t maxBytes{256ULL * kMiB}; + Scenario scenario{Scenario::All}; + std::vector batchCounts{4, 16, 64, 128}; + std::vector batchChunkBytes{4ULL * kKiB, 16ULL * kKiB, 64ULL * kKiB, 256ULL * kKiB, 1ULL * kMiB}; +}; + +Options ParseOptions(int argc, char** argv); + +} // namespace UC::Transport::Ffts::Bench + +#endif // UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_OPTIONS_H diff --git a/ucm/transport/p2p/ffts/bench/ffts_bench_runner.cpp b/ucm/transport/p2p/ffts/bench/ffts_bench_runner.cpp new file mode 100644 index 000000000..c526f6626 --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_bench_runner.cpp @@ -0,0 +1,251 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ffts_bench_runner.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ffts_bench_runtime.h" +#include "ffts_transport.h" + +namespace UC::Transport::Ffts::Bench { +namespace { +constexpr size_t kOutputWidth = 14; + +std::string FormatBytes(size_t bytes) +{ + if (bytes % kGiB == 0) { return std::to_string(bytes / kGiB) + "G"; } + if (bytes % kMiB == 0) { return std::to_string(bytes / kMiB) + "M"; } + if (bytes % kKiB == 0) { return std::to_string(bytes / kKiB) + "K"; } + return std::to_string(bytes) + "B"; +} + +void FillPattern(std::vector& data) +{ + for (size_t i = 0; i < data.size(); ++i) { + data[i] = static_cast((i * 131ULL + 17ULL) & 0xFFU); + } +} + +void* Offset(void* ptr, size_t bytes) +{ + return static_cast(static_cast(ptr) + bytes); +} + +const void* Offset(const void* ptr, size_t bytes) +{ + return static_cast(static_cast(ptr) + bytes); +} + +std::vector BuildCopies(void* dst, const void* src, size_t chunkBytes, size_t count) +{ + std::vector copies; + copies.reserve(count); + for (size_t i = 0; i < count; ++i) { + const auto offset = i * chunkBytes; + copies.push_back({Offset(dst, offset), Offset(src, offset), chunkBytes}); + } + return copies; +} + +void PrepareBuffers(DeviceBuffer& src, DeviceBuffer& dst, size_t bytes) +{ + std::vector host(bytes); + FillPattern(host); + CheckAcl(aclrtMemcpy(src.Get(), src.Size(), host.data(), bytes, ACL_MEMCPY_HOST_TO_DEVICE), + "aclrtMemcpy H2D"); + CheckAcl(aclrtMemset(dst.Get(), dst.Size(), 0, bytes), "aclrtMemset"); +} + +void VerifyBuffer(DeviceBuffer& dst, size_t bytes) +{ + std::vector expected(bytes); + std::vector actual(bytes, 0); + FillPattern(expected); + CheckAcl(aclrtMemcpy(actual.data(), bytes, dst.Get(), bytes, ACL_MEMCPY_DEVICE_TO_HOST), + "aclrtMemcpy D2H"); + if (actual != expected) { throw std::runtime_error("copy verification failed"); } +} + +template +Samples Measure(size_t warmup, size_t iterations, Func&& func) +{ + for (size_t i = 0; i < warmup; ++i) { + func(); + } + + Samples samples; + for (size_t i = 0; i < iterations; ++i) { + const auto start = std::chrono::steady_clock::now(); + func(); + const auto end = std::chrono::steady_clock::now(); + const std::chrono::duration elapsed = end - start; + samples.Add(elapsed.count()); + } + return samples; +} + +void RunFftsCopy(UC::Transport::Ffts::FftsTransport& transport, void* dst, const void* src, size_t bytes) +{ + CheckStatus(transport.CopyAsync(dst, src, bytes), "FftsTransport::CopyAsync"); + CheckStatus(transport.Synchronize(), "FftsTransport::Synchronize"); +} + +void RunFftsBatch(UC::Transport::Ffts::FftsTransport& transport, + const std::vector& copies) +{ + CheckStatus(transport.Submit(copies), "FftsTransport::Submit"); + CheckStatus(transport.Synchronize(), "FftsTransport::Synchronize"); +} + +void RunAclCopy(aclrtStream stream, void* dst, const void* src, size_t bytes) +{ + CheckAcl(aclrtMemcpyAsync(dst, bytes, src, bytes, ACL_MEMCPY_DEVICE_TO_DEVICE, stream), + "aclrtMemcpyAsync D2D"); + CheckAcl(aclrtSynchronizeStream(stream), "aclrtSynchronizeStream"); +} + +void RunAclBatch(aclrtStream stream, const std::vector& copies) +{ + for (const auto& copy : copies) { + CheckAcl(aclrtMemcpyAsync(copy.dst, copy.size, copy.src, copy.size, ACL_MEMCPY_DEVICE_TO_DEVICE, stream), + "aclrtMemcpyAsync D2D"); + } + CheckAcl(aclrtSynchronizeStream(stream), "aclrtSynchronizeStream"); +} + +void PrintHeader() +{ + std::cout << std::left << std::setw(10) << "scenario" << std::setw(10) << "method" << std::right + << std::setw(kOutputWidth) << "bytes" << std::setw(kOutputWidth) << "chunk" + << std::setw(kOutputWidth) << "count" << std::setw(kOutputWidth) << "avg_us" + << std::setw(kOutputWidth) << "min_us" << std::setw(kOutputWidth) << "p50_us" + << std::setw(kOutputWidth) << "GB/s" << '\n'; +} + +void PrintResult(const std::string& scenario, const std::string& method, size_t bytes, size_t chunkBytes, + size_t count, const Samples& samples) +{ + const auto avgUs = samples.Average(); + const auto gbps = avgUs == 0.0 ? 0.0 : (static_cast(bytes) / (avgUs / 1'000'000.0)) / 1'000'000'000.0; + std::cout << std::left << std::setw(10) << scenario << std::setw(10) << method << std::right + << std::setw(kOutputWidth) << FormatBytes(bytes) << std::setw(kOutputWidth) << FormatBytes(chunkBytes) + << std::setw(kOutputWidth) << count << std::setw(kOutputWidth) << std::fixed + << std::setprecision(2) << avgUs << std::setw(kOutputWidth) << samples.Min() + << std::setw(kOutputWidth) << samples.Median() << std::setw(kOutputWidth) << gbps << '\n'; +} + +void RunSingleCase(UC::Transport::Ffts::FftsTransport& transport, aclrtStream stream, size_t bytes, + const Options& options) +{ + DeviceBuffer src(bytes); + DeviceBuffer dst(bytes); + PrepareBuffers(src, dst, bytes); + + RunFftsCopy(transport, dst.Get(), src.Get(), bytes); + VerifyBuffer(dst, bytes); + CheckAcl(aclrtMemset(dst.Get(), dst.Size(), 0, bytes), "aclrtMemset"); + + RunAclCopy(stream, dst.Get(), src.Get(), bytes); + VerifyBuffer(dst, bytes); + CheckAcl(aclrtMemset(dst.Get(), dst.Size(), 0, bytes), "aclrtMemset"); + + const auto ffts = Measure(options.warmup, options.iterations, [&]() { + RunFftsCopy(transport, dst.Get(), src.Get(), bytes); + }); + const auto acl = Measure(options.warmup, options.iterations, [&]() { + RunAclCopy(stream, dst.Get(), src.Get(), bytes); + }); + + PrintResult("single", "ffts", bytes, bytes, 1, ffts); + PrintResult("single", "acl_async", bytes, bytes, 1, acl); +} + +void RunBatchCase(UC::Transport::Ffts::FftsTransport& transport, aclrtStream stream, size_t chunkBytes, size_t count, + const Options& options) +{ + if (count > std::numeric_limits::max() / chunkBytes) { + throw std::overflow_error("batch bytes overflow"); + } + const auto bytes = chunkBytes * count; + DeviceBuffer src(bytes); + DeviceBuffer dst(bytes); + PrepareBuffers(src, dst, bytes); + const auto copies = BuildCopies(dst.Get(), src.Get(), chunkBytes, count); + + RunFftsBatch(transport, copies); + VerifyBuffer(dst, bytes); + CheckAcl(aclrtMemset(dst.Get(), dst.Size(), 0, bytes), "aclrtMemset"); + + RunAclBatch(stream, copies); + VerifyBuffer(dst, bytes); + CheckAcl(aclrtMemset(dst.Get(), dst.Size(), 0, bytes), "aclrtMemset"); + + const auto ffts = Measure(options.warmup, options.iterations, [&]() { + RunFftsBatch(transport, copies); + }); + const auto acl = Measure(options.warmup, options.iterations, [&]() { + RunAclBatch(stream, copies); + }); + + PrintResult("batch", "ffts", bytes, chunkBytes, count, ffts); + PrintResult("batch", "acl_async", bytes, chunkBytes, count, acl); +} +} // namespace + +void RunBenchmark(const Options& options) +{ + AclSession session; + session.Init(options.deviceId); + + AclStream stream; + UC::Transport::Ffts::FftsTransport transport; + CheckStatus(transport.Setup(options.deviceId), "FftsTransport::Setup"); + + PrintHeader(); + if (options.scenario == Scenario::All || options.scenario == Scenario::Single) { + for (size_t bytes = options.minBytes; bytes <= options.maxBytes; bytes *= 2) { + RunSingleCase(transport, stream.Get(), bytes, options); + if (bytes > options.maxBytes / 2) { break; } + } + } + + if (options.scenario == Scenario::All || options.scenario == Scenario::Batch) { + for (const auto chunkBytes : options.batchChunkBytes) { + for (const auto count : options.batchCounts) { + RunBatchCase(transport, stream.Get(), chunkBytes, count, options); + } + } + } +} + +} // namespace UC::Transport::Ffts::Bench diff --git a/ucm/transport/p2p/ffts/bench/ffts_bench_runner.h b/ucm/transport/p2p/ffts/bench/ffts_bench_runner.h new file mode 100644 index 000000000..31c2e243b --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_bench_runner.h @@ -0,0 +1,35 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_RUNNER_H +#define UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_RUNNER_H + +#include "ffts_bench_options.h" + +namespace UC::Transport::Ffts::Bench { + +void RunBenchmark(const Options& options); + +} // namespace UC::Transport::Ffts::Bench + +#endif // UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_RUNNER_H diff --git a/ucm/transport/p2p/ffts/bench/ffts_bench_runtime.cpp b/ucm/transport/p2p/ffts/bench/ffts_bench_runtime.cpp new file mode 100644 index 000000000..a6aa014a5 --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_bench_runtime.cpp @@ -0,0 +1,115 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ffts_bench_runtime.h" + +#include +#include +#include +#include + +namespace UC::Transport::Ffts::Bench { + +void Samples::Add(double value) { values_.push_back(value); } + +double Samples::Average() const +{ + if (values_.empty()) { return 0.0; } + return std::accumulate(values_.begin(), values_.end(), 0.0) / static_cast(values_.size()); +} + +double Samples::Min() const +{ + if (values_.empty()) { return 0.0; } + return *std::min_element(values_.begin(), values_.end()); +} + +double Samples::Median() const +{ + if (values_.empty()) { return 0.0; } + auto sorted = values_; + std::sort(sorted.begin(), sorted.end()); + const auto mid = sorted.size() / 2; + if (sorted.size() % 2 == 0) { return (sorted[mid - 1] + sorted[mid]) / 2.0; } + return sorted[mid]; +} + +AclSession::~AclSession() +{ + if (deviceSet_) { (void)aclrtResetDevice(deviceId_); } + if (initialized_) { (void)aclFinalize(); } +} + +void AclSession::Init(int32_t deviceId) +{ + CheckAcl(aclInit(nullptr), "aclInit"); + initialized_ = true; + + CheckAcl(aclrtSetDevice(deviceId), "aclrtSetDevice"); + deviceId_ = deviceId; + deviceSet_ = true; +} + +DeviceBuffer::DeviceBuffer(size_t bytes) : bytes_(bytes) +{ + CheckAcl(aclrtMalloc(&ptr_, bytes_, ACL_MEM_MALLOC_HUGE_FIRST), "aclrtMalloc"); +} + +DeviceBuffer::~DeviceBuffer() +{ + if (ptr_ != nullptr) { (void)aclrtFree(ptr_); } +} + +void* DeviceBuffer::Get() const { return ptr_; } + +size_t DeviceBuffer::Size() const { return bytes_; } + +AclStream::AclStream() +{ + CheckAcl(aclrtCreateStreamWithConfig(&stream_, 0, ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC), + "aclrtCreateStreamWithConfig"); +} + +AclStream::~AclStream() +{ + if (stream_ != nullptr) { + (void)aclrtSynchronizeStream(stream_); + (void)aclrtDestroyStream(stream_); + } +} + +aclrtStream AclStream::Get() const { return stream_; } + +void CheckAcl(aclError code, const char* call) +{ + if (code == ACL_SUCCESS) { return; } + throw std::runtime_error(std::string(call) + " failed: " + std::to_string(code)); +} + +void CheckStatus(const UC::Status& status, const char* call) +{ + if (status.Success()) { return; } + throw std::runtime_error(std::string(call) + " failed: " + status.ToString()); +} + +} // namespace UC::Transport::Ffts::Bench diff --git a/ucm/transport/p2p/ffts/bench/ffts_bench_runtime.h b/ucm/transport/p2p/ffts/bench/ffts_bench_runtime.h new file mode 100644 index 000000000..f7091845d --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_bench_runtime.h @@ -0,0 +1,99 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_RUNTIME_H +#define UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_RUNTIME_H + +#include +#include +#include + +#include + +#include "status/status.h" + +namespace UC::Transport::Ffts::Bench { + +class Samples { +public: + void Add(double value); + double Average() const; + double Min() const; + double Median() const; + +private: + std::vector values_; +}; + +class AclSession { +public: + AclSession() = default; + ~AclSession(); + + AclSession(const AclSession&) = delete; + AclSession& operator=(const AclSession&) = delete; + + void Init(int32_t deviceId); + +private: + int32_t deviceId_{0}; + bool initialized_{false}; + bool deviceSet_{false}; +}; + +class DeviceBuffer { +public: + explicit DeviceBuffer(size_t bytes); + ~DeviceBuffer(); + + DeviceBuffer(const DeviceBuffer&) = delete; + DeviceBuffer& operator=(const DeviceBuffer&) = delete; + + void* Get() const; + size_t Size() const; + +private: + void* ptr_{nullptr}; + size_t bytes_{0}; +}; + +class AclStream { +public: + AclStream(); + ~AclStream(); + + AclStream(const AclStream&) = delete; + AclStream& operator=(const AclStream&) = delete; + + aclrtStream Get() const; + +private: + aclrtStream stream_{nullptr}; +}; + +void CheckAcl(aclError code, const char* call); +void CheckStatus(const UC::Status& status, const char* call); + +} // namespace UC::Transport::Ffts::Bench + +#endif // UNIFIEDCACHE_TRANSPORT_FFTS_BENCH_RUNTIME_H diff --git a/ucm/transport/p2p/ffts/bench/ffts_transport_bench.cpp b/ucm/transport/p2p/ffts/bench/ffts_transport_bench.cpp new file mode 100644 index 000000000..b0614b23e --- /dev/null +++ b/ucm/transport/p2p/ffts/bench/ffts_transport_bench.cpp @@ -0,0 +1,40 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include +#include + +#include "ffts_bench_options.h" +#include "ffts_bench_runner.h" + +int main(int argc, char** argv) +{ + try { + const auto options = UC::Transport::Ffts::Bench::ParseOptions(argc, argv); + UC::Transport::Ffts::Bench::RunBenchmark(options); + return 0; + } catch (const std::exception& e) { + std::cerr << "ucm_transport_ffts_bench failed: " << e.what() << '\n'; + return 1; + } +} diff --git a/ucm/transport/p2p/ffts/detail/ffts_engine.h b/ucm/transport/p2p/ffts/detail/ffts_engine.h new file mode 100644 index 000000000..27062f11f --- /dev/null +++ b/ucm/transport/p2p/ffts/detail/ffts_engine.h @@ -0,0 +1,71 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANSPORT_FFTS_DETAIL_FFTS_ENGINE_H +#define UNIFIEDCACHE_TRANSPORT_FFTS_DETAIL_FFTS_ENGINE_H + +#include +#include +#include + +#include +#include + +#include "status/status.h" +#include "ffts_transport.h" + +namespace UC::Transport::Ffts { + +class FftsEngine { +public: + FftsEngine(); + ~FftsEngine(); + + // Non-copyable and non-movable + FftsEngine(const FftsEngine&) = delete; + FftsEngine& operator=(const FftsEngine&) = delete; + FftsEngine(FftsEngine&&) = delete; + FftsEngine& operator=(FftsEngine&&) = delete; + + Status Setup(int32_t deviceId); + Status WaitEvent(void* event); + Status Submit(const CopyDesc* copies, size_t count); + Status Synchronize(); + +private: + using ContextBuffer = std::vector; + + Status EnsureReady() const; + Status SubmitChunk(const CopyDesc* copies, size_t count); + void KeepAlive(std::shared_ptr contexts); + void ClearCompletedGraphs(); + + int32_t deviceId_{-1}; + aclrtStream stream_{nullptr}; + bool ready_{false}; + std::vector> pendingContexts_; +}; + +} // namespace UC::Transport::Ffts + +#endif // UNIFIEDCACHE_TRANSPORT_FFTS_DETAIL_FFTS_ENGINE_H diff --git a/ucm/transport/p2p/ffts/include/ffts_transport.h b/ucm/transport/p2p/ffts/include/ffts_transport.h new file mode 100644 index 000000000..e05d282eb --- /dev/null +++ b/ucm/transport/p2p/ffts/include/ffts_transport.h @@ -0,0 +1,67 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_TRANSPORT_FFTS_TRANSPORT_H +#define UNIFIEDCACHE_TRANSPORT_FFTS_TRANSPORT_H + +#include +#include +#include +#include + +#include "status/status.h" + +namespace UC::Transport::Ffts { + +struct CopyDesc { + void* dst; + const void* src; + size_t size; +}; + +class FftsTransport { +public: + FftsTransport(); + ~FftsTransport(); + + FftsTransport(FftsTransport&&) noexcept; + FftsTransport& operator=(FftsTransport&&) noexcept; + + FftsTransport(const FftsTransport&) = delete; + FftsTransport& operator=(const FftsTransport&) = delete; + + Status Setup(int32_t deviceId); + Status WaitEvent(void* event); + Status CopyAsync(void* dst, const void* src, size_t size); + Status Submit(const CopyDesc* copies, size_t count); + Status Submit(const std::vector& copies); + Status Synchronize(); + +private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace UC::Transport::Ffts + +#endif // UNIFIEDCACHE_TRANSPORT_FFTS_TRANSPORT_H diff --git a/ucm/transport/p2p/ffts/src/ffts_engine.cpp b/ucm/transport/p2p/ffts/src/ffts_engine.cpp new file mode 100644 index 000000000..a0f632a8e --- /dev/null +++ b/ucm/transport/p2p/ffts/src/ffts_engine.cpp @@ -0,0 +1,274 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "detail/ffts_engine.h" +#include +#include +#include +#include +#include +#include + +namespace UC::Transport::Ffts { +// Visible within the current .cpp file +namespace { +constexpr size_t kMaxReadyContexts = 128; +constexpr uint16_t kFftsTimeout = std::numeric_limits::max(); +constexpr uint8_t kCommunicationSubType = 0x5A; +constexpr uint8_t kLabelMarker = 0x5A; + +Status AclStatus(aclError code, const char* call) +{ + if (code == ACL_SUCCESS) { return Status::OK(); } + std::ostringstream oss; + oss << call << " failed: " << code; + return Status{static_cast(code), oss.str()}; +} + +Status RtStatus(rtError_t code, const char* call) +{ + if (code == RT_ERROR_NONE) { return Status::OK(); } + std::ostringstream oss; + oss << call << " failed: " << code; + return Status{static_cast(code), oss.str()}; +} + +constexpr uint32_t Low32(uint64_t value) { return static_cast(value & 0xFFFFFFFFULL); } + +constexpr uint32_t High32(uint64_t value) { return static_cast((value >> 32U) & 0xFFFFFFFFULL); } + +uint32_t BuildSdmaMoveHeader() +{ + constexpr uint32_t kDataTypeFp32 = 7U; + constexpr uint32_t kSourceSubstreamValid = 1U << 9U; + constexpr uint32_t kDestinationSubstreamValid = 1U << 10U; + constexpr uint32_t kSourceNonSecure = 1U << 11U; + constexpr uint32_t kDestinationNonSecure = 1U << 12U; + return (kDataTypeFp32 << 4U) | kSourceSubstreamValid | kDestinationSubstreamValid | + kSourceNonSecure | kDestinationNonSecure; +} + +void FillSdmaContext(rtFftsPlusComCtx_t& storage, const CopyDesc& copy) +{ + static_assert(sizeof(rtFftsPlusSdmaCtx_t) <= sizeof(rtFftsPlusComCtx_t), + "FFTS SDMA context must fit in common context storage"); + static_assert(alignof(rtFftsPlusSdmaCtx_t) <= alignof(rtFftsPlusComCtx_t), + "FFTS SDMA context alignment must fit common context storage"); + static_assert(offsetof(rtFftsPlusComCtx_t, contextType) == offsetof(rtFftsPlusSdmaCtx_t, contextType), + "FFTS common and SDMA context tags must have the same offset"); + static_assert(std::is_same_v, + "FFTS common and SDMA context tags must have the same type"); + std::memset(&storage, 0, sizeof(storage)); + + // Ascend FFTS Plus defines rtFftsPlusComCtx_t as fixed-size storage for + // tagged concrete contexts. The checks above pin the storage and tag ABI. + auto& ctx = *reinterpret_cast(&storage); + ctx.contextType = RT_CTX_TYPE_SDMA; + ctx.threadDim = 1; + ctx.res3 = kLabelMarker; + ctx.sdmaSqeHeader = BuildSdmaMoveHeader(); + + const auto src = reinterpret_cast(copy.src); + const auto dst = reinterpret_cast(copy.dst); + ctx.sourceAddressBaseL = Low32(src); + ctx.sourceAddressBaseH = High32(src); + ctx.destinationAddressBaseL = Low32(dst); + ctx.destinationAddressBaseH = High32(dst); + + const auto bytes = static_cast(copy.size); + ctx.nonTailDataLength = bytes; + ctx.tailDataLength = bytes; +} + +rtFftsPlusSqe_t BuildSqe(uint16_t contextCount) +{ + rtFftsPlusSqe_t sqe{}; + sqe.fftsType = RT_FFTS_PLUS_TYPE; + sqe.totalContextNum = contextCount; + sqe.readyContextNum = contextCount; + sqe.preloadContextNum = contextCount; + sqe.timeout = kFftsTimeout; + sqe.subType = kCommunicationSubType; + return sqe; +} +} // namespace + +FftsEngine::FftsEngine() = default; + +FftsEngine::~FftsEngine() +{ + if (stream_ != nullptr) { + (void)aclrtSynchronizeStream(stream_); + pendingContexts_.clear(); + (void)aclrtDestroyStream(stream_); + stream_ = nullptr; + } +} + +/** + * @brief Initializes the engine for the given Ascend device and creates the ACL stream. + * + * @param deviceId Ascend device ID used by this engine. + * @return Status::OK() on success, otherwise an error status. + */ +Status FftsEngine::Setup(int32_t deviceId) +{ + if (ready_) { + if (deviceId_ == deviceId) { return Status::OK(); } + return Status::InvalidParam("FFTS transport is already setup with a different device"); + } + + auto status = AclStatus(aclrtSetDevice(deviceId), "aclrtSetDevice"); + if (status.Failure()) { return status; } + + status = AclStatus( + aclrtCreateStreamWithConfig(&stream_, 0, ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC), + "aclrtCreateStreamWithConfig"); + if (status.Failure()) { + stream_ = nullptr; + return status; + } + + deviceId_ = deviceId; + ready_ = true; + return Status::OK(); +} + +/** + * @brief Waits for an ACL event on the engine stream. + * + * @param event ACL event handle to wait for. nullptr is treated as no-op. + * @return Status::OK() on success, otherwise an error status. + */ +Status FftsEngine::WaitEvent(void* event) +{ + if (event == nullptr) { return Status::OK(); } + auto status = EnsureReady(); + if (status.Failure()) { return status; } + return AclStatus(aclrtStreamWaitEvent(stream_, static_cast(event)), + "aclrtStreamWaitEvent"); +} + +/** + * @brief Submits copy descriptors in chunks to the FFTS engine. + * + * @param copies Copy descriptor array to submit. + * @param count Number of descriptors in the array. + * @return Status::OK() on success, otherwise an error status. + */ +Status FftsEngine::Submit(const CopyDesc* copies, size_t count) +{ + auto status = EnsureReady(); + if (status.Failure()) { return status; } + if (count == 0) { return Status::OK(); } + if (copies == nullptr) { return Status::InvalidParam("FFTS copy list is null"); } + + size_t offset = 0; + while (offset < count) { + const auto chunk = std::min(kMaxReadyContexts, count - offset); + status = SubmitChunk(copies + offset, chunk); + if (status.Failure()) { return status; } + offset += chunk; + } + return Status::OK(); +} + +Status FftsEngine::Synchronize() +{ + auto status = EnsureReady(); + if (status.Failure()) { return status; } + status = AclStatus(aclrtSynchronizeStream(stream_), "aclrtSynchronizeStream"); + if (status.Success()) { ClearCompletedGraphs(); } + return status; +} + +Status FftsEngine::EnsureReady() const +{ + if (!ready_ || stream_ == nullptr) { return Status::Error("FFTS transport is not setup"); } + return Status::OK(); +} + +/** + * @brief Converts valid copy descriptors into SDMA contexts and submits them as one FFTS Plus task. + * + * @param copies Copy descriptors to submit. + * @param count Number of descriptors in this chunk. + * @return Status::OK() on success, otherwise an error status. + */ +Status FftsEngine::SubmitChunk(const CopyDesc* copies, size_t count) +{ + std::vector activeCopies; + + // Filter out no-op copies and validate the remaining descriptors. + activeCopies.reserve(count); + for (size_t i = 0; i < count; ++i) { + const auto& copy = copies[i]; + if (copy.size == 0 || copy.dst == copy.src) { continue; } + if (copy.dst == nullptr || copy.src == nullptr) { + return Status::InvalidParam("FFTS copy source and destination must not be null"); + } + // A single SDMA context stores the copy size as uint32_t. + if (copy.size > std::numeric_limits::max()) { + return Status::InvalidParam("FFTS copy size exceeds single SDMA context limit"); + } + activeCopies.push_back(copy); + } + + if (activeCopies.empty()) { return Status::OK(); } + + // Build one SDMA context for each valid copy descriptor. + auto contexts = std::make_shared(activeCopies.size()); + for (size_t i = 0; i < activeCopies.size(); ++i) { + FillSdmaContext((*contexts)[i], activeCopies[i]); + } + + // Build the FFTS Plus SQE using the number of generated contexts. + const auto contextCount = static_cast(contexts->size()); + auto sqe = BuildSqe(contextCount); + + // Describe the task to launch. The context buffer is stored in host memory. + rtFftsPlusTaskInfo_t task{}; + task.fftsPlusSqe = &sqe; + task.descBuf = contexts->data(); + task.descBufLen = sizeof(rtFftsPlusComCtx_t) * contexts->size(); + task.descAddrType = RT_FFTS_PLUS_CTX_DESC_ADDR_TYPE_HOST; + + // Launch the FFTS Plus task on the engine stream. + auto status = + RtStatus(rtFftsPlusTaskLaunchWithFlag(&task, stream_, 0), "rtFftsPlusTaskLaunchWithFlag"); + if (status.Failure()) { return status; } + + // Keep the context buffer alive until the stream is synchronized. + KeepAlive(std::move(contexts)); + return Status::OK(); +} + +void FftsEngine::KeepAlive(std::shared_ptr contexts) +{ + pendingContexts_.emplace_back(std::move(contexts)); +} + +void FftsEngine::ClearCompletedGraphs() { pendingContexts_.clear(); } + +} // namespace UC::Transport::Ffts diff --git a/ucm/transport/p2p/ffts/src/ffts_transport.cpp b/ucm/transport/p2p/ffts/src/ffts_transport.cpp new file mode 100644 index 000000000..c81983320 --- /dev/null +++ b/ucm/transport/p2p/ffts/src/ffts_transport.cpp @@ -0,0 +1,67 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "ffts_transport.h" + +#include + +#include "detail/ffts_engine.h" + +namespace UC::Transport::Ffts { + +class FftsTransport::Impl { +public: + Status Setup(int32_t deviceId) { return engine_.Setup(deviceId); } + Status WaitEvent(void* event) { return engine_.WaitEvent(event); } + Status Submit(const CopyDesc* copies, size_t count) { return engine_.Submit(copies, count); } + Status Synchronize() { return engine_.Synchronize(); } + +private: + FftsEngine engine_; +}; + +FftsTransport::FftsTransport() : impl_(std::make_unique()) {} + +FftsTransport::~FftsTransport() = default; + +FftsTransport::FftsTransport(FftsTransport&&) noexcept = default; + +FftsTransport& FftsTransport::operator=(FftsTransport&&) noexcept = default; + +Status FftsTransport::Setup(int32_t deviceId) { return impl_->Setup(deviceId); } + +Status FftsTransport::WaitEvent(void* event) { return impl_->WaitEvent(event); } + +Status FftsTransport::CopyAsync(void* dst, const void* src, size_t size) +{ + CopyDesc copy{dst, src, size}; + return Submit(©, 1); +} + +Status FftsTransport::Submit(const CopyDesc* copies, size_t count) { return impl_->Submit(copies, count); } + +Status FftsTransport::Submit(const std::vector& copies) { return Submit(copies.data(), copies.size()); } + +Status FftsTransport::Synchronize() { return impl_->Synchronize(); } + +} // namespace UC::Transport::Ffts diff --git a/ucm/transport/p2p/ffts/test/CMakeLists.txt b/ucm/transport/p2p/ffts/test/CMakeLists.txt new file mode 100644 index 000000000..e38d0ef43 --- /dev/null +++ b/ucm/transport/p2p/ffts/test/CMakeLists.txt @@ -0,0 +1,19 @@ +# Copyright (c) 2026 Huawei Technologies Co., Ltd. + +include(GoogleTest) + +add_executable(ucm_transport_ffts.test + ffts_transport_test.cpp +) + +target_link_libraries(ucm_transport_ffts.test PRIVATE + ucm_transport_ffts + gtest_main + gtest +) + +target_include_directories(ucm_transport_ffts.test SYSTEM PRIVATE + ${FFTS_ASCEND_INCLUDE_DIR} +) + +gtest_discover_tests(ucm_transport_ffts.test) diff --git a/ucm/transport/p2p/ffts/test/ffts_transport_test.cpp b/ucm/transport/p2p/ffts/test/ffts_transport_test.cpp new file mode 100644 index 000000000..0592c7c66 --- /dev/null +++ b/ucm/transport/p2p/ffts/test/ffts_transport_test.cpp @@ -0,0 +1,237 @@ +/** + * MIT License + * + * Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include +#include +#include + +#include +#include + +#include "ffts_transport.h" + +namespace { +constexpr int32_t kDeviceId = 0; + +class DeviceBuffer { +public: + explicit DeviceBuffer(size_t bytes) + { + if (aclrtMalloc(&ptr_, bytes, ACL_MEM_MALLOC_HUGE_FIRST) != ACL_SUCCESS) { ptr_ = nullptr; } + } + + ~DeviceBuffer() + { + if (ptr_ != nullptr) { (void)aclrtFree(ptr_); } + } + + DeviceBuffer(const DeviceBuffer&) = delete; + DeviceBuffer& operator=(const DeviceBuffer&) = delete; + + void* Get() const { return ptr_; } + +private: + void* ptr_{nullptr}; +}; + +// Test fixture for ACL-based transport tests. +// It initializes ACL and sets the target device before each test, +// then resets the device and finalizes ACL after each test. +class FftsTransportTest : public ::testing::Test { +protected: + void SetUp() override + { + auto ret = aclInit(nullptr); + if (ret != ACL_SUCCESS) { GTEST_SKIP() << "aclInit failed: " << ret; } + aclInited_ = true; + + ret = aclrtSetDevice(kDeviceId); + if (ret != ACL_SUCCESS) { GTEST_SKIP() << "aclrtSetDevice failed: " << ret; } + deviceSet_ = true; + } + + void TearDown() override + { + if (deviceSet_) { (void)aclrtResetDevice(kDeviceId); } + if (aclInited_) { (void)aclFinalize(); } + } + +private: + bool aclInited_{false}; + bool deviceSet_{false}; +}; + +void FillPattern(std::vector& data) +{ + for (size_t i = 0; i < data.size(); ++i) { data[i] = static_cast(i ^ 0x5A5A5A5AU); } +} + +void FillPipelinePattern(std::vector& data) +{ + for (size_t i = 0; i < data.size(); ++i) { + data[i] = static_cast((i * 1103515245ULL + 12345ULL) & 0xFFFFFFFFU); + } +} + +void* Offset(void* ptr, size_t bytes) +{ + return static_cast(static_cast(ptr) + bytes); +} + +std::vector BuildChunkCopies(void* dst, const void* src, size_t chunkBytes, + size_t chunks) +{ + std::vector copies; + copies.reserve(chunks); + for (size_t i = 0; i < chunks; ++i) { + const auto offset = chunkBytes * i; + copies.push_back({Offset(dst, offset), Offset(const_cast(src), offset), chunkBytes}); + } + return copies; +} +} // namespace + +TEST(FftsTransportInvalidTest, CopyBeforeSetupFails) +{ + UC::Transport::Ffts::FftsTransport transport; + EXPECT_TRUE(transport.CopyAsync(reinterpret_cast(0x1), reinterpret_cast(0x2), 1).Failure()); +} + +TEST_F(FftsTransportTest, SingleIoCopyMatchesSource) +{ + constexpr size_t bytes = 4096; + std::vector src(bytes / sizeof(uint32_t)); + std::vector dst(src.size(), 0); + FillPattern(src); + + DeviceBuffer devSrc(bytes); + DeviceBuffer devDst(bytes); + ASSERT_NE(devSrc.Get(), nullptr); + ASSERT_NE(devDst.Get(), nullptr); + + ASSERT_EQ(aclrtMemcpy(devSrc.Get(), bytes, src.data(), bytes, ACL_MEMCPY_HOST_TO_DEVICE), ACL_SUCCESS); + ASSERT_EQ(aclrtMemset(devDst.Get(), bytes, 0, bytes), ACL_SUCCESS); + + UC::Transport::Ffts::FftsTransport transport; + ASSERT_EQ(transport.Setup(kDeviceId), UC::Status::OK()); + ASSERT_EQ(transport.CopyAsync(devDst.Get(), devSrc.Get(), bytes), UC::Status::OK()); + ASSERT_EQ(transport.Synchronize(), UC::Status::OK()); + + ASSERT_EQ(aclrtMemcpy(dst.data(), bytes, devDst.Get(), bytes, ACL_MEMCPY_DEVICE_TO_HOST), ACL_SUCCESS); + EXPECT_EQ(dst, src); +} + +TEST_F(FftsTransportTest, SubmitCopiesMultipleDeviceRanges) +{ + constexpr size_t chunkBytes = 1024; + constexpr size_t chunks = 4; + constexpr size_t totalBytes = chunkBytes * chunks; + std::vector src(totalBytes / sizeof(uint32_t)); + std::vector dst(src.size(), 0); + FillPattern(src); + + DeviceBuffer devSrc(totalBytes); + DeviceBuffer devDst(totalBytes); + ASSERT_NE(devSrc.Get(), nullptr); + ASSERT_NE(devDst.Get(), nullptr); + + ASSERT_EQ(aclrtMemcpy(devSrc.Get(), totalBytes, src.data(), totalBytes, ACL_MEMCPY_HOST_TO_DEVICE), ACL_SUCCESS); + ASSERT_EQ(aclrtMemset(devDst.Get(), totalBytes, 0, totalBytes), ACL_SUCCESS); + + auto copies = BuildChunkCopies(devDst.Get(), devSrc.Get(), chunkBytes, chunks); + + UC::Transport::Ffts::FftsTransport transport; + ASSERT_EQ(transport.Setup(kDeviceId), UC::Status::OK()); + ASSERT_EQ(transport.Submit(copies), UC::Status::OK()); + ASSERT_EQ(transport.Synchronize(), UC::Status::OK()); + + ASSERT_EQ(aclrtMemcpy(dst.data(), totalBytes, devDst.Get(), totalBytes, ACL_MEMCPY_DEVICE_TO_HOST), ACL_SUCCESS); + EXPECT_EQ(dst, src); +} + +TEST_F(FftsTransportTest, SubmitSplitsLargeBatchAcrossReadyContextLimit) +{ + constexpr size_t chunkBytes = 64; + constexpr size_t chunks = 130; + constexpr size_t totalBytes = chunkBytes * chunks; + std::vector src(totalBytes / sizeof(uint32_t)); + std::vector dst(src.size(), 0); + FillPattern(src); + + DeviceBuffer devSrc(totalBytes); + DeviceBuffer devDst(totalBytes); + ASSERT_NE(devSrc.Get(), nullptr); + ASSERT_NE(devDst.Get(), nullptr); + + ASSERT_EQ(aclrtMemcpy(devSrc.Get(), totalBytes, src.data(), totalBytes, ACL_MEMCPY_HOST_TO_DEVICE), ACL_SUCCESS); + ASSERT_EQ(aclrtMemset(devDst.Get(), totalBytes, 0, totalBytes), ACL_SUCCESS); + + UC::Transport::Ffts::FftsTransport transport; + ASSERT_EQ(transport.Setup(kDeviceId), UC::Status::OK()); + ASSERT_EQ(transport.Submit(BuildChunkCopies(devDst.Get(), devSrc.Get(), chunkBytes, chunks)), UC::Status::OK()); + ASSERT_EQ(transport.Synchronize(), UC::Status::OK()); + + ASSERT_EQ(aclrtMemcpy(dst.data(), totalBytes, devDst.Get(), totalBytes, ACL_MEMCPY_DEVICE_TO_HOST), ACL_SUCCESS); + EXPECT_EQ(dst, src); +} + +TEST_F(FftsTransportTest, TwoStagePipelineCopyReachesDestination) +{ + constexpr size_t chunkBytes = 1024; + constexpr size_t chunks = 8; + constexpr size_t totalBytes = chunkBytes * chunks; + std::vector src(totalBytes / sizeof(uint32_t)); + std::vector dst(src.size(), 0); + FillPipelinePattern(src); + + DeviceBuffer devSrc(totalBytes); + DeviceBuffer devMid(totalBytes); + DeviceBuffer devDst(totalBytes); + ASSERT_NE(devSrc.Get(), nullptr); + ASSERT_NE(devMid.Get(), nullptr); + ASSERT_NE(devDst.Get(), nullptr); + + ASSERT_EQ(aclrtMemcpy(devSrc.Get(), totalBytes, src.data(), totalBytes, ACL_MEMCPY_HOST_TO_DEVICE), ACL_SUCCESS); + ASSERT_EQ(aclrtMemset(devMid.Get(), totalBytes, 0, totalBytes), ACL_SUCCESS); + ASSERT_EQ(aclrtMemset(devDst.Get(), totalBytes, 0, totalBytes), ACL_SUCCESS); + + UC::Transport::Ffts::FftsTransport transport; + ASSERT_EQ(transport.Setup(kDeviceId), UC::Status::OK()); + ASSERT_EQ(transport.Submit(BuildChunkCopies(devMid.Get(), devSrc.Get(), chunkBytes, chunks)), UC::Status::OK()); + ASSERT_EQ(transport.Submit(BuildChunkCopies(devDst.Get(), devMid.Get(), chunkBytes, chunks)), UC::Status::OK()); + ASSERT_EQ(transport.Synchronize(), UC::Status::OK()); + + ASSERT_EQ(aclrtMemcpy(dst.data(), totalBytes, devDst.Get(), totalBytes, ACL_MEMCPY_DEVICE_TO_HOST), ACL_SUCCESS); + EXPECT_EQ(dst, src); +} + +TEST_F(FftsTransportTest, InvalidAndNoOpCopies) +{ + UC::Transport::Ffts::FftsTransport transport; + ASSERT_EQ(transport.Setup(kDeviceId), UC::Status::OK()); + + EXPECT_EQ(transport.Submit(nullptr, 0), UC::Status::OK()); + EXPECT_TRUE(transport.Submit(nullptr, 1).Failure()); + EXPECT_EQ(transport.CopyAsync(nullptr, nullptr, 0), UC::Status::OK()); + EXPECT_TRUE(transport.CopyAsync(nullptr, reinterpret_cast(0x1), 1).Failure()); +}