로봇/ROS

ROS2 Minimal Tutorial - Action

with-RL 2023. 7. 14. 00:50

이 포스트는 이전 포스트에 이어서 ROS2의 Action에 대한 간단한 설명입니다.

이 포스트는 다음 과정을 완료한 후에 참고하시길 바랍니다.

1. Action

  • Action는 실행 시간이 오래 걸리는 명령을 요청하고 결과를 수신하기 위한 목적으로 만들어진 통신방법입니다.
  • Action은 Service와 비슷하지만 중간에 취소가 가능합니다.
  • Action Server는 명령을 수행하는 중간 과정에 해당하는 feedback과 처리 결과인 result를 보낼수 있습니다.
  • 1:1 통신 방식으로 요청을 보내고 중간 결과 및 처리 결과를 확인하는 통신 방식입니다.

(img src) https://docs.ros.org/en/foxy/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Actions/Understanding-ROS2-Actions.html

  • 첫 번째 터미널에서 아래 명령을 실행해서 'turtlesim.launch.py'를 실행합니다.
$ cd ~/Workspace/ros_ws
$ source install/setup.bash
$ ros2 launch ros_tutorial turtlesim.launch.py
  • 두 번째 터미널에서 아래 명령을 실행해서 '/turtle1/rotate_absolute' Action를 호출합니다. 아래 그림과 같이 Action를 호출하고 실행 결과를 수신한 것을 확인할 수 있습니다.
$ ros2 action send_goal /turtle1/rotate_absolute turtlesim/action/RotateAbsolute "{theta: 1.57}"

  • 이 명령을을 수신한 TurtleBot이 방향을 회전한 것을 확인할 수 있습니다.

  • 아래 명령을 실행해서 Action 목록을 확인합니다. 아래 그림과 같이 Action 목록을 확인할 수 있습니다. '-t' 옵션을 준 경우는 Action이 사용하는 메시지 type을 함께 확인할 수 있습니다.
$ ros2 action list -t

  • 아래 명령을 실행해서 '/turtle1/rotate_absolute' Action의 상세정보를 확인합니다.
$ ros2 action info /turtle1/rotate_absolute

  • 아래 명령을 실행해서 'turtlesim/action/RotateAbsolute' 메시지의 상세정보를 확인합니다.
    request: theta : 회전하고자 하는 목적 각도 입니다.
    ◦ response: delta : 실제 회전한 각도 입니다.
    ◦ feedback: remaining : 목적 각도까지 남은 각도 입니다.
$ ros2 interface show turtlesim/action/RotateAbsolute

2. Action - Client

  • TurtleBot을 회전하게 했던 '/turtle1/rotate_absolute' Action를 호출하는 Action Client를 python으로 구현해 보겠습니다.
  • 'src/ros_tutorial/package.xml' 파일의 의존성은 이전 과정에서 추가한 'turtlesim'과 동일하기 때문에 추가하지 않아도 됩니다.
  • 'src/ros_tutorial/setup.py'의 'console_scripts' 부분에 아래 내용을 추가합니다. 아래 그림과 같이 'turtlesim_rot_client'이라는 명령을 등록했습니다.
'turtlesim_rot_client = ros_tutorial.turtlesim_rot_client:main',

  • 'src/ros_tutorial/ros_tutorial/turtlesim_rot_client.py' 파일을 만들고 아래와 같이 편집합니다. TurtleBot $\pi$ (180°) 방향으로 회전하도록 'turtlesim/action/RotateAbsolute' 메시지를 보내고 결과를 수신하는 코드입니다.
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from turtlesim.action import RotateAbsolute


class TurtlesimRotateClient(Node):
    def __init__(self):
        super().__init__('turtlesim_rot_client')

        self.client = ActionClient(self, RotateAbsolute, '/turtle1/rotate_absolute')
        self.get_logger().info('action client started...')
    
    def send_goal(self, theta):
        goal_req = RotateAbsolute.Goal()
        goal_req.theta = theta

        if self.client.wait_for_server(10) is False:
            self.get_logger().info('service not available...')
            return
        
        goal_future = self.client.send_goal_async(goal_req, feedback_callback=self.feedback_callback)
        goal_future.add_done_callback(self.goal_callback)
    
    def goal_callback(self, future):
        self.goal_handle = future.result()

        if not self.goal_handle.accepted:
            self.get_logger().info('gole rejected...')
            return

        self.get_logger().info('gole accepted...')
        goal_future = self.goal_handle.get_result_async()
        goal_future.add_done_callback(self.result_callback)
        # cancel test
        # self.timer= self.create_timer(1.0, self.timer_callback)

    def feedback_callback(self, msg):
        feedback = msg.feedback
        self.get_logger().info(f'recv feedback: {feedback.remaining}')

    def result_callback(self, future):
        result_handle = future.result()
        res = result_handle.result
        self.get_logger().info(f'recv result: {res.delta}')

        self.destroy_node()
        rclpy.shutdown()
    
    def timer_callback(self):
        self.get_logger().info(f'canceling goal...')

        cancel_future = self.goal_handle.cancel_goal_async()
        cancel_future.add_done_callback(self.cancel_callback)
        # stop timer
        self.timer.cancel()
    
    def cancel_callback(self, future):
        result_handle = future.result()
        if len(result_handle.goals_canceling) > 0:
            self.get_logger().info('canceling goal success...')
        else:
            self.get_logger().info('canceling goal fail...')

        self.destroy_node()
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    client = TurtlesimRotateClient()
    client.send_goal(3.14)

    rclpy.spin(client)


