You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

如何用SimPy建模具备批量生产与库存检查的鞋厂仿真系统?

鞋厂SimPy建模与运营仿真问题

我正在用SimPy建模鞋厂运营仿真,目标是创建不同场景并确定新工厂最优布局。这是我第一次用SimPy,虽然看了官方文档,但还是没法实现核心模块。

需求说明

  • 工厂按随机间隔生成批量订单
  • 订单分为**Make To Order (MTO)Make To Stock (MTS)**两类:
    • MTO订单直接进入生产队列,完成后发运
    • MTS订单先检查库存,有货直接发运;无货则生产后入库
    • 每日初需补货前一日出库的库存
  • 1条生产线包含4台机器,生产为连续式:产品1离开机器1后,产品2立即进入机器1,产品1进入机器2
  • 不同批次产品切换时,每台机器需独立换型:机器完成当前批次后立即开始下一批次的换型

已完成工作

目前已实现:创建4台机器、1条生产线、2批产品,随机生成产品数量;批次进入生产线队列,完成后释放生产线。

现存问题

  • 批量处理不连续:同批次产品需等前一产品走完所有机器才能进入首台机器
  • 换型时间计算错误:换型时间叠加逻辑错误,导致启动时间偏差
  • 无法实现动态换型:无法做到机器完成当前批次后立即启动换型
  • 无库存管理系统:未实现MTS订单的库存检查与补货逻辑
  • 无订单分类机制:无法区分MTO和MTS订单

以下是我写的代码:

# Simulation parameters and machine configurations
RANDOM_SEED = 68
SIM_TIME = 100

NUM_PRODUCTS_MEAN = 5
NUM_PRODUCTS_SIGMA = 3

TIME_BETWEEN_BATCHES = 5

NUMBER_OF_MACHINES = 4

MACHINE_PARAMS = {
    1: {'mean': 0.5, 'sigma': 1.0},
    2: {'mean': 3.0, 'sigma': 0.5},
    3: {'mean': 1.5, 'sigma': 0.3},
    4: {'mean': 0.5, 'sigma': 0.1},
}

# Function to calculate time per part based on the machine type
def time_per_part(machine):
    machine_id = machine.capacity

    if machine_id not in MACHINE_PARAMS:
        raise ValueError(f"Invalid machine {machine_id}. Must be one of {list(MACHINE_PARAMS.keys())}")

    PT_MEAN = MACHINE_PARAMS[machine_id]['mean']
    PT_SIGMA = MACHINE_PARAMS[machine_id]['sigma']

    t = random.normalvariate(PT_MEAN, PT_SIGMA)
    while t <= 0:
        t = random.normalvariate(PT_MEAN, PT_SIGMA)

    return t

# Machine class definition
class Machine:
    def __init__(self, env, name, capacity):
        self.env = env
        self.name = name
        self.capacity = capacity

# Product class representing individual products in the batch
class Product:
    def __init__(self, env, name, category, sku):
        self.env = env
        self.name = name
        self.category = category
        self.sku = sku

    def assemble(self, ms):
        for machine_id in range(1, NUMBER_OF_MACHINES + 1):
            machine = yield ms.get(lambda m: m.capacity == machine_id)
            print(f'{self.name} is being assembled by {machine.name} at {self.env.now}')
            yield self.env.timeout(time_per_part(machine))
            yield ms.put(machine)
            print(f'{self.name} is done with {machine.name} at {self.env.now}')

# Batch generator to create and process batches of products
def batch_generator(env, pl, ms, skus, queue):
    for sku in skus:
        num_products = int(random.normalvariate(NUM_PRODUCTS_MEAN, NUM_PRODUCTS_SIGMA))
        num_products = max(1, num_products)
        batch = Batch(env, f'Batch_{sku}', 'category', sku, num_products)
        env.process(batch.process_batch(pl, ms, queue))
        queue.append(batch.name)
        print(f'Batch {batch.name} is ready at {env.now}')
        yield env.timeout(TIME_BETWEEN_BATCHES)

# Batch class to handle the processing of products
class Batch:
    def __init__(self, env, name, category, sku, num_products):
        self.env = env
        self.name = name
        self.category = category
        self.sku = sku
        self.num_products = num_products
        self.products = [Product(env, f'Product_{sku}_{i}', category, sku) for i in range(num_products)]

    def process_batch(self, pl, ms, queue):
        if len(queue) > 1 and queue[len(queue)-2] != self.name:
            setup_time = random.uniform(1, 3)
            print(f'Setup time of {setup_time} before processing next batch at {self.env.now}')
            yield env.timeout(setup_time)
        
        yield pl.get()
        print(f'Batch {self.name} is being processed by {pl.name} at {self.env.now}')

        for i, product in enumerate(self.products):
            print(f'Processing {product.name} from Batch {self.name} (Product {i+1}/{self.num_products})')
            yield self.env.process(product.assemble(ms))
            print(f'{product.name} from Batch {self.name} is done processing at {self.env.now}')
        
        print(f'Batch {self.name} is done processing at {self.env.now}')
        yield pl.put()

