Skip to main content
 首页 » 编程设计

.net之在 .NET 中的多线程上快速、高效地处理 HTTP 请求

2024年02月23日51kuangbin

自从 .NET 诞生以来,我就一直在使用它,并且很早以前就开始进行并行编程......但我仍然无法解释这种现象。该代码在生产系统中运行,并且大部分时间都在执行其工作,只是为了更好地理解。

我将 10 个用于并发处理的 URL 传递给以下内容:

    public static void ProcessInParellel(IEnumerable<ArchivedStatus> statuses,  
                                         StatusRepository statusRepository,  
                                         WaitCallback callback,  
                                         TimeSpan timeout) 
    { 
        List<ManualResetEventSlim> manualEvents = new List<ManualResetEventSlim>(statuses.Count()); 
 
        try 
        { 
            foreach (ArchivedStatus status in statuses) 
            { 
                manualEvents.Add(new ManualResetEventSlim(false)); 
                ThreadPool.QueueUserWorkItem(callback, 
                                             new State(status, manualEvents[manualEvents.Count - 1], statusRepository)); 
            } 
 
            if (!(WaitHandle.WaitAll((from m in manualEvents select m.WaitHandle).ToArray(), timeout, false)))  
                throw ThreadPoolTimeoutException(timeout); 
        } 
        finally 
        { 
            Dispose(manualEvents); 
        } 
    } 

回调类似于:

    public static void ProcessEntry(object state) 
    { 
        State stateInfo = state as State; 
 
        try 
        { 
            using (new LogTimer(new TimeSpan(0, 0, 6))) 
            { 
               GetFinalDestinationForUrl(<someUrl>); 
            } 
        } 
        catch (System.IO.IOException) { } 
        catch (Exception ex) 
        { 
 
        } 
        finally 
        { 
            if (stateInfo.ManualEvent != null) 
                stateInfo.ManualEvent.Set(); 
        } 
    } 

每个回调都会查看 URL 并遵循一系列重定向(AllowAutoRedirect 故意设置为 false 以处理 cookie):

    public static string GetFinalDestinationForUrl(string url, string cookie) 
    { 
        if (!urlsToIgnore.IsMatch(url)) 
        { 
            HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url); 
            request.AllowAutoRedirect = false; 
            request.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip; 
            request.Method = "GET"; 
            request.KeepAlive = false; 
            request.Pipelined = false; 
            request.Timeout = 5000; 
 
            if (!string.IsNullOrEmpty(cookie)) 
                request.Headers.Add("cookie", cookie); 
 
            try 
            { 
                string html = null, location = null, setCookie = null; 
 
                using (WebResponse response = request.GetResponse()) 
                using (Stream stream = response.GetResponseStream()) 
                using (StreamReader reader = new StreamReader(stream)) 
                { 
                    html = reader.ReadToEnd(); 
                    location = response.Headers["Location"]; 
                    setCookie = response.Headers[System.Net.HttpResponseHeader.SetCookie]; 
                } 
 
                if (null != location) 
                    return GetFinalDestinationForUrl(GetAbsoluteUrlFromLocationHeader(url, location), 
                                                    (!string.IsNullOrEmpty(cookie) ? cookie + ";" : string.Empty) + setCookie); 
 
 
 
                return CleanUrl(url); 
            } 
            catch (Exception ex) 
            { 
                if (AttemptRetry(ex, url)) 
                    throw; 
            } 
        } 
 
        return ProcessedEntryFlag; 
    } 

我在递归 GetFinalDestinationForUrl 调用周围有一个高精度秒表,阈值为 6 秒,通常回调会在这段时间内完成。

但是,WaitAll(10 个线程的超时时间为 (0,0,60))仍然会定期超时。

异常打印出如下内容:

System.Exception: Not all threads returned in 60 seconds: Max Worker:32767, Max I/O:1000, Available Worker:32764, Available I/O:1000 at Work.Threading.ProcessInParellel(IEnumerable`1 statuses, StatusRepository statusRepository, WaitCallback callback, TimeSpan timeout) at Work.UrlExpanderWorker.SyncAllUsers()

它在 .NET 4 上运行,所有 URL 的 ma​​xConnections 设置为 100。

我唯一的理论是同步 HttpWebRequest 调用的阻塞时间可能超过指定的超时时间?这是唯一合理的解释。问题是为什么以及如何最好地强制该操作真正超时?

是的,我知道递归调用指定每次调用的超时时间为 5 秒,但可能需要多次调用才能处理给定的 URL。但我几乎没有看到秒表警告。对于我看到的每 20-30 个 WaitAll 超时错误,我可能会看到一条消息,指示给定线程花费的时间超过 6 秒。如果问题确实是 10 个线程累计需要超过 60 秒,那么我应该看到消息之间至少有 1:1 的相关性(如果不是更高的话)。

更新(2012 年 3 月 30 日):

我可以确认在某些情况下网络调用本身不遵守超时:

            Uri uri = new Uri(url); 
            HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(uri); 
            request.AllowAutoRedirect = false; 
            request.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip; 
            request.Method = "GET"; 
            request.KeepAlive = false; 
            request.Pipelined = false; 
            request.Timeout = 7000; 
            request.CookieContainer = cookies; 
 
            try 
            { 
                string html = null, location = null; 
 
                using (new LogTimer("GetFinalDestinationForUrl", url, new TimeSpan(0, 0, 10))) 
                    using (HttpWebResponse response = (HttpWebResponse)request.GetResponse()) 
                    using (Stream stream = response.GetResponseStream()) 
                    using (StreamReader reader = new StreamReader(stream)) 
                    { 
                        html = reader.ReadToEnd(); 
                        location = response.Headers["Location"]; 
                        cookies = Combine(cookies, response.Cookies); 
 
                        if (response.ContentLength > 150000 && !response.ContentType.ContainsIgnoreCase("text/html")) 
                            log.Warn(string.Format("Large request ({0} bytes, {1}) detected at {2} on level {3}.", response.ContentLength, response.ContentType, url, level)); 
                    } 

该代码通常会记录需要 5-6 分钟才能完成且不大于 150000 的条目。我不是在谈论这里或那里的孤立服务器,这些是随机(高调)媒体网站。

这里到底发生了什么以及我们如何确保代码在合理的时间内退出?

请您参考如下方法:

我同意Aliostad 。我没有看到代码有任何明显的问题。您是否有任何类型的锁定导致这些工作项序列化?我在表面上没有看到任何内容,但值得仔细检查,以防您的代码比您发布的代码更复杂。您将需要添加日志记录代码来捕获这些 HTTP 请求启动的时间。希望这能为您提供更多线索。

在一个不相关的注释中,我通常避免使用WaitHandle.WaitAll。它有一些限制,例如只允许 64 个句柄并且不能在 STA 线程上工作。为了它的值(value),我改用这种模式。

using (var finished = new CountdownEvent(1); 
{ 
  foreach (var item in workitems) 
  { 
    var capture = item; 
    finished.AddCount(); 
    ThreadPool.QueueUserWorkItem( 
      () => 
      { 
        try 
        { 
          ProcessWorkItem(capture); 
        } 
        finally 
        { 
          finished.Signal(); 
        } 
      } 
  } 
  finished.Signal(); 
  if (!finished.Wait(timeout)) 
  { 
    throw new ThreadPoolTimeoutException(timeout); 
  } 
}