Skip to content

Commit ff311fc

Browse files
zhztheplayerzhejiangxiaomai
authored andcommitted
Support single thread execution on MemoryPool::reclaim (#362)
* Remove high usage callback * Executor not found when resuming a single-thread-execution task during memory reclaiming procedure * Use memory arbitrator to manage pool shrinking as well as growing * Allow using customized memory arbitrator when creating memory manager instance * Deadlock if throwing exception from MemoryManager::addRootPool(...) * Assertion failed, expected "!spillFinalized_" if reclaiming HashBuild operator during finishing * UT Fix: MemoryManagerTest.Ctor * Assertion failed, expected "spillPartitionEntry.second->numFiles() > 0" after HashBuild::reclaim() was called on a HashBuild that has no inputs
1 parent 56d55d2 commit ff311fc

19 files changed

+188
-121
lines changed

velox/common/memory/Memory.cpp

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,7 @@ constexpr folly::StringPiece kDefaultLeafName("__default_leaf__");
2828
MemoryManager::MemoryManager(const Options& options)
2929
: capacity_{options.capacity},
3030
allocator_{options.allocator->shared_from_this()},
31-
arbitrator_(MemoryArbitrator::create(MemoryArbitrator::Config{
32-
.kind = options.arbitratorConfig.kind,
33-
.capacity = capacity_,
34-
.initMemoryPoolCapacity =
35-
options.arbitratorConfig.initMemoryPoolCapacity,
36-
.minMemoryPoolCapacityTransferSize =
37-
options.arbitratorConfig.minMemoryPoolCapacityTransferSize,
38-
.retryArbitrationFailure =
39-
options.arbitratorConfig.retryArbitrationFailure})),
31+
arbitrator_{options.arbitratorFactory()},
4032
alignment_(std::max(MemoryAllocator::kMinAlignment, options.alignment)),
4133
checkUsageLeak_(options.checkUsageLeak),
4234
debugEnabled_(options.debugEnabled),
@@ -59,6 +51,9 @@ MemoryManager::MemoryManager(const Options& options)
5951
.debugEnabled = options.debugEnabled})} {
6052
VELOX_CHECK_NOT_NULL(allocator_);
6153
VELOX_USER_CHECK_GE(capacity_, 0);
54+
if (arbitrator_ != nullptr) {
55+
VELOX_CHECK_EQ(arbitrator_->capacity(), capacity_);
56+
}
6257
MemoryAllocator::alignmentCheck(0, alignment_);
6358
defaultRoot_->grow(defaultRoot_->maxCapacity());
6459
const size_t numSharedPools =
@@ -119,19 +114,22 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
119114
options.checkUsageLeak = checkUsageLeak_;
120115
options.debugEnabled = debugEnabled_;
121116

122-
folly::SharedMutex::WriteHolder guard{mutex_};
123-
if (pools_.find(poolName) != pools_.end()) {
124-
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
117+
std::shared_ptr<MemoryPool> pool;
118+
{
119+
folly::SharedMutex::WriteHolder guard{mutex_};
120+
if (pools_.find(poolName) != pools_.end()) {
121+
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
122+
}
123+
pool = std::make_shared<MemoryPoolImpl>(
124+
this,
125+
poolName,
126+
MemoryPool::Kind::kAggregate,
127+
nullptr,
128+
std::move(reclaimer),
129+
poolDestructionCb_,
130+
options);
131+
pools_.emplace(poolName, pool);
125132
}
126-
auto pool = std::make_shared<MemoryPoolImpl>(
127-
this,
128-
poolName,
129-
MemoryPool::Kind::kAggregate,
130-
nullptr,
131-
std::move(reclaimer),
132-
poolDestructionCb_,
133-
options);
134-
pools_.emplace(poolName, pool);
135133
VELOX_CHECK_EQ(pool->capacity(), 0);
136134
if (arbitrator_ != nullptr) {
137135
arbitrator_->reserveMemory(pool.get(), capacity);
@@ -154,6 +152,14 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
154152
return defaultRoot_->addLeafChild(poolName, threadSafe, nullptr);
155153
}
156154

155+
uint64_t MemoryManager::shrinkPool(MemoryPool* pool, uint64_t decrementBytes) {
156+
VELOX_CHECK_NOT_NULL(pool);
157+
if (arbitrator_ == nullptr) {
158+
return pool->shrink(decrementBytes);
159+
}
160+
return arbitrator_->releaseMemory(pool, decrementBytes);
161+
}
162+
157163
bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
158164
VELOX_CHECK_NOT_NULL(pool);
159165
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
@@ -168,14 +174,16 @@ bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
168174

169175
void MemoryManager::dropPool(MemoryPool* pool) {
170176
VELOX_CHECK_NOT_NULL(pool);
171-
folly::SharedMutex::WriteHolder guard{mutex_};
172-
auto it = pools_.find(pool->name());
173-
if (it == pools_.end()) {
174-
VELOX_FAIL("The dropped memory pool {} not found", pool->name());
177+
{
178+
folly::SharedMutex::WriteHolder guard{mutex_};
179+
auto it = pools_.find(pool->name());
180+
if (it == pools_.end()) {
181+
VELOX_FAIL("The dropped memory pool {} not found", pool->name());
182+
}
183+
pools_.erase(it);
175184
}
176-
pools_.erase(it);
177185
if (arbitrator_ != nullptr) {
178-
arbitrator_->releaseMemory(pool);
186+
arbitrator_->releaseMemory(pool, 0);
179187
}
180188
}
181189

velox/common/memory/Memory.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ class IMemoryManager {
8686
/// Specifies the backing memory allocator.
8787
MemoryAllocator* allocator{MemoryAllocator::getInstance()};
8888

89-
/// Specifies the memory arbitration config.
90-
MemoryArbitrator::Config arbitratorConfig{};
89+
/// Specifies the memory arbitrator
90+
std::function<std::unique_ptr<MemoryArbitrator>()> arbitratorFactory{
91+
[]() { return MemoryArbitrator::create({}); }};
9192
};
9293

9394
virtual ~IMemoryManager() = default;
@@ -118,6 +119,10 @@ class IMemoryManager {
118119
const std::string& name = "",
119120
bool threadSafe = true) = 0;
120121

122+
/// Invoked to shrink a memory pool's free capacity with up to
123+
/// 'decrementBytes'.
124+
virtual uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes) = 0;
125+
121126
/// Invoked to grows a memory pool's free capacity with at least
122127
/// 'incrementBytes'. The function returns true on success, otherwise false.
123128
virtual bool growPool(MemoryPool* pool, uint64_t incrementBytes) = 0;
@@ -196,6 +201,7 @@ class MemoryManager final : public IMemoryManager {
196201
const std::string& name = "",
197202
bool threadSafe = true) final;
198203

204+
uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes) final;
199205
bool growPool(MemoryPool* pool, uint64_t incrementBytes) final;
200206

201207
MemoryPool& deprecatedSharedLeafPool() final;

velox/common/memory/MemoryArbitrator.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ std::unique_ptr<MemoryArbitrator> MemoryArbitrator::create(
5050
}
5151
}
5252

