You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

异步长运行任务Rest API:替代轮询的非轮询方案咨询

Non-Polling Alternatives for Your Flask-RESTPlus Batch Processing API

Let’s dive into the non-polling options you can use to replace your current setup—since your architecture team is pushing back on busy-wait polling, these are all solid alternatives tailored to your Flask-RESTPlus + Celery/RabbitMQ workflow:

1. Callback URLs (Webhooks)

This is the option you already know about, and it’s a perfect fit for internal API integrations. Here’s how to make it work for your flow:

  • Updated Workflow: When clients call the /run endpoint, they include a callback_url parameter (or you can pre-configure trusted endpoints in your system for internal services). Once the Celery worker finishes processing the XLSX to XML conversion, it sends a POST request directly to this callback URL with details like task status, file download URL, and any error messages.
  • Implementation Tips:
    • In your Celery task, after processing completes, use a library like requests to hit the callback URL:
      import requests
      def process_xlsx_task(file_id, callback_url):
          # ... processing logic ...
          result = {"status": "completed", "download_url": f"/download/{file_id}"}
          try:
              requests.post(callback_url, json=result, timeout=10)
          except requests.exceptions.RequestException as e:
              # Handle failure: send to a dead-letter queue for retries
              send_to_dead_letter_queue(callback_url, result, e)
      
    • Add retry logic for failed callbacks (e.g., using Celery’s retry decorator or a separate dead-letter queue) to ensure internal services don’t miss updates.
    • Validate callback URLs (e.g., check against a whitelist of internal domains) and sign the payload with a secret key to prevent spoofing.
  • Why It’s Great: No client polling required—your API proactively notifies consumers when the job is done, which aligns perfectly with your architecture team’s feedback.

2. Server-Sent Events (SSE)

If you need real-time status updates (e.g., for a frontend dashboard or a service that wants progress updates), SSE is a lightweight, server-to-client push alternative:

  • Updated Workflow: After the client calls /run and gets a task ID, they open a persistent SSE connection to an endpoint like /stream/{task_id}. Your Celery worker pushes status updates (e.g., "Processing sheet 3 of 5", "Completed") to this stream as the job progresses. When done, it sends the download URL via the stream.
  • Implementation Tips:
    • Use the flask-sse extension to handle SSE connections. It works with Redis as a backend, which you might already be using for Celery.
    • In your Celery task, publish events to a channel named after the task ID:
      from flask_sse import sse
      def process_xlsx_task(file_id):
          # ... initial processing ...
          sse.publish({"status": "started"}, channel=file_id)
          # ... process each sheet ...
          sse.publish({"status": "progress", "percent": 50}, channel=file_id)
          # ... completion ...
          sse.publish({"status": "completed", "download_url": f"/download/{file_id}"}, channel=file_id)
      
    • Clients use the browser’s native EventSource API (or a library like eventsource for server-side consumers) to listen to the stream.
  • Why It’s Great: No polling, minimal client-side code, and supports incremental status updates. Perfect if consumers need visibility into the job’s progress.

3. WebSockets

For full bidirectional communication (if you need clients to send commands mid-processing, like canceling a job), WebSockets are the way to go:

  • Updated Workflow: Clients first establish a WebSocket connection to your API (e.g., ws://your-api/ws). When they send a /run request (either via the WebSocket or a separate POST endpoint), they include their WebSocket session ID. The Celery worker pushes status updates and the final download URL directly to their connected session.
  • Implementation Tips:
    • Use flask-socketio to add WebSocket support to your Flask app. It integrates smoothly with Celery via Redis.
    • In your Celery task, emit messages to the specific client’s room:
      from flask_socketio import emit
      def process_xlsx_task(file_id, session_id):
          # ... processing ...
          emit("status_update", {"status": "completed", "download_url": f"/download/{file_id}"}, room=session_id)
      
    • Handle connection drops with reconnection logic on the client side to avoid missing updates.
  • Why It’s Great: Bidirectional communication lets you support more interactive workflows (like job cancellation) while eliminating polling.

4. Long Polling (Transition Option)

If you need a middle ground that’s better than short polling but easier to implement than SSE/WebSockets, long polling is a viable stopgap:

  • Updated Workflow: When a client checks the task status, your API holds the request open until the job completes or a timeout (e.g., 30 seconds) is hit. If the job finishes during the wait, it returns the result immediately; if not, it returns a "retry" status, and the client opens a new request.
  • Implementation Tips:
    • Modify your status endpoint to wait for the Celery task with a timeout:
      from celery.result import AsyncResult
      @api.route("/status/<task_id>")
      def get_status(task_id):
          result = AsyncResult(task_id)
          if result.ready():
              return {"status": result.status, "download_url": f"/download/{task_id}"}
          # Wait for up to 30 seconds before returning
          result.wait(timeout=30)
          if result.ready():
              return {"status": result.status, "download_url": f"/download/{task_id}"}
          return {"status": "pending", "retry_after": 30}
      
  • Why It’s Great: Reduces the number of requests drastically compared to short polling, and your architecture team may accept it since it’s not "busy-waiting" with frequent, short requests.

Recommendation for Your Use Case

Since you’re dealing with internal API calls, callback URLs (webhooks) are likely the best fit. Internal services can reliably expose callback endpoints, and you won’t have to manage persistent connections like SSE/WebSockets. It’s straightforward to implement with your existing Celery/RabbitMQ setup and aligns perfectly with your architecture team’s concerns about polling.


内容的提问来源于stack exchange,提问作者Pro

火山引擎 最新活动