Microsoft MVP성태의 닷넷 이야기
.NET Framework: 1120. C# - BufferBlock<T> 사용 예제 [링크 복사], [링크+제목 복사]
조회: 8100
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 1개 있습니다.)
(시리즈 글이 4개 있습니다.)
.NET Framework: 209. AutoReset, ManualReset, Monitor.Wait의 차이
; https://www.sysnet.pe.kr/2/0/1015

.NET Framework: 1120. C# - BufferBlock<T> 사용 예제
; https://www.sysnet.pe.kr/2/0/12845

.NET Framework: 1172. .NET에서 Producer/Consumer를 구현하는 기초 인터페이스 - IProducerConsumerCollection<T>
; https://www.sysnet.pe.kr/2/0/12993

.NET Framework: 1173. .NET에서 Producer/Consumer를 구현한 BlockingCollection<T>
; https://www.sysnet.pe.kr/2/0/12995




C# - BufferBlock<T> 사용 예제

오호~~~ 이번 글은 다음의 트윗 덕분에 날로 먹으려고 합니다. ^^

BufferBlock<T>, 이제 알 것 같다.

태스크 간에 데이터가 들어올 때 까지 비동기로 흐름을 멈추고 기다릴 수 있다! pic.twitter.com/dINyb2IOdC — calci (@seonghwan_dev) October 3, 2021

실려 있는 이미지가 BufferBlock의 사용 예를 아주 잘 나타내고 있는데요, 그냥 트윗으로만 묻히기에 아까워 제 글에서 이렇게 도용해 봅니다. ^^

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace bufferblock_sample
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var foo = new Foo();
            await foo.Run();
        }
    }

    public class Foo
    {
        BufferBlock<int> block1 = new BufferBlock<int>();
        BufferBlock<int> block2 = new BufferBlock<int>();

        Task block1Observer;
        Task block2Observer;

        public async Task Run()
        {
            block1Observer = Task.Run(async () => await Observe1());
            block2Observer = Task.Run(async () => await Observe2());

            while (true)
            {
                var read = Console.ReadLine();

                try
                {
                    var block = read.Split(' ');

                    var param1 = block[0];
                    var param2 = block[1];

                    int blockIndex = int.Parse(param1);
                    int inputValue = int.Parse(param2);

                    Task task = (blockIndex == 0) ? block1.SendAsync(inputValue)
                                                  : block2.SendAsync(inputValue);
                    await task;
                }
                catch (Exception)
                {
                    Console.Clear();
                    Console.WriteLine("Wrong Format!");
                }
            }
        }

        private async Task Observe1()
        {
            try
            {
                while (true)
                {
                    int value = await block1.ReceiveAsync();
                    if (value == default)
                    {
                        break;
                    }

                    PrintConsole(ConsoleColor.Yellow, $"Observer 1 Received: {value}");
                }
            }
            catch (Exception) { }
        }

        private async Task Observe2()
        {
            try
            {
                while (true)
                {
                    int value = await block2.ReceiveAsync();
                    if (value == default)
                    {
                        break;
                    }

                    PrintConsole(ConsoleColor.Blue, $"Observer 2 Received: {value}");
                }
            }
            catch (Exception) { }
        }

        void PrintConsole(ConsoleColor color, string text)
        {
            var oldColor = Console.ForegroundColor;
            Console.ForegroundColor = color;
            Console.WriteLine(text);
            Console.ForegroundColor = oldColor;
        }
    }
}

간단히 말해서, 비동기로 동작하는 Producer/Consumer의 예제라고 보시면 됩니다. 예전에 AutoReset + ManualReset 이벤트로 동기식 버전을 구현한 것과 비교해 보셔도 좋을 듯합니다. ^^

AutoReset, ManualReset, Monitor.Wait의 차이
   - 4. 개선 방법 = AutoReset + ManualReset
; https://www.sysnet.pe.kr/2/0/1015#4

물론 동기식 버전으로 구현하면 개별 Consumer마다 고정적으로 스레드를 만들어 할당해 두어야 하지만, BufferBlock을 이용하게 되면 비동기로 처리되므로 그럴 필요가 없습니다.




BufferBlock 내부에는 당연히 큐잉 구현도 되어 있으므로 ReceiveAsync 호출 이전에 다중으로 SendAsync가 호출이 되어도 순차적으로 잘 처리합니다. 해당 큐에 대한 max 값은 다음의 옵션으로 전달할 수 있는데요,

static DataflowBlockOptions dfbo = new DataflowBlockOptions {
        BoundedCapacity = 2,
};

BufferBlock<int> block1 = new BufferBlock<int>(dfbo);

만약 위와 같이 2개로 제한한 상황에서 SendAsync를 (ReceiveAsync 수신을 할 수 없는 상태에서) 3번을 하게 되면,

public async Task Run()
{
    block1Observer = Task.Run(async () => await Observe1());
    block2Observer = Task.Run(async () => await Observe2());

    // 3개의 workitem 전달
    Console.WriteLine(await block1.SendAsync(11));
    Console.WriteLine(await block1.SendAsync(12));
    Console.WriteLine(await block1.SendAsync(13));

    while (true)
    {
        // ...[생략]...
    }
}

private async Task Observe1()
{
    Thread.Sleep(5000); // 수신을 할 수 없도록 임시로 5초 지연

    // ...[생략]...
}

