Roslibpy.Ros客户端二次连接失败问题求助
roslibpy.Ros客户端在容器化CI环境中无法重复连接,本地测试正常
我正在封装一个基于roslibpy的Ros客户端,但我本身不是机器人开发人员,可能对Ros客户端的基础细节有所欠缺。目前遇到了一个奇怪的连接问题,想请大家帮忙分析排查。
我的实现代码
1. RosBridge客户端封装类_PayloadRosBridgeClient
import roslibpy class _PayloadRosBridgeClient(): def __init__(self, hostname: str, port: int) -> None: self._hostname = hostname self._port = port self._ros = None self._timeout = 10 def __del__(self): self._ros.terminate() def connect(self): if self._ros is None: self._ros = Ros(self._hostname, self._port) self._ros.run(self._timeout) else: self._ros.connect() time.sleep(1) # 省略其他未展示方法
2. 调用Ros客户端的业务处理类_PayloadHandler_4_1
这个类同时集成了SSH客户端用于和物理设备交互:
class _PayloadHandler_4_1(PayloadHandlerInterface.PayloadHandlerInterface): def __init__(self, hostname: str, username: str, password: str, ssh_port: int, ros_bridge_port: int): self._lock = threading.Lock() self._ros_bridge_client = PayloadRosBridgeClient.make(hostname, ros_bridge_port) self._ssh_client = PayloadSSHClient.make( hostname, username, password, PathConfigurations.Paths(), ssh_port) self._username = username self._password = password def get_version(self) -> None: return PayloadVersions.Version_4_1 def connect(self) -> bool: try: self._ros_bridge_client.connect() self._ssh_client.connect() return True except Exception: print("PayloadHandler_4_1: could not connect to ros or ssh service") return False # 省略其他未展示方法
3. 单元测试类PayloadHandler_4_1_IntegrationTest
class PayloadHandler_4_1_IntegrationTest(unittest.TestCase): @classmethod def setUpClass(cls): cls._payload_address = socket.gethostbyname("container_name") # 省略其他初始化配置 cls._payload_manager = PayloadHandler_4_1.make( cls._payload_address, cls._payload_username, cls._payload_password, cls._ssh_port, cls._ros_port) def setUp(self): PayloadHandler_4_1_IntegrationTest._payload_manager.connect() def tearDown(self): if PayloadHandler_4_1_IntegrationTest._payload_manager.is_connected(): PayloadHandler_4_1_IntegrationTest._payload_manager.disconnect() def test_make(self): result = PayloadHandler_4_1_IntegrationTest._payload_manager self.assertIsInstance(result, PayloadHandler_4_1._PayloadHandler_4_1) def test_connect(self): handler = PayloadHandler_4_1_IntegrationTest._payload_manager self.assertTrue(handler.is_connected())
具体问题
测试环境是运行在Docker容器中的Ros实例,出现了两种不同的结果:
- 本地测试正常:测试代码直接在本地机器运行,Ros实例在本地Docker容器中,所有测试用例都能顺利通过,重复连接也没有问题。
- GitLab Runner环境失败:测试代码和Ros实例分别在两个独立的容器中,已经验证容器间网络互通(手动SSH连接正常,服务端口可访问),但单元测试执行时出现异常:
- 首次测试(
test_make方法)正常:setUp阶段连接成功,tearDown阶段断开连接; - 后续所有测试(比如
test_connect)均失败:通过rostopic echo /connected_clients可以看到新客户端已经成功连接到Ros,但roslibpy的Ros客户端却返回"Could not connect to Ros"错误。
- 首次测试(
求助需求
希望大家能提供一些排查方向或者解决方案,比如:
- roslibpy客户端重复连接的正确实现方式?我的封装代码里有没有明显的错误?
- 容器化环境下RosBridge连接的特殊注意事项?
- GitLab Runner容器环境中可能存在的网络或时序问题?
内容的提问来源于stack exchange,提问作者Iustinian Olaru




