You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何使用单个脚本运行多个独立的Python Telegram Bot机器人?

如何使用单个脚本运行多个独立的Python Telegram Bot机器人?

我来给你捋捋怎么用单个脚本同时运行多个完全独立的Telegram机器人,每个机器人有自己的token、命令和业务逻辑,互不干扰。

核心思路是把每个机器人的逻辑完全封装隔离,然后用asyncio来统一管理多个机器人的事件循环,避免单个机器人的阻塞影响其他机器人运行。下面给你两种实用的实现方案:

方案一:函数式封装,快速上手

这种写法简单直接,适合机器人数量不多、逻辑相对简单的场景:

首先导入所需模块:

from telegram.ext import Updater, CommandHandler
import asyncio

然后为每个机器人编写专属的命令回调函数:

# 机器人A的命令逻辑
def start_bot_a(update, context):
    update.message.reply_text("我是机器人A!命令列表:\n/start - 显示这条消息")

# 机器人B的命令逻辑
def start_bot_b(update, context):
    update.message.reply_text("我是机器人B!命令列表:\n/start - 显示这条消息\n/hello - 打招呼")

def hello_bot_b(update, context):
    update.message.reply_text("哈喽!我是机器人B~")

接着写一个通用的机器人运行函数,用来初始化并启动单个机器人:

async def run_single_bot(token, handlers):
    # 初始化当前机器人的Updater
    updater = Updater(token=token, use_context=True)
    dp = updater.dispatcher
    # 添加当前机器人的所有命令处理器
    for handler in handlers:
        dp.add_handler(handler)
    # 启动轮询
    updater.start_polling()
    # 保持机器人运行,直到收到停止信号
    await updater.idle()

最后在主函数里配置所有机器人,并用asyncio同时启动:

def main():
    # 配置机器人A:token + 对应的命令处理器
    bot_a_token = "你的机器人A的TOKEN"
    bot_a_handlers = [
        CommandHandler('start', start_bot_a)
    ]
    # 配置机器人B:token + 对应的命令处理器
    bot_b_token = "你的机器人B的TOKEN"
    bot_b_handlers = [
        CommandHandler('start', start_bot_b),
        CommandHandler('hello', hello_bot_b)
    ]

    # 创建asyncio事件循环,同时运行两个机器人
    loop = asyncio.get_event_loop()
    tasks = [
        loop.create_task(run_single_bot(bot_a_token, bot_a_handlers)),
        loop.create_task(run_single_bot(bot_b_token, bot_b_handlers))
    ]
    loop.run_until_complete(asyncio.gather(*tasks))

if __name__ == '__main__':
    main()

方案二:面向对象封装,扩展性更强

如果后续要加更多机器人,或者每个机器人的逻辑更复杂,用类封装会更清晰,维护起来也方便:

同样先导入模块:

from telegram.ext import Updater, CommandHandler
import asyncio

定义一个Telegram机器人的基类,用来封装通用的初始化和运行逻辑:

class TelegramBot:
    def __init__(self, token):
        # 初始化当前机器人的Updater和Dispatcher
        self.updater = Updater(token=token, use_context=True)
        self.dispatcher = self.updater.dispatcher

    # 给当前机器人添加命令的方法
    def add_command(self, command, callback):
        self.dispatcher.add_handler(CommandHandler(command, callback))

    # 运行当前机器人的方法
    async def run(self):
        self.updater.start_polling()
        await self.updater.idle()

然后还是编写每个机器人的专属命令函数:

def start_bot_a(update, context):
    update.message.reply_text("我是机器人A!命令列表:\n/start - 显示这条消息")

def start_bot_b(update, context):
    update.message.reply_text("我是机器人B!命令列表:\n/start - 显示这条消息\n/hello - 打招呼")

def hello_bot_b(update, context):
    update.message.reply_text("哈喽!我是机器人B~")

最后在主函数里实例化每个机器人,配置命令后统一启动:

async def main():
    # 实例化机器人A并配置命令
    bot_a = TelegramBot("你的机器人A的TOKEN")
    bot_a.add_command('start', start_bot_a)

    # 实例化机器人B并配置命令
    bot_b = TelegramBot("你的机器人B的TOKEN")
    bot_b.add_command('start', start_bot_b)
    bot_b.add_command('hello', hello_bot_b)

    # 用asyncio同时运行所有机器人
    await asyncio.gather(bot_a.run(), bot_b.run())

if __name__ == '__main__':
    asyncio.run(main())

注意事项

  • 每个机器人的token必须是Telegram官方提供的有效独立token,对应各自的机器人账号
  • 所有机器人的命令处理器、业务逻辑都是完全隔离的,因为每个机器人有自己独立的Dispatcher实例
  • 如果你的机器人有耗时的IO操作(比如调用外部API、读写大文件),一定要用异步方式处理,避免阻塞整个事件循环影响其他机器人

备注:内容来源于stack exchange,提问作者Bijan

火山引擎 最新活动