You need to enable JavaScript to run this app.
导航
基于Ray实现流式数据处理
最近更新时间:2025.08.12 11:58:29首次发布时间:2025.08.12 11:58:29
复制全文
我的收藏
有用
有用
无用
无用

背景

在电商平台的实时推荐系统中,需要能够消费用户实时浏览、加购等行为数据,调用个性化推荐模型以生成商品推荐列表,并通过 Kafka 推送至前端进行展示,从而实现 “行为发生 - 模型计算 - 推荐生效” 的亚秒级响应;在智能客服场景下,系统可实时处理用户输入的文本或语音转写数据,调用意图识别与问答模型生成回复内容,以支撑即时交互体验。在信贷审批场景中,系统可实时处理申请人的征信、资产等动态数据,输出授信额度评估结果,为实时放贷决策提供支持。
上述场景均要求对数据进行流式实时处理,同时异构调度 CPU/GPU 资源,以适配不同的业务负载。Ray 的分布式任务调度能力可动态分配 GPU/CPU 资源,支持模型并行与数据并行,有效解决推理延迟瓶颈;Kafka 具备高吞吐量(达百万级 / 秒)与持久化特性,能够确保数据在峰值流量下不丢失、不阻塞,满足实时场景的稳定性要求。

处理流程

Ray Actor 流程

Image

Ray Data流程

Image

样例代码

基于 Ray Actor 实现

import ray
from ray.util.queue import Queue, Empty
import time
import logging
import json
from typing import List, Dict
import uuid

def setup_logger(name, level=logging.INFO):
    """配置并返回一个logger实例。"""
    logger = logging.getLogger(name)
    if not logger.handlers: # 防止重复添加handler
        logger.setLevel(level)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
    return logger

@ray.remote(num_cpus=0.5)
class KafkaReaderActor:
    def __init__(self, kafka_config: Dict, topic: str, partition: int, output_queue: Queue, flow_control_threshold: int, batch_size: int = 100, batch_timeout: float = 1.0):
        from confluent_kafka import Consumer, TopicPartition
        self.kafka_config = kafka_config
        self.topic = topic
        self.partition = partition
        self.output_queue = output_queue
        self.flow_control_threshold = flow_control_threshold
        self.consumer = Consumer(kafka_config)
        self.running = False
        # 分配特定分区而不是订阅整个主题
        self.consumer.assign([TopicPartition(topic, partition)])
        self.logger = setup_logger(f"KafkaReaderActor-p{self.partition}")
        self.logger.info(f"KafkaReaderActor initialized for topic {topic}, partition {partition}")
        # Metrics
        self.messages_read = 0
        self.start_time = 0
        self.last_report_time = 0
        self.report_interval = 10  # seconds
        # Batch config
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout

    def _report_metrics(self):
        current_time = time.time()
        if current_time - self.last_report_time > self.report_interval:
            total_time = current_time - self.start_time
            speed = self.messages_read / total_time if total_time > 0 else 0
            self.logger.info(
                f"Reader Partition-{self.partition} Metrics: "
                f"Total read: {self.messages_read}, "
                f"Avg speed: {speed:.2f} msg/s"
            )
            self.last_report_time = current_time

    def start(self):
        self.running = True
        self.start_time = time.time()
        self.last_report_time = self.start_time
        self.logger.info(f"KafkaReaderActor started for topic {self.topic}, partition {self.partition}")
        while self.running:
            # Flow control: check queue size before polling
            while self.running and self.output_queue.size() >= self.flow_control_threshold:
                self.logger.warning(
                    f"Reader for partition {self.partition} paused, output queue size "
                    f"({self.output_queue.size()}) reached threshold ({self.flow_control_threshold}). Waiting..."
                )
                time.sleep(1)

            if not self.running:
                break

            # 批量消费消息
            msgs = self.consumer.consume(num_messages=self.batch_size, timeout=self.batch_timeout)
            if not msgs:
                # 超时但没有消费到消息,继续循环
                continue

            for msg in msgs:
                if msg.error():
                    self.logger.error(f"Consumer error: {msg.error()}")
                    continue
                try:
                    # 将消息放入输出队列
                    data = json.loads(msg.value().decode('utf-8'))
                    self.output_queue.put(data)
                    self.messages_read += 1
                    self.logger.debug(f"Successfully read and queued message: {data}")
                except json.JSONDecodeError:
                    self.logger.error(f"Failed to decode JSON message: {msg.value()}")

            self._report_metrics()

    def get_partition(self):
        return self.partition

    def stop(self):
        self.running = False
        self.consumer.close()
        self.logger.info(f"KafkaReaderActor stopped for topic {self.topic}, partition {self.partition}")

