MongoDB聚合获取各股票代码最新报价问题(PyMongo)及层级实现
解决PyMongo聚合获取每个股票最新报价的问题
我来帮你排查当前的问题,同时也会给你演示单层级和两层级分组获取最后一条数据的实现方法。
一、当前查询只返回一行的原因
你遇到的问题大概率是部分目标股票代码(ticker)没有匹配到$match条件中的数据(比如2020年10月6日15点的报价记录不存在)。MongoDB的$group只会对有匹配数据的分组返回结果,如果要确保即使没有数据也返回对应ticker的条目,我会在后面给出解决方案。
二、单层级分组的正确实现(获取有数据ticker的最新报价)
首先优化你的聚合逻辑,简化匹配条件并调整排序方式,确保每个ticker的最新报价被正确获取:
tickerList = ['AAPL', 'MSFT', 'AMZN', 'NFLX'] docs = dbCollectionQuotes.aggregate([ # 简化匹配条件:MongoDB多条件默认是逻辑与,无需显式写$and {'$match': { 'timestampYear': 2020, 'timestampMonth': 10, 'timestampDay': 6, 'timestampHour': 15, 'ticker': {'$in': tickerList } }}, # 先按ticker分组,再按时间降序排列,这样每个组的第一条就是最新数据 {'$sort': { 'ticker': 1, 'timestampIsoDateTime': -1 }}, {'$group': { '_id': '$ticker', 'lastId': {'$first': '$_id'}, # 降序下$first等价于升序下的$last,可读性更好 'minuteClose': {'$first': '$minuteClose'}, 'todaysChangePerc': {'$first': '$todaysChangePerc'}, 'timestampIsoDateTime': {'$first': '$timestampIsoDateTime'} }}, # 可选:重新整理字段,把_id转为ticker,让结果更直观 {'$project': { '_id': 0, 'ticker': '$_id', 'lastId': 1, 'minuteClose': 1, 'todaysChangePerc': 1, 'timestampIsoDateTime': 1 }} ])
强制返回所有目标ticker(含无数据的情况)
如果希望不管ticker有没有匹配数据,都返回4行(无数据的字段设为null),可以用$documents生成临时集合,结合$lookup实现:
tickerList = ['AAPL', 'MSFT', 'AMZN', 'NFLX'] docs = dbCollectionQuotes.aggregate([ # 生成包含所有目标ticker的临时集合 {'$documents': [{'ticker': t} for t in tickerList]}, # 关联报价集合,子查询获取每个ticker的最新数据 {'$lookup': { 'from': 'collectionQuotes', # 替换为你的实际集合名 'let': {'targetTicker': '$ticker'}, 'pipeline': [ {'$match': { '$expr': {'$eq': ['$ticker', '$$targetTicker']}, 'timestampYear': 2020, 'timestampMonth': 10, 'timestampDay': 6, 'timestampHour': 15 }}, {'$sort': {'timestampIsoDateTime': -1}}, {'$limit': 1} # 只取最新的一条 ], 'as': 'latestQuote' }}, # 展开报价数据,保留无数据的ticker {'$unwind': { 'path': '$latestQuote', 'preserveNullAndEmptyArrays': True }}, # 整理输出字段 {'$project': { 'ticker': 1, 'lastId': '$latestQuote._id', 'minuteClose': '$latestQuote.minuteClose', 'todaysChangePerc': '$latestQuote.todaysChangePerc', 'timestampIsoDateTime': '$latestQuote.timestampIsoDateTime' }} ])
三、两层级分组的实现(比如按「年份+股票代码」分组)
如果需要按多个维度分组(比如年份+股票代码),只需要将$group的_id设为包含多个字段的对象即可:
tickerList = ['AAPL', 'MSFT', 'AMZN', 'NFLX'] docs = dbCollectionQuotes.aggregate([ {'$match': { 'ticker': {'$in': tickerList } }}, # 按年份、ticker分组,再按时间降序 {'$sort': { 'timestampYear': 1, 'ticker': 1, 'timestampIsoDateTime': -1 }}, {'$group': { '_id': { 'year': '$timestampYear', 'ticker': '$ticker' }, 'lastId': {'$first': '$_id'}, 'minuteClose': {'$first': '$minuteClose'}, 'todaysChangePerc': {'$first': '$todaysChangePerc'}, 'timestampIsoDateTime': {'$first': '$timestampIsoDateTime'} }}, # 整理输出字段,拆分两层级分组的_id {'$project': { '_id': 0, 'year': '$_id.year', 'ticker': '$_id.ticker', 'lastId': 1, 'minuteClose': 1, 'todaysChangePerc': 1, 'timestampIsoDateTime': 1 }} ])
这个查询会返回每个(年份+股票代码)组合的最新报价,实现了两层级的分组需求。
内容的提问来源于stack exchange,提问作者NealWalters




