You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用C语言动态创建ZeroMQ点对点(Peer-Peer)套接字?

这场景其实就是ZeroMQ里常见的「握手+动态端口分配」模式,我给你整理了完整的实现思路和C语言代码,直接就能跑起来:

实现核心逻辑

服务器需要两个核心部分:

  1. 一个「迎宾」套接字绑定8500端口,专门处理客户端的初次握手请求,分配专属通信端口
  2. 为每个成功握手的客户端创建独立的通信套接字和线程,保证主线程能持续监听新的握手请求
服务器端代码
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// 全局ZeroMQ上下文,所有线程共享
void *zmq_ctx;
// 初始分配端口,每次成功分配后自动递增
static int next_port = 8510;

// 单个客户端通信的线程处理函数
void *handle_client(void *arg) {
    int port = *(int*)arg;
    free(arg);

    // 创建专属的REP套接字用于点对点通信
    void *rep_socket = zmq_socket(zmq_ctx, ZMQ_REP);
    if (!rep_socket) {
        fprintf(stderr, "创建客户端套接字失败: %s\n", zmq_strerror(errno));
        return NULL;
    }

    // 绑定到分配的端口
    char bind_addr[64];
    snprintf(bind_addr, sizeof(bind_addr), "tcp://*:%d", port);
    if (zmq_bind(rep_socket, bind_addr) != 0) {
        fprintf(stderr, "绑定端口%d失败: %s\n", port, zmq_strerror(errno));
        zmq_close(rep_socket);
        return NULL;
    }
    printf("已为新客户端绑定端口%d\n", port);

    // 和客户端的通信循环
    while (1) {
        char buffer[256];
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        if (zmq_msg_recv(&msg, rep_socket, 0) == -1) {
            fprintf(stderr, "接收客户端消息失败: %s\n", zmq_strerror(errno));
            zmq_msg_close(&msg);
            break;
        }

        memcpy(buffer, zmq_msg_data(&msg), zmq_msg_size(&msg));
        buffer[zmq_msg_size(&msg)] = '\0';
        printf("从端口%d的客户端收到消息: %s\n", port, buffer);
        zmq_msg_close(&msg);

        // 回复客户端
        const char *reply = "消息已收到!";
        zmq_send(rep_socket, reply, strlen(reply), 0);

        // 客户端发送exit则结束通信
        if (strcmp(buffer, "exit") == 0) {
            printf("端口%d的客户端已断开连接\n", port);
            break;
        }
    }

    zmq_close(rep_socket);
    return NULL;
}

