You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Python3导入CSV通讯录到Telegram遇FloodWaitError求无等待批量导入方案

解决Telethon导入通讯录触发FloodWaitError的问题

首先得明确:Telegram的FloodWaitError是官方的反滥用限流机制,单次批量导入超过50条通讯录刚好触发了这个阈值——完全“无需等待”完成导入是不现实的(强行绕过可能导致账号封禁),但我们可以通过合法的方式优化流程,让程序自动处理等待、高效完成全部导入。

下面是几个可行的解决方案:

1. 捕获异常自动等待(最安全可靠)

Telethon允许我们捕获FloodWaitError,并根据异常提示的时间自动等待后重试,这样不需要手动干预,程序就能继续完成剩余导入。修改你的代码加入异常处理逻辑即可:

from telethon import TelegramClient
from telethon.errors import FloodWaitError
from telethon.tl.functions.contacts import ImportContactsRequest
from telethon.tl.types import InputPhoneContact
import time
import csv

# 替换成你的Telegram API信息
api_id = YOUR_API_ID
api_hash = "YOUR_API_HASH"
phone_number = "YOUR_PHONE_NUMBER"

client = TelegramClient("contact_import_session", api_id, api_hash)

async def batch_import_contacts():
    with open("contacts.csv", "r", encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file)
        contact_list = list(csv_reader)
        total_contacts = len(contact_list)

        for index, contact in enumerate(contact_list, start=1):
            try:
                # 构造通讯录条目(根据你的CSV字段调整)
                input_contact = InputPhoneContact(
                    client_id=0,
                    phone=contact["phone"].strip(),
                    first_name=contact["name"].strip(),
                    last_name=""
                )
                # 导入单条(或者小批量,比如每次10条)
                await client(ImportContactsRequest([input_contact]))
                print(f"已导入 {index}/{total_contacts}: {contact['name']}")
                # 加入随机短延迟,降低触发限流的概率
                time.sleep(0.5)
            except FloodWaitError as e:
                print(f"触发限流,自动等待 {e.seconds} 秒...")
                time.sleep(e.seconds)
                # 重试当前条目
                await client(ImportContactsRequest([input_contact]))
                print(f"重试成功: {contact['name']} ({index}/{total_contacts})")

with client:
    client.loop.run_until_complete(batch_import_contacts())

2. 分小批次导入+主动延迟

与其等触发限流后被迫长时间等待,不如主动拆分批次,每导入一批(比如40条)就主动暂停几秒,这样能避免触发高时长的FloodWait。比如在循环中加入批次控制:

# 批量控制逻辑示例
batch_size = 40
for batch_start in range(0, total_contacts, batch_size):
    batch = contact_list[batch_start:batch_start+batch_size]
    for contact in batch:
        # 执行单条导入逻辑
        input_contact = InputPhoneContact(
            client_id=0,
            phone=contact["phone"].strip(),
            first_name=contact["name"].strip(),
            last_name=""
        )
        await client(ImportContactsRequest([input_contact]))
        print(f"已导入 {batch_start + index +1}/{total_contacts}: {contact['name']}")
        time.sleep(0.3)
    print(f"完成一批导入,暂停10秒...")
    time.sleep(10)

这种方式能把等待时间分散开,整体耗时可能比触发FloodWait后等待更短。

3. 多账号拆分导入(谨慎使用)

如果你的通讯录可以拆分到多个账号下(比如每个账号导入50条),可以创建多个Telethon会话分别处理不同批次的联系人。但要注意:Telegram禁止滥用多个账号,这种方式可能违反服务条款,有账号被封禁的风险,仅作为最后备选。


需要强调的是:不要尝试使用任何“绕过”限流的黑科技,Telegram的反滥用系统很严格,违规操作会导致账号永久封禁。最稳妥的方式还是方案1,让程序自动处理等待,合法完成导入。

内容的提问来源于stack exchange,提问作者Ameer

火山引擎 最新活动