如何用SimPy建模具备批量生产与库存检查的鞋厂仿真系统?
鞋厂SimPy建模与运营仿真问题
我正在用SimPy建模鞋厂运营仿真,目标是创建不同场景并确定新工厂最优布局。这是我第一次用SimPy,虽然看了官方文档,但还是没法实现核心模块。
需求说明
- 工厂按随机间隔生成批量订单
- 订单分为**Make To Order (MTO)和Make To Stock (MTS)**两类:
- MTO订单直接进入生产队列,完成后发运
- MTS订单先检查库存,有货直接发运;无货则生产后入库
- 每日初需补货前一日出库的库存
- 1条生产线包含4台机器,生产为连续式:产品1离开机器1后,产品2立即进入机器1,产品1进入机器2
- 不同批次产品切换时,每台机器需独立换型:机器完成当前批次后立即开始下一批次的换型
已完成工作
目前已实现:创建4台机器、1条生产线、2批产品,随机生成产品数量;批次进入生产线队列,完成后释放生产线。
现存问题
- 批量处理不连续:同批次产品需等前一产品走完所有机器才能进入首台机器
- 换型时间计算错误:换型时间叠加逻辑错误,导致启动时间偏差
- 无法实现动态换型:无法做到机器完成当前批次后立即启动换型
- 无库存管理系统:未实现MTS订单的库存检查与补货逻辑
- 无订单分类机制:无法区分MTO和MTS订单
以下是我写的代码:
# Simulation parameters and machine configurations RANDOM_SEED = 68 SIM_TIME = 100 NUM_PRODUCTS_MEAN = 5 NUM_PRODUCTS_SIGMA = 3 TIME_BETWEEN_BATCHES = 5 NUMBER_OF_MACHINES = 4 MACHINE_PARAMS = { 1: {'mean': 0.5, 'sigma': 1.0}, 2: {'mean': 3.0, 'sigma': 0.5}, 3: {'mean': 1.5, 'sigma': 0.3}, 4: {'mean': 0.5, 'sigma': 0.1}, } # Function to calculate time per part based on the machine type def time_per_part(machine): machine_id = machine.capacity if machine_id not in MACHINE_PARAMS: raise ValueError(f"Invalid machine {machine_id}. Must be one of {list(MACHINE_PARAMS.keys())}") PT_MEAN = MACHINE_PARAMS[machine_id]['mean'] PT_SIGMA = MACHINE_PARAMS[machine_id]['sigma'] t = random.normalvariate(PT_MEAN, PT_SIGMA) while t <= 0: t = random.normalvariate(PT_MEAN, PT_SIGMA) return t # Machine class definition class Machine: def __init__(self, env, name, capacity): self.env = env self.name = name self.capacity = capacity # Product class representing individual products in the batch class Product: def __init__(self, env, name, category, sku): self.env = env self.name = name self.category = category self.sku = sku def assemble(self, ms): for machine_id in range(1, NUMBER_OF_MACHINES + 1): machine = yield ms.get(lambda m: m.capacity == machine_id) print(f'{self.name} is being assembled by {machine.name} at {self.env.now}') yield self.env.timeout(time_per_part(machine)) yield ms.put(machine) print(f'{self.name} is done with {machine.name} at {self.env.now}') # Batch generator to create and process batches of products def batch_generator(env, pl, ms, skus, queue): for sku in skus: num_products = int(random.normalvariate(NUM_PRODUCTS_MEAN, NUM_PRODUCTS_SIGMA)) num_products = max(1, num_products) batch = Batch(env, f'Batch_{sku}', 'category', sku, num_products) env.process(batch.process_batch(pl, ms, queue)) queue.append(batch.name) print(f'Batch {batch.name} is ready at {env.now}') yield env.timeout(TIME_BETWEEN_BATCHES) # Batch class to handle the processing of products class Batch: def __init__(self, env, name, category, sku, num_products): self.env = env self.name = name self.category = category self.sku = sku self.num_products = num_products self.products = [Product(env, f'Product_{sku}_{i}', category, sku) for i in range(num_products)] def process_batch(self, pl, ms, queue): if len(queue) > 1 and queue[len(queue)-2] != self.name: setup_time = random.uniform(1, 3) print(f'Setup time of {setup_time} before processing next batch at {self.env.now}') yield env.timeout(setup_time) yield pl.get() print(f'Batch {self.name} is being processed by {pl.name} at {self.env.now}') for i, product in enumerate(self.products): print(f'Processing {product.name} from Batch {self.name} (Product {i+1}/{self.num_products})') yield self.env.process(product.assemble(ms)) print(f'{product.name} from Batch {self.name} is done processing at {self.env.now}') print(f'Batch {self.name} is done processing at {self.env.now}') yield pl.put() # ProductionLine class for managing the production line class ProductionLine: def __init__(self, env, name, capacity): self.env = env self.name = name self.capacity = capacity self.lines = simpy.FilterStore(env, capacity=capacity) for i in range(capacity): self.lines.put(self) def get(self): return self.lines.get() def put(self): return self.lines.put() # Setting up the simulation environment and running it print('Machine shop') random.seed(RANDOM_SEED) env = simpy.Environment() machine_1 = Machine(env, "Machine_1", 1) machine_2 = Machine(env, "Machine_2", 2) machine_3 = Machine(env, "Machine_3", 3) machine_4 = Machine(env, "Machine_4", 4) machine_shop = simpy.FilterStore(env, capacity=NUMBER_OF_MACHINES) machine_shop.items = [machine_1, machine_2, machine_3, machine_4] pl = ProductionLine(env, 'Main Line', capacity=1) skus = ['SKU1', 'SKU2'] batch_queue = [] env.process(batch_generator(env, pl, machine_shop, skus, batch_queue)) env.run(until=SIM_TIME)
重构后的解决方案代码
import simpy import random # Simulation parameters and machine configurations RANDOM_SEED = 68 SIM_TIME = 100 DAILY_TIME = 24 # 定义一天的时长,用于每日补货 NUM_PRODUCTS_MEAN = 5 NUM_PRODUCTS_SIGMA = 3 TIME_BETWEEN_BATCHES = 5 ORDER_TYPES = ['MTO', 'MTS'] # 订单类型 ORDER_TYPE_PROB = [0.4, 0.6] # MTO/MTS生成概率 NUMBER_OF_MACHINES = 4 MACHINE_PARAMS = { 1: {'mean': 0.5, 'sigma': 1.0, 'setup_range': (1, 3)}, 2: {'mean': 3.0, 'sigma': 0.5, 'setup_range': (2, 4)}, 3: {'mean': 1.5, 'sigma': 0.3, 'setup_range': (1.5, 3.5)}, 4: {'mean': 0.5, 'sigma': 0.1, 'setup_range': (0.5, 1.5)}, } # 库存管理类 class Inventory: def __init__(self, env): self.env = env self.stock = {} # 库存结构:{sku: quantity} self.daily_outbound = {} # 每日出库记录,用于次日补货 # 启动每日补货进程 self.env.process(self.daily_restock()) def check_stock(self, sku, quantity): """检查库存是否满足需求""" if sku not in self.stock or self.stock[sku] < quantity: return False return True def deduct_stock(self, sku, quantity): """扣除库存,记录出库""" if sku in self.stock: self.stock[sku] -= quantity self.daily_outbound[sku] = self.daily_outbound.get(sku, 0) + quantity print(f"[库存扣减] SKU:{sku}, 剩余库存:{self.stock[sku]} 时间:{self.env.now:.2f}") def add_stock(self, sku, quantity): """增加库存""" self.stock[sku] = self.stock.get(sku, 0) + quantity print(f"[库存增加] SKU:{sku}, 当前库存:{self.stock[sku]} 时间:{self.env.now:.2f}") def daily_restock(self): """每日初补货前一日出库量""" while True: # 等待到次日0点(按DAILY_TIME间隔) yield self.env.timeout(DAILY_TIME - (self.env.now % DAILY_TIME)) if self.daily_outbound: print(f"\n[每日补货] 开始补货,时间:{self.env.now:.2f}") for sku, qty in self.daily_outbound.items(): # 触发补货生产 self.env.process(generate_production_batch(self.env, sku, qty, 'MTS', production_line, machines, inventory)) print(f"[补货任务] SKU:{sku}, 补货量:{qty}") self.daily_outbound = {} # 清空当日出库记录 # 机器类:包含状态管理和换型逻辑 class Machine: def __init__(self, env, machine_id): self.env = env self.machine_id = machine_id self.name = f"Machine_{machine_id}" self.current_sku = None # 当前加工的SKU self.resource = simpy.Resource(env, capacity=1) # 用Resource管理机器占用 def process_part(self, sku): """处理单个产品,包含换型逻辑""" with self.resource.request() as req: yield req # 换型检查:如果当前SKU和新SKU不同,执行换型 if self.current_sku != sku: setup_time = random.uniform(*MACHINE_PARAMS[self.machine_id]['setup_range']) print(f"[{self.name}] 开始换型,切换至SKU:{sku} 换型时间:{setup_time:.2f} 时间:{self.env.now:.2f}") yield self.env.timeout(setup_time) self.current_sku = sku print(f"[{self.name}] 换型完成,开始加工SKU:{sku} 时间:{self.env.now:.2f}") # 加工时间 process_time = self._get_process_time() yield self.env.timeout(process_time) print(f"[{self.name}] 完成SKU:{sku}加工 耗时:{process_time:.2f} 时间:{self.env.now:.2f}") def _get_process_time(self): """获取当前机器的加工时间(确保为正)""" params = MACHINE_PARAMS[self.machine_id] while True: t = random.normalvariate(params['mean'], params['sigma']) if t > 0: return t # 产品加工流程:连续式生产 def process_product(env, sku, machines): """单个产品依次经过4台机器加工""" print(f"[产品启动] SKU:{sku} 进入生产线 时间:{env.now:.2f}") for machine in machines: yield env.process(machine.process_part(sku)) print(f"[产品完成] SKU:{sku} 加工完成 时间:{env.now:.2f}") # 批次处理:启动批量产品的连续加工 def process_batch(env, sku, quantity, order_type, production_line, machines, inventory): """处理批次订单,区分MTO/MTS""" if order_type == 'MTS': # MTS订单先检查库存 if inventory.check_stock(sku, quantity): inventory.deduct_stock(sku, quantity) print(f"[MTS订单发运] SKU:{sku}, 数量:{quantity} 直接发运 时间:{env.now:.2f}") return else: print(f"[MTS订单缺货] SKU:{sku}, 需生产数量:{quantity} 时间:{env.now:.2f}") # MTO或缺货的MTS订单,进入生产队列 with production_line.request() as req: yield req print(f"[批次启动] {order_type} SKU:{sku}, 数量:{quantity} 开始生产 时间:{env.now:.2f}") # 连续式生产:前一个产品启动后,间隔短时间启动下一个产品 product_processes = [] for _ in range(quantity): proc = env.process(process_product(env, sku, machines)) product_processes.append(proc) # 模拟首台机器加工后的间隔,让下一个产品进入 yield env.timeout(MACHINE_PARAMS[1]['mean'] * 0.1) # 等待所有产品加工完成 yield env.all_of(product_processes) if order_type == 'MTS': # MTS生产完成后入库并发运 inventory.add_stock(sku, quantity) inventory.deduct_stock(sku, quantity) print(f"[MTS订单完成] SKU:{sku}, 数量:{quantity} 生产入库后发运 时间:{env.now:.2f}") else: print(f"[MTO订单完成] SKU:{sku}, 数量:{quantity} 生产完成并发运 时间:{env.now:.2f}") print(f"[批次结束] {order_type} SKU:{sku} 完成所有生产 时间:{env.now:.2f}") # 生成生产批次(包括订单和补货) def generate_production_batch(env, sku, quantity, order_type, production_line, machines, inventory): yield env.process(process_batch(env, sku, quantity, order_type, production_line, machines, inventory)) # 订单生成器:随机生成MTO/MTS订单 def order_generator(env, production_line, machines, inventory): skus = ['SKU1', 'SKU2'] while env.now < SIM_TIME: # 随机生成订单间隔 inter_arrival = random.expovariate(1/TIME_BETWEEN_BATCHES) yield env.timeout(inter_arrival) # 随机选择SKU和订单类型 sku = random.choice(skus) order_type = random.choices(ORDER_TYPES, weights=ORDER_TYPE_PROB)[0] num_products = max(1, int(random.normalvariate(NUM_PRODUCTS_MEAN, NUM_PRODUCTS_SIGMA))) print(f"\n[新订单生成] {order_type} SKU:{sku}, 数量:{num_products} 时间:{env.now:.2f}") env.process(generate_production_batch(env, sku, num_products, order_type, production_line, machines, inventory)) # 主仿真流程 if __name__ == '__main__': print('=== 鞋厂运营仿真 ===') random.seed(RANDOM_SEED) env = simpy.Environment() inventory = Inventory(env) # 创建机器 machines = [Machine(env, i+1) for i in range(NUMBER_OF_MACHINES)] # 创建生产线资源(控制批次并发) production_line = simpy.Resource(env, capacity=1) # 启动订单生成器 env.process(order_generator(env, production_line, machines, inventory)) # 运行仿真 env.run(until=SIM_TIME)
关键问题解决说明
连续式生产实现:
- 单个产品加工流程独立启动,前一个产品启动后间隔短时间就启动下一个产品,无需等待上一个产品走完所有机器
- 使用
env.all_of()等待批次内所有产品加工完成
动态换型与正确换型时间:
- 机器类新增
current_sku状态,每次加工前检查是否需要换型 - 换型在机器空闲时触发,每台机器独立处理换型,避免时间叠加
- 机器类新增
订单分类与库存管理:
- 新增
Inventory类,实现库存检查、扣减、入库和每日补货逻辑 - 订单生成时随机分配MTO/MTS类型
- 新增




