You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何从PHP-FPM向ReactPHP/Amp/Swoole等常驻Worker发送消息?

How to Communicate Between PHP-FPM and Long-Running Workers (ReactPHP/Amp/Swoole)

Great question! Let's break down a practical, transaction-safe approach to connect your PHP-FPM web endpoint with a long-running worker—no external message queues needed, just leveraging the frameworks you're considering and your existing database.

Core Approach: Database-First with Best-Effort Notifications

Since you need task submission tied to database transactions (no lost jobs, even if notifications fail), here's the ideal flow:

  1. PHP-FPM starts a database transaction.
  2. Insert the job into your Jobs table.
  3. Attempt to send a quick notification to the worker (this is best-effort—if it fails, the worker will catch up later).
  4. Commit the transaction.
  5. The worker either processes the job immediately via the notification, or periodically polls the Jobs table for unprocessed tasks to handle missed notifications.

This way, your transaction guarantees the job is persisted, and the socket notification just triggers immediate processing instead of waiting for the next poll.


Implementation Options

1. Custom Unix/TCP Socket Server (Works for All Frameworks)

Building a simple socket server is the most straightforward solution—all three frameworks have excellent support for async socket handling. Unix sockets are faster for local communication, so prefer those if your worker runs on the same server as PHP-FPM.

Example with Swoole (Simplest for PHP)

Swoole's built-in async server makes this trivial:

Worker Script (Swoole Unix Socket Server)

<?php
// Use a Unix socket for faster local communication
$server = new Swoole\Server('unix:/tmp/worker.sock', 0, SWOOLE_PROCESS, SWOOLE_SOCK_UNIX);

$server->on('Start', function ($server) {
    echo "Worker server started. Listening on unix:/tmp/worker.sock\n";
});

$server->on('Receive', function ($server, $fd, $fromId, $data) {
    $jobId = trim($data);
    echo "Received job ID: $jobId\n";
    // Process the job (fetch from DB, run task, mark as complete)
    processJob((int)$jobId);
    $server->send($fd, "ACK: $jobId");
});

// Add periodic polling for missed jobs (runs every 60 seconds)
$server->tick(60000, function () {
    echo "Scanning for missed pending jobs...\n";
    $pdo = new PDO('mysql:host=localhost;dbname=your_db', 'user', 'pass');
    $stmt = $pdo->query("SELECT id FROM Jobs WHERE status = 'pending'");
    while ($row = $stmt->fetch()) {
        processJob((int)$row['id']);
    }
});

$server->start();

function processJob(int $jobId) {
    // Your job processing logic here
    // Don't forget to update the job status in the DB when done!
    $pdo = new PDO('mysql:host=localhost;dbname=your_db', 'user', 'pass');
    $pdo->exec("UPDATE Jobs SET status = 'processed' WHERE id = $jobId");
}

PHP-FPM Notification Code

<?php
function notify_new_job_to_worker(int $jobId): bool {
    // Connect to the Unix socket
    $socket = fsockopen('unix:///tmp/worker.sock', -1, $errno, $errstr, 2);
    
    if (!$socket) {
        error_log("Failed to reach worker: $errstr ($errno)");
        return false;
    }
    
    fwrite($socket, (string)$jobId);
    $response = fgets($socket);
    fclose($socket);
    
    return strpos($response, "ACK: $jobId") !== false;
}

// In your web endpoint:
$pdo->beginTransaction();
try {
    $pdo->exec("INSERT INTO Jobs (status, ...) VALUES ('pending', ...)");
    $jobId = $pdo->lastInsertId();
    // Notify worker (best-effort—don't rollback if this fails!)
    notify_new_job_to_worker($jobId);
    $pdo->commit();
} catch (Exception $e) {
    $pdo->rollback();
    throw $e;
}

Example with ReactPHP

ReactPHP uses an event loop for async handling:

Worker Script

<?php
require __DIR__ . '/vendor/autoload.php';

$loop = React\EventLoop\Factory::create();
$socket = new React\Socket\Server('unix:/tmp/react-worker.sock', $loop);

$socket->on('connection', function (React\Socket\ConnectionInterface $conn) {
    $conn->on('data', function ($data) use ($conn) {
        $jobId = trim($data);
        echo "Received job ID: $jobId\n";
        processJob((int)$jobId);
        $conn->write("ACK: $jobId");
        $conn->close();
    });
});

// Add periodic polling
$loop->addPeriodicTimer(60, function () {
    echo "Scanning for missed jobs...\n";
    $pdo = new PDO('mysql:host=localhost;dbname=your_db', 'user', 'pass');
    $stmt = $pdo->query("SELECT id FROM Jobs WHERE status = 'pending'");
    while ($row = $stmt->fetch()) {
        processJob((int)$row['id']);
    }
});

echo "ReactPHP worker server started\n";
$loop->run();

function processJob(int $jobId) {
    // Job processing logic
}

Example with Amp

Amp's async socket API is similarly straightforward:

Worker Script

<?php
require __DIR__ . '/vendor/autoload.php';

Amp\Loop::run(function () {
    $server = Amp\Socket\Server::listen('unix:/tmp/amp-worker.sock');
    echo "Amp worker server started\n";
    
    // Handle incoming connections
    while ($client = yield $server->accept()) {
        $data = yield $client->read();
        $jobId = trim($data);
        echo "Received job ID: $jobId\n";
        processJob((int)$jobId);
        yield $client->write("ACK: $jobId");
        $client->close();
    }
    
    // Add periodic polling
    Amp\Loop::repeat(60000, function () {
        echo "Scanning for missed jobs...\n";
        $pdo = new PDO('mysql:host=localhost;dbname=your_db', 'user', 'pass');
        $stmt = $pdo->query("SELECT id FROM Jobs WHERE status = 'pending'");
        while ($row = $stmt->fetch()) {
            processJob((int)$row['id']);
        }
    });
});

function processJob(int $jobId) {
    // Job processing logic
}

2. Lightweight Message Serialization (Optional)

If you need to send more than just a job ID (e.g., task parameters), use JSON or MessagePack instead of plain text. All frameworks support this easily:

PHP-FPM Example with JSON

$message = json_encode(['job_id' => $jobId, 'priority' => 'high', 'payload' => ['key' => 'value']]);
fwrite($socket, $message);

Worker Example with JSON

$message = json_decode($data, true);
$jobId = $message['job_id'];
$priority = $message['priority'];

Critical Transaction Safety Notes

  • Never rollback the transaction if the notification fails: The job is already saved to the DB, and the worker will pick it up via polling. Rolling back would lose the job entirely.
  • Mark jobs as processed: Always update the Jobs table with a status (e.g., processed, failed) once the worker finishes, so you don't reprocess tasks.
  • Use short timeouts in PHP-FPM: Set a 1-2 second timeout when connecting to the worker socket to avoid hanging web requests if the worker is unresponsive.

内容的提问来源于stack exchange,提问作者BenMorel

火山引擎 最新活动