Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 29 additions & 23 deletions hphp/runtime/ext/asio/asio-blockable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,29 +166,35 @@ c_WaitableWaitHandle* AsioBlockable::getWaitHandle() const {
}

void AsioBlockableChain::unblock() {
while (auto cur = m_lastParent) {
m_lastParent = cur->getPrevParent();
cur->updatePrevParent(nullptr);
// the onUnblocked handler may free cur
switch (cur->getKind()) {
case Kind::AsyncFunctionWaitHandleNode:
getAsyncFunctionWaitHandleNode(cur)->onUnblocked();
break;
case Kind::AsyncGeneratorWaitHandle:
getAsyncGeneratorWaitHandle(cur)->onUnblocked();
break;
case Kind::AwaitAllWaitHandleNode:
getAwaitAllWaitHandleNode(cur)->onUnblocked();
break;
case Kind::ConcurrentWaitHandleNode:
getConcurrentWaitHandleNode(cur)->onUnblocked();
break;
case Kind::ConditionWaitHandle:
getConditionWaitHandle(cur)->onUnblocked();
break;
case Kind::PriorityBridgeWaitHandle:
getPriorityBridgeWaitHandle(cur)->onUnblocked();
break;
std::vector<AsioBlockableChain> worklist = { *this };
while (!worklist.empty()) {
auto const lastParent = worklist.back().m_lastParent;
worklist.pop_back();

for (AsioBlockable* cur = lastParent, *next; cur; cur = next) {
next = cur->getPrevParent();
cur->updatePrevParent(nullptr);
// the onUnblocked handler may free cur
switch (cur->getKind()) {
case Kind::AsyncFunctionWaitHandleNode:
getAsyncFunctionWaitHandleNode(cur)->onUnblocked();
break;
case Kind::AsyncGeneratorWaitHandle:
getAsyncGeneratorWaitHandle(cur)->onUnblocked();
break;
case Kind::AwaitAllWaitHandleNode:
getAwaitAllWaitHandleNode(cur)->onUnblocked(worklist);
break;
case Kind::ConcurrentWaitHandleNode:
getConcurrentWaitHandleNode(cur)->onUnblocked(worklist);
break;
case Kind::ConditionWaitHandle:
getConditionWaitHandle(cur)->onUnblocked(worklist);
break;
case Kind::PriorityBridgeWaitHandle:
getPriorityBridgeWaitHandle(cur)->onUnblocked(worklist);
break;
}
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions hphp/runtime/ext/asio/ext_await-all-wait-handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void c_AwaitAllWaitHandle::initialize(ContextStateIndex ctxStateIdx) {
}
}

void c_AwaitAllWaitHandle::onUnblocked(uint32_t idx) {
void c_AwaitAllWaitHandle::onUnblocked(uint32_t idx, std::vector<AsioBlockableChain>& worklist) {
assertx(idx <= m_unfinished);
assertx(getState() == STATE_BLOCKED);

Expand All @@ -166,25 +166,25 @@ void c_AwaitAllWaitHandle::onUnblocked(uint32_t idx) {
} catch (const Object& cycle_exception) {
assertx(cycle_exception->instanceof(SystemLib::getThrowableClass()));
throwable_recompute_backtrace_from_wh(cycle_exception.get(), this);
markAsFailed(cycle_exception);
markAsFailed(cycle_exception, worklist);
}
return;
}
}
// All children finished.
markAsFinished();
markAsFinished(worklist);
}
}

void c_AwaitAllWaitHandle::markAsFinished() {
void c_AwaitAllWaitHandle::markAsFinished(std::vector<AsioBlockableChain>& worklist) {
auto parentChain = getParentChain();
setState(STATE_SUCCEEDED);
tvWriteNull(m_resultOrException);
parentChain.unblock();
worklist.emplace_back(std::move(parentChain));
decRefObj(this);
}

void c_AwaitAllWaitHandle::markAsFailed(const Object& exception) {
void c_AwaitAllWaitHandle::markAsFailed(const Object& exception, std::vector<AsioBlockableChain>& worklist) {
for (uint32_t idx = 0; idx < m_cap; idx++) {
auto const child = m_children[idx].m_child;
if (!child->isFinished()) {
Expand All @@ -196,7 +196,7 @@ void c_AwaitAllWaitHandle::markAsFailed(const Object& exception) {
auto parentChain = getParentChain();
setState(STATE_FAILED);
tvWriteObject(exception.get(), &m_resultOrException);
parentChain.unblock();
worklist.emplace_back(std::move(parentChain));
decRefObj(this);
}

Expand Down
37 changes: 29 additions & 8 deletions hphp/runtime/ext/asio/ext_await-all-wait-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <vector>

#include "hphp/runtime/base/type-array.h"
#include "hphp/runtime/base/type-object.h"
#include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h"
Expand All @@ -37,9 +39,28 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle,

static void instanceDtor(ObjectData* obj, const Class*) {
auto wh = wait_handle<c_AwaitAllWaitHandle>(obj);
auto const sz = wh->heapSize();
wh->~c_AwaitAllWaitHandle();
tl_heap->objFree(obj, sz);

std::vector<c_AwaitAllWaitHandle*> queue = {wh};
for (std::size_t i = 0; i < queue.size(); i++) {
auto cur = queue[i];
for (int32_t j = 0; j < cur->m_cap; j++) {
auto cur_child = cur->m_children[j].m_child;
assertx(isFailed() || cur_child->isFinished());

if (cur_child->getKind() == Kind::AwaitAll) {
if (cur_child->decReleaseCheck()) {
queue.push_back(cur_child->asAwaitAll());
}
} else {
decRefObj(cur_child);
}
}
}

for (auto& cur : queue) {
auto const sz = cur->heapSize();
tl_heap->objFree(cur, sz);
}
}

explicit c_AwaitAllWaitHandle(unsigned cap = 0)
Expand Down Expand Up @@ -79,8 +100,8 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle,
return getChildIdx() == getWaitHandle()->m_unfinished;
}

void onUnblocked() {
getWaitHandle()->onUnblocked(getChildIdx());
void onUnblocked(std::vector<AsioBlockableChain>& worklist) {
getWaitHandle()->onUnblocked(getChildIdx(), worklist);
}

AsioBlockable m_blockable;
Expand All @@ -93,7 +114,7 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle,
}

String getName();
void onUnblocked(uint32_t idx);
void onUnblocked(uint32_t idx, std::vector<AsioBlockableChain>& worklist);
c_WaitableWaitHandle* getChild();
template<typename T> void forEachChild(T fn);

Expand All @@ -108,8 +129,8 @@ struct c_AwaitAllWaitHandle final : c_WaitableWaitHandle,
static Object Create(Iter iter);
static req::ptr<c_AwaitAllWaitHandle> Alloc(int32_t cnt);
void initialize(ContextStateIndex ctxStateIdx);
void markAsFinished(void);
void markAsFailed(const Object& exception);
void markAsFinished(std::vector<AsioBlockableChain>& worklist);
void markAsFailed(const Object& exception, std::vector<AsioBlockableChain>& worklist);
void setState(uint8_t state) { setKindState(Kind::AwaitAll, state); }

// Construct an AAWH from an array-like without making layout assumptions.
Expand Down
14 changes: 7 additions & 7 deletions hphp/runtime/ext/asio/ext_concurrent-wait-handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void c_ConcurrentWaitHandle::initialize(ContextStateIndex ctxStateIdx) {
}
}

void c_ConcurrentWaitHandle::onUnblocked(uint32_t idx) {
void c_ConcurrentWaitHandle::onUnblocked(uint32_t idx, std::vector<AsioBlockableChain>& worklist) {
assertx(idx <= m_unfinished);
assertx(getState() == STATE_BLOCKED);

Expand All @@ -118,25 +118,25 @@ void c_ConcurrentWaitHandle::onUnblocked(uint32_t idx) {
} catch (const Object& cycle_exception) {
assertx(cycle_exception->instanceof(SystemLib::getThrowableClass()));
throwable_recompute_backtrace_from_wh(cycle_exception.get(), this);
markAsFailed(cycle_exception);
markAsFailed(cycle_exception, worklist);
}
return;
}
}
// All children finished.
markAsFinished();
markAsFinished(worklist);
}
}

void c_ConcurrentWaitHandle::markAsFinished() {
void c_ConcurrentWaitHandle::markAsFinished(std::vector<AsioBlockableChain>& worklist) {
auto parentChain = getParentChain();
setState(STATE_SUCCEEDED);
tvWriteNull(m_resultOrException);
parentChain.unblock();
worklist.emplace_back(std::move(parentChain));
decRefObj(this);
}

void c_ConcurrentWaitHandle::markAsFailed(const Object& exception) {
void c_ConcurrentWaitHandle::markAsFailed(const Object& exception, std::vector<AsioBlockableChain>& worklist) {
for (uint32_t idx = 0; idx < m_cap; idx++) {
auto const child = m_children[idx].m_child;
if (!child->isFinished()) {
Expand All @@ -148,7 +148,7 @@ void c_ConcurrentWaitHandle::markAsFailed(const Object& exception) {
auto parentChain = getParentChain();
setState(STATE_FAILED);
tvWriteObject(exception.get(), &m_resultOrException);
parentChain.unblock();
worklist.emplace_back(std::move(parentChain));
decRefObj(this);
}

Expand Down
37 changes: 29 additions & 8 deletions hphp/runtime/ext/asio/ext_concurrent-wait-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <vector>

#include "hphp/runtime/base/type-array.h"
#include "hphp/runtime/base/type-object.h"
#include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h"
Expand All @@ -37,9 +39,28 @@ struct c_ConcurrentWaitHandle final :
using SystemLib::ClassLoader<"HH\\ConcurrentWaitHandle">::className;
static void instanceDtor(ObjectData* obj, const Class*) {
auto wh = wait_handle<c_ConcurrentWaitHandle>(obj);
auto const sz = wh->heapSize();
wh->~c_ConcurrentWaitHandle();
tl_heap->objFree(obj, sz);

std::vector<c_ConcurrentWaitHandle*> queue = {wh};
for (std::size_t i = 0; i < queue.size(); i++) {
auto cur = queue[i];
for (int32_t j = 0; j < cur->m_cap; j++) {
auto cur_child = cur->m_children[j].m_child;
assertx(isFailed() || cur_child->isFinished());

if (cur_child->getKind() == Kind::Concurrent) {
if (cur_child->decReleaseCheck()) {
queue.push_back(cur_child->asConcurrent());
}
} else {
decRefObj(cur_child);
}
}
}

for (auto& cur : queue) {
auto const sz = cur->heapSize();
tl_heap->objFree(cur, sz);
}
}

explicit c_ConcurrentWaitHandle(unsigned cap = 0)
Expand Down Expand Up @@ -86,8 +107,8 @@ struct c_ConcurrentWaitHandle final :
return getChildIdx() == getWaitHandle()->m_unfinished;
}

void onUnblocked() {
getWaitHandle()->onUnblocked(getChildIdx());
void onUnblocked(std::vector<AsioBlockableChain>& worklist) {
getWaitHandle()->onUnblocked(getChildIdx(), worklist);
}

AsioBlockable m_blockable;
Expand All @@ -100,7 +121,7 @@ struct c_ConcurrentWaitHandle final :
}

String getName();
void onUnblocked(uint32_t idx);
void onUnblocked(uint32_t idx, std::vector<AsioBlockableChain>& worklist);
c_WaitableWaitHandle* getChild();
template<typename T> void forEachChild(T fn);

Expand All @@ -113,8 +134,8 @@ struct c_ConcurrentWaitHandle final :
private:
static req::ptr<c_ConcurrentWaitHandle> Alloc(int32_t cnt);
void initialize(ContextStateIndex ctxStateIdx);
void markAsFinished(void);
void markAsFailed(const Object& exception);
void markAsFinished(std::vector<AsioBlockableChain>& worklist);
void markAsFailed(const Object& exception, std::vector<AsioBlockableChain>& worklist);
void setState(uint8_t state) { setKindState(Kind::Concurrent, state); }

private:
Expand Down
4 changes: 2 additions & 2 deletions hphp/runtime/ext/asio/ext_condition-wait-handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void c_ConditionWaitHandle::initialize(c_WaitableWaitHandle* child) {
}
}

void c_ConditionWaitHandle::onUnblocked() {
void c_ConditionWaitHandle::onUnblocked(std::vector<AsioBlockableChain>& worklist) {
decRefObj(m_child);
m_child = nullptr;

Expand All @@ -142,7 +142,7 @@ void c_ConditionWaitHandle::onUnblocked() {
make_tv<KindOfObject>(getNotNotifiedException().detach()),
m_resultOrException
);
parentChain.unblock();
worklist.emplace_back(std::move(parentChain));
decRefObj(this);
}

Expand Down
4 changes: 3 additions & 1 deletion hphp/runtime/ext/asio/ext_condition-wait-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifndef incl_HPHP_EXT_ASIO_CONDITION_WAIT_HANDLE_H_
#define incl_HPHP_EXT_ASIO_CONDITION_WAIT_HANDLE_H_

#include <vector>

#include "hphp/runtime/base/vanilla-dict.h"
#include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h"
#include "hphp/runtime/ext/extension.h"
Expand Down Expand Up @@ -48,7 +50,7 @@ struct c_ConditionWaitHandle final :
}

String getName();
void onUnblocked();
void onUnblocked(std::vector<AsioBlockableChain>& worklist);
c_WaitableWaitHandle* getChild();

static const int8_t STATE_BLOCKED = 2;
Expand Down
4 changes: 2 additions & 2 deletions hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void c_PriorityBridgeWaitHandle::initialize(c_WaitableWaitHandle* child) {
}
}

void c_PriorityBridgeWaitHandle::onUnblocked() {
void c_PriorityBridgeWaitHandle::onUnblocked(std::vector<AsioBlockableChain>& worklist) {
auto parentChain = getParentChain();

// Propagate the child's result.
Expand All @@ -90,7 +90,7 @@ void c_PriorityBridgeWaitHandle::onUnblocked() {
decRefObj(m_child);
m_child = nullptr;

parentChain.unblock();
worklist.emplace_back(std::move(parentChain));
decRefObj(this);
}

Expand Down
4 changes: 3 additions & 1 deletion hphp/runtime/ext/asio/ext_priority-bridge-wait-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <vector>

#include "hphp/runtime/ext/asio/ext_waitable-wait-handle.h"
#include "hphp/runtime/ext/asio/ext_resumable-wait-handle.h"

Expand Down Expand Up @@ -49,7 +51,7 @@ struct c_PriorityBridgeWaitHandle final
}

String getName();
void onUnblocked();
void onUnblocked(std::vector<AsioBlockableChain>& worklist);
c_WaitableWaitHandle* getChild();

// Prioritize the child of this PBWH. Will lift the child into the object's
Expand Down
Loading