53+
uint64_t MemoryArbitrator::capacity() {
54+
return capacity_;
55+
}
56+
5357
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
5458
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
5559
}

velox/common/memory/MemoryArbitrator.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ class MemoryArbitrator {
105105
/// the memory arbitration on demand when actual memory allocation happens.
106106
virtual void reserveMemory(MemoryPool* pool, uint64_t bytes) = 0;
107107

108-
/// Invoked by the memory manager to return back all the reserved memory
109-
/// capacity of a destroying memory pool.
110-
virtual void releaseMemory(MemoryPool* pool) = 0;
108+
/// Invoked by the memory manager to return back the specified amount of
109+
/// reserved memory capacity of a destroying memory pool. If 0 is specified,
110+
/// release all reserve memory. Returns the actually released amount of bytes.
111+
virtual uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) = 0;
111112

112113
/// Invoked by the memory manager to grow a memory pool's capacity.
113114
/// 'pool' is the memory pool to request to grow. 'candidates' is a list
@@ -126,6 +127,7 @@ class MemoryArbitrator {
126127
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
127128
uint64_t targetBytes) = 0;
128129

130+
uint64_t capacity();
129131
/// The internal execution stats of the memory arbitrator.
130132
struct Stats {
131133
/// The number of arbitration requests.

velox/common/memory/MemoryPool.cpp

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -577,18 +577,6 @@ int64_t MemoryPoolImpl::capacity() const {
577577
return capacity_;
578578
}
579579

580-
bool MemoryPoolImpl::highUsage() {
581-
if (parent_ != nullptr) {
582-
return parent_->highUsage();
583-
}
584-
585-
if (highUsageCallback_ != nullptr) {
586-
return highUsageCallback_(*this);
587-
}
588-
589-
return false;
590-
}
591-
592580
std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
593581
std::shared_ptr<MemoryPool> parent,
594582
const std::string& name,
@@ -743,6 +731,16 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
743731
capacityToString(manager_->capacity()))));
744732
}
745733

