You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

DbContext实例并发使用问题求助:Hangfire定时任务冲突排查

解决Hangfire定时任务共享DbContext的并发冲突问题

你的核心问题在于:DbContext是线程不安全的,但你现在让两个Hangfire任务共享了同一个AccountBilling实例(进而共享同一个DbContext),当任务并发执行时,必然会出现上下文状态冲突的问题。下面是适配你代码场景的具体解决方案:

方案1:使用DbContext工厂实现上下文的隔离

EF Core 5+提供了IDbContextFactory<T>,可以让我们在需要时创建独立的DbContext实例,完美适配Hangfire的多线程执行场景。

步骤1:修改DbContext的注册(Startup.cs)

把原来的AddDbContext改成AddDbContextFactory,这样我们可以通过工厂获取新的上下文实例:

services.AddDbContextFactory<EntityContext>(
    options => options.UseNpgsql(connectionString)
);

同时注册AccountBilling为Scoped服务(让Hangfire每次执行任务时都能拿到新实例):

services.AddScoped<AccountBilling>();

步骤2:重构AccountBilling类,依赖DbContext工厂

修改构造函数和方法,让每个任务执行时都创建独立的DbContext:

public readonly IDbContextFactory<EntityContext> _contextFactory;
private TBCPaymentOptions _tbcPaymentOptions = null;

// 构造函数改为注入DbContext工厂
public AccountBilling(IDbContextFactory<EntityContext> contextFactory) 
{ 
    _contextFactory = contextFactory; 
}

public AccountBilling(IDbContextFactory<EntityContext> contextFactory, IOptions<TBCPaymentOptions> tbcPaymentOptions) 
{ 
    _contextFactory = contextFactory; 
    this._tbcPaymentOptions = tbcPaymentOptions.Value; 
}

// 重构Save方法,接收当前使用的DbContext
private void Save(EntityContext context) {
    try { context.SaveChanges(); }
    catch (Exception e) { Console.WriteLine(e); throw; }
}

// 修改CalculateUserCharge方法,每次创建新的上下文
public void CalculateUserCharge(DateTime date) {
    // 每次执行任务都创建独立的DbContext
    using var context = _contextFactory.CreateDbContext();
    
    var latestJob = context.JobLogs.Include(c => c.JobStatus)
        .OrderByDescending(c => c.StartDate)
        .Where(c => c.JobId == (int)JobEnum.CloseDay)
        .FirstOrDefault();
    
    var jobLog = new JobLog {
        JobId = (int)JobEnum.CloseDay,
        JobStatusID = (int)JobStatusEnum.Active,
        StartDate = DateTime.Now
    };
    context.Add(jobLog);
    this.Save(context);
    
    Console.WriteLine("Starting...");
    if (latestJob != null && latestJob.JobStatusID == (int)JobStatusEnum.Active) {
        jobLog.JobStatusID = (int)JobStatusEnum.Canceled;
        jobLog.EndDate = DateTime.Now;
        context.Update(jobLog);
        this.Save(context);
    } else {
        try {
            var result = new List<GetActiveUserPackagesForOpenBillingPeriodResult>();
            using (var conn = new NpgsqlConnection(context.ConnectionString)) {
                conn.Open();
                using (var cmd = new NpgsqlCommand("\"GetActiveUserPackagesForOpenBillingPeriod\"", conn)) {
                    Console.WriteLine("გავუშვი command");
                    cmd.CommandType = CommandType.StoredProcedure;
                    cmd.Parameters.AddWithValue("somedate", DateTime.Today);
                    using var reader = cmd.ExecuteReader();
                    if (reader.HasRows) {
                        while (reader.Read()) {
                            result.Add(new GetActiveUserPackagesForOpenBillingPeriodResult {
                                Amount = (decimal)reader["Amount"],
                                PackageID = (int)reader["PackageID"],
                                UserID = (int)reader["UserID"],
                                AccountID = (int)reader["AccountID"],
                                UserPackageStartDate = (DateTime)reader["UserPackageStartDate"],
                            });
                        }
                    }
                }
                
                var groupByResults = result.GroupBy(c => c.AccountID)
                    .Select(a => new { accountId = a.Key, lines = a.ToList() });
                
                foreach (var group in groupByResults) {
                    var transactionHeader = new TransactionHeader {
                        TransactionHeaderTypeID = (int)TransactionHeaderTypeEnum.Charge,
                        Date = date,
                        CorrectionDescription = null,
                        AccountID = group.accountId
                    };
                    
                    foreach (var lineItem in group.lines) {
                        transactionHeader.TransactionLines.Add(new TransactionLine {
                            UserID = lineItem.UserID,
                            PackageID = lineItem.PackageID,
                            Amount = this.CalculateUserChargeMethod(date, lineItem.Amount, lineItem.UserPackageStartDate)
                        });
                    }
                    
                    transactionHeader.TotalAmount = transactionHeader.TransactionLines.Sum(c => c.Amount);
                    context.TransactionHeaders.Add(transactionHeader);
                    this.Save(context);
                }
                
                jobLog.EndDate = DateTime.Now;
                jobLog.JobStatusID = (int)JobStatusEnum.Inactive;
                context.Update(jobLog);
                this.Save(context);
                ClosePeriodOnEndOfMonth(date, conn);
            }
        } catch (Exception ex) {
            jobLog.EndDate = DateTime.Now;
            jobLog.JobStatusID = (int)JobStatusEnum.Canceled;
            jobLog.Comment = ex.ToString();
            context.Update(jobLog);
            this.Save(context);
        }
    }
}

