迭代器模式应用场景咨询:新手开发者求实用案例
嘿,作为刚接触迭代器模式的新手,能啃完维基百科的概念已经很棒了!我来给你分享几个实际开发中高频用到的迭代器场景,都是我自己或者同事日常踩过坑、靠迭代器解决的真实案例,帮你把理论落地~
很多时候我们会自己实现非标准集合,比如公司部门的树形结构(每个部门下有子部门+员工)、知识图谱的节点关联结构。如果让调用方自己写递归或循环去遍历,不仅容易写错,代码也会乱糟糟。迭代器能把内部复杂的遍历逻辑完全封装起来,给外部提供统一的next()/hasNext()接口,调用方根本不用关心集合内部是树还是图。
举个Python的简单例子,实现树形部门的员工遍历迭代器:
class Department: def __init__(self, name): self.name = name self.sub_departments = [] self.employees = [] class DepartmentEmployeeIterator: def __init__(self, root_dept): self.stack = [root_dept] # 用栈处理树形遍历 self.current_emp_index = 0 def has_next(self): while self.stack: current_dept = self.stack[-1] # 如果当前部门还有未遍历的员工,返回True if self.current_emp_index < len(current_dept.employees): return True # 处理完当前部门员工,弹出栈,压入子部门(倒序保证遍历顺序正确) self.stack.pop() self.current_emp_index = 0 self.stack.extend(reversed(current_dept.sub_departments)) return False def next(self): if not self.has_next(): raise StopIteration current_dept = self.stack[-1] emp = current_dept.employees[self.current_emp_index] self.current_emp_index += 1 return emp
外部调用的时候,完全不用管递归逻辑,直接用迭代器遍历所有员工:
# 构建测试部门结构 root = Department("总部") hr = Department("HR部") hr.employees = ["Alice", "Bob"] root.sub_departments.append(hr) dev = Department("研发部") dev.employees = ["Charlie", "David"] frontend = Department("前端组") frontend.employees = ["Eve"] dev.sub_departments.append(frontend) root.sub_departments.append(dev) # 遍历所有员工 iterator = DepartmentEmployeeIterator(root) while iterator.has_next(): print(f"员工: {iterator.next()}")
假设你需要同时处理三个数据源:数据库查询结果、Redis缓存列表、第三方API的分页数据。每个数据源的读取方式都不一样——数据库要逐行游标取,API要分页请求,缓存是直接读列表。这时候迭代器能把这些差异完全封装,让外部用一模一样的代码遍历所有数据。
比如我们给每个数据源写一个迭代器,都实现Python的__iter__和__next__接口:
class DBIterator: def __init__(self, db_cursor): self.cursor = db_cursor def __iter__(self): return self def __next__(self): row = self.cursor.fetchone() if not row: raise StopIteration return row class APIPaginatedIterator: def __init__(self, api_base_url): self.url = api_base_url self.current_page = 1 self.current_data = [] self.index = 0 def __iter__(self): return self def __next__(self): if self.index >= len(self.current_data): # 请求下一页数据 resp = requests.get(self.url, params={"page": self.current_page}) self.current_data = resp.json()["data"] if not self.current_data: raise StopIteration self.current_page += 1 self.index = 0 item = self.current_data[self.index] self.index += 1 return item # 缓存迭代器就更简单了,直接基于列表封装 class CacheIterator: def __init__(self, cache_list): self.data = cache_list self.index = 0 def __iter__(self): return self def __next__(self): if self.index >= len(self.data): raise StopIteration item = self.data[self.index] self.index += 1 return item
然后外部处理数据的代码完全不用改,不管加多少新数据源,只要加对应的迭代器就行:
def process_all_data(iterators): for iterator in iterators: for item in iterator: print(f"处理数据: {item}") # 传入三个迭代器,统一处理 process_all_data([DBIterator(db_cursor), APIPaginatedIterator("https://api.example.com/data"), CacheIterator(cache_list)])
这完全符合开闭原则——对扩展开放(加新迭代器),对修改关闭(不用改process_all_data)。
当你要处理10GB的日志文件、数据库里的100万条记录时,如果一次性把所有数据加载到内存,直接就会爆内存。迭代器的核心优势之一就是惰性加载——每次只加载一条(或一小批)数据,用到的时候才去取,内存占用始终很低。
比如Python读取大文件的内置迭代器,其实就是这个原理:
with open("10gb_log.log", "r") as f: for line in f: # f本身就是迭代器,每次只读取一行到内存 parse_log_line(line)
自己实现的话,比如处理分页导出的大数据:
class LargeExportIterator: def __init__(self, export_func, page_size=1000): self.export_func = export_func # 分页导出的函数 self.page_size = page_size self.current_page = 1 self.current_data = [] self.index = 0 def __iter__(self): return self def __next__(self): if self.index >= len(self.current_data): # 加载下一页数据 self.current_data = self.export_func(page=self.current_page, size=self.page_size) if not self.current_data: raise StopIteration self.current_page += 1 self.index = 0 item = self.current_data[self.index] self.index += 1 return item
用这个迭代器导出数据,内存里永远只存1000条数据,不会出现内存溢出的问题。
比如你有一个商品列表,默认按价格排序,但业务需要按销量、上架时间、好评率等不同维度遍历。不用每次都重新排序生成新列表,用迭代器封装不同的排序逻辑就行,原集合完全不用动。
例子:
class Product: def __init__(self, name, price, sales, create_time): self.name = name self.price = price self.sales = sales self.create_time = create_time class ProductSortIterator: def __init__(self, products, sort_key="price"): # 按指定字段排序,生成临时列表(不修改原集合) self.sorted_products = sorted(products, key=lambda x: getattr(x, sort_key)) self.index = 0 def __iter__(self): return self def __next__(self): if self.index >= len(self.sorted_products): raise StopIteration item = self.sorted_products[self.index] self.index += 1 return item
调用的时候,想按什么顺序遍历就传什么参数:
products = [ Product("笔记本", 5999, 1200, "2024-01-15"), Product("鼠标", 99, 5000, "2024-02-20"), Product("键盘", 199, 3000, "2024-03-10") ] # 按销量遍历 for p in ProductSortIterator(products, sort_key="sales"): print(f"{p.name} 销量: {p.sales}") # 按上架时间遍历 for p in ProductSortIterator(products, sort_key="create_time"): print(f"{p.name} 上架时间: {p.create_time}")
内容的提问来源于stack exchange,提问作者Saddam