@ray.remote(num_cpus=1)
class DataProcessorActor:
    def __init__(self, input_queue: Queue, output_queue: Queue, process_function=None):
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.process_function = process_function or self.default_process
        self.running = False
        self.actor_id = str(uuid.uuid4())[:8]
        self.logger = setup_logger(f"DataProcessorActor-{self.actor_id}")
        self.logger.info(f"DataProcessorActor {self.actor_id} initialized")
        # Metrics
        self.items_processed = 0
        self.start_time = 0
        self.last_report_time = 0
        self.report_interval = 10  # seconds

    def default_process(self, data: Dict) -> Dict:
        # 默认处理函数,可以被子类或自定���函数覆盖
        # 这里只是简单示例:添加处理时间戳
        data['processed_at'] = time.time()
        data['status'] = 'processed'
        return data

    def _report_metrics(self):
        current_time = time.time()
        if current_time - self.last_report_time > self.report_interval:
            total_time = current_time - self.start_time
            speed = self.items_processed / total_time if total_time > 0 else 0
            self.logger.info(
                f"Processor-{self.actor_id} Metrics: "
                f"Total processed: {self.items_processed}, "
                f"Avg speed: {speed:.2f} items/s"
            )
            self.last_report_time = current_time

    def start(self):
        self.running = True
        self.start_time = time.time()
        self.last_report_time = self.start_time
        self.logger.info(f"DataProcessorActor {self.actor_id} started")
        while self.running:
            try:
                data = self.input_queue.get(timeout=1.0)
                processed_data = self.process_function(data)
                self.output_queue.put(processed_data)
                self.items_processed += 1
                self.logger.debug(f"Processed data: {processed_data}")
            except Empty:
                # 队列为空是正常情况,继续等待
                self._report_metrics()
                continue
            except Exception as e:
                if not self.running:
                    break
                self.logger.error(f"Error processing data: {str(e)}")
            
            self._report_metrics()

    def stop(self):
        self.running = False
        self.logger.info(f"DataProcessorActor {self.actor_id} stopped")

@ray.remote(num_cpus=0.5)
class KafkaWriterActor:
    def __init__(self, kafka_config: Dict, topic: str, input_queue: Queue):
        from confluent_kafka import Producer
        self.kafka_config = kafka_config
        self.topic = topic
        self.input_queue = input_queue
        self.producer = Producer(kafka_config)
        self.running = False
        self.actor_id = str(uuid.uuid4())[:8]
        self.logger = setup_logger(f"KafkaWriterActor-{self.actor_id}")
        self.logger.info(f"KafkaWriterActor {self.actor_id} initialized for topic {topic}")
        # Metrics
        self.messages_written = 0
        self.start_time = 0
        self.last_report_time = 0
        self.report_interval = 10  # seconds

    def delivery_report(self, err, msg):
        if err is not None:
            self.logger.error(f"Message delivery failed: {err}")
        else:
            self.logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}]")

    def _report_metrics(self):
        current_time = time.time()
        if current_time - self.last_report_time > self.report_interval:
            total_time = current_time - self.start_time
            speed = self.messages_written / total_time if total_time > 0 else 0
            self.logger.info(
                f"Writer-{self.actor_id} Metrics: "
                f"Total written: {self.messages_written}, "
                f"Avg speed: {speed:.2f} msg/s"
            )
            self.last_report_time = current_time

    def start(self):
        self.running = True
        self.start_time = time.time()
        self.last_report_time = self.start_time
        self.logger.info(f"KafkaWriterActor {self.actor_id} started for topic {self.topic}")
        while self.running:
            try:
                data = self.input_queue.get(timeout=1.0)
                self.producer.produce(
                    self.topic,
                    key=data.get('key'),
                    value=json.dumps(data).encode('utf-8'),
                    on_delivery=self.delivery_report
                )
                self.messages_written += 1
                self.producer.poll(0)
                self.logger.debug(f"Sent data to Kafka: {data}")
            except Empty:
                # 队列为空是正常情况,继续等待
                self._report_metrics()
                continue
            except Exception as e:
                if not self.running:
                    break
                self.logger.error(f"Error writing to Kafka: {str(e)}", exc_info=True)
            
            self._report_metrics()
        # 确保所有剩余消息都被发送
        self.producer.flush()

    def stop(self):
        self.running = False
        self.logger.info(f"KafkaWriterActor {self.actor_id} stopped for topic {self.topic}")

