C#-深入淺出 BlockingCollection:解決多執行緒工作佇列的利器

在多執行緒的應用程式中,BlockingCollection 是實作生產者與消費者模式的一個便利的工具。當我們需要實現跨執行緒的安全資料交換時,通常會面臨資料同步和鎖定的問題,而處理這些問題容易導致程式的邏輯複雜性增加。BlockingCollection 不僅內建了資料緩衝區,還能自動處理執行緒間的執行序安全,並提供了帶有阻塞功能的集合操作。這樣可以有效降低開發的難度,提高程式的穩定性與效率,非常適合高效且可靠的並行程式開發,這篇文章讓我們來看看這個工具的用法。
什麼是生產者與消費者模式
生產者與消費者模式是一種常見的設計模式,主要用於解決多執行緒之間的數據交換與同步問題。在該模式中,生產者(Producer)負責產生資料或任務,並將結果放入緩存區(例如佇列或集合),而消費者(Consumer)則負責從緩存區中取得資料並進行處理。這兩者分別在不同的執行緒上運行,且生產速度與消費速度可能並不一致會導致緩存區過度填滿或處於空閒狀態。若生產速度過快而消費速度不足,緩存區可能達到容量上限,進而阻塞生產者;反之,若消費速度過快而生產速度不足,消費者可能長時間處於等待狀態,影響整體執行效率。
核心特點
- 解耦
生產者專注於資料的產生,消費者專注於資料的處理,兩者通過共享的緩存區來溝通,互不干擾。 - 同步
當緩存區滿時,生產者會等待(阻塞),直到消費者處理後釋放空間;當緩存區空時,消費者會等待,直到生產者產生數據。 - 控制流量
通過緩存區的容量限制,能夠有效防止生產者過量生產或消費者過度消耗資源而造成程式使用記憶體或CPU非常高。
運作流程
- 生產者將資料放入共享佇列或集合。
- 消費者從佇列中取出資料進行處理。
- 如果佇列滿,生產者會等待;如果佇列空,消費者會等待。
- 整個過程通過緩存區的同步機制自動協調。
適用範圍
- 多執行緒應用:如伺服器請求處理、數據管道、任務分派等。
- 流量控制:避免因不對稱的生產與消費速度導致資源過度使用或浪費。
- 解耦系統組件:促進模組化設計,提高系統靈活性。
日常比喻
想像你有一條生產線:
- 生產者 是工作機器,負責生產產品(資料)。
- 緩存區 是產品的中間倉庫(集合/佇列)。
- 消費者 是運輸工人,負責從倉庫中取產品並運送(處理數據)。
如果倉庫滿了,生產者會暫停工作(避免堆積過多);如果倉庫空了,消費者會等待(避免沒有產品運送)。這就是生產者與消費者模式的基本思想。
可參考生產者消費者問題 – 維基百科,自由的百科全書了解詳細內容。
BlockingCollection 的特色
- 同時支援多個執行緒:簡化多執行緒之間的資料共享與同步。
- 靈活搭配: 能和
ConcurrentQueue,ConcurrentStack等基礎集合配合使用。 - 阻塞操作是關鍵: 「阻塞」是個大殺器,能讓生產和消費的節奏搭配得更一致。
- 容量可有上限: 可以設限,確保資料量不會失控,造成記憶體爆掉。
使用BlockingCollection的情境
情境 1:處理影像大檔的批次處理
問題:
假設有一個系統需要從資料夾中載入大量影像檔案進行處理(例如壓縮或調整大小),但處理器處理影像的速度比載入檔案的速度慢。像這樣的場景常會造成:
- 如果載入的影像太快,造成記憶體壓力
- 如果處理器速度過慢,系統可能會無法跟上。
解決方式:
- 使用
BlockingCollection,設定有限的容量(例如最多同時處理 10 張影像)。 - 生產者:讀取並丟入影像到
BlockingCollection。 - 消費者:一邊從集合中取出影像執行處理(例如儲存到硬碟)。
情境 2:網頁爬蟲與存資料庫
問題:
假設你有一個網頁爬蟲程式,一個執行緒負責進行網路爬蟲並取得資料,另一個執行緒負責儲存這些資料到資料庫中,如果爬取速度比儲存速度快,可能會造成爬取資料無法即時處理,或者負責儲存執行緒浪費資源等待新資料。
解決方式:
- 使用
BlockingCollection的資料緩衝特性,讓爬取的網頁資料可放入限制容量的佇列,並由消費者逐步儲存。
情境 3:伺服器處理任務佇列
問題:
假設現在有一個伺服器,負責處理許多用戶的請求。由於伺服器資源有限,請求的處理速度可能趕不上請求的到達速度。如果不限制同時處理的請求數量,伺服器可能面臨CPU及記憶體用量過大的情形。
解決方式:
- 使用
BlockingCollection來作為請求佇列,限制請求進入伺服器的數量。 - 多個生產者(用戶請求)會把請求放入佇列中,多個消費者執行緒負責處理請求。
BlockingCollection 的常用方法與基本操作
初始化 BlockingCollection
初始化容量上限為 10 的BlockingCollection,當容量滿時會自動暫停增加資料的執行緒,直到有位置才會被釋放。
BlockingCollection<int> collection = new BlockingCollection<int>(boundedCapacity: 10); Add/Take – 加入/取出資料
collection.Add(1); // 加入資料,阻塞直到可用空間出現上限
int item = collection.Take(); // 從集合中移除並returm元素,若集合為空則阻塞 TryAdd/TryTake 非阻塞加入取出資料
「非阻塞」意指執行緒執行某個動作時,如果集合不可用,不會等待,而是直接return或進行其他操作。
if (collection.TryAdd(2))
{
Console.WriteLine("增加成功!");
}
if (collection.TryTake(out int result))
{
Console.WriteLine($"移除的項目是:{result}");
} 並行處理
BlockingCollection支援多個Thread同時處理。
Parallel.For(0, 5, i =>
{
collection.Add(i);
Console.WriteLine($"生產項目: {i}");
}); CompleteAdding – 結束資料增加(標記完成)
將BlockingCollection標記成加入元素完成,再加入元素會拋出InvalidOperationException的錯誤。
GetConsumingEnumerable – 取集合中資料
從集合中取元素,執行緒會自動在集合有新元素時繼續處理,直到集合標記完成 CompleteAdding()且取出所有元素時結束。
// 從集合中的所有數據,直到集合標記完成CompleteAdding()並取出完畢
foreach (int item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"取出並處理:{item}");
} IsCompleted – 回傳集合是否加入及取出完畢
判斷邏輯:
- 是否已執行
CompleteAdding()。 - 集合中是否沒有任何剩下的元素(完全用盡)。
if (collection.IsCompleted)
{
Console.WriteLine("集合已完成 (無可添加或可取元素)!");
} Count – 檢查目前集合元素數量
int count = collection.Count; // 獲取目前集合中元素的數量
Console.WriteLine($"集合中有 {count} 個元素。"); Dispose – 釋放內部資源
使用完 BlockingCollection 後,使用 Dispose() 來釋放內部資源。
collection.Dispose(); // 釋放 BlockingCollection 的資源 實戰功能範例
- 生產者邏輯:
模擬生產者使用TryAdd()將元素(數字 1 到 5)依序放入的 BlockingCollection,
TryAdd:嘗試將數字 1-5 加到集合中,並可以等待最多500 毫秒。- 如果集合中有空間,則增加成功,返回
true。 - 如果集合已滿,且生產者等待
500 毫秒後,返回false(超時)。
- 如果集合中有空間,則增加成功,返回
CompleteAdding:通知消費者不會再有新元素加入,表示生產結束。Thread.Sleep(200):模擬生產者的元素產生速度(比消費者慢)。
- 消費者邏輯:
消費者從 BlockingCollection 中取出元素並進行處理,直到所有元素都被處理完。
GetConsumingEnumerable:取出集合中的元素,並阻塞等待新元素加入。- 如果集合中有元素,消費者立即取出進行處理。
- 如果集合為空,消費者會自動等待。
- 當集合完成(
CompleteAdding被呼叫),並且所有元素消費完,結束執行。
Thread.Sleep(1000):模擬每次處理一個元素的耗時,消費者比生產者慢。
var collection = new BlockingCollection<int>(boundedCapacity: 3); // 定義最多包含 3 個元素的 BlockingCollection
// **生產者任務**
Task producer = Task.Run(() =>
{
for (int i = 1; i <= 5; i++) // 循環產生 5 個數字
{
if (collection.TryAdd(i, TimeSpan.FromMilliseconds(500))) // 嘗試加入到集合,最多等待 500 毫秒
{
Console.WriteLine($"生產者加入了:{i}"); // 加入成功
}
else
{
Console.WriteLine($"生產者加入 {i} 超時!"); // 加入超時
}
Thread.Sleep(200); // 模擬生產者壓力,間隔 200 毫秒產生一個數字
}
collection.CompleteAdding(); // 通知消費者生產者已完成數據加入,集合不再有新數據
});
// **消費者任務**
Task consumer = Task.Run(() =>
{
foreach (int item in collection.GetConsumingEnumerable()) // 從集合中消費數據
{
Console.WriteLine($"消費者處理了:{item}"); // 處理消費的每一個數字
Thread.Sleep(1000); // 模擬消費壓力,消費一個數字需要耗時 1000 毫秒
}
});
// 等待生產者和消費者任務完成
Task.WaitAll(producer, consumer);
Console.WriteLine("所有工作已完成!");
}這段範例示範了 BlockingCollection 的多執行緒協作能力,特別是在生產者及消費者速度不一致的情況下,它非常有用。程式透過阻塞與非阻塞方法 (Add、TryAdd、Take、GetConsumingEnumerable),來有效控制多執行緒間的資料交換,並避免不必要的資源浪費。
您也可以直接將範例應用於日常的多執行緒場景,比如批次資料處理、伺服器請求排程等。希望這篇文章能幫助您深入理解 BlockingCollection 的應用!



