Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ __pycache__/
.act-artifacts/
.secrets
actionlint
build-debug/
build-tsan/
19 changes: 12 additions & 7 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,18 @@ namespace phlex::experimental {
} else {
accessor a;
if (stores_.insert(a, store->index()->hash())) {
auto result = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
++product_count_[store->index()->layer_hash()];
products new_products;
new_products.add_all(output_, std::move(result));
a->second = std::make_shared<product_store>(
store->index(), this->full_name(), std::move(new_products));
try {
auto result = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
++product_count_[store->index()->layer_hash()];
products new_products;
new_products.add_all(output_, std::move(result));
a->second = std::make_shared<product_store>(
store->index(), this->full_name(), std::move(new_products));
} catch (...) {
stores_.erase(a);
throw;
}

message const new_msg{a->second, message_id};
stay_in_graph.try_put(new_msg);
Expand Down
25 changes: 4 additions & 21 deletions plugins/python/src/lifelinewrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,18 @@ static py_lifeline_t* ll_new(PyTypeObject* pytype, PyObject*, PyObject*)
{
py_lifeline_t* pyobj = (py_lifeline_t*)pytype->tp_alloc(pytype, 0);
if (!pyobj)
PyErr_Print();
return nullptr;
pyobj->m_view = nullptr;
new (&pyobj->m_source) std::shared_ptr<void>{};

return pyobj;
}

static int ll_traverse(py_lifeline_t* pyobj, visitproc visit, void* args)
{
if (pyobj->m_view)
visit(pyobj->m_view, args);
return 0;
}

static int ll_clear(py_lifeline_t* pyobj)
{
Py_CLEAR(pyobj->m_view);
return 0;
}

static void ll_dealloc(py_lifeline_t* pyobj)
{
// This type participates in GC; untrack before clearing references so the
// collector does not traverse a partially torn-down object during dealloc.
PyObject_GC_UnTrack(pyobj);
Py_CLEAR(pyobj->m_view);
typedef std::shared_ptr<void> generic_shared_t;
pyobj->m_source.~generic_shared_t();
// Use tp_free to pair with tp_alloc for GC-tracked Python objects.
Py_TYPE(pyobj)->tp_free((PyObject*)pyobj);
}

Expand All @@ -62,10 +45,10 @@ PyTypeObject phlex::experimental::PhlexLifeline_Type = {
0, // tp_getattro
0, // tp_setattro
0, // tp_as_buffer
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, // tp_flags
Py_TPFLAGS_DEFAULT, // tp_flags
(char*)"internal", // tp_doc
(traverseproc)ll_traverse, // tp_traverse
(inquiry)ll_clear, // tp_clear
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
Expand Down
58 changes: 48 additions & 10 deletions plugins/python/src/modulewrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ using phlex::product_query;

// TODO: the layer is currently hard-wired and should come from the product
// specification instead, but that doesn't exist in Python yet.
static std::string const LAYER = "job";
static std::string const LAYER = "event";

// Simple phlex module wrapper
// clang-format off
Expand Down Expand Up @@ -58,8 +58,16 @@ namespace {
static inline PyObject* lifeline_transform(intptr_t arg)
{
PyObject* pyobj = (PyObject*)arg;
if (pyobj && PyObject_TypeCheck(pyobj, &PhlexLifeline_Type)) {
return ((py_lifeline_t*)pyobj)->m_view;
if (!pyobj) {
throw std::runtime_error("lifeline_transform received null PyObject* argument");
}
if (PyObject_TypeCheck(pyobj, &PhlexLifeline_Type)) {
PyObject* view = ((py_lifeline_t*)pyobj)->m_view;
if (!view) {
throw std::runtime_error(
"PhlexLifeline has null m_view pointer - object may be uninitialized or corrupted");
}
return view;
}
return pyobj;
}
Expand Down Expand Up @@ -107,15 +115,23 @@ namespace {

PyGILRAII gil;

PyObject* result =
PyObject_CallFunctionObjArgs(m_callable, lifeline_transform(args)..., nullptr);
// Extract views from PhlexLifeline objects. The returned pointers are
// borrowed references; INCREF them to create temporary owned references
// that survive for the duration of the Python call.
PyObject* py_args[N] = {lifeline_transform(args)...};
for (auto* p : py_args)
Py_INCREF(p);

PyObject* result = call_with_array(py_args, std::make_index_sequence<N>{});

std::string error_msg;
if (!result) {
if (!msg_from_py_error(error_msg))
error_msg = "Unknown python error";
}

for (auto* p : py_args)
Py_DECREF(p);
decref_all(args...);

if (!error_msg.empty()) {
Expand All @@ -132,7 +148,14 @@ namespace {

PyGILRAII gil;

PyObject* result = PyObject_CallFunctionObjArgs(m_callable, (PyObject*)args..., nullptr);
// Extract views from PhlexLifeline objects. The returned pointers are
// borrowed references; INCREF them to create temporary owned references
// that survive for the duration of the Python call.
PyObject* py_args[N] = {lifeline_transform(args)...};
for (auto* p : py_args)
Py_INCREF(p);

PyObject* result = call_with_array(py_args, std::make_index_sequence<N>{});

std::string error_msg;
if (!result) {
Expand All @@ -141,6 +164,8 @@ namespace {
} else
Py_DECREF(result);

for (auto* p : py_args)
Py_DECREF(p);
decref_all(args...);

if (!error_msg.empty()) {
Expand All @@ -149,6 +174,12 @@ namespace {
}

private:
template <std::size_t... Is>
PyObject* call_with_array(PyObject* (&py_args)[N], std::index_sequence<Is...>)
{
return PyObject_CallFunctionObjArgs(m_callable, py_args[Is]..., nullptr);
}

template <typename... Args>
void decref_all(Args... args)
{
Expand Down Expand Up @@ -358,7 +389,7 @@ namespace {
PyGILRAII gil; \
\
if (!v) \
return (intptr_t)nullptr; \
throw std::runtime_error("null vector<" #cpptype "> passed to " #name "_to_py"); \
\
/* use a numpy view with the shared pointer tied up in a lifeline object (note: this */ \
/* is just a demonstrator; alternatives are still being considered) */ \
Expand All @@ -370,8 +401,12 @@ namespace {
(void*)(v->data()) /* raw buffer */ \
); \
\
if (!np_view) \
return (intptr_t)nullptr; \
if (!np_view) { \
std::string py_msg; \
msg_from_py_error(py_msg, true); \
throw std::runtime_error("failed to create numpy array in " #name "_to_py" + \
(py_msg.empty() ? std::string{} : ": " + py_msg)); \
} \
\
/* make the data read-only by not making it writable */ \
PyArray_CLEARFLAGS((PyArrayObject*)np_view, NPY_ARRAY_WRITEABLE); \
Expand All @@ -383,7 +418,10 @@ namespace {
(py_lifeline_t*)PhlexLifeline_Type.tp_new(&PhlexLifeline_Type, nullptr, nullptr); \
if (!pyll) { \
Py_DECREF(np_view); \
return (intptr_t)nullptr; \
std::string py_msg; \
msg_from_py_error(py_msg, true); \
throw std::runtime_error("failed to create lifeline in " #name "_to_py" + \
(py_msg.empty() ? std::string{} : ": " + py_msg)); \
} \
pyll->m_source = v; \
pyll->m_view = np_view; /* steals reference */ \
Expand Down
Loading
Loading