@ray.remote(num_cpus=0.5)
class ManagerActor:
    def __init__(
        self,
        reader_actors: List[ray.actor.ActorHandle],
        processor_actors: List[ray.actor.ActorHandle],
        writer_actors: List[ray.actor.ActorHandle],
        reader_queue: Queue,
        processor_queue: Queue,
        kafka_config: Dict,
        output_topic: str,
        scale_config: Dict = None
    ):
        self.reader_actors = reader_actors
        self.processor_actors = processor_actors
        self.writer_actors = writer_actors
        self.reader_queue = reader_queue
        self.processor_queue = processor_queue
        
        # KafkaReaderActor不再由ManagerActor动态伸缩,因此移除相关逻辑
        
        # 保存创建Actor所需的参数
        self.kafka_config = kafka_config
        self.output_topic = output_topic
        
        # 缩放配置,包含阈值和最大/最小实例数
        self.scale_config = scale_config or {
            'processor': {'min': 1, 'max': 10, 'high_threshold': 200, 'low_threshold': 20},
            'writer': {'min': 1, 'max': 5, 'high_threshold': 100, 'low_threshold': 10}
        }
        
        self.running = False
        self.check_interval = 5  # 检查间隔(秒)
        self.logger = setup_logger("ManagerActor")
        self.logger.info("ManagerActor initialized")

    def get_queue_size(self, queue: Queue) -> int:
        try:
            return queue.size()
        except Exception as e:
            self.logger.error(f"Error getting queue size: {str(e)}")
            return 0

    def scale_actors(self, actor_type: str):
        if actor_type == 'processor':
            actors = self.processor_actors
            queue = self.reader_queue
            config = self.scale_config['processor']
        elif actor_type == 'writer':
            actors = self.writer_actors
            queue = self.processor_queue
            config = self.scale_config['writer']
        else:
            self.logger.error(f"Unknown actor type: {actor_type}")
            return

        queue_size = self.get_queue_size(queue)
        current_count = len(actors)

        # 需要扩容
        if queue_size > config['high_threshold'] and current_count < config['max']:
            new_count = min(current_count + 1, config['max'])
            self.logger.info(f"Scaling up {actor_type} actors from {current_count} to {new_count} (queue size: {queue_size})")
            # 创建新的Actor实例并添加到列表
            for _ in range(new_count - current_count):
                if actor_type == 'processor':
                    new_actor = DataProcessorActor.remote(self.reader_queue, self.processor_queue)
                    new_actor.start.remote()
                elif actor_type == 'writer':
                    new_actor = KafkaWriterActor.remote(self.kafka_config, self.output_topic, self.processor_queue)
                    new_actor.start.remote()
                actors.append(new_actor)

        # 需要缩容
        elif queue_size < config['low_threshold'] and current_count > config['min']:
            new_count = max(current_count - 1, config['min'])
            self.logger.info(f"Scaling down {actor_type} actors from {current_count} to {new_count} (queue size: {queue_size})")
            # 停止多余的Actor并从列表中移除
            actors_to_stop = actors[new_count:]
            for actor in actors_to_stop:
                try:
                    ray.get(actor.stop.remote())
                except Exception as e:
                    self.logger.error(f"Failed to stop {actor_type} actor: {str(e)}", exc_info=True)
            actors[:] = actors[:new_count]

    def start(self):
        self.running = True
        self.logger.info("ManagerActor started")
        while self.running:
            # 检查并调整各类型Actor
            self.scale_actors('processor')
            self.scale_actors('writer')
            
            # 记录当前并发和队列状态
            reader_queue_size = self.get_queue_size(self.reader_queue)
            processor_queue_size = self.get_queue_size(self.processor_queue)
            self.logger.info(
                f"Status - Concurrency: [Readers: {len(self.reader_actors)}, Processors: {len(self.processor_actors)}, Writers: {len(self.writer_actors)}] | "
                f"Queue Sizes: [ReaderQ: {reader_queue_size}, ProcessorQ: {processor_queue_size}]"
            )
            
            time.sleep(self.check_interval)

    def stop(self):
        self.running = False
        self.logger.info("ManagerActor stopped")
        # 停止所有Actor
        for actor in self.reader_actors + self.processor_actors + self.writer_actors:
            ray.get(actor.stop.remote())

