Skip to content

Commit 8fd8568

Browse files
committed
Revert "task: stop wrapping tasks with unique_ptr"
This reverts commit 33406cf. It introduces memory leaks: Direct leak of 24 byte(s) in 1 object(s) allocated from: #0 0x7fb773b389d7 in operator new(unsigned long) (/lib64/libasan.so.5+0x10f9d7) ceph#1 0x108f0d4 in seastar::reactor::poller::~poller() ../src/core/reactor.cc:2879 ceph#2 0x11c1e59 in std::experimental::fundamentals_v1::_Optional_base<seastar::reactor::poller, true>::~_Optional_base() /usr/include/c++/9/experimental/optional:288 ceph#3 0x118f2d7 in std::experimental::fundamentals_v1::optional<seastar::reactor::poller>::~optional() /usr/include/c++/9/experimental/optional:491 ceph#4 0x108c5a5 in seastar::reactor::run() ../src/core/reactor.cc:2587 ceph#5 0xf1a822 in seastar::app_template::run_deprecated(int, char**, std::function<void ()>&&) ../src/core/app-template.cc:199 ceph#6 0xf1885d in seastar::app_template::run(int, char**, std::function<seastar::future<int> ()>&&) ../src/core/app-template.cc:115 ceph#7 0xeb2735 in operator() ../src/testing/test_runner.cc:72 ceph#8 0xebb342 in _M_invoke /usr/include/c++/9/bits/std_function.h:300 ceph#9 0xf3d8b0 in std::function<void ()>::operator()() const /usr/include/c++/9/bits/std_function.h:690 ceph#10 0x1034c72 in seastar::posix_thread::start_routine(void*) ../src/core/posix.cc:52 ceph#11 0x7fb7738804e1 in start_thread /usr/src/debug/glibc-2.30-13-g919af705ee/nptl/pthread_create.c:479 Reported-by: Rafael Avila de Espindola <[email protected]>
1 parent 33406cf commit 8fd8568

File tree

7 files changed

+86
-93
lines changed

7 files changed

+86
-93
lines changed

include/seastar/core/do_with.hh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,13 @@ public:
9090
template<typename T, typename F>
9191
inline
9292
auto do_with(T&& rvalue, F&& f) {
93-
auto task = new internal::do_with_state<T, std::result_of_t<F(T&)>>(std::forward<T>(rvalue));
93+
auto task = std::make_unique<internal::do_with_state<T, std::result_of_t<F(T&)>>>(std::forward<T>(rvalue));
9494
auto fut = f(task->data());
9595
if (fut.available()) {
9696
return fut;
9797
}
9898
auto ret = task->get_future();
99-
internal::set_callback(fut, task);
99+
internal::set_callback(fut, std::move(task));
100100
return ret;
101101
}
102102

@@ -148,13 +148,13 @@ do_with(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more) {
148148
auto&& just_func = std::move(std::get<nr>(std::move(all)));
149149
using value_tuple = std::remove_reference_t<decltype(just_values)>;
150150
using ret_type = decltype(apply(just_func, just_values));
151-
auto task = new internal::do_with_state<value_tuple, ret_type>(std::move(just_values));
151+
auto task = std::make_unique<internal::do_with_state<value_tuple, ret_type>>(std::move(just_values));
152152
auto fut = apply(just_func, task->data());
153153
if (fut.available()) {
154154
return fut;
155155
}
156156
auto ret = task->get_future();
157-
internal::set_callback(fut, task);
157+
internal::set_callback(fut, std::move(task));
158158
return ret;
159159
}
160160

