Skip to content

Commit e174b46

Browse files
Ke Wangmeta-codesync[bot]
authored andcommitted
Back out "perf: Do not request data size if there is only a single source as in persistent shuffle" (#15406)
Summary: Pull Request resolved: #15406 Original commit changeset: 4424ea18f7d4 Original Phabricator Diff: D85647156 Reviewed By: amitkdutta, xiaoxmeng Differential Revision: D86272441 fbshipit-source-id: 01aa02cf6e4354e69732f552e0bba45851eb8950
1 parent f25af0b commit e174b46

File tree

3 files changed

+5
-66
lines changed

3 files changed

+5
-66
lines changed

velox/exec/ExchangeClient.cpp

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ void ExchangeClient::request(std::vector<RequestSpec>&& requestSpecs) {
185185
RECORD_METRIC_VALUE(kMetricExchangeDataCount);
186186
}
187187

188-
bool pauseCurrentSource{false};
188+
bool pauseCurrentSource = false;
189189
std::vector<RequestSpec> requestSpecs;
190190
std::shared_ptr<ExchangeSource> currentSource = spec.source;
191191
{
@@ -232,9 +232,6 @@ ExchangeClient::pickSourcesToRequestLocked() {
232232
if (closed_) {
233233
return {};
234234
}
235-
if (sources_.size() == 1) {
236-
return pickupSingleSourceToRequestLocked();
237-
}
238235
std::vector<RequestSpec> requestSpecs;
239236
while (!emptySources_.empty()) {
240237
auto& source = emptySources_.front();
@@ -286,42 +283,6 @@ ExchangeClient::pickSourcesToRequestLocked() {
286283
return requestSpecs;
287284
}
288285

289-
std::vector<ExchangeClient::RequestSpec>
290-
ExchangeClient::pickupSingleSourceToRequestLocked() {
291-
VELOX_CHECK_EQ(sources_.size(), 1);
292-
VELOX_CHECK(!closed_);
293-
if (emptySources_.empty() && producingSources_.empty()) {
294-
return {};
295-
}
296-
297-
VELOX_CHECK_EQ(totalPendingBytes_, 0);
298-
VELOX_CHECK_LE(!!emptySources_.empty() + !!producingSources_.empty(), 1);
299-
const auto requestBytes = maxQueuedBytes_ - queue_->totalBytes();
300-
if (requestBytes <= 0) {
301-
return {};
302-
}
303-
std::vector<RequestSpec> requestSpecs;
304-
SCOPE_EXIT {
305-
totalPendingBytes_ += requestBytes;
306-
};
307-
if (!emptySources_.empty()) {
308-
VELOX_CHECK_EQ(emptySources_.size(), 1);
309-
auto& source = emptySources_.front();
310-
VELOX_CHECK(source->shouldRequestLocked());
311-
requestSpecs.push_back({std::move(source), requestBytes});
312-
emptySources_.pop();
313-
return requestSpecs;
314-
}
315-
316-
VELOX_CHECK_EQ(producingSources_.size(), 1);
317-
auto& source = producingSources_.front().source;
318-
VELOX_CHECK(source->shouldRequestLocked());
319-
VELOX_CHECK(!producingSources_.front().remainingBytes.empty());
320-
requestSpecs.push_back({std::move(source), requestBytes});
321-
producingSources_.pop();
322-
return requestSpecs;
323-
}
324-
325286
ExchangeClient::~ExchangeClient() {
326287
close();
327288
}

velox/exec/ExchangeClient.h

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -132,27 +132,8 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
132132
std::vector<int64_t> remainingBytes;
133133
};
134134

135-
// Selects exchange sources to request data from based on available queue
136-
// capacity. Handles multiple sources by first requesting data sizes from all
137-
// empty sources, then requesting actual data from producing sources based on
138-
// their remaining bytes and available capacity. May initiate out-of-band
139-
// transfers for large pages that exceed capacity to avoid deadlock
140-
// situations. For single source case, delegates to
141-
// pickupSingleSourceToRequestLocked which sets max request bytes based on
142-
// available queue space instead of reported remaining bytes from exchange
143-
// sources.
144135
std::vector<RequestSpec> pickSourcesToRequestLocked();
145136

146-
// Specialized single-source request picker for single-source exchange
147-
// clients. Sets the max request bytes based on available space in the queue
148-
// rather than the reported remaining bytes from exchange sources. The reason
149-
// is that single source has no other alternative so just fetch as much as
150-
// possible from that source. Returns a request spec for the single source
151-
// when there is available capacity in the queue and no pending requests. If
152-
// capacity is unavailable or requests are already pending, returns empty
153-
// vector.
154-
std::vector<RequestSpec> pickupSingleSourceToRequestLocked();
155-
156137
void request(std::vector<RequestSpec>&& requestSpecs);
157138

158139
// Handy for ad-hoc logging.

velox/exec/tests/ExchangeClientTest.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ TEST_P(ExchangeClientTest, acknowledge) {
589589
SCOPED_TESTVALUE_SET(
590590
"facebook::velox::exec::test::LocalExchangeSource::pause",
591591
std::function<void(void*)>(([&numberOfAcknowledgeRequests](void*) {
592-
++numberOfAcknowledgeRequests;
592+
numberOfAcknowledgeRequests++;
593593
})));
594594

595595
{
@@ -665,7 +665,7 @@ TEST_P(ExchangeClientTest, acknowledge) {
665665
int attempts = 100;
666666
bool outputBuffersEmpty;
667667
while (attempts > 0) {
668-
--attempts;
668+
attempts--;
669669
outputBuffersEmpty = bufferManager_->getUtilization(sourceTaskId) == 0;
670670
if (outputBuffersEmpty) {
671671
break;
@@ -677,7 +677,7 @@ TEST_P(ExchangeClientTest, acknowledge) {
677677
// The output buffer is empty now
678678
// Explicit acknowledge is not necessary as a blocking getDataSize is sent
679679
// right away
680-
EXPECT_EQ(numberOfAcknowledgeRequests, 3);
680+
EXPECT_EQ(numberOfAcknowledgeRequests, 2);
681681
#endif
682682
}
683683

@@ -985,10 +985,7 @@ VELOX_INSTANTIATE_TEST_SUITE_P(
985985
testing::Values(
986986
VectorSerde::Kind::kPresto,
987987
VectorSerde::Kind::kCompactRow,
988-
VectorSerde::Kind::kUnsafeRow),
989-
[](const testing::TestParamInfo<VectorSerde::Kind>& info) {
990-
return fmt::format("{}", info.param);
991-
});
988+
VectorSerde::Kind::kUnsafeRow));
992989

993990
} // namespace
994991
} // namespace facebook::velox::exec

0 commit comments

Comments
 (0)