Skip to content

Commit 2a1c9a3

Browse files
moto-metamthrok
andauthored
Move the implementation detail of iterate_in_subprocess to development note (#1076)
Differential Revision: D86097638 Co-authored-by: Moto Hira <[email protected]>
1 parent 7445c81 commit 2a1c9a3

File tree

5 files changed

+104
-99
lines changed

5 files changed

+104
-99
lines changed

docs/source/index.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ Please use the following BibTex for citing our project if you find it useful.
2323
.. code-block:: text
2424
2525
@misc{hira2025scalableperformantdataloading,
26-
title={Scalable and Performant Data Loading},
26+
title={Scalable and Performant Data Loading},
2727
author={Moto Hira and Christian Puhrsch and Valentin Andrei and Roman Malinovskyy and Gael Le Lan and Abhinandan Krishnan and Joseph Cummings and Miguel Martin and Gokul Gunasekaran and Yuta Inoue and Alex J Turner and Raghuraman Krishnamoorthi},
2828
year={2025},
2929
eprint={2504.20067},
3030
archivePrefix={arXiv},
3131
primaryClass={cs.DC},
32-
url={https://arxiv.org/abs/2504.20067},
32+
url={https://arxiv.org/abs/2504.20067},
3333
}
34-
34+
3535
.. toctree::
3636
:hidden:
3737

docs/source/notes/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ Development Notes
44
.. toctree::
55

66
pipeline_impl
7+
remote_iterable_protocol
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
Remote Iterable Protocol
2+
========================
3+
4+
.. py:currentmodule:: spdl.pipeline._iter_utils
5+
6+
Manipulting an iterable object in a remote location (subprocess or subinterpreter) requires
7+
somewhat elaborated state control.
8+
The following section go over the implementation detail.
9+
10+
Worker State
11+
------------
12+
13+
The iterable object is manipulated in the worker process.
14+
The worker process has three states, ``Initialization``, ``Stand By`` and ``Iteration``.
15+
16+
The Initialization state performs global initialization and create the iterable object.
17+
When the Initialization completes, the worker transition to ``Stand By`` mode,
18+
where it waits for a command from the parent process.
19+
20+
The command can be ``START_ITERATION`` or ``ABORT``.
21+
When the ``START_ITERATION`` is received, the worker process transition to the Iteration mode.
22+
23+
In the Iteration mode, the worker creates an iterator object from the iterable, then executes it.
24+
The resulting data are put in the queue, which the parent process is watching.
25+
26+
The following diagram illustrates worker's state transition in simplified manner.
27+
Detailed diagram alongside the actual implementation is found in :py:func:`_execute_iterable`.
28+
29+
.. mermaid::
30+
31+
stateDiagram-v2
32+
state Parent {
33+
p1: Start Iteration
34+
p2: Iterate on the result
35+
state pf <<fork>>
36+
state pj <<join>>
37+
38+
[*] --> p1
39+
p1 --> pf
40+
pf --> pj: Wait for worker process
41+
pj --> p2
42+
p2 --> [*]
43+
}
44+
45+
state Worker {
46+
state wf <<fork>>
47+
w0: Initialization
48+
w1: Stand By
49+
w2: Iteration
50+
51+
[*]--> w0
52+
w0 --> w1: Success
53+
w0 --> [*]: Fail
54+
w1 --> wf: Iteration started
55+
wf --> w2
56+
w2 --> w1: Iteration completed
57+
58+
w1 --> [*]: Abort
59+
w2 --> [*]: Fail / Abort
60+
}
61+
pf --> w1: Issue START_ITERATION command
62+
wf --> pj: Notify ITERATION_STARTED
63+
w2 --> p2: Results passed via queue
64+
65+
Helper functions and data structures
66+
-------------------------------------
67+
68+
The follosing functions and data structures are used to implement
69+
the :py:func:`~spdl.pipeline.iterate_in_subprocess` function.
70+
They are not public interface, but the logic is sufficiently elaborated,
71+
and it is helpful to have them in the documentation, so they are listed here.
72+
73+
.. autoclass:: _Cmd
74+
:noindex:
75+
:members:
76+
77+
.. autoclass:: _Status
78+
:noindex:
79+
:members:
80+
81+
.. autofunction:: _enter_iteration_mode()
82+
:noindex:
83+
84+
.. autofunction:: _execute_iterable()
85+
:noindex:
86+
87+
.. autofunction:: _drain()
88+
:noindex:
89+
90+
.. autofunction:: _iterate_results()
91+
:noindex:
92+
93+
.. autoclass:: _SubprocessIterable()
94+
:noindex:
95+
:members: __iter__

src/spdl/pipeline/_iter_utils/_common.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def empty(self) -> bool:
6363
class _Cmd(enum.IntEnum):
6464
"""_Cmd()
6565
Command issued from the parent process to the worker process in
66-
:py:func:`iterate_in_subprocess`.
66+
:py:func:`~spdl.pipeline.iterate_in_subprocess`.
6767
"""
6868

6969
ABORT = enum.auto()
@@ -94,7 +94,7 @@ class _Cmd(enum.IntEnum):
9494
# Final status of the iterator
9595
class _Status(enum.IntEnum):
9696
"""_Status()
97-
Status reported by the worker process in :py:func:`iterate_in_subprocess`.
97+
Status reported by the worker process in :py:func:`~spdl.pipeline.iterate_in_subprocess`.
9898
"""
9999

100100
UNEXPECTED_CMD_RECIEVED = enum.auto()
@@ -155,7 +155,7 @@ class _Msg(Generic[T]):
155155
def _drain(q: _Queue[Any]) -> None:
156156
"""Drain a queue by removing all items.
157157
158-
Works with both multiprocessing.Queue and concurrent.interpreters.Queue.
158+
Works with both :py:func:`multiprocessing.Queue` and :py:func:`concurrent.interpreters.Queue`.
159159
"""
160160
while True:
161161
try:
@@ -170,7 +170,7 @@ def _execute_iterable(
170170
fn: Callable[[], Iterable[T]],
171171
initializers: Sequence[Callable[[], None]] | None,
172172
) -> None:
173-
"""Worker implementation for :py:func:`iterate_in_subprocess`.
173+
"""Worker implementation for :py:func:`~spdl.pipeline.iterate_in_subprocess`.
174174
175175
The following diagram illustrates the state transition with more details.
176176

src/spdl/pipeline/_iter_utils/_subprocess.py

Lines changed: 1 addition & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -131,98 +131,7 @@ def iterate_in_subprocess(
131131
- :py:func:`run_pipeline_in_subprocess` for runinng a :py:class:`Pipeline` in
132132
a subprocess
133133
- :ref:`parallelism-performance` for the context in which this function was created.
134-
135-
136-
Implementation Detail
137-
---------------------
138-
139-
.. py:currentmodule:: spdl.pipeline._iter_utils
140-
141-
Manipulting an iterable object in a subprocess requires somewhat elaborated state
142-
control.
143-
The following section go over the implementation detail.
144-
145-
**Wroker State**
146-
147-
The iterable object is manipulated in the worker process.
148-
The worker process has three states, "Initialization", "Stand By" and "Iteration".
149-
The Initialization state performs global initialization and create the iterable object.
150-
When the Initialization completes, the worker transition to Stand By mode, where
151-
it waits for a command from the parent process. The command can be "START_ITERATION"
152-
or "ABORT".
153-
When the "START_ITERATION" is received, the worker process transition to the
154-
Iteration mode. In the Iteration mode, the worker creates an iterator object from
155-
the iterable, then executes it.
156-
The resulting data are put in the queue, which the parent process is watching.
157-
158-
The following diagram illustrates worker's state transition in simplified manner.
159-
Detailed diagram alongside the actual implementation is found in
160-
:py:func:`_execute_iterable`.
161-
162-
.. mermaid::
163-
164-
stateDiagram-v2
165-
state Parent {
166-
p1: Start Iteration
167-
p2: Iterate on the result
168-
state pf <<fork>>
169-
state pj <<join>>
170-
171-
[*] --> p1
172-
p1 --> pf
173-
pf --> pj: Wait for worker process
174-
pj --> p2
175-
p2 --> [*]
176-
}
177-
178-
state Worker {
179-
state wf <<fork>>
180-
w0: Initialization
181-
w1: Stand By
182-
w2: Iteration
183-
184-
[*]--> w0
185-
w0 --> w1: Success
186-
w0 --> [*]: Fail
187-
w1 --> wf: Iteration started
188-
wf --> w2
189-
w2 --> w1: Iteration completed
190-
191-
w1 --> [*]: Abort
192-
w2 --> [*]: Fail / Abort
193-
}
194-
pf --> w1: Issue START_ITERATION command
195-
wf --> pj: Notify ITERATION_STARTED
196-
w2 --> p2: Results passed via queue
197-
198-
Helper functions and data structures
199-
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
200-
201-
The follosing functions and data structures are used to implement the
202-
:py:func:`iterate_in_subprocess` function.
203-
They are not public interface, but the logic is sufficiently elaborated,
204-
and it is helpful to have them in the documentation, so they are listed here.
205-
206-
.. autoclass:: _Cmd
207-
:noindex:
208-
:members:
209-
210-
.. autoclass:: _Status
211-
:noindex:
212-
:members:
213-
214-
.. autofunction:: _execute_iterable()
215-
:noindex:
216-
217-
.. autofunction:: _enter_iteration_mode()
218-
:noindex:
219-
220-
.. autofunction:: _iterate_results()
221-
:noindex:
222-
223-
.. autoclass:: _SubprocessIterable()
224-
:noindex:
225-
:members: __iter__
134+
- :doc:`../notes/remote_iterable_protocol` for implementation details
226135
"""
227136
initializers = (
228137
None

0 commit comments

Comments
 (0)