C# - BufferBlock<T> 사용 예제
오호~~~ 이번 글은 다음의 트윗 덕분에 날로 먹으려고 합니다. ^^
BufferBlock<T>, 이제 알 것 같다.
태스크 간에 데이터가 들어올 때 까지 비동기로 흐름을 멈추고 기다릴 수 있다! pic.twitter.com/dINyb2IOdC
— calci (@seonghwan_dev) October 3, 2021
실려 있는 이미지가
BufferBlock의 사용 예를 아주 잘 나타내고 있는데요, 그냥 트윗으로만 묻히기에 아까워 제 글에서 이렇게 도용해 봅니다. ^^
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace bufferblock_sample
{
class Program
{
static async Task Main(string[] args)
{
var foo = new Foo();
await foo.Run();
}
}
public class Foo
{
BufferBlock<int> block1 = new BufferBlock<int>();
BufferBlock<int> block2 = new BufferBlock<int>();
Task block1Observer;
Task block2Observer;
public async Task Run()
{
block1Observer = Task.Run(async () => await Observe1());
block2Observer = Task.Run(async () => await Observe2());
while (true)
{
var read = Console.ReadLine();
try
{
var block = read.Split(' ');
var param1 = block[0];
var param2 = block[1];
int blockIndex = int.Parse(param1);
int inputValue = int.Parse(param2);
Task task = (blockIndex == 0) ? block1.SendAsync(inputValue)
: block2.SendAsync(inputValue);
await task;
}
catch (Exception)
{
Console.Clear();
Console.WriteLine("Wrong Format!");
}
}
}
private async Task Observe1()
{
try
{
while (true)
{
int value = await block1.ReceiveAsync();
if (value == default)
{
break;
}
PrintConsole(ConsoleColor.Yellow, $"Observer 1 Received: {value}");
}
}
catch (Exception) { }
}
private async Task Observe2()
{
try
{
while (true)
{
int value = await block2.ReceiveAsync();
if (value == default)
{
break;
}
PrintConsole(ConsoleColor.Blue, $"Observer 2 Received: {value}");
}
}
catch (Exception) { }
}
void PrintConsole(ConsoleColor color, string text)
{
var oldColor = Console.ForegroundColor;
Console.ForegroundColor = color;
Console.WriteLine(text);
Console.ForegroundColor = oldColor;
}
}
}
간단히 말해서, 비동기로 동작하는 Producer/Consumer의 예제라고 보시면 됩니다. 예전에 AutoReset + ManualReset 이벤트로 동기식 버전을 구현한 것과 비교해 보셔도 좋을 듯합니다. ^^
AutoReset, ManualReset, Monitor.Wait의 차이
- 4. 개선 방법 = AutoReset + ManualReset
; https://www.sysnet.pe.kr/2/0/1015#4
물론 동기식 버전으로 구현하면 개별 Consumer마다 고정적으로 스레드를 만들어 할당해 두어야 하지만, BufferBlock을 이용하게 되면 비동기로 처리되므로 그럴 필요가 없습니다.
BufferBlock 내부에는 당연히 큐잉 구현도 되어 있으므로 ReceiveAsync 호출 이전에 다중으로 SendAsync가 호출이 되어도 순차적으로 잘 처리합니다. 해당 큐에 대한 max 값은 다음의 옵션으로 전달할 수 있는데요,
static DataflowBlockOptions dfbo = new DataflowBlockOptions {
BoundedCapacity = 2,
};
BufferBlock<int> block1 = new BufferBlock<int>(dfbo);
만약 위와 같이 2개로 제한한 상황에서 SendAsync를 (ReceiveAsync 수신을 할 수 없는 상태에서) 3번을 하게 되면,
public async Task Run()
{
block1Observer = Task.Run(async () => await Observe1());
block2Observer = Task.Run(async () => await Observe2());
// 3개의 workitem 전달
Console.WriteLine(await block1.SendAsync(11));
Console.WriteLine(await block1.SendAsync(12));
Console.WriteLine(await block1.SendAsync(13));
while (true)
{
// ...[생략]...
}
}
private async Task Observe1()
{
Thread.Sleep(5000); // 수신을 할 수 없도록 임시로 5초 지연
// ...[생략]...
}
처음 2개는 곧바로 SendAsync의 호출이 반환하지만 3번째 호출에서는 5초 이후로 실행이 지연됩니다.
(
첨부 파일은 이 글의 예제 코드를 포함합니다.)
[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]