티스토리 뷰

Computer Science/ROS

ROS

흔한 학생 2025. 2. 17. 17:32

ROS는 2007년 스탠포드 대학 AI LAB의 프로젝트에 사용된 인공지능 로봇의 개발을 위해 만든 Switchyard 에서 시작했다. 그리고 2008년 로봇 전문 기업 윌로우게러지가 이어받아 ROS라는 이름으로 개발하기 시작했다. 2010년 ROS Box Turtle이라는 이름으로 첫 ROS가 릴리즈되었고 새로운 버전이 이어지고 있다. 현재는 OSRF(Open Source Robotics Foundation)이 관리를 맡고 있으며 오픈 소스로 공개되어 있다. 

ROS1은 개발 당시 단일 로봇, 리눅스 환경, 실시간 제어 불가능, 안정된 네트워크 환경, 연구용 등과 같은 제한 사항이 있었다. 때문에 새로운 개발 환경이 요구되면서 ROS2가 개발되었다. ROS1과 비교하여 ROS2에는 다음과 같은 특징이 있다.

  • 멀티플랫폼 지원
  • DDS(Data Distribution Service) 채용
  • ROS 클라이언트 라이브러리
  • ament 빌드 시스템과 colcon
  • IDL 채용
  • python을 이용한 roslaunch
  • 하나의 프로세스에서 여러 노드 실행
  • 실시간 제어
  • 그래프 표현
  • 임베디드 시스템 지원 
  • 네임 서비스 

현재는 2024년에 출시한 Jazzy 버전이 가장 최신 버전이다.

 


- ROS topic, service, action의 개념/사용법

Topic은 비동기적 통신을 위한 메커니즘이다. Publisher(발행자)가 특정 msg 메시지형태의 메시지를 발행하면, Subscriber(구독자)가 이를 수신한다. 

  • 단방향 통신: Publisher → Subscriber로만 메시지가 전달됩니다.
  • 비동기적: Publisher와 Subscriber는 서로의 상태를 알 필요 없이 독립적으로 동작합니다.
  • 1:N 통신: 하나의 Topic에 여러 Publisher와 Subscriber가 연결될 수 있습니다.

주고받는 데이터 형태 또한 정의할 수 있는데 예를들어 `geometry_msgs/msgs/Twist` 형태이면 geometry_msgs 패키지의 msgs 분류의 Twist 형태인 것이다. 

Service는 동기적 통신을 위한 메커니즘이다. 서비스 클라이언트가 서버에 요청(Request)을 보내면, 서비스 서버가 응답(Response)을 반환한다. 주고받는 데이터는 msg 인터페이스의 변형인 srv 인터페이스라고 한다. 일회성 메시지이기에 네트워크에 부하가 적다.

  • 양방향 통신: 클라이언트 → 서버로 요청, 서버 → 클라이언트로 응답.
  • 동기적: 클라이언트는 서버의 응답을 기다립니다.
  • 1:1 통신: 일반적으로 하나의 클라이언트와 하나의 서버 간 통신.

Action은 장시간 비동기 작업을 위한 메커니즘이다. Goal(목표), Feedback(진행 상태), Result(결과)로 구성된다. action은 토픽의 혼합이라고 볼 수 있다. Action Client가 goal을 Action Server로 요청한다. 동시에 Server는 Client로 feedback을 보낸다. 마지막으로 goal을 마칠 때 Client로 result를 보낸다.

  • 비동기적 통신: 클라이언트는 서버의 작업 완료를 기다리지 않습니다.
  • Feedback 제공: 작업 진행 상태를 실시간으로 전달할 수 있습니다.
  • 복잡한 작업에 적합: 장시간 실행되는 작업(예: 경로 계획, 네비게이션)에 사용됩니다.

 

- custom topic message, service, action 만들기

 

새로운 패키지를 만들고 해당 패키지 폴더에 있는 package.xml 파일과 CMakeLists 파일을 수정해야했다.
xml 파일에는 아래 내용을 추가했다.

<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>

 

그 다음 CMake 파일을 수정했다.

find_package(catkin REQUIRED COMPONENTS
  roscpp
  rospy
  std_msgs
  message_generation
)

또 새로 만든 파이썬 파일을 실행할 수 있게 아래처럼 수정했다.