include/seastar/core/future-util.hh

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private:
119119
// Wait for one of the futures in _incomplete to complete, and then
120120
// decide what to do: wait for another one, or deliver _result if all
121121
// are complete.
122-
void wait_for_one() noexcept {
122+
void wait_for_one() {
123123
// Process from back to front, on the assumption that the front
124124
// futures are likely to complete earlier than the back futures.
125125
// If that's indeed the case, then the front futures will be
@@ -135,7 +135,7 @@ private:
135135

136136
// If there's an incompelete future, wait for it.
137137
if (!_incomplete.empty()) {
138-
internal::set_callback(_incomplete.back(), static_cast<continuation_base<>*>(this));
138+
internal::set_callback(_incomplete.back(), std::unique_ptr<continuation_base<>>(this));
139139
// This future's state will be collected in run_and_dispose(), so we can drop it.
140140
_incomplete.pop_back();
141141
return;
@@ -280,14 +280,13 @@ public:
280280
}
281281
future<> get_future() { return _promise.get_future(); }
282282
virtual void run_and_dispose() noexcept override {
283+
std::unique_ptr<repeater> zis{this};
283284
if (_state.failed()) {
284285
_promise.set_exception(std::move(_state).get_exception());
285-
delete this;
286286
return;
287287
} else {
288288
if (std::get<0>(_state.get()) == stop_iteration::yes) {
289289
_promise.set_value();
290-
delete this;
291290
return;
292291
}
293292
_state = {};
@@ -296,22 +295,20 @@ public:
296295
do {
297296
auto f = futurator::apply(_action);
298297
if (!f.available()) {
299-
internal::set_callback(f, this);
298+
internal::set_callback(f, std::move(zis));
300299
return;
301300
}
302301
if (f.get0() == stop_iteration::yes) {
303302
_promise.set_value();
304-
delete this;
305303
return;
306304
}
307305
} while (!need_preempt());
308306
} catch (...) {
309307
_promise.set_exception(std::current_exception());
310-
delete this;
311308
return;
312309
}
313310
_state.set(stop_iteration::no);
314-
schedule(this);
311+
schedule(std::move(zis));
315312
}
316313
};
317314

@@ -331,7 +328,7 @@ public:
331328
template<typename AsyncAction>
332329
GCC6_CONCEPT( requires seastar::ApplyReturns<AsyncAction, stop_iteration> || seastar::ApplyReturns<AsyncAction, future<stop_iteration>> )
333330
inline
334-
future<> repeat(AsyncAction action) noexcept {
331+
future<> repeat(AsyncAction action) {
335332
using futurator = futurize<std::result_of_t<AsyncAction()>>;
336333
static_assert(std::is_same<future<stop_iteration>, typename futurator::type>::value, "bad AsyncAction signature");
337334
try {
@@ -342,9 +339,9 @@ future<> repeat(AsyncAction action) noexcept {
342339
if (!f.available()) {
343340
return [&] () noexcept {
344341
memory::disable_failure_guard dfg;
345-
auto repeater = new internal::repeater<AsyncAction>(std::move(action));
342+
auto repeater = std::make_unique<internal::repeater<AsyncAction>>(std::move(action));
346343
auto ret = repeater->get_future();
347-
internal::set_callback(f, repeater);
344+
internal::set_callback(f, std::move(repeater));
348345
return ret;
349346
}();
350347
}
@@ -354,9 +351,9 @@ future<> repeat(AsyncAction action) noexcept {
354351
}
355352
} while (!need_preempt());
356353

357-
auto repeater = new internal::repeater<AsyncAction>(stop_iteration::no, std::move(action));
354+
auto repeater = std::make_unique<internal::repeater<AsyncAction>>(stop_iteration::no, std::move(action));
358355
auto ret = repeater->get_future();
359-
schedule(repeater);
356+
schedule(std::move(repeater));
360357
return ret;
361358
} catch (...) {
362359
return make_exception_future(std::current_exception());
@@ -400,15 +397,14 @@ public:
400397
}
401398
future<T> get_future() { return _promise.get_future(); }
402399
virtual void run_and_dispose() noexcept override {
400+
std::unique_ptr<repeat_until_value_state> zis{this};
403401
if (this->_state.failed()) {
404402
_promise.set_exception(std::move(this->_state).get_exception());
405-
delete this;
406403
return;
407404
} else {
408405
auto v = std::get<0>(std::move(this->_state).get());
409406
if (v) {
410407
_promise.set_value(std::move(*v));
411-
delete this;
412408
return;
413409
}
414410
this->_state = {};
@@ -417,23 +413,21 @@ public:
417413
do {
418414
auto f = futurator::apply(_action);
419415
if (!f.available()) {
420-
internal::set_callback(f, this);
416+
internal::set_callback(f, std::move(zis));
421417
return;
422418
}
423419
auto ret = f.get0();
424420
if (ret) {
425421
_promise.set_value(std::make_tuple(std::move(*ret)));
426-
delete this;
427422
return;
428423
}
429424
} while (!need_preempt());
430425
} catch (...) {
431426
_promise.set_exception(std::current_exception());
432-
delete this;
433427
return;
434428
}
435429
this->_state.set(compat::nullopt);
436-
schedule(this);
430+
schedule(std::move(zis));
437431
}
438432
};
439433

@@ -456,7 +450,7 @@ GCC6_CONCEPT( requires requires (AsyncAction aa) {
456450
futurize<std::result_of_t<AsyncAction()>>::apply(aa).get0().value();
457451
} )
458452
repeat_until_value_return_type<AsyncAction>
459-
repeat_until_value(AsyncAction action) noexcept {
453+
repeat_until_value(AsyncAction action) {
460454
using futurator = futurize<std::result_of_t<AsyncAction()>>;
461455
using type_helper = repeat_until_value_type_helper<typename futurator::type>;
462456
// the "T" in the documentation
@@ -468,9 +462,9 @@ repeat_until_value(AsyncAction action) noexcept {
468462
if (!f.available()) {
469463
return [&] () noexcept {
470464
memory::disable_failure_guard dfg;
471-
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(std::move(action));
465+
auto state = std::make_unique<internal::repeat_until_value_state<AsyncAction, value_type>>(std::move(action));
472466
auto ret = state->get_future();
473-
internal::set_callback(f, state);
467+
internal::set_callback(f, std::move(state));
474468
return ret;
475469
}();
476470
}
@@ -486,9 +480,9 @@ repeat_until_value(AsyncAction action) noexcept {
486480
} while (!need_preempt());
487481

488482
try {
489-
auto state = new internal::repeat_until_value_state<AsyncAction, value_type>(compat::nullopt, std::move(action));
483+
auto state = std::make_unique<internal::repeat_until_value_state<AsyncAction, value_type>>(compat::nullopt, std::move(action));
490484
auto f = state->get_future();
491-
schedule(state);
485+
schedule(std::move(state));
492486
return f;
493487
} catch (...) {
494488
return make_exception_future<value_type>(std::current_exception());
@@ -506,10 +500,10 @@ public:
506500
explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {}
507501
future<> get_future() { return _promise.get_future(); }
508502
virtual void run_and_dispose() noexcept override {
503+
std::unique_ptr<do_until_state> zis{this};
509504
if (_state.available()) {
510505
if (_state.failed()) {
511506
_promise.set_urgent_state(std::move(_state));
512-
delete this;
513507
return;
514508
}
515509
_state = {}; // allow next cycle to overrun state
@@ -518,26 +512,23 @@ public:
518512
do {
519513
if (_stop()) {
520514
_promise.set_value();
521-
delete this;
522515
return;
523516
}
524517
auto f = _action();
525518
if (!f.available()) {
526-
internal::set_callback(f, this);
519+
internal::set_callback(f, std::move(zis));
527520
return;
528521
}
529522
if (f.failed()) {
530523
f.forward_to(std::move(_promise));
531-
delete this;
532524
return;
533525
}
534526
} while (!need_preempt());
535527
} catch (...) {
536528
_promise.set_exception(std::current_exception());
537-
delete this;
538529
return;
539530
}
540-
schedule(this);
531+
schedule(std::move(zis));
541532
}
542533
};
543534

@@ -556,7 +547,7 @@ public:
556547
template<typename AsyncAction, typename StopCondition>
557548
GCC6_CONCEPT( requires seastar::ApplyReturns<StopCondition, bool> && seastar::ApplyReturns<AsyncAction, future<>> )
558549
inline
559-
future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
550+
future<> do_until(StopCondition stop_cond, AsyncAction action) {
560551
using namespace internal;
561552
using futurator = futurize<void>;
562553
do {
@@ -567,9 +558,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
567558
if (!f.available()) {
568559
return [&] () noexcept {
569560
memory::disable_failure_guard dfg;
570-
auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
561+
auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
571562
auto ret = task->get_future();
572-
internal::set_callback(f, task);
563+
internal::set_callback(f, std::move(task));
573564
return ret;
574565
}();
575566
}
@@ -578,9 +569,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept {
578569
}
579570
} while (!need_preempt());
580571

581-
auto task = new do_until_state<StopCondition, AsyncAction>(std::move(stop_cond), std::move(action));
572+
auto task = std::make_unique<do_until_state<StopCondition, AsyncAction>>(std::move(stop_cond), std::move(action));
582573
auto f = task->get_future();
583-
schedule(task);
574+
schedule(std::move(task));
584575
return f;
585576
}
586577

@@ -746,7 +737,7 @@ public:
746737
return true;
747738
} else {
748739
auto c = new (continuation) when_all_state_component(wasb, f);
749-
set_callback(*f, c);
740+
set_callback(*f, std::unique_ptr<when_all_state_component>(c));
750741
return false;
751742
}
752743
}

include/seastar/core/future.hh

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ template <typename... T>
495495
future<T...> make_exception_future(future_state_base&& state) noexcept;
496496

497497
template <typename... T, typename U>
498-
void set_callback(future<T...>& fut, U* callback) noexcept;
498+
void set_callback(future<T...>& fut, std::unique_ptr<U> callback);
499499

500500
class future_base;
501501

@@ -509,7 +509,7 @@ protected:
509509
// details.
510510
future_state_base* _state;
511511

512-
task* _task = nullptr;
512+
std::unique_ptr<task> _task;
513513

514514
promise_base(const promise_base&) = delete;
515515
promise_base(future_state_base* state) noexcept : _state(state) {}
@@ -607,19 +607,19 @@ public:
607607
#if SEASTAR_COROUTINES_TS
608608
void set_coroutine(future_state<T...>& state, task& coroutine) noexcept {
609609
_state = &state;
610-
_task = &coroutine;
610+
_task = std::unique_ptr<task>(&coroutine);
611611
}
612612
#endif
613613
private:
614614
template <typename Func>
615-
void schedule(Func&& func) noexcept {
616-
auto tws = new continuation<Func, T...>(std::move(func));
615+
void schedule(Func&& func) {
616+
auto tws = std::make_unique<continuation<Func, T...>>(std::move(func));
617617
_state = &tws->_state;
618-
_task = tws;
618+
_task = std::move(tws);
619619
}
620-
void schedule(continuation_base<T...>* callback) noexcept {
620+
void schedule(std::unique_ptr<continuation_base<T...>> callback) {
621621
_state = &callback->_state;
622-
_task = callback;
622+
_task = std::move(callback);
623623
}
624624

625625
template <typename... U>
@@ -968,12 +968,12 @@ private:
968968
return static_cast<internal::promise_base_with_type<T...>*>(future_base::detach_promise());
969969
}
970970
template <typename Func>
971-
void schedule(Func&& func) noexcept {
971+
void schedule(Func&& func) {
972972
if (_state.available() || !_promise) {
973973
if (__builtin_expect(!_state.available() && !_promise, false)) {
974974
_state.set_to_broken_promise();
975975
}
976-
::seastar::schedule(new continuation<Func, T...>(std::move(func), std::move(_state)));
976+
::seastar::schedule(std::make_unique<continuation<Func, T...>>(std::move(func), std::move(_state)));
977977
} else {
978978
assert(_promise);
979979
detach_promise()->schedule(std::move(func));
@@ -1095,7 +1095,7 @@ private:
10951095
auto thread = thread_impl::get();
10961096
assert(thread);
10971097
thread_wake_task wake_task{thread, this};
1098-
detach_promise()->schedule(static_cast<continuation_base<T...>*>(&wake_task));
1098+
detach_promise()->schedule(std::unique_ptr<continuation_base<T...>>(&wake_task));
10991099
thread_impl::switch_out(thread);
11001100
}
11011101

@@ -1443,13 +1443,13 @@ public:
14431443
}
14441444
#endif
14451445
private:
1446-
void set_callback(continuation_base<T...>* callback) noexcept {
1446+
void set_callback(std::unique_ptr<continuation_base<T...>> callback) {
14471447
if (_state.available()) {
14481448
callback->set_state(get_available_state_ref());
1449-
::seastar::schedule(callback);
1449+
::seastar::schedule(std::move(callback));
14501450
} else {
14511451
assert(_promise);
1452-
detach_promise()->schedule(callback);
1452+
detach_promise()->schedule(std::move(callback));
14531453
}
14541454

14551455
}
@@ -1470,7 +1470,7 @@ private:
14701470
template <typename... U>
14711471
friend future<U...> internal::make_exception_future(future_state_base&& state) noexcept;
14721472
template <typename... U, typename V>
1473-
friend void internal::set_callback(future<U...>&, V*) noexcept;
1473+
friend void internal::set_callback(future<U...>&, std::unique_ptr<V>);
14741474
/// \endcond
14751475
};
14761476

@@ -1717,10 +1717,10 @@ namespace internal {
17171717

17181718
template <typename... T, typename U>
17191719
inline
1720-
void set_callback(future<T...>& fut, U* callback) noexcept {
1720+
void set_callback(future<T...>& fut, std::unique_ptr<U> callback) {
17211721
// It would be better to use continuation_base<T...> for U, but
17221722
// then a derived class of continuation_base<T...> won't be matched
1723-
return fut.set_callback(callback);
1723+
return fut.set_callback(std::move(callback));
17241724
}
17251725

17261726
}

0 commit comments

Comments
 (0)