Microsoft MVP성태의 닷넷 이야기
닷넷: 2149. C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할 [링크 복사], [링크+제목 복사]
조회: 3195
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일

C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할

아래와 같은 질문이 있군요. ^^

C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?
; https://forum.dotnetdev.kr/t/c-parallel/8548/4

질문의 코드를 그대로 실어 보면,

var list = new List<int>();
for(var i = 0; i < 100; i++)
{
    list.Add(i);
}

list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    // 번호가 높을수록 작업이 오래 걸린다.
    Thread.Sleep(i * 5);
});

보는 바와 같이 Task에 해당하는 작업이 0, 1, 2, 3, ..., 99까지의 수로 이뤄진 형태가 되고, 그 수에 5를 곱해 Thread.Sleep을 하고 있습니다. 예제가 저렇긴 하지만 아마도 저 숫자를 작업이 끝날 것으로 예상되는 대략적인 weight라고 봐도 무방할 것입니다.

어쨌든, 저 코드대로라면 Parallel.ForEach의 특성상 작업은 100개의 목록을 4단계(0 ~ 24, 25 ~ 49, 50 ~ 74, 75 ~ 99) 구획으로 분할해 그 영역만큼을 ThreadPool의 개별 여유 스레드에 할당할 것이므로, 결국 마지막으로 갈수록, 특히나 75 ~ 99 구간을 담당한 스레드는 더 오랫동안 실행하게 될 것입니다. 실제로 대충 다음과 같은 부가 코드를 곁들이면,

namespace ConsoleApp1;

internal class Program
{
    static void Main(string[] args)
    {
        var list = Enumerable.Range(0, 100).ToList();

        Dictionary<int, int> processed = new Dictionary<int, int>();

        ThreadPool.GetAvailableThreads(out int workerThreads, out _);
        Enumerable.Range(0, workerThreads).All((elem) =>
        {
            processed[elem] = 0;
            return true;
        });

        list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
        {
            Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");

            processed[Thread.CurrentThread.ManagedThreadId] += i;
            Thread.Sleep(i * 5);
        });

        foreach (var item in processed)
        {
            if (item.Value == 0)
            {
                continue;
            }

            Console.WriteLine($"Total Id: {item.Key}, Value: {item.Value}");               
        }
    }
}

개별 스레드 당 수행된 부하를 출력할 수 있는데,

Total Id: 1, Value: 2175
Total Id: 4, Value: 300
Total Id: 7, Value: 925
Total Id: 10, Value: 1550

4번 스레드의 경우 300에 끝난 반면 1번 스레드는 2175의 부하로 오랫동안 혼자서 남은 작업들을 수행했음을 짐작게 합니다.

위의 코드로 알 수 있지만, PLinq의 작업 구획 나누기 규칙은 간단합니다. PLinq는 기본적으로 대상 작업의 수를 구할 수 있다면 WithDegreeOfParallelism으로 나누기를 한 구획을 분배합니다. 그렇기 때문에 "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 예제 코드에서 사용한 List<int>는 Count를 알 수 있는 IReadOnlyCollection을 구현하고 있으므로 그 수를 이용해 구획을 판정합니다.

그렇다면 만약 작업의 수를 모른다면 어떻게 될까요? 그럼 당연히 작업 분배는 List와는 다른 양상을 띠게 됩니다. 실제로 예제 코드에서 List 구성을,

var list = Enumerable.Range(0, 100).ToList();

다음과 같이만 바꿔도,

var list = Enumerable.Range(0, 100); // ToList를 하지 않았으므로 IEnumerable 반환

작업의 정확한 수를 알 수 없는 IEnumerable이 반환되므로 작업 부하는 이제 다음과 같이 바뀝니다.

// 실행마다 다른 결과
Total Id: 1, Value: 1120
Total Id: 4, Value: 1160
Total Id: 7, Value: 1469
Total Id: 10, Value: 1201

IEnumerable이기 때문에 어쩔 수 없이 작업을 분배하는 측은 IEnumerable.MoveNext 호출에 따라 작업을 분배하는 식으로 처리할 듯합니다.




자, 그럼 이것을 골고루 나누려면 어떻게 해야 할까요? "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 글의 답변에 있는 "에릭권"님의 답변이 이에 대한 훌륭한 답이 됩니다.

var list = Enumerable.Range(0, 100).ToList();
var queue = new ConcurrentQueue<int>(list);

Parallel.ForEach(Enumerable.Range(0, 4), _ =>
{
    while (queue.TryDequeue(out int p))
    {
        Console.WriteLine($"Id: {Environment.CurrentManagedThreadId}, Value: {p}");
        Thread.Sleep(p * 5);
    }
});

ForEach의 분할 특성을 이용했고, 작업을 Queue에서 빼오는 방식을 통해 먼저 작업을 끝낸 스레드가 후속 작업이 없어질 때까지 반복/수행하고 있습니다. 이전의 수행 부하 측정 코드를 집어넣어 보면 실행할 때마다 값이 달라지긴 하지만 다음과 같이 고르게 부하 분산이 됩니다.

// 실행마다 다름

