在电商平台的实时推荐系统中,需要能够消费用户实时浏览、加购等行为数据,调用个性化推荐模型以生成商品推荐列表,并通过 Kafka 推送至前端进行展示,从而实现 “行为发生 - 模型计算 - 推荐生效” 的亚秒级响应;在智能客服场景下,系统可实时处理用户输入的文本或语音转写数据,调用意图识别与问答模型生成回复内容,以支撑即时交互体验。在信贷审批场景中,系统可实时处理申请人的征信、资产等动态数据,输出授信额度评估结果,为实时放贷决策提供支持。
上述场景均要求对数据进行流式实时处理,同时异构调度 CPU/GPU 资源,以适配不同的业务负载。Ray 的分布式任务调度能力可动态分配 GPU/CPU 资源,支持模型并行与数据并行,有效解决推理延迟瓶颈;Kafka 具备高吞吐量(达百万级 / 秒)与持久化特性,能够确保数据在峰值流量下不丢失、不阻塞,满足实时场景的稳定性要求。
import ray from ray.util.queue import Queue, Empty import time import logging import json from typing import List, Dict import uuid def setup_logger(name, level=logging.INFO): """配置并返回一个logger实例。""" logger = logging.getLogger(name) if not logger.handlers: # 防止重复添加handler logger.setLevel(level) handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger @ray.remote(num_cpus=0.5) class KafkaReaderActor: def __init__(self, kafka_config: Dict, topic: str, partition: int, output_queue: Queue, flow_control_threshold: int, batch_size: int = 100, batch_timeout: float = 1.0): from confluent_kafka import Consumer, TopicPartition self.kafka_config = kafka_config self.topic = topic self.partition = partition self.output_queue = output_queue self.flow_control_threshold = flow_control_threshold self.consumer = Consumer(kafka_config) self.running = False # 分配特定分区而不是订阅整个主题 self.consumer.assign([TopicPartition(topic, partition)]) self.logger = setup_logger(f"KafkaReaderActor-p{self.partition}") self.logger.info(f"KafkaReaderActor initialized for topic {topic}, partition {partition}") # Metrics self.messages_read = 0 self.start_time = 0 self.last_report_time = 0 self.report_interval = 10 # seconds # Batch config self.batch_size = batch_size self.batch_timeout = batch_timeout def _report_metrics(self): current_time = time.time() if current_time - self.last_report_time > self.report_interval: total_time = current_time - self.start_time speed = self.messages_read / total_time if total_time > 0 else 0 self.logger.info( f"Reader Partition-{self.partition} Metrics: " f"Total read: {self.messages_read}, " f"Avg speed: {speed:.2f} msg/s" ) self.last_report_time = current_time def start(self): self.running = True self.start_time = time.time() self.last_report_time = self.start_time self.logger.info(f"KafkaReaderActor started for topic {self.topic}, partition {self.partition}") while self.running: # Flow control: check queue size before polling while self.running and self.output_queue.size() >= self.flow_control_threshold: self.logger.warning( f"Reader for partition {self.partition} paused, output queue size " f"({self.output_queue.size()}) reached threshold ({self.flow_control_threshold}). Waiting..." ) time.sleep(1) if not self.running: break # 批量消费消息 msgs = self.consumer.consume(num_messages=self.batch_size, timeout=self.batch_timeout) if not msgs: # 超时但没有消费到消息,继续循环 continue for msg in msgs: if msg.error(): self.logger.error(f"Consumer error: {msg.error()}") continue try: # 将消息放入输出队列 data = json.loads(msg.value().decode('utf-8')) self.output_queue.put(data) self.messages_read += 1 self.logger.debug(f"Successfully read and queued message: {data}") except json.JSONDecodeError: self.logger.error(f"Failed to decode JSON message: {msg.value()}") self._report_metrics() def get_partition(self): return self.partition def stop(self): self.running = False self.consumer.close() self.logger.info(f"KafkaReaderActor stopped for topic {self.topic}, partition {self.partition}") @ray.remote(num_cpus=1) class DataProcessorActor: def __init__(self, input_queue: Queue, output_queue: Queue, process_function=None): self.input_queue = input_queue self.output_queue = output_queue self.process_function = process_function or self.default_process self.running = False self.actor_id = str(uuid.uuid4())[:8] self.logger = setup_logger(f"DataProcessorActor-{self.actor_id}") self.logger.info(f"DataProcessorActor {self.actor_id} initialized") # Metrics self.items_processed = 0 self.start_time = 0 self.last_report_time = 0 self.report_interval = 10 # seconds def default_process(self, data: Dict) -> Dict: # 默认处理函数,可以被子类或自定���函数覆盖 # 这里只是简单示例:添加处理时间戳 data['processed_at'] = time.time() data['status'] = 'processed' return data def _report_metrics(self): current_time = time.time() if current_time - self.last_report_time > self.report_interval: total_time = current_time - self.start_time speed = self.items_processed / total_time if total_time > 0 else 0 self.logger.info( f"Processor-{self.actor_id} Metrics: " f"Total processed: {self.items_processed}, " f"Avg speed: {speed:.2f} items/s" ) self.last_report_time = current_time def start(self): self.running = True self.start_time = time.time() self.last_report_time = self.start_time self.logger.info(f"DataProcessorActor {self.actor_id} started") while self.running: try: data = self.input_queue.get(timeout=1.0) processed_data = self.process_function(data) self.output_queue.put(processed_data) self.items_processed += 1 self.logger.debug(f"Processed data: {processed_data}") except Empty: # 队列为空是正常情况,继续等待 self._report_metrics() continue except Exception as e: if not self.running: break self.logger.error(f"Error processing data: {str(e)}") self._report_metrics() def stop(self): self.running = False self.logger.info(f"DataProcessorActor {self.actor_id} stopped") @ray.remote(num_cpus=0.5) class KafkaWriterActor: def __init__(self, kafka_config: Dict, topic: str, input_queue: Queue): from confluent_kafka import Producer self.kafka_config = kafka_config self.topic = topic self.input_queue = input_queue self.producer = Producer(kafka_config) self.running = False self.actor_id = str(uuid.uuid4())[:8] self.logger = setup_logger(f"KafkaWriterActor-{self.actor_id}") self.logger.info(f"KafkaWriterActor {self.actor_id} initialized for topic {topic}") # Metrics self.messages_written = 0 self.start_time = 0 self.last_report_time = 0 self.report_interval = 10 # seconds def delivery_report(self, err, msg): if err is not None: self.logger.error(f"Message delivery failed: {err}") else: self.logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}]") def _report_metrics(self): current_time = time.time() if current_time - self.last_report_time > self.report_interval: total_time = current_time - self.start_time speed = self.messages_written / total_time if total_time > 0 else 0 self.logger.info( f"Writer-{self.actor_id} Metrics: " f"Total written: {self.messages_written}, " f"Avg speed: {speed:.2f} msg/s" ) self.last_report_time = current_time def start(self): self.running = True self.start_time = time.time() self.last_report_time = self.start_time self.logger.info(f"KafkaWriterActor {self.actor_id} started for topic {self.topic}") while self.running: try: data = self.input_queue.get(timeout=1.0) self.producer.produce( self.topic, key=data.get('key'), value=json.dumps(data).encode('utf-8'), on_delivery=self.delivery_report ) self.messages_written += 1 self.producer.poll(0) self.logger.debug(f"Sent data to Kafka: {data}") except Empty: # 队列为空是正常情况,继续等待 self._report_metrics() continue except Exception as e: if not self.running: break self.logger.error(f"Error writing to Kafka: {str(e)}", exc_info=True) self._report_metrics() # 确保所有剩余消息都被发送 self.producer.flush() def stop(self): self.running = False self.logger.info(f"KafkaWriterActor {self.actor_id} stopped for topic {self.topic}") @ray.remote(num_cpus=0.5) class ManagerActor: def __init__( self, reader_actors: List[ray.actor.ActorHandle], processor_actors: List[ray.actor.ActorHandle], writer_actors: List[ray.actor.ActorHandle], reader_queue: Queue, processor_queue: Queue, kafka_config: Dict, output_topic: str, scale_config: Dict = None ): self.reader_actors = reader_actors self.processor_actors = processor_actors self.writer_actors = writer_actors self.reader_queue = reader_queue self.processor_queue = processor_queue # KafkaReaderActor不再由ManagerActor动态伸缩,因此移除相关逻辑 # 保存创建Actor所需的参数 self.kafka_config = kafka_config self.output_topic = output_topic # 缩放配置,包含阈值和最大/最小实例数 self.scale_config = scale_config or { 'processor': {'min': 1, 'max': 10, 'high_threshold': 200, 'low_threshold': 20}, 'writer': {'min': 1, 'max': 5, 'high_threshold': 100, 'low_threshold': 10} } self.running = False self.check_interval = 5 # 检查间隔(秒) self.logger = setup_logger("ManagerActor") self.logger.info("ManagerActor initialized") def get_queue_size(self, queue: Queue) -> int: try: return queue.size() except Exception as e: self.logger.error(f"Error getting queue size: {str(e)}") return 0 def scale_actors(self, actor_type: str): if actor_type == 'processor': actors = self.processor_actors queue = self.reader_queue config = self.scale_config['processor'] elif actor_type == 'writer': actors = self.writer_actors queue = self.processor_queue config = self.scale_config['writer'] else: self.logger.error(f"Unknown actor type: {actor_type}") return queue_size = self.get_queue_size(queue) current_count = len(actors) # 需要扩容 if queue_size > config['high_threshold'] and current_count < config['max']: new_count = min(current_count + 1, config['max']) self.logger.info(f"Scaling up {actor_type} actors from {current_count} to {new_count} (queue size: {queue_size})") # 创建新的Actor实例并添加到列表 for _ in range(new_count - current_count): if actor_type == 'processor': new_actor = DataProcessorActor.remote(self.reader_queue, self.processor_queue) new_actor.start.remote() elif actor_type == 'writer': new_actor = KafkaWriterActor.remote(self.kafka_config, self.output_topic, self.processor_queue) new_actor.start.remote() actors.append(new_actor) # 需要缩容 elif queue_size < config['low_threshold'] and current_count > config['min']: new_count = max(current_count - 1, config['min']) self.logger.info(f"Scaling down {actor_type} actors from {current_count} to {new_count} (queue size: {queue_size})") # 停止多余的Actor并从列表中移除 actors_to_stop = actors[new_count:] for actor in actors_to_stop: try: ray.get(actor.stop.remote()) except Exception as e: self.logger.error(f"Failed to stop {actor_type} actor: {str(e)}", exc_info=True) actors[:] = actors[:new_count] def start(self): self.running = True self.logger.info("ManagerActor started") while self.running: # 检查并调整各类型Actor self.scale_actors('processor') self.scale_actors('writer') # 记录当前并发和队列状态 reader_queue_size = self.get_queue_size(self.reader_queue) processor_queue_size = self.get_queue_size(self.processor_queue) self.logger.info( f"Status - Concurrency: [Readers: {len(self.reader_actors)}, Processors: {len(self.processor_actors)}, Writers: {len(self.writer_actors)}] | " f"Queue Sizes: [ReaderQ: {reader_queue_size}, ProcessorQ: {processor_queue_size}]" ) time.sleep(self.check_interval) def stop(self): self.running = False self.logger.info("ManagerActor stopped") # 停止所有Actor for actor in self.reader_actors + self.processor_actors + self.writer_actors: ray.get(actor.stop.remote()) def main(): logger = setup_logger("main") try: # 初始化Ray ray.init(ignore_reinit_error=True) logger.info("Ray initialized") except Exception as e: logger.error(f"Failed to initialize Ray: {str(e)}", exc_info=True) return # Kafka配置 kafka_config = { 'bootstrap.servers': 'kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanisms': 'PLAIN', 'sasl.username': 'xxxx', 'sasl.password': 'xxxx', 'group.id': 'ray-streaming-group', 'auto.offset.reset': 'earliest', } input_topic = 'input_topic' output_topic = 'output_topic' # 创建Ray队列 reader_queue = Queue(maxsize=1000) processor_queue = Queue(maxsize=1000) # 获取Kafka主题分区信息 from confluent_kafka.admin import AdminClient admin_client = AdminClient(kafka_config) metadata = admin_client.list_topics(topic=input_topic) total_partitions = len(metadata.topics[input_topic].partitions) logger.info(f"Discovered {total_partitions} partitions for topic {input_topic}") # 定义流控阈值 flow_control_threshold = 800 # 定义批量读取配置 reader_batch_size = 100 reader_batch_timeout = 12 # 为每个分区创建初始ReaderActor readers = [] for partition in range(total_partitions): reader = KafkaReaderActor.remote( kafka_config, input_topic, partition, reader_queue, flow_control_threshold, batch_size=reader_batch_size, batch_timeout=reader_batch_timeout ) readers.append(reader) logger.info(f"Created initial KafkaReaderActor for partition {partition}") # 创建其他初始Actor processor = DataProcessorActor.remote(reader_queue, processor_queue) writer = KafkaWriterActor.remote(kafka_config, output_topic, processor_queue) # 创建管理器Actor manager = ManagerActor.remote( readers, [processor], [writer], reader_queue, processor_queue, kafka_config, output_topic ) # 启动所有Actor ray.get([ *[r.start.remote() for r in readers], processor.start.remote(), writer.start.remote(), manager.start.remote() ]) # 保持主程序运行 try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("Received shutdown signal") # 确保所有Actor都已停止 ray.get(manager.stop.remote()) logger.info("All actors stopped successfully") ray.shutdown() if __name__ == "__main__": main()
import ray ray.init(logging_config=ray.LoggingConfig(log_level="INFO")) consumer_config = { "bootstrap.servers": 'kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594', "security.protocol": "SASL_PLAINTEXT", # 'SASL_SSL' or 'SASL_PLAINTEXT' "sasl.mechanism": "PLAIN", "sasl.username": "xxx", "sasl.password": "xxx", "auto.offset.reset": 'earliest', // 从最早的消息开始读取(可选:'latest') "group.id": 'ray_consumer_group1', // 消费者组ID(不同组独立消费) } ds = ray.data.read_kafka(bootstrap_servers="kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594", topic = 'dujl_test', consumer_config=consumer_config) producer_config = { "bootstrap.servers": 'kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594', "security.protocol": "SASL_PLAINTEXT", // 'SASL_SSL' or 'SASL_PLAINTEXT' "sasl.mechanism": "PLAIN", "sasl.username": "xxxx", "sasl.password": "xxxx", } def map_batches(record): // 示例:假设原始 JSON 中包含 "name" 和 "age" 字段 // 可以新增字段或修改现有字段 return record ds.map_batches(map_batches, batch_size=200).write_kafka(bootstrap_servers="kafka-xxx.kafka.volces.com:9592,kafka-xxx.kafka.volces.com:9593,kafka-xxx.kafka.volces.com:9594", topic = 'dujl_test_ray_write', producer_config=producer_config)
当前支持可视化跟Python两种方式提交作业,详细操作请参考:Serverless Ray 使用指南。