처음 2개는 곧바로 SendAsync의 호출이 반환하지만 3번째 호출에서는 5초 이후로 실행이 지연됩니다.

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 10/6/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2021-10-06 10시08분
[dimohy] 오 ManualReset을 같은 목적을 위해 종종 사용하는데 값을 주거니 받거니 하는 용도로 BufferBlock<T>가 매우 편리하겠군요! 감사합니다.
[guest]
2021-10-06 11시41분
문서에 보면 .NET Core 1부터 지원하고 있었는데, 이제서야 알게 되는군요. ^^ 뭐가 많이 추가된 것 같은데... 이런 식으로라도 하나씩 정리를 해야겠습니다.
정성태
2021-10-24 09시28분
[tb] 나중에 나온 System.Threading.Channels 도 좋더라구요. IAsyncEnumerable 로 구현했고 Cancellation 도 지원하구요.
[guest]
2021-10-25 09시24분
@tb 좋은 정보 감사드립니다. ^^ (이래서 꾸준히 공부해야 하나 봅니다. ^^;)
정성태
2021-12-09 02시55분
[서영준] Channel과는 또 다르네요?
[guest]

... [16]  17  18  19  20  21  22  23  24  25  26  27  28  29  30  ...
NoWriterDateCnt.TitleFile(s)
13223정성태1/20/20234286오류 유형: 838. RDP 연결 오류 - The two computers couldn't connect in the amount of time allotted
13222정성태1/20/20233936개발 환경 구성: 657. WSL - DockerDesktop.vhdx 파일 위치를 옮기는 방법
13221정성태1/19/20234167Linux: 57. C# - 리눅스 프로세스 메모리 정보파일 다운로드1
13220정성태1/19/20234320오류 유형: 837. NETSDK1045 The current .NET SDK does not support targeting .NET ...
13219정성태1/18/20233882Windows: 220. 네트워크의 인터넷 접속 가능 여부에 대한 판단 기준
13218정성태1/17/20233814VS.NET IDE: 178. Visual Studio 17.5 (Preview 2) - 포트 터널링을 이용한 웹 응용 프로그램의 외부 접근 허용
13217정성태1/13/20234406디버깅 기술: 185. windbg - 64비트 운영체제에서 작업 관리자로 뜬 32비트 프로세스의 덤프를 sos로 디버깅하는 방법
13216정성태1/12/20234660디버깅 기술: 184. windbg - 32비트 프로세스의 메모리 덤프인 경우 !peb 명령어로 나타나지 않는 환경 변수
13215정성태1/11/20236185Linux: 56. 리눅스 - /proc/pid/stat 정보를 이용해 프로세스의 CPU 사용량 구하는 방법 [1]
13214정성태1/10/20235744.NET Framework: 2087. .NET 6부터 SourceGenerator와 통합된 System.Text.Json [1]파일 다운로드1
13213정성태1/9/20235274오류 유형: 836. docker 이미지 빌드 시 "RUN apt install ..." 명령어가 실패하는 이유
13212정성태1/8/20235038기타: 85. 단정도/배정도 부동 소수점의 정밀도(Precision)에 따른 형변환 손실
13211정성태1/6/20235114웹: 42. (https가 아닌) http 다운로드를 막는 웹 브라우저
13210정성태1/5/20234138Windows: 219. 윈도우 x64의 경우 0x00000000`7ffe0000 아래의 주소는 왜 사용하지 않을까요?
13209정성태1/4/20234040Windows: 218. 왜 윈도우에서 가상 메모리 공간은 64KB 정렬이 된 걸까요?
13208정성태1/3/20233988.NET Framework: 2086. C# - Windows 운영체제의 2MB Large 페이지 크기 할당 방법파일 다운로드1
13207정성태12/26/20224289.NET Framework: 2085. C# - gpedit.msc의 "User Rights Assignment" 특권을 코드로 설정/해제하는 방법파일 다운로드1
13206정성태12/24/20224503.NET Framework: 2084. C# - GetTokenInformation으로 사용자 SID(Security identifiers) 구하는 방법 [3]파일 다운로드1
13205정성태12/24/20224888.NET Framework: 2083. C# - C++과의 연동을 위한 구조체의 fixed 배열 필드 사용 (2)파일 다운로드1
13204정성태12/22/20224170.NET Framework: 2082. C# - (LSA_UNICODE_STRING 예제로) CustomMarshaler 사용법파일 다운로드1
13203정성태12/22/20224329.NET Framework: 2081. C# Interop 예제 - (LSA_UNICODE_STRING 예제로) 구조체를 C++에 전달하는 방법파일 다운로드1
13202정성태12/21/20224714기타: 84. 직렬화로 설명하는 Little/Big Endian파일 다운로드1
13201정성태12/20/20225332오류 유형: 835. PyCharm 사용 시 C 드라이브 용량 부족
13200정성태12/19/20224208오류 유형: 834. 이벤트 로그 - SSL Certificate Settings created by an admin process for endpoint
13199정성태12/19/20224495개발 환경 구성: 656. Internal Network 유형의 스위치로 공유한 Hyper-V의 VM과 호스트가 통신이 안 되는 경우
13198정성태12/18/20224376.NET Framework: 2080. C# - Microsoft.XmlSerializer.Generator 처리 없이 XmlSerializer 생성자를 예외 없이 사용하고 싶다면?파일 다운로드1
... [16]  17  18  19  20  21  22  23  24  25  26  27  28  29  30  ...