You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Roslibpy.Ros客户端二次连接失败问题求助

roslibpy.Ros客户端在容器化CI环境中无法重复连接,本地测试正常

我正在封装一个基于roslibpy的Ros客户端,但我本身不是机器人开发人员,可能对Ros客户端的基础细节有所欠缺。目前遇到了一个奇怪的连接问题,想请大家帮忙分析排查。

我的实现代码

1. RosBridge客户端封装类_PayloadRosBridgeClient

import roslibpy
class _PayloadRosBridgeClient():
    def __init__(self, hostname: str, port: int) -> None:
        self._hostname = hostname
        self._port = port
        self._ros = None
        self._timeout = 10
    def __del__(self):
        self._ros.terminate()
    def connect(self):
        if self._ros is None:
            self._ros = Ros(self._hostname, self._port)
            self._ros.run(self._timeout)
        else:
            self._ros.connect()
        time.sleep(1)
# 省略其他未展示方法

2. 调用Ros客户端的业务处理类_PayloadHandler_4_1

这个类同时集成了SSH客户端用于和物理设备交互:

class _PayloadHandler_4_1(PayloadHandlerInterface.PayloadHandlerInterface):
    def __init__(self, hostname: str, username: str, password: str, ssh_port: int, ros_bridge_port: int):
        self._lock = threading.Lock()
        self._ros_bridge_client = PayloadRosBridgeClient.make(hostname, ros_bridge_port)
        self._ssh_client = PayloadSSHClient.make(
            hostname, username, password, PathConfigurations.Paths(), ssh_port)
        self._username = username
        self._password = password
    def get_version(self) -> None:
        return PayloadVersions.Version_4_1
    def connect(self) -> bool:
        try:
            self._ros_bridge_client.connect()
            self._ssh_client.connect()
            return True
        except Exception:
            print("PayloadHandler_4_1: could not connect to ros or ssh service")
            return False
# 省略其他未展示方法

3. 单元测试类PayloadHandler_4_1_IntegrationTest

class PayloadHandler_4_1_IntegrationTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls._payload_address = socket.gethostbyname("container_name")
        # 省略其他初始化配置
        cls._payload_manager = PayloadHandler_4_1.make(
            cls._payload_address, cls._payload_username, cls._payload_password, cls._ssh_port, cls._ros_port)
    def setUp(self):
        PayloadHandler_4_1_IntegrationTest._payload_manager.connect()
    def tearDown(self):
        if PayloadHandler_4_1_IntegrationTest._payload_manager.is_connected():
            PayloadHandler_4_1_IntegrationTest._payload_manager.disconnect()
    def test_make(self):
        result = PayloadHandler_4_1_IntegrationTest._payload_manager
        self.assertIsInstance(result, PayloadHandler_4_1._PayloadHandler_4_1)
    def test_connect(self):
        handler = PayloadHandler_4_1_IntegrationTest._payload_manager
        self.assertTrue(handler.is_connected())

具体问题

测试环境是运行在Docker容器中的Ros实例,出现了两种不同的结果:

  • 本地测试正常:测试代码直接在本地机器运行,Ros实例在本地Docker容器中,所有测试用例都能顺利通过,重复连接也没有问题。
  • GitLab Runner环境失败:测试代码和Ros实例分别在两个独立的容器中,已经验证容器间网络互通(手动SSH连接正常,服务端口可访问),但单元测试执行时出现异常:
    • 首次测试(test_make方法)正常:setUp阶段连接成功,tearDown阶段断开连接;
    • 后续所有测试(比如test_connect)均失败:通过rostopic echo /connected_clients可以看到新客户端已经成功连接到Ros,但roslibpy的Ros客户端却返回"Could not connect to Ros"错误。

求助需求

希望大家能提供一些排查方向或者解决方案,比如:

  • roslibpy客户端重复连接的正确实现方式?我的封装代码里有没有明显的错误?
  • 容器化环境下RosBridge连接的特殊注意事项?
  • GitLab Runner容器环境中可能存在的网络或时序问题?

内容的提问来源于stack exchange,提问作者Iustinian Olaru

火山引擎 最新活动