# ProductionLine class for managing the production line
class ProductionLine:
    def __init__(self, env, name, capacity):
        self.env = env
        self.name = name
        self.capacity = capacity
        self.lines = simpy.FilterStore(env, capacity=capacity)

        for i in range(capacity):
            self.lines.put(self)

    def get(self):
        return self.lines.get()

    def put(self):
        return self.lines.put()

# Setting up the simulation environment and running it
print('Machine shop')
random.seed(RANDOM_SEED)

env = simpy.Environment()

machine_1 = Machine(env, "Machine_1", 1)
machine_2 = Machine(env, "Machine_2", 2)
machine_3 = Machine(env, "Machine_3", 3)
machine_4 = Machine(env, "Machine_4", 4)

machine_shop = simpy.FilterStore(env, capacity=NUMBER_OF_MACHINES)
machine_shop.items = [machine_1, machine_2, machine_3, machine_4]

pl = ProductionLine(env, 'Main Line', capacity=1)

skus = ['SKU1', 'SKU2']
batch_queue = []

env.process(batch_generator(env, pl, machine_shop, skus, batch_queue))

env.run(until=SIM_TIME)

重构后的解决方案代码

import simpy
import random

# Simulation parameters and machine configurations
RANDOM_SEED = 68
SIM_TIME = 100
DAILY_TIME = 24  # 定义一天的时长,用于每日补货

NUM_PRODUCTS_MEAN = 5
NUM_PRODUCTS_SIGMA = 3

TIME_BETWEEN_BATCHES = 5
ORDER_TYPES = ['MTO', 'MTS']  # 订单类型
ORDER_TYPE_PROB = [0.4, 0.6]  # MTO/MTS生成概率

NUMBER_OF_MACHINES = 4

MACHINE_PARAMS = {
    1: {'mean': 0.5, 'sigma': 1.0, 'setup_range': (1, 3)},
    2: {'mean': 3.0, 'sigma': 0.5, 'setup_range': (2, 4)},
    3: {'mean': 1.5, 'sigma': 0.3, 'setup_range': (1.5, 3.5)},
    4: {'mean': 0.5, 'sigma': 0.1, 'setup_range': (0.5, 1.5)},
}

# 库存管理类
class Inventory:
    def __init__(self, env):
        self.env = env
        self.stock = {}  # 库存结构:{sku: quantity}
        self.daily_outbound = {}  # 每日出库记录,用于次日补货
        # 启动每日补货进程
        self.env.process(self.daily_restock())

    def check_stock(self, sku, quantity):
        """检查库存是否满足需求"""
        if sku not in self.stock or self.stock[sku] < quantity:
            return False
        return True

    def deduct_stock(self, sku, quantity):
        """扣除库存,记录出库"""
        if sku in self.stock:
            self.stock[sku] -= quantity
            self.daily_outbound[sku] = self.daily_outbound.get(sku, 0) + quantity
            print(f"[库存扣减] SKU:{sku}, 剩余库存:{self.stock[sku]} 时间:{self.env.now:.2f}")

    def add_stock(self, sku, quantity):
        """增加库存"""
        self.stock[sku] = self.stock.get(sku, 0) + quantity
        print(f"[库存增加] SKU:{sku}, 当前库存:{self.stock[sku]} 时间:{self.env.now:.2f}")

    def daily_restock(self):
        """每日初补货前一日出库量"""
        while True:
            # 等待到次日0点(按DAILY_TIME间隔)
            yield self.env.timeout(DAILY_TIME - (self.env.now % DAILY_TIME))
            if self.daily_outbound:
                print(f"\n[每日补货] 开始补货,时间:{self.env.now:.2f}")
                for sku, qty in self.daily_outbound.items():
                    # 触发补货生产
                    self.env.process(generate_production_batch(self.env, sku, qty, 'MTS', production_line, machines, inventory))
                    print(f"[补货任务] SKU:{sku}, 补货量:{qty}")
                self.daily_outbound = {}  # 清空当日出库记录

# 机器类:包含状态管理和换型逻辑
class Machine:
    def __init__(self, env, machine_id):
        self.env = env
        self.machine_id = machine_id
        self.name = f"Machine_{machine_id}"
        self.current_sku = None  # 当前加工的SKU
        self.resource = simpy.Resource(env, capacity=1)  # 用Resource管理机器占用

    def process_part(self, sku):
        """处理单个产品,包含换型逻辑"""
        with self.resource.request() as req:
            yield req

            # 换型检查:如果当前SKU和新SKU不同,执行换型
            if self.current_sku != sku:
                setup_time = random.uniform(*MACHINE_PARAMS[self.machine_id]['setup_range'])
                print(f"[{self.name}] 开始换型,切换至SKU:{sku} 换型时间:{setup_time:.2f} 时间:{self.env.now:.2f}")
                yield self.env.timeout(setup_time)
                self.current_sku = sku
                print(f"[{self.name}] 换型完成,开始加工SKU:{sku} 时间:{self.env.now:.2f}")

            # 加工时间
            process_time = self._get_process_time()
            yield self.env.timeout(process_time)
            print(f"[{self.name}] 完成SKU:{sku}加工 耗时:{process_time:.2f} 时间:{self.env.now:.2f}")

    def _get_process_time(self):
        """获取当前机器的加工时间(确保为正)"""
        params = MACHINE_PARAMS[self.machine_id]
        while True:
            t = random.normalvariate(params['mean'], params['sigma'])
            if t > 0:
                return t

