Skip to main content
 首页 » 编程设计

多线程 Entity Framework : The connection was not closed. 连接的当前状态为正在连接

2024年12月31日1552php

所以我有一个执行工作流过程的 Windows 服务进程。后端在 Entity Framework 之上使用 Repository 和 UnitofWork 模式和 Unity,以及从 edmx 生成的实体类。我不会详细介绍它,因为它不是必需的,但基本上有 5 个工作流程要经过的步骤。一个特定的过程可能在任何时间点的任何阶段(当然是按顺序)。第一步只是为第二步生成数据,第二步通过一个长时间运行的进程验证数据到另一台服务器。然后在那里使用该数据生成一个pdf。对于每个阶段,我们都会生成一个计时器,但是可以配置为允许为每个阶段生成多个计时器。问题就在于此。当我将处理器添加到特定阶段时,它随机出现以下错误:

连接未关闭。连接的当前状态是正在连接。

阅读此内容似乎很明显,这是因为上下文试图从两个线程访问同一个实体。但这就是让我陷入困境的地方。我能找到的所有信息都表明我们应该为每个线程使用一个实例上下文。据我所知,我在做什么(见下面的代码)。我没有使用单例模式或静态或任何东西,所以我不确定为什么会发生这种情况或如何避免它。我已经在下面发布了我的代码的相关部分供您查看。

基础存储库:

 public class BaseRepository 
{ 
    /// <summary> 
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public BaseRepository(IUnitOfWork unitOfWork) 
    { 
        if (unitOfWork == null) throw new ArgumentException("unitofWork"); 
        UnitOfWork = unitOfWork; 
    } 
 
 
    /// <summary> 
    /// Returns a <see cref="DbSet"/> of entities. 
    /// </summary> 
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam> 
    /// <returns></returns> 
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class 
    { 
 
        return Context.Set<TEntity>(); 
    } 
 
    /// <summary> 
    /// Sets the state of an entity. 
    /// </summary> 
    /// <param name="entity">object to set state.</param> 
    /// <param name="entityState"><see cref="EntityState"/></param> 
    protected virtual void SetEntityState(object entity, EntityState entityState) 
    { 
        Context.Entry(entity).State = entityState; 
    } 
 
    /// <summary> 
    /// Unit of work controlling this repository.        
    /// </summary> 
    protected IUnitOfWork UnitOfWork { get; set; } 
 
    /// <summary> 
    ///  
    /// </summary> 
    /// <param name="entity"></param> 
    protected virtual void Attach(object entity) 
    { 
        if (Context.Entry(entity).State == EntityState.Detached) 
            Context.Entry(entity).State = EntityState.Modified; 
    } 
 
    protected virtual void Detach(object entity) 
    { 
        Context.Entry(entity).State = EntityState.Detached; 
    } 
 
    /// <summary> 
    /// Provides access to the ef context we are working with 
    /// </summary> 
    internal StatementAutoEntities Context 
    { 
        get 
        {                 
            return (StatementAutoEntities)UnitOfWork; 
        } 
    } 
} 

StatementAutoEntities 是自动生成的 EF 类。

存储库实现:
public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository 
{ 
 
    /// <summary> 
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork) 
    { 
    } 
 
    /// <summary> 
    /// Create a new <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///     <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Create(ProcessingQueue Queue) 
    { 
        GetDbSet<ProcessingQueue>().Add(Queue); 
        UnitOfWork.SaveChanges(); 
    } 
 
    /// <summary> 
    /// Updates a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="queue"> 
    ///     <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Update(ProcessingQueue queue) 
    { 
        //Attach(queue); 
        UnitOfWork.SaveChanges(); 
    } 
 
    /// <summary> 
    /// Delete a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///     <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Delete(ProcessingQueue Queue) 
    { 
        GetDbSet<ProcessingQueue>().Remove(Queue);   
        UnitOfWork.SaveChanges(); 
    } 
 
    /// <summary> 
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id 
    /// </summary> 
    /// <param name="id"></param> 
    /// <returns></returns> 
    public ProcessingQueue GetById(int id) 
    { 
        return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault(); 
    } 
 
    /// <summary> 
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status 
    /// </summary> 
    /// <param name="status"></param> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetByStatus(int status) 
    { 
        return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList(); 
    } 
 
    /// <summary> 
    /// Gets a list of all <see cref="ProcessingQueue"/> entries 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetAll() 
    { 
        return (from e in Context.ProcessingQueue_Select() select e).ToList(); 
    } 
 
    /// <summary> 
    /// Gets the next pending item id in the queue for a specific work         
    /// </summary> 
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param> 
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param> 
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param> 
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns> 
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId) 
    { 
        var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId,  operationId).SingleOrDefault(); 
        return id.HasValue ? id.Value : -1; 
    } 
 
    /// <summary> 
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all 
    /// active entries in the queue 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries() 
    { 
        return (from e in Context.ProcessingQueueStatus_Select() select e).ToList(); 
    } 
    /// <summary> 
    /// Bumps an entry to the front of the queue  
    /// </summary> 
    /// <param name="processingQueueId"></param> 
    public void Bump(int processingQueueId) 
    { 
        Context.ProcessingQueue_Bump(processingQueueId); 
    } 
} 

我们使用 Unity 进行依赖注入(inject),一些调用代码例如:
#region Members 
    private readonly IProcessingQueueRepository _queueRepository;        
    #endregion 
 
    #region Constructors 
    /// <summary>Initializes ProcessingQueue services with repositories</summary> 
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>         
    public ProcessingQueueService(IProcessingQueueRepository queueRepository) 
    { 
        Check.Require(queueRepository != null, "processingQueueRepository is required"); 
        _queueRepository = queueRepository; 
 
    } 
    #endregion 

windows服务中启动定时器的代码如下:
            _staWorkTypeConfigLock.EnterReadLock(); 
        foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o)  
                              let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval  
                              select new StaTimer 
                            { 
                                Interval = _runImmediate ? 5000 : interval*1000, 
                                Operation = (ProcessingQueue.RequestedOperation) operation.OperationId 
                            }) 
        { 
            timer.Elapsed += ApxQueueProcessingOnElapsedInterval; 
            timer.Enabled = true; 
            Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);                 
        } 
        _staWorkTypeConfigLock.ExitReadLock(); 

StaTimer 只是定时器添加操作类型的包装器。 ApxQueueProcessingOnElapsedInterval 然后基本上只是根据操作将工作分配给进程。

我还将在我们生成任务的地方添加一些 ApxQueueProcessingOnElapsedInterval 代码。
            _staTasksLock.EnterWriteLock(); 
        for (var x = 0; x < tasksNeeded; x++) 
        { 
            var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj), 
                             CreateQueueProcessConfig(true, operation), _cancellationToken); 
 
 
            _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t)); 
 
            t.Start(); 
            Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table 
        } 
        _staTasksLock.ExitWriteLock(); 

请您参考如下方法:

如果这对任何人都有帮助:

就我而言,我确实确保了非线程安全的 DbContext有一个 TransientLifetime (使用 Ninject),但它仍然导致并发问题!原来在我的一些自定义ActionFilters我使用依赖注入(inject)来访问 DbContext在构造函数中,但 ActionFilters有一个生命周期,可以让它们在多个请求中实例化,因此不会重新创建上下文。

我通过手动解决 OnActionExecuting 中的依赖关系来修复它方法而不是在构造函数中,以便它每次都是一个新的实例。