734+
uint64_t MemoryPoolImpl::shrinkManaged(
735+
MemoryPool* requestor,
736+
uint64_t targetBytes) {
737+
if (parent_ != nullptr) {
738+
return parent_->shrinkManaged(requestor, targetBytes);
739+
}
740+
VELOX_CHECK_NULL(parent_);
741+
return manager_->shrinkPool(requestor, targetBytes);
742+
};
743+
746744
bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
747745
std::lock_guard<std::mutex> l(mutex_);
748746
return maybeIncrementReservationLocked(size);

velox/common/memory/MemoryPool.h

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ constexpr int64_t kMaxMemory = std::numeric_limits<int64_t>::max();
105105
/// be merged into memory pool object later.
106106
class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
107107
public:
108-
using HighUsageCallBack = std::function<bool(MemoryPool& Pool)>;
109108
/// Defines the kinds of a memory pool.
110109
enum class Kind {
111110
/// The leaf memory pool is used for memory allocation. User can allocate
@@ -312,14 +311,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
312311
/// 'capacity()' is fixed and set to 'maxCapacity()' on creation.
313312
virtual int64_t capacity() const = 0;
314313

315-
virtual bool highUsage() = 0;
316-
317-
virtual void setHighUsageCallback(HighUsageCallBack func) {
318-
VELOX_CHECK_NULL(
319-
parent_, "Only root memory pool allows to set high-usage callback");
320-
highUsageCallback_ = func;
321-
}
322-
323314
/// Returns the currently used memory in bytes of this memory pool.
324315
virtual int64_t currentBytes() const = 0;
325316

@@ -358,6 +349,12 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
358349
/// without actually freeing the used memory.
359350
virtual uint64_t freeBytes() const = 0;
360351

352+
/// Try shrinking up to the specified amount of free memory via memory
353+
/// manager.
354+
virtual uint64_t shrinkManaged(
355+
MemoryPool* requestor,
356+
uint64_t targetBytes = 0) = 0;
357+
361358
/// Invoked to free up to the specified amount of free memory by reducing
362359
/// this memory pool's capacity without actually freeing any used memory. The
363360
/// function returns the actually freed memory capacity in bytes. If
@@ -522,8 +519,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
522519
// visitChildren() cost as we don't have to upgrade the weak pointer and copy
523520
// out the upgraded shared pointers.git
524521
std::unordered_map<std::string, std::weak_ptr<MemoryPool>> children_;
525-
526-
HighUsageCallBack highUsageCallback_{};
527522
};
528523