# 产品加工流程:连续式生产
def process_product(env, sku, machines):
    """单个产品依次经过4台机器加工"""
    print(f"[产品启动] SKU:{sku} 进入生产线 时间:{env.now:.2f}")
    for machine in machines:
        yield env.process(machine.process_part(sku))
    print(f"[产品完成] SKU:{sku} 加工完成 时间:{env.now:.2f}")

# 批次处理:启动批量产品的连续加工
def process_batch(env, sku, quantity, order_type, production_line, machines, inventory):
    """处理批次订单,区分MTO/MTS"""
    if order_type == 'MTS':
        # MTS订单先检查库存
        if inventory.check_stock(sku, quantity):
            inventory.deduct_stock(sku, quantity)
            print(f"[MTS订单发运] SKU:{sku}, 数量:{quantity} 直接发运 时间:{env.now:.2f}")
            return
        else:
            print(f"[MTS订单缺货] SKU:{sku}, 需生产数量:{quantity} 时间:{env.now:.2f}")

    # MTO或缺货的MTS订单,进入生产队列
    with production_line.request() as req:
        yield req
        print(f"[批次启动] {order_type} SKU:{sku}, 数量:{quantity} 开始生产 时间:{env.now:.2f}")
        
        # 连续式生产:前一个产品启动后,间隔短时间启动下一个产品
        product_processes = []
        for _ in range(quantity):
            proc = env.process(process_product(env, sku, machines))
            product_processes.append(proc)
            # 模拟首台机器加工后的间隔,让下一个产品进入
            yield env.timeout(MACHINE_PARAMS[1]['mean'] * 0.1)
        
        # 等待所有产品加工完成
        yield env.all_of(product_processes)
        
        if order_type == 'MTS':
            # MTS生产完成后入库并发运
            inventory.add_stock(sku, quantity)
            inventory.deduct_stock(sku, quantity)
            print(f"[MTS订单完成] SKU:{sku}, 数量:{quantity} 生产入库后发运 时间:{env.now:.2f}")
        else:
            print(f"[MTO订单完成] SKU:{sku}, 数量:{quantity} 生产完成并发运 时间:{env.now:.2f}")
        print(f"[批次结束] {order_type} SKU:{sku} 完成所有生产 时间:{env.now:.2f}")

# 生成生产批次(包括订单和补货)
def generate_production_batch(env, sku, quantity, order_type, production_line, machines, inventory):
    yield env.process(process_batch(env, sku, quantity, order_type, production_line, machines, inventory))

# 订单生成器:随机生成MTO/MTS订单
def order_generator(env, production_line, machines, inventory):
    skus = ['SKU1', 'SKU2']
    while env.now < SIM_TIME:
        # 随机生成订单间隔
        inter_arrival = random.expovariate(1/TIME_BETWEEN_BATCHES)
        yield env.timeout(inter_arrival)
        
        # 随机选择SKU和订单类型
        sku = random.choice(skus)
        order_type = random.choices(ORDER_TYPES, weights=ORDER_TYPE_PROB)[0]
        num_products = max(1, int(random.normalvariate(NUM_PRODUCTS_MEAN, NUM_PRODUCTS_SIGMA)))
        
        print(f"\n[新订单生成] {order_type} SKU:{sku}, 数量:{num_products} 时间:{env.now:.2f}")
        env.process(generate_production_batch(env, sku, num_products, order_type, production_line, machines, inventory))

# 主仿真流程
if __name__ == '__main__':
    print('=== 鞋厂运营仿真 ===')
    random.seed(RANDOM_SEED)

    env = simpy.Environment()
    inventory = Inventory(env)
    
    # 创建机器
    machines = [Machine(env, i+1) for i in range(NUMBER_OF_MACHINES)]
    # 创建生产线资源(控制批次并发)
    production_line = simpy.Resource(env, capacity=1)
    
    # 启动订单生成器
    env.process(order_generator(env, production_line, machines, inventory))
    
    # 运行仿真
    env.run(until=SIM_TIME)

关键问题解决说明

  1. 连续式生产实现

    • 单个产品加工流程独立启动,前一个产品启动后间隔短时间就启动下一个产品,无需等待上一个产品走完所有机器
    • 使用env.all_of()等待批次内所有产品加工完成
  2. 动态换型与正确换型时间

    • 机器类新增current_sku状态,每次加工前检查是否需要换型
    • 换型在机器空闲时触发,每台机器独立处理换型,避免时间叠加
  3. 订单分类与库存管理

    • 新增Inventory类,实现库存检查、扣减、入库和每日补货逻辑
    • 订单生成时随机分配MTO/MTS类型

火山引擎 最新活动