def main():
    logger = setup_logger("main")
    try:
        # 初始化Ray
        ray.init(ignore_reinit_error=True)
        logger.info("Ray initialized")
    except Exception as e:
        logger.error(f"Failed to initialize Ray: {str(e)}", exc_info=True)
        return

    # Kafka配置
    kafka_config = {
        'bootstrap.servers': 'kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanisms': 'PLAIN',
        'sasl.username': 'xxxx',
        'sasl.password': 'xxxx',
        'group.id': 'ray-streaming-group',
        'auto.offset.reset': 'earliest',
    }
    input_topic = 'input_topic'
    output_topic = 'output_topic'

    # 创建Ray队列
    reader_queue = Queue(maxsize=1000)
    processor_queue = Queue(maxsize=1000)

    # 获取Kafka主题分区信息
    from confluent_kafka.admin import AdminClient
    admin_client = AdminClient(kafka_config)
    metadata = admin_client.list_topics(topic=input_topic)
    total_partitions = len(metadata.topics[input_topic].partitions)
    logger.info(f"Discovered {total_partitions} partitions for topic {input_topic}")

    # 定义流控阈值
    flow_control_threshold = 800
    # 定义批量读取配置
    reader_batch_size = 100
    reader_batch_timeout = 12

    # 为每个分区创建初始ReaderActor
    readers = []
    for partition in range(total_partitions):
        reader = KafkaReaderActor.remote(
            kafka_config,
            input_topic,
            partition,
            reader_queue,
            flow_control_threshold,
            batch_size=reader_batch_size,
            batch_timeout=reader_batch_timeout
        )
        readers.append(reader)
        logger.info(f"Created initial KafkaReaderActor for partition {partition}")

    # 创建其他初始Actor
    processor = DataProcessorActor.remote(reader_queue, processor_queue)
    writer = KafkaWriterActor.remote(kafka_config, output_topic, processor_queue)

    # 创建管理器Actor
    manager = ManagerActor.remote(
        readers, [processor], [writer],
        reader_queue, processor_queue, kafka_config, output_topic
    )

    # 启动所有Actor
    ray.get([
        *[r.start.remote() for r in readers],
        processor.start.remote(),
        writer.start.remote(),
        manager.start.remote()
    ])

    # 保持主程序运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Received shutdown signal")
        # 确保所有Actor都已停止
        ray.get(manager.stop.remote())
        logger.info("All actors stopped successfully")
        ray.shutdown()

if __name__ == "__main__":
    main()

基于 Ray Data 实现

import ray 

ray.init(logging_config=ray.LoggingConfig(log_level="INFO"))

consumer_config = {
    "bootstrap.servers": 'kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594',
    "security.protocol": "SASL_PLAINTEXT",  # 'SASL_SSL' or 'SASL_PLAINTEXT'
    "sasl.mechanism": "PLAIN",
    "sasl.username": "xxx",
    "sasl.password": "xxx",
    "auto.offset.reset": 'earliest',  // 从最早的消息开始读取(可选:'latest')
    "group.id": 'ray_consumer_group1',        // 消费者组ID(不同组独立消费)
}


ds = ray.data.read_kafka(bootstrap_servers="kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594", topic = 'dujl_test', consumer_config=consumer_config)



producer_config = {
    "bootstrap.servers": 'kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594',
    "security.protocol": "SASL_PLAINTEXT",  // 'SASL_SSL' or 'SASL_PLAINTEXT'
    "sasl.mechanism": "PLAIN",
    "sasl.username": "xxxx",
    "sasl.password": "xxxx",
}

def map_batches(record):
    // 示例:假设原始 JSON 中包含 "name" 和 "age" 字段
    // 可以新增字段或修改现有字段
    return record

ds.map_batches(map_batches, batch_size=200).write_kafka(bootstrap_servers="kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594", topic = 'dujl_test_ray_write', producer_config=producer_config)

Image

Serverless Ray 提交作业

当前支持可视化跟Python两种方式提交作业,详细操作请参考:Serverless Ray 使用指南