使用Python3导入CSV通讯录到Telegram遇FloodWaitError求无等待批量导入方案
解决Telethon导入通讯录触发FloodWaitError的问题
首先得明确:Telegram的FloodWaitError是官方的反滥用限流机制,单次批量导入超过50条通讯录刚好触发了这个阈值——完全“无需等待”完成导入是不现实的(强行绕过可能导致账号封禁),但我们可以通过合法的方式优化流程,让程序自动处理等待、高效完成全部导入。
下面是几个可行的解决方案:
1. 捕获异常自动等待(最安全可靠)
Telethon允许我们捕获FloodWaitError,并根据异常提示的时间自动等待后重试,这样不需要手动干预,程序就能继续完成剩余导入。修改你的代码加入异常处理逻辑即可:
from telethon import TelegramClient from telethon.errors import FloodWaitError from telethon.tl.functions.contacts import ImportContactsRequest from telethon.tl.types import InputPhoneContact import time import csv # 替换成你的Telegram API信息 api_id = YOUR_API_ID api_hash = "YOUR_API_HASH" phone_number = "YOUR_PHONE_NUMBER" client = TelegramClient("contact_import_session", api_id, api_hash) async def batch_import_contacts(): with open("contacts.csv", "r", encoding="utf-8") as csv_file: csv_reader = csv.DictReader(csv_file) contact_list = list(csv_reader) total_contacts = len(contact_list) for index, contact in enumerate(contact_list, start=1): try: # 构造通讯录条目(根据你的CSV字段调整) input_contact = InputPhoneContact( client_id=0, phone=contact["phone"].strip(), first_name=contact["name"].strip(), last_name="" ) # 导入单条(或者小批量,比如每次10条) await client(ImportContactsRequest([input_contact])) print(f"已导入 {index}/{total_contacts}: {contact['name']}") # 加入随机短延迟,降低触发限流的概率 time.sleep(0.5) except FloodWaitError as e: print(f"触发限流,自动等待 {e.seconds} 秒...") time.sleep(e.seconds) # 重试当前条目 await client(ImportContactsRequest([input_contact])) print(f"重试成功: {contact['name']} ({index}/{total_contacts})") with client: client.loop.run_until_complete(batch_import_contacts())
2. 分小批次导入+主动延迟
与其等触发限流后被迫长时间等待,不如主动拆分批次,每导入一批(比如40条)就主动暂停几秒,这样能避免触发高时长的FloodWait。比如在循环中加入批次控制:
# 批量控制逻辑示例 batch_size = 40 for batch_start in range(0, total_contacts, batch_size): batch = contact_list[batch_start:batch_start+batch_size] for contact in batch: # 执行单条导入逻辑 input_contact = InputPhoneContact( client_id=0, phone=contact["phone"].strip(), first_name=contact["name"].strip(), last_name="" ) await client(ImportContactsRequest([input_contact])) print(f"已导入 {batch_start + index +1}/{total_contacts}: {contact['name']}") time.sleep(0.3) print(f"完成一批导入,暂停10秒...") time.sleep(10)
这种方式能把等待时间分散开,整体耗时可能比触发FloodWait后等待更短。
3. 多账号拆分导入(谨慎使用)
如果你的通讯录可以拆分到多个账号下(比如每个账号导入50条),可以创建多个Telethon会话分别处理不同批次的联系人。但要注意:Telegram禁止滥用多个账号,这种方式可能违反服务条款,有账号被封禁的风险,仅作为最后备选。
需要强调的是:不要尝试使用任何“绕过”限流的黑科技,Telegram的反滥用系统很严格,违规操作会导致账号永久封禁。最稳妥的方式还是方案1,让程序自动处理等待,合法完成导入。
内容的提问来源于stack exchange,提问作者Ameer