catkin_install_python(PROGRAMS
  src/talker.py
  DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

 

파이썬 파일은 수정해도 빌드를 하지 않아도 되지만 C++파일은 수정하면 빌드를 다시해야한다. 

빌드는 워크스페이스 경로에 위치한 다음 catkin_make로 빌드를 한다. 

topic 실습

가장 기초적인 turtlesim을 publisher를 이용해 움직여보려고 한다.

rostopoic list 로 토픽을 확인할 수 있었고 /turtle1/cmd_vel 값을 날려주려고 한다.

Publisher는 rospy에 정의되어 있고 파라미터로는 토픽의 이름, 메시지의 타입, queue_size가 들어가야한다. 

#!/usr/bin/env python	# 파이썬을 쓴다면 반드시 달아주자
#-*- coding:utf-8 -*-	# 한글 주석을 달기 위해 사용한다.

import rospy				# ROS 라이브러리
from geometry_msgs.msg import Twist	# 패키지의 메시지 파일

def main():
    # 퍼블리시 노드 초기화
    ## 노드 이름 talker
    rospy.init_node('talker', anonymous=True)
    
    # 퍼블리셔 변수
    ## 퍼블리시 토픽 이름 chatter, 메시지 타입 test_msg
    pub = rospy.Publisher('/turtle1/cmd_vel', Twist, queue_size=10)
    
    # 10헤르츠마다 반복(변수=rate)
    rate = rospy.Rate(10) # 10hz

    twist = Twist()	# 메시지 변수 선언
    twist.linear.x = 0.2
    twist.angular.z = 0.2

    # 중단되거나 사용자가 강제종료(ctrl+C) 전까지 계속 실행
    while not rospy.is_shutdown():
        
        # 메시지를 퍼블리시
        pub.publish(twist)
        
        # 정해둔 주기(hz)만큼 일시중단
        rate.sleep()

if __name__ == '__main__':
    try:
        main()
    except rospy.ROSInterruptException:
        pass

 

service 실습

마찬가지로 turtlesim에서 가능한 service를 다뤄볼 것이다.
rosservice list로 확인가능하고 teleport_absolute 를 이용할 것이다. 
rosservice info /turtle1/teleport_absolute 명령어를 입력하면 데이터타입을 알 수 있다.

teleport_absolute로 turtle의 위치를 이동시킬 것이다.

서비스 클라이언트는 rospy에 정의된 ServiceProxy를 이용해 만들 수 있다. 
파라미터로는 서비스 이름과 타입이 들어가야한다.

#!/usr/bin/env python

import rospy
from turtlesim.srv import TeleportAbsolute

def teleport_turtle(x, y, theta):
    rospy.wait_for_service('/turtle1/teleport_absolute')  # 서비스가 실행될 때까지 대기
    try:
        teleport = rospy.ServiceProxy('/turtle1/teleport_absolute', TeleportAbsolute)  # 서비스 클라이언트 생성
        teleport(x, y, theta)  # 서비스 호출 (x, y, 회전각)
        rospy.loginfo("Turtle moved to x: %.2f, y: %.2f, theta: %.2f" % (x, y, theta))
    except rospy.ServiceException as e:
        rospy.logerr("Service call failed: %s" % e)

if __name__ == '__main__':
    rospy.init_node('turtle_teleporter')
    teleport_turtle(5.0, 5.0, 0.0)  # 거북이를 (5,5) 위치로 이동

 

action 실습

ROS의 기본 패키지를 이용해 action server 가 피보나치 수열을 반환하는 예제를 살펴보려고 한다.
터미널에서 요청을 보내면  서버에서 나오는 feedback과 result를 확인할 수 있다.

action 메시지의 구성은 예를 들어 다음과 같이 Goal, Result, Feedback 순서대로 쓸 수 있다.

float32 x
float32 y
---
bool success
---
float32 progress

 

action server 는 actionlib에 정의된 SimpleActionServer 을 이용해 만들 수 있고 action client는 SimpleActionClient 를 이용할 수 있다. 
action 서버 객체에 들어가는 파라미터는 action 이름, 타입, 실행할 콜백 함수, auto_start 가 있다.

#! /usr/bin/env python
import rospy
 
import actionlib
 
from actionlib_tutorials.msg import FibonacciFeedback, FibonacciResult, FibonacciAction
 
class FibonacciClass(object):
 
  # create messages that are used to publish feedback/result
  _feedback = FibonacciFeedback()
  _result   = FibonacciResult()
 
  def __init__(self):
    # creates the action server
    self._as = actionlib.SimpleActionServer("fibonacci_as", FibonacciAction, self.goal_callback, False)
    self._as.start()
 
  def goal_callback(self, goal):
    # this callback is called when the action server is called.
    # this is the function that computes the Fibonacci sequence
    # and returns the sequence to the node that called the action server
 
    # helper variables
    r = rospy.Rate(1)
    success = True
 
    # append the seeds for the fibonacci sequence
    self._feedback.sequence = []
    self._feedback.sequence.append(0)
    self._feedback.sequence.append(1)
 
    # publish info to the console for the user
    rospy.loginfo('"fibonacci_as": Executing, creating fibonacci sequence of order %i with seeds %i, %i' % ( goal.order, self._feedback.sequence[0], self._feedback.sequence[1]))
 
    # starts calculating the Fibonacci sequence
    fibonacciOrder = goal.order
    for i in xrange(1, fibonacciOrder):
 
      # check that preempt (cancelation) has not been requested by the action client
      if self._as.is_preempt_requested():
        rospy.loginfo('The goal has been cancelled/preempted')
        # the following line, sets the client in preempted state (goal cancelled)
        self._as.set_preempted()
        success = False
        # we end the calculation of the Fibonacci sequence
        break
 
      # builds the next feedback msg to be sent
      self._feedback.sequence.append(self._feedback.sequence[i] + self._feedback.sequence[i-1])
      # publish the feedback
      self._as.publish_feedback(self._feedback)
      # the sequence is computed at 1 Hz frequency
      r.sleep()
 
    # at this point, either the goal has been achieved (success==true)
    # or the client preempted the goal (success==false)
    # If success, then we publish the final result
    # If not success, we do not publish anything in the result
    if success:
      self._result.sequence = self._feedback.sequence
      rospy.loginfo('Succeeded calculating the Fibonacci of order %i' % fibonacciOrder )
      self._as.set_succeeded(self._result)
 
if __name__ == '__main__':
  rospy.init_node('fibonacci')
  FibonacciClass()
  rospy.spin()

 rostopic echo /fibonacci_as/feedback 명령어를 실행하고 

터미널에서 요청을 보내면 

rostopic pub /fibonacci_as/goal actionlib_tutorials/FibonacciActionGoal "header:
  seq: 0
  stamp:
    secs: 0
    nsecs: 0
  frame_id: ''
goal_id:
  stamp:
    secs: 0
    nsecs: 0
  id: ''
goal:
  order: 10"

피드백을 확인하는 터미널에 이와 같이 출력됨을 확인할 수 있다.

- Gazebo를 이용한 turtlebot 제어(키보드)

turtlebot3_teleop_key 를 이용하면 제어할 수 있다.

export TURTLEBOT3_MODEL=waffle
roslaunch turtlebot3_gazebo turtlebot3_world.launch
roslaunch turtlebot3_teleop turtlebot3_teleop_key.launch

원본 코드는 다음과 같다. 

더보기
#!/usr/bin/env python

# Copyright (c) 2011, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#    * Neither the name of the Willow Garage, Inc. nor the names of its
#      contributors may be used to endorse or promote products derived from
#       this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import rospy
from geometry_msgs.msg import Twist
import sys, select, os
# OS가 윈도우인지 구분하여 모듈 임포트
if os.name == 'nt':
  import msvcrt
else:
  import tty, termios

BURGER_MAX_LIN_VEL = 0.22
BURGER_MAX_ANG_VEL = 2.84

WAFFLE_MAX_LIN_VEL = 0.26
WAFFLE_MAX_ANG_VEL = 1.82

LIN_VEL_STEP_SIZE = 0.01
ANG_VEL_STEP_SIZE = 0.1

msg = """
Control Your TurtleBot3!
---------------------------
Moving around:
        w
   a    s    d
        x
w/x : increase/decrease linear velocity (Burger : ~ 0.22, Waffle and Waffle Pi : ~ 0.26)
a/d : increase/decrease angular velocity (Burger : ~ 2.84, Waffle and Waffle Pi : ~ 1.82)
space key, s : force stop
CTRL-C to quit
"""

e = """
Communications Failed
"""

# stdin에서 글자를 읽어옵니다.
# release 되는 키를 확인할 수 없습니다.
def getKey():
    # 윈도우
    if os.name == 'nt':
      return msvcrt.getch()
    # 유닉스 계열
    tty.setraw(sys.stdin.fileno())
    rlist, _, _ = select.select([sys.stdin], [], [], 0.1)
    if rlist:
        key = sys.stdin.read(1)
    else:
        key = ''
    # 입력 버퍼 정상화? 초기화?
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
    return key

# 현재 속도 출력
def vels(target_linear_vel, target_angular_vel):
    return "currently:\tlinear vel %s\t angular vel %s " % (target_linear_vel,target_angular_vel)

# 속도를 완만하게 증가시킵니다.
def makeSimpleProfile(output, input, slop):
    if input > output:
        output = min( input, output + slop )
    elif input < output:
        output = max( input, output - slop )
    else:
        output = input

    return output

# 일정 범위 clamp 함수
# checkLinearLimitVelocity/checkAngularLimitVelocity에 사용되어
# 속도가 일정 범위를 넘어가지 못하게 합니다.
def constrain(input, low, high):
    if input < low:
      input = low
    elif input > high:
      input = high
    else:
      input = input

    return input

# 병진 속도 범위 체크 함수
def checkLinearLimitVelocity(vel):
    if turtlebot3_model == "burger":
      vel = constrain(vel, -BURGER_MAX_LIN_VEL, BURGER_MAX_LIN_VEL)
    elif turtlebot3_model == "waffle" or turtlebot3_model == "waffle_pi":
      vel = constrain(vel, -WAFFLE_MAX_LIN_VEL, WAFFLE_MAX_LIN_VEL)
    else:
      vel = constrain(vel, -BURGER_MAX_LIN_VEL, BURGER_MAX_LIN_VEL)

    return vel

# 회전 속도 범위 체크 함수
def checkAngularLimitVelocity(vel):
    if turtlebot3_model == "burger":
      vel = constrain(vel, -BURGER_MAX_ANG_VEL, BURGER_MAX_ANG_VEL)
    elif turtlebot3_model == "waffle" or turtlebot3_model == "waffle_pi":
      vel = constrain(vel, -WAFFLE_MAX_ANG_VEL, WAFFLE_MAX_ANG_VEL)
    else:
      vel = constrain(vel, -BURGER_MAX_ANG_VEL, BURGER_MAX_ANG_VEL)

    return vel

if __name__=="__main__":
    # 윈도우가 아닐 경우 표준 입력 버퍼의 초기값을 저장합니다.
    if os.name != 'nt':
        settings = termios.tcgetattr(sys.stdin)

    rospy.init_node('turtlebot3_teleop')
    # 토픽 이름이 cmd_vel로 고정된 모습입니다.
    pub = rospy.Publisher('cmd_vel', Twist, queue_size=10)

    turtlebot3_model = rospy.get_param("model", "burger")

    status = 0
    target_linear_vel   = 0.0
    target_angular_vel  = 0.0
    control_linear_vel  = 0.0
    control_angular_vel = 0.0

    try:
        print(msg)
        while(1):
            # 키 입력 하나를 받아옵니다.
            key = getKey()
            # if/elif/else 구문까지 사용하여 멀티 키 입력을 받을 수 없습니다.
            # 입력에 따라 속도를 증가/감소시키고 출력합니다.
            # 상태를 1 증가시킵니다.
            if key == 'w' :
                target_linear_vel = checkLinearLimitVelocity(target_linear_vel + LIN_VEL_STEP_SIZE)
                status = status + 1
                print(vels(target_linear_vel,target_angular_vel))
            elif key == 'x' :
                target_linear_vel = checkLinearLimitVelocity(target_linear_vel - LIN_VEL_STEP_SIZE)
                status = status + 1
                print(vels(target_linear_vel,target_angular_vel))
            elif key == 'a' :
                target_angular_vel = checkAngularLimitVelocity(target_angular_vel + ANG_VEL_STEP_SIZE)
                status = status + 1
                print(vels(target_linear_vel,target_angular_vel))
            elif key == 'd' :
                target_angular_vel = checkAngularLimitVelocity(target_angular_vel - ANG_VEL_STEP_SIZE)
                status = status + 1
                print(vels(target_linear_vel,target_angular_vel))
            # 스페이스바나 s가 입력될 때 속도를 0으로 설정합니다.
            elif key == ' ' or key == 's' :
                target_linear_vel   = 0.0
                control_linear_vel  = 0.0
                target_angular_vel  = 0.0
                control_angular_vel = 0.0
                print(vels(target_linear_vel, target_angular_vel))
            # 그 외 입력 처리
            else:
                # ctrl + c를 처리합니다.
                if (key == '\x03'):
                    break

            if status == 20 :
                print(msg)
                status = 0

            twist = Twist()

            # 속도를 완만하게 증가시킨 뒤, 이를 토대로 메시지를 만듭니다.
            control_linear_vel = makeSimpleProfile(control_linear_vel, target_linear_vel, (LIN_VEL_STEP_SIZE/2.0))
            twist.linear.x = control_linear_vel; twist.linear.y = 0.0; twist.linear.z = 0.0

            control_angular_vel = makeSimpleProfile(control_angular_vel, target_angular_vel, (ANG_VEL_STEP_SIZE/2.0))
            twist.angular.x = 0.0; twist.angular.y = 0.0; twist.angular.z = control_angular_vel

            # /cmd_vel 메시지가 퍼블리시 됩니다.
            pub.publish(twist)

    except:
        print(e)

    finally:
        # ctrl + c를 누르면 터틀봇이 더이상 이동하지 않도록 속도를 0으로 만들어줍니다.
        twist = Twist()
        twist.linear.x = 0.0; twist.linear.y = 0.0; twist.linear.z = 0.0
        twist.angular.x = 0.0; twist.angular.y = 0.0; twist.angular.z = 0.0
        pub.publish(twist)

    # 윈도우가 아닐 경우, 표준 입력 버퍼를 원상태로 회복합니다.
    if os.name != 'nt':
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)

 

- Gazebo에서 카메라 LiDAR 데이터 취득하여 Rviz, RQT 에서 시각화

 

반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함