if __name__ == '__main__':
    main()

  • 첫 번째 터미널에서 아래 명령을 실행해서 컴파일, 환경설정 그리고 'turtlesim.launch.py'를 실행합니다.
$ cd ~/Workspace/ros_ws
$ colcon build --symlink-install
$ source install/setup.bash
$ ros2 launch ros_tutorial turtlesim.launch.py
  • 두 번째 터미널에서 아래 명령을 실행해서 'turtlesim_rot_client'을 실행합니다. 아래 그림과 같이 Action Client가 실행되고 feedbak을 수신하다가 최종적으로 result를 수신하는 과정을 확인할 수 있습니다.
$ cd ~/Workspace/ros_ws
$ source install/setup.bash
$ ros2 run ros_tutorial turtlesim_rot_client

  • TurtleBot이 $\pi$ 방향으로 회전한 것을 확인할 수 있습니다.

3. Action - Server

  • '/turtle1/rotate_absolute' Action Server를 python으로 구현해 보겠습니다.
  • 'src/ros_tutorial/setup.py' 파일의 'console_scripts' 부분에 아래 내용을 추가합니다.
'turtlesim_rot_server = ros_tutorial.turtlesim_rot_server:main',

  • 'src/ros_tutorial/ros_tutorial/turtlesim_rot_server.py' 파일을 만들고 아래와같이 편집합니다.
#!/usr/bin/env python3

import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from turtlesim.action import RotateAbsolute

class TurtlesimRotateServer(Node):
    def __init__(self):
        super().__init__('turtlesim_rot_server')

        self.server = ActionServer(self, RotateAbsolute,
                                   '/turtle1/rotate_absolute',
                                   callback_group=ReentrantCallbackGroup(),
                                   execute_callback=self.execute_callback,
                                   goal_callback=self.goal_callback,
                                   cancel_callback=self.cancel_callback)
        self.get_logger().info('action server started...')
    
   
    def goal_callback(self, goal_request):
        self.get_logger().info(f'recv goal request: {goal_request}')
        return GoalResponse.ACCEPT
    
    def cancel_callback(self, cancel_request):
        self.get_logger().info(f'recv cancel request: {cancel_request}')
        return CancelResponse.ACCEPT


    async def execute_callback(self, goal_handle):
        feedback = RotateAbsolute.Feedback()
        feedback.remaining = 10.0
        for i in range(10):
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                self.get_logger().info('action canceled...')
                return RotateAbsolute.Result()

            feedback.remaining -= 1
            goal_handle.publish_feedback(feedback)
            time.sleep(1)

        goal_handle.succeed()
        self.get_logger().info('action succeed...')

        res = RotateAbsolute.Result()
        res.delta = 0.0
        return res


def main(args=None):
    rclpy.init(args=args)

    server = TurtlesimRotateServer()
    executor = MultiThreadedExecutor()
    rclpy.spin(server, executor=executor)

    server.destroy()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

  • 첫 번째 터미널에서 아래 명령을 실행해서 컴파일, 환경설정 그리고 'turtlesim_rot_server'를 실행합니다.
$ cd ~/Workspace/ros_ws
$ colcon build --symlink-install
$ source install/setup.bash
$ ros2 run ros_tutorial turtlesim_rot_server
  • 두 번째 터미널에서 아래 명령을 실행해서 'turtlesim_rot_client'을 실행합니다. 'turtlesim_rot_client'가 10번의 feedback을 받고 정상 응답을 받은 후 종료된 것을 확인할 수 있습니다.
$ cd ~/Workspace/ros_ws
$ source install/setup.bash
$ ros2 run ros_tutorial turtlesim_rot_client

  • 'turtlesim_rot_server'가 'turtlesim_rot_client'의 명령을 받아서 정상적으로 처리한 것을 확인할 수 있습니다.