- The concurrent collection classes are designed to include built-in synchronization code to support simultaneous access by multiple threads without concern for race conditions.
- The order in which data is added and removed is determined by the individual collection classes that implement the
ICollection<T>
interface.
- With the producers and consumers problem, you have some processes that are creating items (producers) and some processes that are consuming these items (consumers).
- There is a bin (queue) where the produced items are placed, and typically that bin has a limited size.
- Thus if a producer creates an item, but there is no space to store it, it must wait until an item is consumed.
- Also if a consumer goes to consume an item and none exists, it must wait until an item is produced.
- It provides a blocking collection that enables producer/consumer scenarios in which producers write data into the collection while consumers read the data.
- This class provides a generic collection type that synchronizes add and remove operations without concern for the internal storage.
- It provides blocking and bounding support for collections that implement the
IProducerConsumerCollection<T>
interface.
- Removing an item from the collection can be blocked until data becomes available.
- Adding data is fast, but you can set a maximum upper limit.
- If that limit is reached, adding an item blocks the calling thread until there is room.
BlockingCollection
is a wrapper around other collection types.
- If you don’t give it any instructions, it uses the
ConcurrentQueue
by default.
- A regular collection blows up when being used in a multithreaded scenario because an item might be removed by one thread while the other thread is trying to read it.
var collection = new BlockingCollection<string>();
var read = Task.Run(() =>
{
while (true)
Console.WriteLine(collection.Take());
});
var write = Task.Run(() =>
{
while (true)
collection.Add(Console.ReadLine());
});
write.Wait();
read.Wait();
- You can use the
CompleteAdding
method to signal to the BlockingCollection
that no more items will be added.
- If other threads are waiting for new items, they won’t be blocked anymore.
- You can remove the while(true) statements by using the
GetConsumingEnumerable
, which gets an IEnumerable
that blocks until it finds a new item. Then, you can use a foreach
with the BlockingCollection
.
var read = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
Console.WriteLine(item);
});
Methods
BoundedCapacity
returns capacity of the “bin”
- If the bin is unbounded, the capacity is
int.MaxValue
.
Count
returns an internally-kept count of items
- This makes it O(1), but if you modify underlying collection directly (not recommended) it is unreliable.
CompleteAdding()
is used to cut off further adds.
- This sets IsAddingCompleted and begins to wind down consumers once empty.
- Use of
CompleteAdding()
to signal the BlockingCollection
that nothing else should be added.
- This means that any attempts to
TryAdd()
or Add()
after marked completed will throw an InvalidOperationException
.
IsAddingCompleted
is true when producers are “done”.
- Once you are done producing, should complete the add process to alert consumers.
IsCompleted
is true when producers are “done” and “bin” is empty.
- Once you mark the producers done, and all items removed, this will be true.
Add()
is a blocking add to collection.
- If bin is full, will wait till space frees up
Take()
is a blocking remove from collection.
- If bin is empty, will wait until item is produced or adding is completed.
GetConsumingEnumerable()
is used to iterate and consume items.
- Unlike the standard enumerator, this one consumes the items instead of iteration.
TryAdd()
attempts add but does not block completely.
- If adding would block, returns false instead, can specify TimeSpan to wait before stopping.
TryTake()
attempts to take but does not block completely.
- Like TryAdd(), if taking would block, returns false instead, can specify TimeSpan to wait.
ConcurrentBag<T>
- It is just a bag of items that enables duplicates and it has no particular order.
- Important methods are
Add
, TryTake
, and TryPeek
.
- The
TryPeek
method is not very useful in a multithreaded environment.
- It could be that another thread removes the item before you can access it.
ConcurrentBag
also implements IEnumerable<T>
, so you can iterate over it.
- Enumeration became thread-safe by making a snapshot of the collection when you start iterating it. Therefore, items added after starting the iteration won’t be visible.
var bag = new ConcurrentBag<int> { 10, 20 };
Task.Run(() =>
{
bag.Add(30);
Thread.Sleep(1000);
bag.Add(40);
});
Task.Run(() =>
{
foreach (var number in bag)
Console.WriteLine(number);
}).Wait();