// 同样修改CheckUserPayment方法,创建独立上下文
public void CheckUserPayment() {
    using var context = _contextFactory.CreateDbContext();
    var Cert = new X509Certificate2("cert.p12", _tbcPaymentOptions.TBCPayCertificatePassword, X509KeyStorageFlags.MachineKeySet);
    var time = DateTime.Now.AddMinutes(-5);
    
    var latestJob = context.JobLogs.Include(c => c.JobStatus)
        .OrderByDescending(c => c.StartDate)
        .Where(c => c.JobId == (int)JobEnum.CheckPayment)
        .FirstOrDefault();
    
    var jobLog = new JobLog {
        JobId = (int)JobEnum.CheckPayment,
        JobStatusID = (int)JobStatusEnum.Active,
        StartDate = DateTime.Now
    };
    context.Add(jobLog);
    this.Save(context);
    
    if (latestJob != null && latestJob.JobStatusID == (int)JobStatusEnum.Active) {
        jobLog.JobStatusID = (int)JobStatusEnum.Canceled;
        jobLog.EndDate = DateTime.Now;
        context.Update(jobLog);
        this.Save(context);
    } else {
        try {
            var processingPayments = context.Payments
                .Where(c => c.AcceptanceAct.AcceptanceActStatusID == (int)AcceptanceActStatusEnum.Processing && c.CreatedDate < time)
                .ToList();
            
            System.Console.WriteLine("IN METHOD");
            foreach (var item in processingPayments) {
                System.Console.WriteLine("Job Started");
                // 如果CheckUserPaymentMethod需要DbContext,记得传入当前的context实例
                this.CheckUserPaymentMethod(item.BankPaymentCode, item, _tbcPaymentOptions.AppPlatformIP, _tbcPaymentOptions.MerchantURL, Cert, context);
            }
            
            Console.WriteLine("Job Done");
            jobLog.JobStatusID = (int)JobStatusEnum.Inactive;
            jobLog.EndDate = DateTime.Now;
            context.Update(jobLog);
            this.Save(context);
        } catch (Exception ex) {
            jobLog.JobStatusID = (int)JobStatusEnum.Canceled;
            jobLog.EndDate = DateTime.Now;
            jobLog.Comment = ex.ToString();
            context.Update(jobLog);
            this.Save(context);
        }
    }
}

步骤3:修改Hangfire任务注册(Startup.cs的Configure方法)

不要直接传入手动创建的abill实例,而是让Hangfire通过依赖注入自动获取新的AccountBilling实例:

// 让Hangfire自动解析AccountBilling依赖,每次执行任务都会创建新实例
RecurringJob.AddOrUpdate<AccountBilling>(abill => abill.CheckUserPayment(), Cron.Minutely);
RecurringJob.AddOrUpdate<AccountBilling>(
    "CalculateUserCharge", 
    abill => abill.CalculateUserCharge(DateTime.Today.AddDays(-1)), 
    Cron.Daily(12,37), 
    TimeZoneInfo.Utc
);

为什么这个方案有效?

DbContext的设计初衷是短生命周期、单线程使用,每个DbContext实例维护自己的实体跟踪状态。通过DbContext工厂,我们让每个Hangfire任务执行时都获取独立的DbContext,彻底避免了并发修改同一个上下文状态的问题。

内容的提问来源于stack exchange,提问作者Guga Todua

火山引擎 最新活动