diff --git a/launch_ros/launch_ros/actions/lifecycle_node.py b/launch_ros/launch_ros/actions/lifecycle_node.py index d5c9ad89..af7e47e4 100644 --- a/launch_ros/launch_ros/actions/lifecycle_node.py +++ b/launch_ros/launch_ros/actions/lifecycle_node.py @@ -16,11 +16,16 @@ from typing import List from typing import Optional +from typing import Union import launch from launch import SomeSubstitutionsType from launch.action import Action +from launch.frontend import Entity +from launch.frontend import expose_action +from launch.frontend import Parser import launch.logging +from launch.utilities import type_utils import lifecycle_msgs.msg import lifecycle_msgs.srv @@ -31,6 +36,7 @@ from ..utilities import LifecycleEventManager +@expose_action('lifecycle_node') class LifecycleNode(Node): """Action that executes a ROS lifecycle node.""" @@ -39,7 +45,7 @@ def __init__( *, name: SomeSubstitutionsType, namespace: SomeSubstitutionsType, - autostart: bool = False, + autostart: Union[bool, SomeSubstitutionsType] = False, **kwargs ) -> None: """ @@ -73,14 +79,25 @@ def __init__( """ super().__init__(name=name, namespace=namespace, **kwargs) self.__logger = launch.logging.get_logger(__name__) - self.__autostart = autostart + self.__autostart = type_utils.normalize_typed_substitution(autostart, bool) self.__lifecycle_event_manager = None @property - def node_autostart(self): + def node_autostart(self) -> Union[bool, SomeSubstitutionsType]: """Getter for autostart.""" return self.__autostart + @classmethod + def parse(cls, entity: Entity, parser: Parser): + """Return `LifecycleNode` action and kwargs for constructing it.""" + _, kwargs = super().parse(entity, parser) + + autostart = entity.get_attr('autostart', data_type=bool, optional=True, can_be_str=True) + if autostart is not None: + kwargs['autostart'] = parser.parse_if_substitutions(autostart) + + return cls, kwargs + @property def is_lifecycle_node(self): return True @@ -103,7 +120,7 @@ def execute(self, context: launch.LaunchContext) -> Optional[List[Action]]: # If autostart is enabled, transition to the 'active' state. autostart_actions = None - if self.node_autostart: + if type_utils.perform_typed_substitution(context, self.node_autostart, bool): autostart_actions = [ LifecycleTransition( lifecycle_node_names=[self.node_name], diff --git a/test_launch_ros/test/test_launch_ros/frontend/test_lifecycle_node_frontend.py b/test_launch_ros/test/test_launch_ros/frontend/test_lifecycle_node_frontend.py new file mode 100644 index 00000000..0a5e0ce7 --- /dev/null +++ b/test_launch_ros/test/test_launch_ros/frontend/test_lifecycle_node_frontend.py @@ -0,0 +1,163 @@ +# Copyright 2025 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import io +import pathlib +import textwrap + +from launch import LaunchService +from launch.frontend import Parser +from launch.utilities import type_utils +from launch_ros.actions import LifecycleNode +from launch_ros.utilities import evaluate_parameters +import osrf_pycommon.process_utils + +yaml_params = str(pathlib.Path(__file__).parent / 'params.yaml') + +# Escape backslashes if any to keep them after parsing takes place +yaml_params = yaml_params.replace('\\', '\\\\') + + +def test_launch_frontend_xml(): + xml_file = textwrap.dedent( + r""" + + + + + + + + + + + + + + + + + """.format(yaml_params)) # noqa: E501 + + with io.StringIO(xml_file) as f: + check_launch_lifecycle_node(f) + + +def test_launch_frontend_yaml(): + yaml_file = textwrap.dedent( + r""" + launch: + - lifecycle_node: + pkg: lifecycle + exec: lifecycle_talker + output: screen + name: my_lc_talker + namespace: my_lc_ns + exec_name: my_lc_talker_process + ros_args: "--log-level info --log-file-name filename" + param: + - name: param1 + value: ads + - name: param_group1 + param: + - name: param_group2 + param: + - name: param2 + value: 2 + - name: param3 + value: [2, 5, 8] + - from: {} + env: + - name: var + value: '1' + remap: + - from: "foo" + to: "bar" + - from: "baz" + to: "foobar" + - lifecycle_node: + pkg: lifecycle + exec: lifecycle_talker + output: screen + name: my_lc_auto_talker + namespace: my_lc_auto_ns + exec_name: my_lc_talker_auto_process + autostart: True + """.format(yaml_params)) + + with io.StringIO(yaml_file) as f: + check_launch_lifecycle_node(f) + + +def check_launch_lifecycle_node(file): + root_entity, parser = Parser.load(file) + ld = parser.parse_description(root_entity) + ls = LaunchService() + ls.include_launch_description(ld) + + loop = osrf_pycommon.process_utils.get_loop() + launch_task = loop.create_task(ls.run_async()) + + lc_talker_node, lc_auto_talker_node = ld.describe_sub_entities() + + assert isinstance(lc_talker_node, LifecycleNode) + assert not type_utils.perform_typed_substitution( + ls.context, lc_talker_node.node_autostart, bool) + + assert isinstance(lc_auto_talker_node, LifecycleNode) + assert type_utils.perform_typed_substitution( + ls.context, lc_auto_talker_node.node_autostart, bool) + + evaluated_parameters = evaluate_parameters(ls.context, lc_talker_node._Node__parameters) + + assert len(evaluated_parameters) == 3 + assert isinstance(evaluated_parameters[0], dict) + assert isinstance(evaluated_parameters[1], dict) + assert isinstance(evaluated_parameters[2], pathlib.Path) + + assert len(evaluated_parameters[0]) == 1 + assert 'param1' in evaluated_parameters[0] + assert evaluated_parameters[0]['param1'] == 'ads' + + param_dict = evaluated_parameters[1] + assert len(param_dict) == 2 + assert 'param_group1.param_group2.param2' in param_dict + assert 'param_group1.param3' in param_dict + assert param_dict['param_group1.param_group2.param2'] == 2 + assert param_dict['param_group1.param3'] == [2, 5, 8] + + assert evaluated_parameters[2] == pathlib.PurePath(yaml_params) + + assert len(lc_auto_talker_node._Node__parameters) == 0 + + # Check remappings exist + remappings = lc_talker_node._Node__remappings + assert remappings is not None + assert len(remappings) == 2 + + assert len(lc_auto_talker_node._Node__remappings) == 0 + + timeout_sec = 5 + loop.run_until_complete(asyncio.sleep(timeout_sec)) + if not launch_task.done(): + loop.create_task(ls.shutdown()) + loop.run_until_complete(launch_task) + assert 0 == launch_task.result() + + talker_node_cmd_string = ' '.join(lc_talker_node.process_details['cmd']) + assert '--ros-args --log-level info --log-file-name filename' in talker_node_cmd_string + + assert lc_talker_node.node_name == '/my_lc_ns/my_lc_talker' + assert lc_auto_talker_node.node_name == '/my_lc_auto_ns/my_lc_auto_talker'