You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

迭代器模式应用场景咨询:新手开发者求实用案例

嘿,作为刚接触迭代器模式的新手,能啃完维基百科的概念已经很棒了!我来给你分享几个实际开发中高频用到的迭代器场景,都是我自己或者同事日常踩过坑、靠迭代器解决的真实案例,帮你把理论落地~

1. 遍历复杂自定义集合(比如树形/图结构)

很多时候我们会自己实现非标准集合,比如公司部门的树形结构(每个部门下有子部门+员工)、知识图谱的节点关联结构。如果让调用方自己写递归或循环去遍历,不仅容易写错,代码也会乱糟糟。迭代器能把内部复杂的遍历逻辑完全封装起来,给外部提供统一的next()/hasNext()接口,调用方根本不用关心集合内部是树还是图。

举个Python的简单例子,实现树形部门的员工遍历迭代器:

class Department:
    def __init__(self, name):
        self.name = name
        self.sub_departments = []
        self.employees = []

class DepartmentEmployeeIterator:
    def __init__(self, root_dept):
        self.stack = [root_dept]  # 用栈处理树形遍历
        self.current_emp_index = 0

    def has_next(self):
        while self.stack:
            current_dept = self.stack[-1]
            # 如果当前部门还有未遍历的员工,返回True
            if self.current_emp_index < len(current_dept.employees):
                return True
            # 处理完当前部门员工,弹出栈,压入子部门(倒序保证遍历顺序正确)
            self.stack.pop()
            self.current_emp_index = 0
            self.stack.extend(reversed(current_dept.sub_departments))
        return False

    def next(self):
        if not self.has_next():
            raise StopIteration
        current_dept = self.stack[-1]
        emp = current_dept.employees[self.current_emp_index]
        self.current_emp_index += 1
        return emp

外部调用的时候,完全不用管递归逻辑,直接用迭代器遍历所有员工:

# 构建测试部门结构
root = Department("总部")
hr = Department("HR部")
hr.employees = ["Alice", "Bob"]
root.sub_departments.append(hr)
dev = Department("研发部")
dev.employees = ["Charlie", "David"]
frontend = Department("前端组")
frontend.employees = ["Eve"]
dev.sub_departments.append(frontend)
root.sub_departments.append(dev)

# 遍历所有员工
iterator = DepartmentEmployeeIterator(root)
while iterator.has_next():
    print(f"员工: {iterator.next()}")
2. 统一多数据源的遍历方式

假设你需要同时处理三个数据源:数据库查询结果、Redis缓存列表、第三方API的分页数据。每个数据源的读取方式都不一样——数据库要逐行游标取,API要分页请求,缓存是直接读列表。这时候迭代器能把这些差异完全封装,让外部用一模一样的代码遍历所有数据。

比如我们给每个数据源写一个迭代器,都实现Python的__iter____next__接口:

class DBIterator:
    def __init__(self, db_cursor):
        self.cursor = db_cursor
    def __iter__(self):
        return self
    def __next__(self):
        row = self.cursor.fetchone()
        if not row:
            raise StopIteration
        return row

class APIPaginatedIterator:
    def __init__(self, api_base_url):
        self.url = api_base_url
        self.current_page = 1
        self.current_data = []
        self.index = 0
    def __iter__(self):
        return self
    def __next__(self):
        if self.index >= len(self.current_data):
            # 请求下一页数据
            resp = requests.get(self.url, params={"page": self.current_page})
            self.current_data = resp.json()["data"]
            if not self.current_data:
                raise StopIteration
            self.current_page += 1
            self.index = 0
        item = self.current_data[self.index]
        self.index += 1
        return item

# 缓存迭代器就更简单了,直接基于列表封装
class CacheIterator:
    def __init__(self, cache_list):
        self.data = cache_list
        self.index = 0
    def __iter__(self):
        return self
    def __next__(self):
        if self.index >= len(self.data):
            raise StopIteration
        item = self.data[self.index]
        self.index += 1
        return item

然后外部处理数据的代码完全不用改,不管加多少新数据源,只要加对应的迭代器就行:

def process_all_data(iterators):
    for iterator in iterators:
        for item in iterator:
            print(f"处理数据: {item}")

# 传入三个迭代器,统一处理
process_all_data([DBIterator(db_cursor), APIPaginatedIterator("https://api.example.com/data"), CacheIterator(cache_list)])

这完全符合开闭原则——对扩展开放(加新迭代器),对修改关闭(不用改process_all_data)。

3. 惰性加载处理超大数据集

当你要处理10GB的日志文件、数据库里的100万条记录时,如果一次性把所有数据加载到内存,直接就会爆内存。迭代器的核心优势之一就是惰性加载——每次只加载一条(或一小批)数据,用到的时候才去取,内存占用始终很低。

比如Python读取大文件的内置迭代器,其实就是这个原理:

with open("10gb_log.log", "r") as f:
    for line in f:  # f本身就是迭代器,每次只读取一行到内存
        parse_log_line(line)

自己实现的话,比如处理分页导出的大数据:

class LargeExportIterator:
    def __init__(self, export_func, page_size=1000):
        self.export_func = export_func  # 分页导出的函数
        self.page_size = page_size
        self.current_page = 1
        self.current_data = []
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index >= len(self.current_data):
            # 加载下一页数据
            self.current_data = self.export_func(page=self.current_page, size=self.page_size)
            if not self.current_data:
                raise StopIteration
            self.current_page += 1
            self.index = 0
        item = self.current_data[self.index]
        self.index += 1
        return item

用这个迭代器导出数据,内存里永远只存1000条数据,不会出现内存溢出的问题。

4. 自定义遍历顺序(不用修改原集合)

比如你有一个商品列表,默认按价格排序,但业务需要按销量、上架时间、好评率等不同维度遍历。不用每次都重新排序生成新列表,用迭代器封装不同的排序逻辑就行,原集合完全不用动。

例子:

class Product:
    def __init__(self, name, price, sales, create_time):
        self.name = name
        self.price = price
        self.sales = sales
        self.create_time = create_time

class ProductSortIterator:
    def __init__(self, products, sort_key="price"):
        # 按指定字段排序,生成临时列表(不修改原集合)
        self.sorted_products = sorted(products, key=lambda x: getattr(x, sort_key))
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index >= len(self.sorted_products):
            raise StopIteration
        item = self.sorted_products[self.index]
        self.index += 1
        return item

调用的时候,想按什么顺序遍历就传什么参数:

products = [
    Product("笔记本", 5999, 1200, "2024-01-15"),
    Product("鼠标", 99, 5000, "2024-02-20"),
    Product("键盘", 199, 3000, "2024-03-10")
]

# 按销量遍历
for p in ProductSortIterator(products, sort_key="sales"):
    print(f"{p.name} 销量: {p.sales}")

# 按上架时间遍历
for p in ProductSortIterator(products, sort_key="create_time"):
    print(f"{p.name} 上架时间: {p.create_time}")

内容的提问来源于stack exchange,提问作者Saddam

火山引擎 最新活动