You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何从Flask Python应用将JSON数据传递给Discord机器人并在Discord频道发布

将Flask接收的JSON传递给Discord机器人并发布到指定频道

根据你的需求,我提供两种常见的实现方案,你可以根据自己的项目规模和偏好选择:


方案一:直接通过Discord Webhook发送(无需机器人参与)

这是最简单的方式,Flask接收JSON后直接调用Discord的Webhook接口发送消息,不需要和你的bot.py做交互,适合快速实现需求的场景。

步骤:

  1. 在Discord服务器中创建Webhook:
    进入服务器设置 → 集成 → Webhooks → 新建Webhook,选择目标频道,复制生成的Webhook URL。

  2. 修改Flask代码:

from flask import Flask, json, request
import requests

app = Flask(__name__)
# 替换成你自己的Discord Webhook URL
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/xxx/xxx"

@app.route('/', methods=['GET'])
def index():
    return 'working ...'

@app.route('/webhook', methods=['POST'])
def jiraJSON():
    if request.is_json:
        data = request.get_json()
        print(data)
        
        # 自定义消息格式,你可以根据JSON结构调整展示内容
        message_content = f"收到新的Jira事件:\n```json\n{json.dumps(data, indent=2)}\n```"
        
        # 发送到Discord
        requests.post(DISCORD_WEBHOOK_URL, json={"content": message_content})
        
        return 'json received', 200
    else:
        return 'Failed - Not a JSON', 400

if __name__ == '__main__':
    app.run()

方案二:让Discord机器人接收并发送消息(进程间通信)

如果你希望必须由你的bot.py来发送消息,需要实现Flask和机器人之间的通信。这里提供两种常见的通信方式:

方式A:将Flask和机器人运行在同一个异步事件循环中

使用Quart(Flask的异步版本)和Discord.py的异步特性,让两者共享一个消息队列,Flask将JSON放入队列,机器人从队列取出并发送。

步骤:

  1. 安装依赖:
pip install quart discord.py python-dotenv
  1. 合并或修改代码(这里以单文件为例,也可以拆分):
# combined.py
import os
import json
import discord
from dotenv import load_dotenv
from quart import Quart, request
import asyncio

load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
# 替换成你的目标频道ID(需要开启Discord开发者模式才能复制)
CHANNEL_ID = int(os.getenv('DISCORD_CHANNEL_ID'))

app = Quart(__name__)
client = discord.Client(intents=discord.Intents.default())
# 创建异步消息队列
message_queue = asyncio.Queue()

@client.event
async def on_ready():
    print(f'{client.user} 已连接到Discord!')
    # 启动队列监听任务
    asyncio.create_task(process_message_queue())

async def process_message_queue():
    while True:
        data = await message_queue.get()
        channel = client.get_channel(CHANNEL_ID)
        if channel:
            # 格式化消息内容
            msg = f"收到新的Payload:\n```json\n{json.dumps(data, indent=2)}\n```"
            await channel.send(msg)
        message_queue.task_done()

@app.route('/', methods=['GET'])
async def index():
    return 'working ...'

@app.route('/webhook', methods=['POST'])
async def jiraJSON():
    if request.is_json:
        data = await request.get_json()
        print(data)
        # 将数据放入队列
        await message_queue.put(data)
        return 'json received', 200
    else:
        return 'Failed - Not a JSON', 400

async def main():
    # 同时运行机器人和Quart应用
    bot_task = asyncio.create_task(client.start(TOKEN))
    app_task = asyncio.create_task(app.run_task(host='0.0.0.0', port=5000))
    await asyncio.gather(bot_task, app_task)

if __name__ == '__main__':
    asyncio.run(main())
  1. 注意事项:
    • 在Discord开发者门户中为你的机器人开启必要的Intents(至少开启"Server Members Intent"和"Message Content Intent")
    • 开启Discord开发者模式:设置 → 高级 → 开发者模式,右键目标频道复制ID

方式B:使用Redis作为消息中间件(分离Flask和机器人进程)

如果希望Flask和机器人各自独立运行,可以用Redis作为两者的通信桥梁,Flask发布消息到Redis频道,机器人订阅该频道并处理消息。

步骤:

  1. 安装依赖:
pip install redis flask discord.py python-dotenv
  1. 启动Redis服务(本地安装或使用云服务)

  2. 修改bot.py

# bot.py
import os
import json
import discord
from dotenv import load_dotenv
import redis
import asyncio

load_dotenv()
TOKEN = os.getenv('DISCORD_TOKEN')
CHANNEL_ID = int(os.getenv('DISCORD_CHANNEL_ID'))
# Redis连接地址,本地默认是redis://localhost:6379
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379')

client = discord.Client(intents=discord.Intents.default())
r = redis.Redis.from_url(REDIS_URL)

@client.event
async def on_ready():
    print(f'{client.user} 已连接到Discord!')
    asyncio.create_task(listen_redis_channel())

async def listen_redis_channel():
    pubsub = r.pubsub()
    pubsub.subscribe('flask_to_discord')  # 订阅指定频道
    while True:
        message = pubsub.get_message(ignore_subscribe_messages=True)
        if message:
            data = json.loads(message['data'])
            channel = client.get_channel(CHANNEL_ID)
            if channel:
                msg = f"收到新的Payload:\n```json\n{json.dumps(data, indent=2)}\n```"
                await channel.send(msg)
        await asyncio.sleep(1)

client.run(TOKEN)
  1. 修改app.py
# app.py
from flask import Flask, json, request
import redis
import json

app = Flask(__name__)
REDIS_URL = 'redis://localhost:6379'
r = redis.Redis.from_url(REDIS_URL)

@app.route('/', methods=['GET'])
def index():
    return 'working ...'

@app.route('/webhook', methods=['POST'])
def jiraJSON():
    if request.is_json:
        data = request.get_json()
        print(data)
        # 发布消息到Redis频道
        r.publish('flask_to_discord', json.dumps(data))
        return 'json received', 200
    else:
        return 'Failed - Not a JSON', 400

if __name__ == '__main__':
    app.run()

内容的提问来源于stack exchange,提问作者Bogdan Adrian Velica

火山引擎 最新活动