Total Id: 4, Value: 1225
Total Id: 12, Value: 1250
Total Id: 13, Value: 1277
Total Id: 20, Value: 1198

오히려 IEnumerable을 자동으로 분배해 주던 PLinq의 것보다 골고루 더 분산이 됩니다. 사실 질문 자체는 "병렬 분할 알고리즘 변경"이지만 오히려 질문자의 상황에는 "에릭권"님의 답변이 올바른 해결책입니다.




자... 그럼 제목에 맞게 "병렬 분할 알고리즘 변경"이라는 측면으로 답변을 해보겠습니다. 마이크로소프트는 Partition에 대한 사용자 정의를 할 수 있는 방법을 제공하고 있는데,

Custom Partitioners for PLINQ and TPL
; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/custom-partitioners-for-plinq-and-tpl

바로 Partitioner<T>를 구현하기만 하면 됩니다. 실제로 "에릭권"님의 답변이 보여주는 방식을 Partitioner를 이용해 다음과 같이 만들어 둘 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    ConcurrentQueue<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = new ConcurrentQueue<T>(list.ToArray());
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        ConcurrentQueue<T> _list;

        public EnumerateFromList(ConcurrentQueue<T> list)
        {
            _list = list;
        }

        public IEnumerator<T> GetEnumerator()
        {
            while (_list!.TryDequeue(out T? item))
            {
                yield return item;
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

Partitioner가 해야 할 가장 중요한 작업은 GetPartitions 메서드에서 작업을 열거할 IEnumerable 배열을 인자로 넘어온 partitionCount 만큼 반환하는 것입니다. 즉, WithDegreeOfParallelism으로 지정한 값이 partitionCount로 넘어오게 되고, 여기서 반환한 IEnumerable 배열에 대해 하나씩 스레드 풀의 스레드가 맡아 작업을 처리하는 식입니다.

일단 이렇게 Partitioner를 만들어 두면 사용법은 이전과 유사하게 다룰 수 있습니다.

var list = Enumerable.Range(0, 100).ToList();

var partitioner = new ListPartitioner<int>(list);
partitioner.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    Thread.Sleep(i * 5);
});

보는 바와 같이 AsParallel()을 호출하고 있는데, 이게 가능한 이유는 AsParallel 확장 메서드가 기본적으로 IEnumerable 인터페이스와 함께 Partitioner<T>에 대해서도 정의돼 있기 때문입니다.

혹은, ConcurrentQueue를 사용할 필요 없이 단순히 인덱스를 지정하는 식으로 좀 더 Partitioner 본연의 기능에 가깝게 다음과 같이 구현하는 것도 가능합니다.

class RefInteger
{
    int _value = -1;

    public int GetNext()
    {
        int newValue = Interlocked.Increment(ref _value);
        return newValue;
    }
}

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;
    RefInteger _position = new RefInteger();

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list, _position).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        List<T> _list;
        RefInteger _position;

        public EnumerateFromList(List<T> list, RefInteger position)
        {
            _list = list;
            _position = position;
        }

        public IEnumerator<T> GetEnumerator()
        {
            do
            {
                int idx = _position.GetNext();
                if (idx >= _list.Count)
                {
                    yield break;
                }

                yield return _list[idx];
            } while (true);
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

"병렬 분할 알고리즘 변경"이라는 측면에서 봤을 때 이렇게 만들어 둔 Partitioner가 유용할 수 있겠지만, 아마도 대부분의 경우에는 그냥 "에릭권"님의 답변처럼 직접적으로 구현하는 것이 더 이해하기는 쉬울 것입니다. ^^ (재미있게도, 마이크로소프트의 Partitioner 도움말에 실린 예제 코드에 정확히 위의 역할과 동일한 SingleElementPartitioner를 구현하고 있습니다.)





참고로, 좀 더 단순하게라면, 이러한 구획 나누기는 list의 순서를 재정렬하는 것으로도 가능합니다. 구간마다 들어가는 요소를, 다음과 같이 각각 교차해서 PLinq에 전달해도 되는 것입니다.

0,4,8,12,16,20,24,28,32,36,40,44,48,52,56,60,64,68,72,76,80,84,88,92,96
1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97
2,6,10,14,18,22,26,30,34,38,42,46,50,54,58,62,66,70,74,78,82,86,90,94,98
3,7,11,15,19,23,27,31,35,39,43,47,51,55,59,63,67,71,75,79,83,87,91,95,99

이를 위해 간단하게 다음의 코드를 하나 넣어주면,

var list = Enumerable.Range(0, 100).ToList();

List<int> ordered = list.OrderBy(i => i % 4).ToList();
ordered.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    // ...[생략]...
});

기본적인 파티션 규칙에 따라 부하 분산 효과를 갖게 됩니다. 실제 출력 결과도 이런데요,

Total Id: 1, Value: 1275
Total Id: 4, Value: 1250
Total Id: 10, Value: 1200
Total Id: 11, Value: 1225

