基于PyMongo对比MongoDB两集合生成第三集合的技术需求
我来帮你搞定这个用PyMongo对比两个MongoDB集合并生成第三个集合的需求,步骤清晰,代码直接能用:
解决方案:用PyMongo实现集合对比与统计
1. 先建立MongoDB连接
首先确保你已经安装了PyMongo(执行pip install pymongo即可),然后连接到你的数据库,获取三个集合的引用:
from pymongo import MongoClient from datetime import datetime # 根据你的实际MongoDB配置调整连接字符串 client = MongoClient('mongodb://localhost:27017/') db = client['your_database_name'] # 替换成你的数据库名称 # 获取三个目标集合 coll1 = db['collection1'] coll2 = db['collection2'] coll3 = db['collection3']
2. 定义时间戳范围
把你需要筛选的时间范围转换成MongoDB能识别的ISODate格式:
# 示例:筛选2018年1月3日全天的数据 start_time = datetime(2018, 1, 3, 0, 0, 0) end_time = datetime(2018, 1, 3, 23, 59, 59)
3. 预处理集合1的教授数据
先把集合1中时间范围内的教授数据按Prof_Name分组,整理每个教授的科目列表(把字符串转成集合,方便后续快速匹配):
prof_subjects_map = {} # 查询集合1中符合时间范围的所有文档 for doc in coll1.find({"timestamp": {"$gte": start_time, "$lte": end_time}}): prof_name = doc['Prof_Name'] # 清理科目字符串的空格并拆分成集合(自动去重) subjects = set(doc['subjects1'].replace(' ', '').split(',')) prof_subjects_map[prof_name] = subjects
4. 查询并匹配集合2的数据
查询集合2中时间范围内的所有记录,然后遍历每个教授,收集有科目交集的UUID:
# 查询集合2中符合时间范围的所有文档 coll2_records = coll2.find({"timestamp": {"$gte": start_time, "$lte": end_time}}) # 初始化每个教授的统计结果容器 prof_stats = {} for record in coll2_records: uuid = record['UUID'] # 处理集合2的科目字符串 sub2_set = set(record['subjects2'].replace(' ', '').split(',')) # 遍历每个教授,检查科目是否有交集 for prof_name, sub1_set in prof_subjects_map.items(): if sub1_set & sub2_set: # 存在交集则匹配成功 if prof_name not in prof_stats: # 初始化教授的统计字段 prof_stats[prof_name] = { "subjects_list": ', '.join(sub1_set), "UUID_list": [], "UUID-count": 0, "subject_count": len(sub1_set) } # 避免重复添加同一个UUID if uuid not in prof_stats[prof_name]['UUID_list']: prof_stats[prof_name]['UUID_list'].append(str(uuid)) prof_stats[prof_name]['UUID-count'] += 1
5. 将结果插入集合3
最后把整理好的统计数据插入到集合3中:
# 遍历统计结果,逐个插入集合3 for prof_name, stats_data in prof_stats.items(): # 把UUID列表转换成字符串格式 stats_data['Prof_name'] = prof_name stats_data['UUID_list'] = ', '.join(stats_data['UUID_list']) # 插入文档 coll3.insert_one(stats_data)
大数据量优化建议
如果你的集合数据量很大,上面的Python内存遍历可能效率不高,推荐用MongoDB的聚合管道把计算逻辑放在服务器端执行,减少客户端内存占用和数据传输:
pipeline = [ # 1. 过滤集合2的时间范围 {"$match": {"timestamp": {"$gte": start_time, "$lte": end_time}}}, # 2. 清理并拆分subjects2为数组 {"$addFields": { "subjects2_arr": {"$split": [{"$replaceAll": {"input": "$subjects2", "find": " ", "replacement": ""}}, ","]} }}, # 3. 关联集合1,匹配时间范围和科目交集的教授 {"$lookup": { "from": "collection1", "let": {"subs": "$subjects2_arr"}, "pipeline": [ {"$match": { "timestamp": {"$gte": start_time, "$lte": end_time}, "$expr": { "$gt": [ {"$size": {"$setIntersection": [ {"$split": [{"$replaceAll": {"input": "$subjects1", "find": " ", "replacement": ""}}, ","]}, "$$subs" ]}}, 0 ] } }}, {"$project": {"Prof_Name": 1, "_id": 0}} ], "as": "matched_profs" }}, # 4. 展开匹配到的教授列表 {"$unwind": "$matched_profs"}, # 5. 按教授分组,收集UUID并关联教授的科目信息 {"$group": { "_id": "$matched_profs.Prof_Name", "UUID_list": {"$addToSet": "$UUID"}, # 从集合1中获取教授的科目列表(这里需要确保集合1中每个教授只有一条记录,或调整逻辑取第一条) "subjects_list": {"$first": {"$arrayElemAt": [ {"$split": [{"$replaceAll": {"input": "$subjects1", "find": " ", "replacement": ""}}, ","]}, 0 ]}} }}, # 6. 计算统计字段并格式化UUID列表 {"$addFields": { "UUID-count": {"$size": "$UUID_list"}, "subject_count": {"$size": {"$split": [{"$replaceAll": {"input": "$subjects_list", "find": " ", "replacement": ""}}, ","]}}, "UUID_list": {"$reduce": { "input": "$UUID_list", "initialValue": "", "in": {"$concat": ["$$value", {"$cond": [{"$eq": ["$$value", ""]}, "", ", "]}, {"$toString": "$$this"}]} }} }}, # 7. 重命名字段并输出 {"$project": { "_id": 0, "Prof_name": "$_id", "subjects_list": 1, "UUID_list": 1, "UUID-count": 1, "subject_count": 1 }} ] # 执行聚合并批量插入集合3 agg_result = coll2.aggregate(pipeline) coll3.insert_many(agg_result)
内容的提问来源于stack exchange,提问作者Rachel