529524
std::ostream& operator<<(std::ostream& out, MemoryPool::Kind kind);
@@ -571,8 +566,6 @@ class MemoryPoolImpl : public MemoryPool {
571566

572567
int64_t capacity() const override;
573568

574-
bool highUsage() override;
575-
576569
int64_t currentBytes() const override {
577570
std::lock_guard<std::mutex> l(mutex_);
578571
return currentBytesLocked();
@@ -611,6 +604,8 @@ class MemoryPoolImpl : public MemoryPool {
611604

612605
uint64_t reclaim(uint64_t targetBytes) override;
613606

607+
uint64_t shrinkManaged(MemoryPool* requestor, uint64_t targetBytes) override;
608+
614609
uint64_t shrink(uint64_t targetBytes = 0) override;
615610

616611
uint64_t grow(uint64_t bytes) noexcept override;

velox/common/memory/SharedArbitrator.cpp

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) {
124124
pool->grow(reserveBytes);
125125
}
126126

127-
void SharedArbitrator::releaseMemory(MemoryPool* pool) {
127+
uint64_t SharedArbitrator::releaseMemory(MemoryPool* pool, uint64_t bytes) {
128128
std::lock_guard<std::mutex> l(mutex_);
129-
const uint64_t freedBytes = pool->shrink(0);
129+
const uint64_t freedBytes = pool->shrink(bytes);
130130
incrementFreeCapacityLocked(freedBytes);
131+
return freedBytes;
131132
}
132133

133134
std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats(
@@ -246,54 +247,59 @@ bool SharedArbitrator::arbitrateMemory(
246247
uint64_t targetBytes) {
247248
VELOX_CHECK(!requestor->aborted());
248249

249-
const uint64_t growTarget = std::min(
250-
maxGrowBytes(*requestor),
251-
std::max(minMemoryPoolCapacityTransferSize_, targetBytes));
252-
uint64_t freedBytes = decrementFreeCapacity(growTarget);
253-
if (freedBytes >= targetBytes) {
254-
requestor->grow(freedBytes);
255-
return true;
256-
}
257-
VELOX_CHECK_LT(freedBytes, growTarget);
250+
const uint64_t growTarget =
251+
std::max(minMemoryPoolCapacityTransferSize_, targetBytes);
252+
uint64_t unusedFreedBytes = decrementFreeCapacity(growTarget);
258253

259254
auto freeGuard = folly::makeGuard([&]() {
260255
// Returns the unused freed memory capacity back to the arbitrator.
261-
if (freedBytes > 0) {
262-
incrementFreeCapacity(freedBytes);
256+
if (unusedFreedBytes > 0) {
257+
incrementFreeCapacity(unusedFreedBytes);
263258
}
264259
});
265260

266-
freedBytes +=
267-
reclaimFreeMemoryFromCandidates(candidates, growTarget - freedBytes);
268-
if (freedBytes >= targetBytes) {
269-
const uint64_t bytesToGrow = std::min(growTarget, freedBytes);
270-
requestor->grow(bytesToGrow);
271-
freedBytes -= bytesToGrow;
261+
if (unusedFreedBytes >= targetBytes) {
262+
requestor->grow(unusedFreedBytes);
263+
unusedFreedBytes = 0;
264+
return true;
265+
}
266+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
267+
268+
reclaimFreeMemoryFromCandidates(candidates, growTarget - unusedFreedBytes);
269+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
270+
if (unusedFreedBytes >= targetBytes) {
271+
requestor->grow(unusedFreedBytes);
272+
unusedFreedBytes = 0;
272273
return true;
273274
}
274275

275-
VELOX_CHECK_LT(freedBytes, growTarget);
276-
freedBytes += reclaimUsedMemoryFromCandidates(
277-
requestor, candidates, growTarget - freedBytes);
276+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
277+
reclaimUsedMemoryFromCandidates(
278+
requestor, candidates, growTarget - unusedFreedBytes);
279+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
278280
if (requestor->aborted()) {
279281
++numFailures_;
280282
VELOX_MEM_POOL_ABORTED(requestor);
281283
}
282284

283285
VELOX_CHECK(!requestor->aborted());
284286

285-
if (freedBytes < targetBytes) {
287+
if (unusedFreedBytes < targetBytes) {
286288
VELOX_MEM_LOG(WARNING)
287289
<< "Failed to arbitrate sufficient memory for memory pool "
288290
<< requestor->name() << ", request " << succinctBytes(targetBytes)
289-
<< ", only " << succinctBytes(freedBytes)
291+
<< ", only " << succinctBytes(unusedFreedBytes)
290292
<< " has been freed, Arbitrator state: " << toString();
291293
return false;
292294
}
293295

294-
const uint64_t bytesToGrow = std::min(freedBytes, growTarget);
295-
requestor->grow(bytesToGrow);
296-
freedBytes -= bytesToGrow;
296+
if (unusedFreedBytes > growTarget) {
297+
requestor->grow(growTarget);
298+
unusedFreedBytes -= growTarget;
299+
return true;
300+
}
301+
requestor->grow(unusedFreedBytes);
302+
unusedFreedBytes = 0;
297303
return true;
298304
}
299305

@@ -314,7 +320,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
314320
if (bytesToShrink <= 0) {
315321
break;
316322
}
317-
freedBytes += candidate.pool->shrink(bytesToShrink);
323+
uint64_t shrunk = candidate.pool->shrink(bytesToShrink);
324+
incrementFreeCapacity(shrunk);
325+
freedBytes += shrunk;
318326
if (freedBytes >= targetBytes) {
319327
break;
320328
}
@@ -354,6 +362,7 @@ uint64_t SharedArbitrator::reclaim(
354362
uint64_t freedBytes{0};
355363
try {
356364
freedBytes = pool->shrink(targetBytes);
365+
incrementFreeCapacity(freedBytes);
357366
if (freedBytes < targetBytes) {
358367
pool->reclaim(targetBytes - freedBytes);
359368
}
@@ -363,7 +372,7 @@ uint64_t SharedArbitrator::reclaim(
363372
abort(pool);
364373
// Free up all the free capacity from the aborted pool as the associated
365374
// query has failed at this point.
366-
pool->shrink();
375+
incrementFreeCapacity(pool->shrink());
367376
}
368377
const uint64_t newCapacity = pool->capacity();
369378
VELOX_CHECK_GE(oldCapacity, newCapacity);

velox/common/memory/SharedArbitrator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class SharedArbitrator : public MemoryArbitrator {
3333

3434
void reserveMemory(MemoryPool* pool, uint64_t /*unused*/) final;
3535

36-
void releaseMemory(MemoryPool* pool) final;
36+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) final;
3737

3838
bool growMemory(
3939
MemoryPool* pool,

0 commit comments

Comments
 (0)