물론, 저 코드는 OrderBy로 인해 정렬 작업이 추가되는 부하가 있으니 권장할 수는 없습니다. 단지, 기본 제공하는 Partitioner의 규칙에 실어 자연스럽게 구획을 나누게 한다는 점에서 의미가 있다고만 보면 되겠습니다.

게다가 위의 규칙 역시도 Partitioner 본연의 개념을 이용한다면 다음과 같이 (이번엔 정렬 작업 없이) 구현할 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new HopEnumerate(_list, i, partitionCount).GetEnumerator();
        }

        return partitions;
    }

    private class HopEnumerate: IEnumerable<T>
    {
        List<T> _list;
        int _start;
        int _hop;

        public HopEnumerate(List<T> list, int start, int hop) 
            => (_list, _start, _hop) = (list, start, hop);    

        public IEnumerator<T> GetEnumerator()
        {
            for (int i = _start; i < _list.Count; i += _hop)
            {
                yield return _list[i];
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]







[최초 등록일: ]
[최종 수정일: 10/16/2023]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




1  2  3  4  5  6  [7]  8  9  10  11  12  13  14  15  ...
NoWriterDateCnt.TitleFile(s)
13457정성태11/25/20232277VS.NET IDE: 187. Visual Studio - 16.9 버전부터 추가된 "Display inline type hints" 옵션
13456정성태11/25/20232580닷넷: 2169. C# - OpenAI를 사용해 PDF 데이터를 대상으로 OpenAI 챗봇 작성 [1]파일 다운로드1
13455정성태11/25/20232472닷넷: 2168. C# - Azure.AI.OpenAI 패키지로 OpenAI 사용파일 다운로드1
13454정성태11/23/20232823닷넷: 2167. C# - Qdrant Vector DB를 이용한 Embedding 벡터 값 보관/조회 (Azure OpenAI) [1]파일 다운로드1
13453정성태11/23/20232320오류 유형: 879. docker desktop 설치 시 "Invalid JSON string. (Exception from HRESULT: 0x83750007)"
13452정성태11/22/20232424닷넷: 2166. C# - Azure OpenAI API를 이용해 사용자가 제공하는 정보를 대상으로 검색하는 방법파일 다운로드1
13451정성태11/21/20232554닷넷: 2165. C# - Azure OpenAI API를 이용해 ChatGPT처럼 동작하는 콘솔 응용 프로그램 제작파일 다운로드1
13450정성태11/21/20232364닷넷: 2164. C# - Octokit을 이용한 GitHub Issue 검색파일 다운로드1
13449정성태11/21/20232441개발 환경 구성: 688. Azure OpenAI 서비스 신청 방법
13448정성태11/20/20232678닷넷: 2163. .NET 8 - Dynamic PGO를 결합한 성능 향상파일 다운로드1
13447정성태11/16/20232562닷넷: 2162. ASP.NET Core 웹 사이트의 SSL 설정을 코드로 하는 방법
13446정성태11/16/20232513닷넷: 2161. .NET Conf 2023 - Day 1 Blazor 개요 정리
13445정성태11/15/20232831Linux: 62. 리눅스/WSL에서 CA 인증서를 저장하는 방법
13444정성태11/15/20232589닷넷: 2160. C# 12 - Experimental 특성 지원
13443정성태11/14/20232613개발 환경 구성: 687. OpenSSL로 생성한 사용자 인증서를 ASP.NET Core 웹 사이트에 적용하는 방법
13442정성태11/13/20232432개발 환경 구성: 686. 비주얼 스튜디오로 실행한 ASP.NET Core 사이트를 WSL 2 인스턴스에서 https로 접속하는 방법
13441정성태11/12/20232748닷넷: 2159. C# - ASP.NET Core 프로젝트에서 서버 Socket을 직접 생성하는 방법파일 다운로드1
13440정성태11/11/20232420Windows: 253. 소켓 Listen 시 방화벽의 Public/Private 제어 기능이 비활성화된 경우
13439정성태11/10/20232945닷넷: 2158. C# - 소켓 포트를 미리 시스템에 등록/예약해 사용하는 방법(Port Exclusion Ranges)파일 다운로드1
13438정성태11/9/20232539닷넷: 2157. C# - WinRT 기능을 이용해 윈도우에서 실행 중인 Media App 제어
13437정성태11/8/20232733닷넷: 2156. .NET 7 이상의 콘솔 프로그램을 (dockerfile 없이) 로컬 docker에 배포하는 방법
13436정성태11/7/20232974닷넷: 2155. C# - .NET 8 런타임부터 (Reflection 없이) 특성을 이용해 public이 아닌 멤버 호출 가능
13435정성태11/6/20232903닷넷: 2154. C# - 네이티브 자원을 포함한 관리 개체(예: 스레드)의 GC 정리
13434정성태11/1/20232668스크립트: 62. 파이썬 - class의 정적 함수를 동적으로 교체
13433정성태11/1/20232379스크립트: 61. 파이썬 - 함수 오버로딩 미지원
13432정성태10/31/20232455오류 유형: 878. 탐색기의 WSL 디렉터리 접근 시 "Attempt to access invalid address." 오류 발생
1  2  3  4  5  6  [7]  8  9  10  11  12  13  14  15  ...