Skip to content

Commit e078023

Browse files
peterenescumeta-codesync[bot]
authored andcommitted
feat: Add fuzzer reference runner VeloxQueryRunner (#14747)
Summary: Pull Request resolved: #14747 Add a new fuzzer reference query runner: VeloxQueryRunner. This new runner will support plan serialization and communication to thrift service LocalRunnerService and accept serialized batch results. Extending the expression fuzzer to support this new reference query runner will allow Velox-Velox expression evaluation comparison. Reviewed By: kagamiori Differential Revision: D81618821 fbshipit-source-id: a00cdaa9199863da90c00d416cf49bc11c8dcec5
1 parent 6df585d commit e078023

File tree

4 files changed

+332
-1
lines changed

4 files changed

+332
-1
lines changed

velox/exec/fuzzer/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ add_library(
2727
PrestoSql.cpp
2828
)
2929

30+
# TODO Add VeloxQueryRunner to velox_fuzzer_util to support in
31+
# ExpressionFuzzerTest. More information can be found here:
32+
# https://github.com/facebookincubator/velox/issues/15414
33+
if(VELOX_ENABLE_REMOTE_FUNCTIONS)
34+
target_sources(velox_fuzzer_util PRIVATE VeloxQueryRunner.cpp)
35+
target_link_libraries(velox_fuzzer_util FBThrift::thriftcpp2)
36+
endif()
37+
3038
target_link_libraries(
3139
velox_fuzzer_util
3240
velox_common_fuzzer_util

velox/exec/fuzzer/ReferenceQueryRunner.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class ReferenceQueryRunner {
5555
enum class RunnerType {
5656
kPrestoQueryRunner,
5757
kDuckQueryRunner,
58-
kSparkQueryRunner
58+
kSparkQueryRunner,
59+
kVeloxQueryRunner
5960
};
6061

6162
// @param aggregatePool Used to allocate memory needed for vectors produced
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/exec/fuzzer/VeloxQueryRunner.h"
18+
19+
#include <cpr/cpr.h> // @manual
20+
#include <fmt/format.h>
21+
#include <folly/Uri.h>
22+
#include <folly/json.h>
23+
#include <re2/re2.h>
24+
#include <thrift/lib/cpp2/async/RocketClientChannel.h>
25+
#include "velox/core/PlanNode.h"
26+
#include "velox/exec/fuzzer/if/gen-cpp2/LocalRunnerService.h"
27+
#include "velox/exec/tests/utils/QueryAssertions.h"
28+
#include "velox/functions/prestosql/types/BingTileType.h"
29+
#include "velox/functions/prestosql/types/GeometryType.h"
30+
#include "velox/functions/prestosql/types/IPAddressType.h"
31+
#include "velox/functions/prestosql/types/IPPrefixType.h"
32+
#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h"
33+
#include "velox/functions/prestosql/types/UuidType.h"
34+
#include "velox/serializers/PrestoSerializer.h"
35+
#include "velox/type/parser/TypeParser.h"
36+
37+
using namespace facebook::velox::runner;
38+
39+
namespace facebook::velox::exec::test {
40+
41+
namespace {
42+
43+
RowTypePtr parseBatchRowType(Batch batch) {
44+
std::vector<std::string> names;
45+
std::vector<TypePtr> types;
46+
47+
for (const auto& name : *batch.columnNames()) {
48+
names.push_back(name);
49+
}
50+
51+
// Clean up type strings and format according to TypeParser::parseType
52+
// expectation. Input types are serialized by type()->asRow() for Thrift
53+
// struct by LocalRunnerService in the format of SOME_COMPLEX_TYPE<TYPE_1,
54+
// TYPE_2, etc.> (note the '<' and '>'). And in the case of complex type ROW,
55+
// types follow column name and a semicolon: as an example, ROW<f0:TYPE_1,
56+
// f1:TYPE_2, etc.>. We need to change these particular character choices to
57+
// paranthesis (in the case of the angled brackets) and spaces (in the case of
58+
// the semicolon). As an example,
59+
// MAP<VARCHAR,TIMESTAMP> -> MAP(VARCHAR, TIMESTAMP)
60+
// ROW<f0:TYPE_1, f1:TYPE_2, etc.> -> ROW(f0 TYPE_1, f1 TYPE_2, etc.)
61+
// as expected by TypeParser::parseType. Without this cleanup, parse will
62+
// crash and fuzzer will fail.
63+
for (const auto& typeString : *batch.columnTypes()) {
64+
auto parsedTypeString = typeString;
65+
std::replace(parsedTypeString.begin(), parsedTypeString.end(), '<', '(');
66+
std::replace(parsedTypeString.begin(), parsedTypeString.end(), '>', ')');
67+
std::replace(parsedTypeString.begin(), parsedTypeString.end(), ':', ' ');
68+
types.push_back(parseType(parsedTypeString));
69+
}
70+
71+
return ROW(std::move(names), std::move(types));
72+
}
73+
74+
std::vector<RowVectorPtr> deserializeBatches(
75+
const std::vector<Batch>& resultBatches,
76+
memory::MemoryPool* pool) {
77+
std::vector<RowVectorPtr> queryResults;
78+
79+
auto serde = std::make_unique<serializer::presto::PrestoVectorSerde>();
80+
serializer::presto::PrestoVectorSerde::PrestoOptions options;
81+
82+
for (const auto& batch : resultBatches) {
83+
VELOX_CHECK(
84+
apache::thrift::is_non_optional_field_set_manually_or_by_serializer(
85+
batch.serializedData_ref()));
86+
VELOX_CHECK(!batch.serializedData()->empty());
87+
88+
// Deserialize binary data.
89+
const auto& serializedData = *batch.serializedData();
90+
ByteRange byteRange{
91+
reinterpret_cast<uint8_t*>(const_cast<char*>(serializedData.data())),
92+
static_cast<int32_t>(serializedData.length()),
93+
0};
94+
auto byteStream = std::make_unique<BufferInputStream>(
95+
std::vector<ByteRange>{{byteRange}});
96+
97+
RowVectorPtr rowVector;
98+
serde->deserialize(
99+
byteStream.get(),
100+
pool,
101+
parseBatchRowType(batch),
102+
&rowVector,
103+
0,
104+
&options);
105+
106+
VELOX_CHECK_NOT_NULL(rowVector);
107+
queryResults.push_back(rowVector);
108+
}
109+
110+
return queryResults;
111+
}
112+
113+
std::shared_ptr<apache::thrift::Client<LocalRunnerService>> createThriftClient(
114+
const std::string& host,
115+
int port,
116+
std::chrono::milliseconds timeout,
117+
folly::EventBase* evb) {
118+
folly::SocketAddress addr(host, port);
119+
auto socket = folly::AsyncSocket::newSocket(evb, addr, timeout.count());
120+
auto channel =
121+
apache::thrift::RocketClientChannel::newChannel(std::move(socket));
122+
return std::make_shared<apache::thrift::Client<LocalRunnerService>>(
123+
std::move(channel));
124+
}
125+
} // namespace
126+
127+
VeloxQueryRunner::VeloxQueryRunner(
128+
memory::MemoryPool* aggregatePool,
129+
std::string serviceUri,
130+
std::chrono::milliseconds timeout)
131+
: ReferenceQueryRunner(aggregatePool),
132+
serviceUri_(std::move(serviceUri)),
133+
timeout_(timeout) {
134+
pool_ = aggregatePool->addLeafChild("leaf");
135+
136+
folly::Uri uri(serviceUri_);
137+
thriftHost_ = uri.host();
138+
thriftPort_ = uri.port();
139+
}
140+
141+
const std::vector<TypePtr>& VeloxQueryRunner::supportedScalarTypes() const {
142+
static const std::vector<TypePtr> kScalarTypes{
143+
BOOLEAN(),
144+
TINYINT(),
145+
SMALLINT(),
146+
INTEGER(),
147+
BIGINT(),
148+
REAL(),
149+
DOUBLE(),
150+
VARCHAR(),
151+
VARBINARY(),
152+
TIMESTAMP(),
153+
TIMESTAMP_WITH_TIME_ZONE(),
154+
IPADDRESS(),
155+
UUID(),
156+
// https://github.com/facebookincubator/velox/issues/15379 (IPPREFIX)
157+
// https://github.com/facebookincubator/velox/issues/15380 (Non-orderable
158+
// custom types such as HYPERLOGLOG, JSON, BINGTILE, GEOMETRY, etc.)
159+
};
160+
return kScalarTypes;
161+
}
162+
163+
const std::unordered_map<std::string, DataSpec>&
164+
VeloxQueryRunner::aggregationFunctionDataSpecs() const {
165+
static const std::unordered_map<std::string, DataSpec>
166+
kAggregationFunctionDataSpecs{};
167+
return kAggregationFunctionDataSpecs;
168+
}
169+
170+
std::optional<std::string> VeloxQueryRunner::toSql(
171+
const core::PlanNodePtr& /*plan*/) {
172+
// We don't need to convert to SQL for VeloxQueryRunner
173+
// as we're sending the serialized plan directly
174+
VELOX_FAIL("VeloxQueryRunner does not support SQL conversion");
175+
}
176+
177+
bool VeloxQueryRunner::isConstantExprSupported(
178+
const core::TypedExprPtr& /*expr*/) {
179+
// Since we're using Velox directly, we support all constant expressions
180+
return true;
181+
}
182+
183+
bool VeloxQueryRunner::isSupported(
184+
const exec::FunctionSignature& /*signature*/) {
185+
// Since we're using Velox directly, we support all function signatures
186+
return true;
187+
}
188+
189+
std::vector<RowVectorPtr> VeloxQueryRunner::execute(
190+
const std::string& /*sql*/) {
191+
VELOX_FAIL("VeloxQueryRunner does not support SQL execution");
192+
}
193+
194+
std::vector<RowVectorPtr> VeloxQueryRunner::execute(
195+
const std::string& /*sql*/,
196+
const std::string& /*sessionProperty*/) {
197+
VELOX_FAIL("VeloxQueryRunner does not support SQL execution");
198+
}
199+
200+
std::pair<
201+
std::optional<std::multiset<std::vector<velox::variant>>>,
202+
ReferenceQueryErrorCode>
203+
VeloxQueryRunner::execute(const core::PlanNodePtr& plan) {
204+
auto serializedPlan = serializePlan(plan);
205+
auto queryId = fmt::format("velox_local_query_runner_{}", rand());
206+
207+
auto client =
208+
createThriftClient(thriftHost_, thriftPort_, timeout_, &eventBase_);
209+
210+
// Create the request
211+
ExecutePlanRequest request;
212+
request.serializedPlan() = serializedPlan;
213+
request.queryId() = queryId;
214+
request.numWorkers() = 4; // Default value
215+
request.numDrivers() = 2; // Default value
216+
217+
// Send the request
218+
ExecutePlanResponse response;
219+
try {
220+
client->sync_execute(response, request);
221+
} catch (const std::exception& e) {
222+
VELOX_FAIL("Thrift request failed: {}", e.what());
223+
}
224+
225+
// Handle the response
226+
if (*response.success()) {
227+
LOG(INFO) << "Reference eval succeeded.";
228+
return std::make_pair(
229+
exec::test::materialize(
230+
deserializeBatches(*response.results(), pool_.get())),
231+
ReferenceQueryErrorCode::kSuccess);
232+
} else {
233+
LOG(INFO) << "Reference eval failed.";
234+
return std::make_pair(
235+
std::nullopt, ReferenceQueryErrorCode::kReferenceQueryFail);
236+
}
237+
}
238+
239+
std::string VeloxQueryRunner::serializePlan(const core::PlanNodePtr& plan) {
240+
// Serialize the plan to JSON
241+
folly::dynamic serializedPlan = plan->serialize();
242+
return folly::toJson(serializedPlan);
243+
}
244+
245+
} // namespace facebook::velox::exec::test
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <folly/io/async/EventBaseThread.h>
19+
#include "velox/exec/fuzzer/ReferenceQueryRunner.h"
20+
#include "velox/vector/ComplexVector.h"
21+
22+
namespace facebook::velox::exec::test {
23+
24+
class VeloxQueryRunner : public ReferenceQueryRunner {
25+
public:
26+
/// @param serviceUri Thrift URI of the LocalRunnerService.
27+
/// @param timeout Timeout in milliseconds of a request.
28+
VeloxQueryRunner(
29+
memory::MemoryPool* aggregatePool,
30+
std::string serviceUri,
31+
std::chrono::milliseconds timeout);
32+
33+
RunnerType runnerType() const override {
34+
return RunnerType::kVeloxQueryRunner;
35+
}
36+
37+
const std::vector<TypePtr>& supportedScalarTypes() const override;
38+
39+
const std::unordered_map<std::string, DataSpec>&
40+
aggregationFunctionDataSpecs() const override;
41+
42+
std::optional<std::string> toSql(const core::PlanNodePtr& plan) override;
43+
44+
bool isConstantExprSupported(const core::TypedExprPtr& expr) override;
45+
46+
bool isSupported(const exec::FunctionSignature& signature) override;
47+
48+
std::pair<
49+
std::optional<std::multiset<std::vector<velox::variant>>>,
50+
ReferenceQueryErrorCode>
51+
execute(const core::PlanNodePtr& plan) override;
52+
53+
bool supportsVeloxVectorResults() const override {
54+
return true;
55+
}
56+
57+
std::vector<RowVectorPtr> execute(const std::string& sql) override;
58+
59+
std::vector<RowVectorPtr> execute(
60+
const std::string& sql,
61+
const std::string& sessionProperty) override;
62+
63+
private:
64+
// Serializes the plan node to JSON string
65+
std::string serializePlan(const core::PlanNodePtr& plan);
66+
67+
std::string serviceUri_;
68+
std::chrono::milliseconds timeout_;
69+
folly::EventBase eventBase_;
70+
std::shared_ptr<memory::MemoryPool> pool_;
71+
72+
// Thrift-specific members
73+
std::string thriftHost_;
74+
int thriftPort_{9091};
75+
};
76+
77+
} // namespace facebook::velox::exec::test

0 commit comments

Comments
 (0)