Skip to content

Commit 92b7758

Browse files
authored
Expose root workflow execution (#805)
* exposing root info to workflow info and workflow execution info * added test * skip test in time-skipping env, added link for PR that needs to be in release * improve test helper, changes to test wf name, return types, and ids
1 parent bf747f1 commit 92b7758

File tree

7 files changed

+132
-22
lines changed

7 files changed

+132
-22
lines changed

temporalio/client.py

+12
Original file line numberDiff line numberDiff line change
@@ -2777,6 +2777,12 @@ class WorkflowExecution:
27772777
parent_run_id: Optional[str]
27782778
"""Run ID for the parent workflow if this was started as a child."""
27792779

2780+
root_id: Optional[str]
2781+
"""ID for the root workflow."""
2782+
2783+
root_run_id: Optional[str]
2784+
"""Run ID for the root workflow."""
2785+
27802786
raw_info: temporalio.api.workflow.v1.WorkflowExecutionInfo
27812787
"""Underlying protobuf info."""
27822788

@@ -2828,6 +2834,12 @@ def _from_raw_info(
28282834
parent_run_id=info.parent_execution.run_id
28292835
if info.HasField("parent_execution")
28302836
else None,
2837+
root_id=info.root_execution.workflow_id
2838+
if info.HasField("root_execution")
2839+
else None,
2840+
root_run_id=info.root_execution.run_id
2841+
if info.HasField("root_execution")
2842+
else None,
28312843
raw_info=info,
28322844
run_id=info.execution.run_id,
28332845
search_attributes=temporalio.converter.decode_search_attributes(

temporalio/worker/_workflow.py

+7
Original file line numberDiff line numberDiff line change
@@ -369,12 +369,18 @@ def _create_workflow_instance(
369369

370370
# Build info
371371
parent: Optional[temporalio.workflow.ParentInfo] = None
372+
root: Optional[temporalio.workflow.RootInfo] = None
372373
if init.HasField("parent_workflow_info"):
373374
parent = temporalio.workflow.ParentInfo(
374375
namespace=init.parent_workflow_info.namespace,
375376
run_id=init.parent_workflow_info.run_id,
376377
workflow_id=init.parent_workflow_info.workflow_id,
377378
)
379+
if init.HasField("root_workflow"):
380+
root = temporalio.workflow.RootInfo(
381+
run_id=init.root_workflow.run_id,
382+
workflow_id=init.root_workflow.workflow_id,
383+
)
378384
info = temporalio.workflow.Info(
379385
attempt=init.attempt,
380386
continued_run_id=init.continued_from_execution_run_id or None,
@@ -385,6 +391,7 @@ def _create_workflow_instance(
385391
headers=dict(init.headers),
386392
namespace=self._namespace,
387393
parent=parent,
394+
root=root,
388395
raw_memo=dict(init.memo.fields),
389396
retry_policy=temporalio.common.RetryPolicy.from_proto(init.retry_policy)
390397
if init.HasField("retry_policy")

temporalio/worker/workflow_sandbox/_runner.py

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
headers={},
3636
namespace="sandbox-validate-namespace",
3737
parent=None,
38+
root=None,
3839
raw_memo={},
3940
retry_policy=None,
4041
run_id="sandbox-validate-run_id",

temporalio/workflow.py

+9
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ class Info:
427427
headers: Mapping[str, temporalio.api.common.v1.Payload]
428428
namespace: str
429429
parent: Optional[ParentInfo]
430+
root: Optional[RootInfo]
430431
priority: temporalio.common.Priority
431432
"""The priority of this workflow execution. If not set, or this server predates priorities,
432433
then returns a default instance."""
@@ -518,6 +519,14 @@ class ParentInfo:
518519
workflow_id: str
519520

520521

522+
@dataclass(frozen=True)
523+
class RootInfo:
524+
"""Information about the root workflow."""
525+
526+
run_id: str
527+
workflow_id: str
528+
529+
521530
@dataclass(frozen=True)
522531
class UpdateInfo:
523532
"""Information about a workflow update."""

tests/helpers/__init__.py

+35-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import uuid
55
from contextlib import closing
66
from datetime import timedelta
7-
from typing import Awaitable, Callable, Optional, Sequence, Type, TypeVar
7+
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union
88

99
from temporalio.api.common.v1 import WorkflowExecution
1010
from temporalio.api.enums.v1 import IndexedValueType
@@ -19,7 +19,9 @@
1919
from temporalio.service import RPCError, RPCStatusCode
2020
from temporalio.worker import Worker, WorkflowRunner
2121
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner
22-
from temporalio.workflow import UpdateMethodMultiParam
22+
from temporalio.workflow import (
23+
UpdateMethodMultiParam,
24+
)
2325

2426

2527
def new_worker(
@@ -150,3 +152,34 @@ async def admitted_update_task(
150152
lambda: workflow_update_exists(client, handle.id, id),
151153
)
152154
return update_task
155+
156+
157+
async def assert_workflow_exists_eventually(
158+
client: Client,
159+
workflow: Any,
160+
workflow_id: str,
161+
) -> WorkflowHandle:
162+
handle = None
163+
164+
async def check_workflow_exists() -> bool:
165+
nonlocal handle
166+
try:
167+
handle = client.get_workflow_handle_for(
168+
workflow,
169+
workflow_id=workflow_id,
170+
)
171+
await handle.describe()
172+
return True
173+
except RPCError as err:
174+
# Ignore not-found or failed precondition because child may
175+
# not have started yet
176+
if (
177+
err.status == RPCStatusCode.NOT_FOUND
178+
or err.status == RPCStatusCode.FAILED_PRECONDITION
179+
):
180+
return False
181+
raise
182+
183+
await assert_eq_eventually(True, check_workflow_exists)
184+
assert handle is not None
185+
return handle

tests/test_client.py

+2
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ async def test_describe(
367367
assert desc.status == WorkflowExecutionStatus.COMPLETED
368368
assert desc.task_queue == worker.task_queue
369369
assert desc.workflow_type == "kitchen_sink"
370+
assert desc.root_id == desc.id
371+
assert desc.root_run_id == desc.run_id
370372

371373

372374
async def test_query(client: Client, worker: ExternalWorker):

tests/worker/test_workflow.py

+66-20
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
from tests.helpers import (
113113
admitted_update_task,
114114
assert_eq_eventually,
115+
assert_workflow_exists_eventually,
115116
ensure_search_attributes_present,
116117
find_free_port,
117118
new_worker,
@@ -1126,26 +1127,9 @@ async def test_workflow_cancel_child_started(client: Client, use_execute: bool):
11261127
task_queue=worker.task_queue,
11271128
)
11281129

1129-
# Wait until child started
1130-
async def child_started() -> bool:
1131-
try:
1132-
return await handle.query(
1133-
CancelChildWorkflow.ready
1134-
) and await client.get_workflow_handle_for(
1135-
LongSleepWorkflow.run, # type: ignore[arg-type]
1136-
workflow_id=f"{handle.id}_child",
1137-
).query(LongSleepWorkflow.started)
1138-
except RPCError as err:
1139-
# Ignore not-found or failed precondition because child may
1140-
# not have started yet
1141-
if (
1142-
err.status == RPCStatusCode.NOT_FOUND
1143-
or err.status == RPCStatusCode.FAILED_PRECONDITION
1144-
):
1145-
return False
1146-
raise
1147-
1148-
await assert_eq_eventually(True, child_started)
1130+
await assert_workflow_exists_eventually(
1131+
client, LongSleepWorkflow.run, f"{handle.id}_child"
1132+
)
11491133
# Send cancel signal and wait on the handle
11501134
await handle.signal(CancelChildWorkflow.cancel_child)
11511135
with pytest.raises(WorkflowFailureError) as err:
@@ -7081,3 +7065,65 @@ async def test_workflow_priorities(client: Client, env: WorkflowEnvironment):
70817065
task_queue=worker.task_queue,
70827066
)
70837067
await handle.result()
7068+
7069+
7070+
@workflow.defn
7071+
class ExposeRootChildWorkflow:
7072+
def __init__(self) -> None:
7073+
self.blocked = True
7074+
7075+
@workflow.signal
7076+
def unblock(self) -> None:
7077+
self.blocked = False
7078+
7079+
@workflow.run
7080+
async def run(self) -> Optional[temporalio.workflow.RootInfo]:
7081+
await workflow.wait_condition(lambda: not self.blocked)
7082+
return workflow.info().root
7083+
7084+
7085+
@workflow.defn
7086+
class ExposeRootWorkflow:
7087+
@workflow.run
7088+
async def run(self, child_wf_id) -> Optional[temporalio.workflow.RootInfo]:
7089+
return await workflow.execute_child_workflow(
7090+
ExposeRootChildWorkflow.run, id=child_wf_id
7091+
)
7092+
7093+
7094+
async def test_expose_root_execution(client: Client, env: WorkflowEnvironment):
7095+
if env.supports_time_skipping:
7096+
pytest.skip(
7097+
"Java test server needs release with: https://github.com/temporalio/sdk-java/pull/2441"
7098+
)
7099+
async with new_worker(
7100+
client, ExposeRootWorkflow, ExposeRootChildWorkflow
7101+
) as worker:
7102+
parent_wf_id = f"workflow-{uuid.uuid4()}"
7103+
child_wf_id = parent_wf_id + "_child"
7104+
handle = await client.start_workflow(
7105+
ExposeRootWorkflow.run,
7106+
child_wf_id,
7107+
id=parent_wf_id,
7108+
task_queue=worker.task_queue,
7109+
)
7110+
7111+
await assert_workflow_exists_eventually(
7112+
client, ExposeRootChildWorkflow, child_wf_id
7113+
)
7114+
child_handle: WorkflowHandle = client.get_workflow_handle_for(
7115+
ExposeRootChildWorkflow.run, child_wf_id
7116+
)
7117+
child_desc = await child_handle.describe()
7118+
parent_desc = await handle.describe()
7119+
# Assert child root execution is the same as it's parent execution
7120+
assert child_desc.root_id == parent_desc.id
7121+
assert child_desc.root_run_id == parent_desc.run_id
7122+
# Unblock child
7123+
await child_handle.signal(ExposeRootChildWorkflow.unblock)
7124+
# Get the result (child info)
7125+
child_wf_info_root = await handle.result()
7126+
# Assert root execution in child info is same as it's parent execution
7127+
assert child_wf_info_root is not None
7128+
assert child_wf_info_root.workflow_id == parent_desc.id
7129+
assert child_wf_info_root.run_id == parent_desc.run_id

0 commit comments

Comments
 (0)