int main() {
    // 初始化ZeroMQ上下文
    zmq_ctx = zmq_ctx_new();
    if (!zmq_ctx) {
        fprintf(stderr, "创建ZMQ上下文失败: %s\n", zmq_strerror(errno));
        return 1;
    }

    // 创建握手监听用的REP套接字
    void *listener_socket = zmq_socket(zmq_ctx, ZMQ_REP);
    if (!listener_socket) {
        fprintf(stderr, "创建监听套接字失败: %s\n", zmq_strerror(errno));
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }

    // 绑定到8500端口
    if (zmq_bind(listener_socket, "tcp://*:8500") != 0) {
        fprintf(stderr, "绑定8500端口失败: %s\n", zmq_strerror(errno));
        zmq_close(listener_socket);
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }
    printf("服务器已在8500端口监听握手请求...\n");

    while (1) {
        // 接收客户端的初始握手请求
        char buffer[256];
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        if (zmq_msg_recv(&msg, listener_socket, 0) == -1) {
            fprintf(stderr, "接收握手请求失败: %s\n", zmq_strerror(errno));
            zmq_msg_close(&msg);
            continue;
        }

        memcpy(buffer, zmq_msg_data(&msg), zmq_msg_size(&msg));
        buffer[zmq_msg_size(&msg)] = '\0';
        printf("收到握手请求: %s\n", buffer);
        zmq_msg_close(&msg);

        // 分配新端口并返回给客户端
        int *port_ptr = malloc(sizeof(int));
        *port_ptr = next_port++;
        char port_str[16];
        snprintf(port_str, sizeof(port_str), "%d", *port_ptr);
        zmq_send(listener_socket, port_str, strlen(port_str), 0);
        printf("已为客户端分配端口%d\n", *port_ptr);

        // 创建线程处理该客户端的通信
        pthread_t thread;
        if (pthread_create(&thread, NULL, handle_client, port_ptr) != 0) {
            fprintf(stderr, "创建客户端线程失败\n");
            free(port_ptr);
            continue;
        }

        // 分离线程,避免资源泄漏
        pthread_detach(thread);
    }

    // 以下代码实际运行中不会执行,需通过信号处理(如SIGINT)优雅退出
    zmq_close(listener_socket);
    zmq_ctx_destroy(zmq_ctx);
    return 0;
}
客户端代码
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main() {
    // 初始化ZeroMQ上下文
    void *zmq_ctx = zmq_ctx_new();
    if (!zmq_ctx) {
        fprintf(stderr, "创建ZMQ上下文失败: %s\n", zmq_strerror(errno));
        return 1;
    }

    // 第一步:连接服务器的8500握手端口
    void *handshake_socket = zmq_socket(zmq_ctx, ZMQ_REQ);
    if (!handshake_socket) {
        fprintf(stderr, "创建握手套接字失败: %s\n", zmq_strerror(errno));
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }

    if (zmq_connect(handshake_socket, "tcp://localhost:8500") != 0) {
        fprintf(stderr, "连接8500握手端口失败: %s\n", zmq_strerror(errno));
        zmq_close(handshake_socket);
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }

    // 发送握手消息
    const char *handshake_msg = "请求分配通信端口";
    zmq_send(handshake_socket, handshake_msg, strlen(handshake_msg), 0);
    printf("已向服务器发送握手请求\n");

    // 接收服务器分配的新端口
    char port_str[16];
    zmq_msg_t msg;
    zmq_msg_init(&msg);
    if (zmq_msg_recv(&msg, handshake_socket, 0) == -1) {
        fprintf(stderr, "接收分配端口失败: %s\n", zmq_strerror(errno));
        zmq_msg_close(&msg);
        zmq_close(handshake_socket);
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }

    memcpy(port_str, zmq_msg_data(&msg), zmq_msg_size(&msg));
    port_str[zmq_msg_size(&msg)] = '\0';
    int new_port = atoi(port_str);
    printf("收到服务器分配的端口: %d\n", new_port);
    zmq_msg_close(&msg);

    // 关闭握手套接字
    zmq_close(handshake_socket);

    // 第二步:连接新分配的端口进行点对点通信
    void *comm_socket = zmq_socket(zmq_ctx, ZMQ_REQ);
    if (!comm_socket) {
        fprintf(stderr, "创建通信套接字失败: %s\n", zmq_strerror(errno));
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }

    char connect_addr[64];
    snprintf(connect_addr, sizeof(connect_addr), "tcp://localhost:%d", new_port);
    if (zmq_connect(comm_socket, connect_addr) != 0) {
        fprintf(stderr, "连接端口%d失败: %s\n", new_port, zmq_strerror(errno));
        zmq_close(comm_socket);
        zmq_ctx_destroy(zmq_ctx);
        return 1;
    }
    printf("已连接到服务器的%d端口\n", new_port);

    // 和服务器的通信循环
    while (1) {
        char input[256];
        printf("请输入消息(输入exit退出): ");
        fgets(input, sizeof(input), stdin);
        input[strcspn(input, "\n")] = '\0'; // 去除换行符

        zmq_send(comm_socket, input, strlen(input), 0);

        // 接收服务器回复
        zmq_msg_init(&msg);
        if (zmq_msg_recv(&msg, comm_socket, 0) == -1) {
            fprintf(stderr, "接收服务器回复失败: %s\n", zmq_strerror(errno));
            zmq_msg_close(&msg);
            break;
        }

        char reply[256];
        memcpy(reply, zmq_msg_data(&msg), zmq_msg_size(&msg));
        reply[zmq_msg_size(&msg)] = '\0';
        printf("服务器回复: %s\n", reply);
        zmq_msg_close(&msg);

        if (strcmp(input, "exit") == 0) {
            break;
        }
    }

    // 清理资源
    zmq_close(comm_socket);
    zmq_ctx_destroy(zmq_ctx);
    return 0;
}
关键注意事项
  • 端口分配优化:示例用了简单的递增策略,生产环境可以加入端口可用性检查(比如尝试绑定失败则自动递增加一)
  • 线程安全:ZeroMQ上下文是线程安全的,但单个套接字不能被多线程共享,所以每个客户端必须用独立的套接字
  • 通信模式选择:示例用REQ/REP适合简单请求响应场景,如果需要更灵活的双向通信,可以换成DEALER/ROUTER模式
  • 优雅退出:服务器可以添加信号处理(如捕获SIGINT),实现优雅关闭所有套接字和线程,避免资源泄漏

内容的提问来源于stack exchange,提问作者maxmaiz

火山引擎 最新活动