Microsoft MVP성태의 닷넷 이야기
닷넷: 2149. C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할 [링크 복사], [링크+제목 복사],
조회: 5768
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일

C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할

아래와 같은 질문이 있군요. ^^

C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?
; https://forum.dotnetdev.kr/t/c-parallel/8548/4

질문의 코드를 그대로 실어 보면,

var list = new List<int>();
for(var i = 0; i < 100; i++)
{
    list.Add(i);
}

list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    // 번호가 높을수록 작업이 오래 걸린다.
    Thread.Sleep(i * 5);
});

보는 바와 같이 Task에 해당하는 작업이 0, 1, 2, 3, ..., 99까지의 수로 이뤄진 형태가 되고, 그 수에 5를 곱해 Thread.Sleep을 하고 있습니다. 예제가 저렇긴 하지만 아마도 저 숫자를 작업이 끝날 것으로 예상되는 대략적인 weight라고 봐도 무방할 것입니다.

어쨌든, 저 코드대로라면 Parallel.ForEach의 특성상 작업은 100개의 목록을 4단계(0 ~ 24, 25 ~ 49, 50 ~ 74, 75 ~ 99) 구획으로 분할해 그 영역만큼을 ThreadPool의 개별 여유 스레드에 할당할 것이므로, 결국 마지막으로 갈수록, 특히나 75 ~ 99 구간을 담당한 스레드는 더 오랫동안 실행하게 될 것입니다. 실제로 대충 다음과 같은 부가 코드를 곁들이면,

namespace ConsoleApp1;

internal class Program
{
    static void Main(string[] args)
    {
        var list = Enumerable.Range(0, 100).ToList();

        Dictionary<int, int> processed = new Dictionary<int, int>();

        ThreadPool.GetAvailableThreads(out int workerThreads, out _);
        Enumerable.Range(0, workerThreads).All((elem) =>
        {
            processed[elem] = 0;
            return true;
        });

        list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
        {
            Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");

            processed[Thread.CurrentThread.ManagedThreadId] += i;
            Thread.Sleep(i * 5);
        });

        foreach (var item in processed)
        {
            if (item.Value == 0)
            {
                continue;
            }

            Console.WriteLine($"Total Id: {item.Key}, Value: {item.Value}");               
        }
    }
}

개별 스레드 당 수행된 부하를 출력할 수 있는데,

Total Id: 1, Value: 2175
Total Id: 4, Value: 300
Total Id: 7, Value: 925
Total Id: 10, Value: 1550

4번 스레드의 경우 300에 끝난 반면 1번 스레드는 2175의 부하로 오랫동안 혼자서 남은 작업들을 수행했음을 짐작게 합니다.

위의 코드로 알 수 있지만, PLinq의 작업 구획 나누기 규칙은 간단합니다. PLinq는 기본적으로 대상 작업의 수를 구할 수 있다면 WithDegreeOfParallelism으로 나누기를 한 구획을 분배합니다. 그렇기 때문에 "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 예제 코드에서 사용한 List<int>는 Count를 알 수 있는 IReadOnlyCollection을 구현하고 있으므로 그 수를 이용해 구획을 판정합니다.

그렇다면 만약 작업의 수를 모른다면 어떻게 될까요? 그럼 당연히 작업 분배는 List와는 다른 양상을 띠게 됩니다. 실제로 예제 코드에서 List 구성을,

var list = Enumerable.Range(0, 100).ToList();

다음과 같이만 바꿔도,

var list = Enumerable.Range(0, 100); // ToList를 하지 않았으므로 IEnumerable 반환

작업의 정확한 수를 알 수 없는 IEnumerable이 반환되므로 작업 부하는 이제 다음과 같이 바뀝니다.

// 실행마다 다른 결과
Total Id: 1, Value: 1120
Total Id: 4, Value: 1160
Total Id: 7, Value: 1469
Total Id: 10, Value: 1201

IEnumerable이기 때문에 어쩔 수 없이 작업을 분배하는 측은 IEnumerable.MoveNext 호출에 따라 작업을 분배하는 식으로 처리할 듯합니다.




자, 그럼 이것을 골고루 나누려면 어떻게 해야 할까요? "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 글의 답변에 있는 "에릭권"님의 답변이 이에 대한 훌륭한 답이 됩니다.

var list = Enumerable.Range(0, 100).ToList();
var queue = new ConcurrentQueue<int>(list);

Parallel.ForEach(Enumerable.Range(0, 4), _ =>
{
    while (queue.TryDequeue(out int p))
    {
        Console.WriteLine($"Id: {Environment.CurrentManagedThreadId}, Value: {p}");
        Thread.Sleep(p * 5);
    }
});

ForEach의 분할 특성을 이용했고, 작업을 Queue에서 빼오는 방식을 통해 먼저 작업을 끝낸 스레드가 후속 작업이 없어질 때까지 반복/수행하고 있습니다. 이전의 수행 부하 측정 코드를 집어넣어 보면 실행할 때마다 값이 달라지긴 하지만 다음과 같이 고르게 부하 분산이 됩니다.

// 실행마다 다름

Total Id: 4, Value: 1225
Total Id: 12, Value: 1250
Total Id: 13, Value: 1277
Total Id: 20, Value: 1198

오히려 IEnumerable을 자동으로 분배해 주던 PLinq의 것보다 골고루 더 분산이 됩니다. 사실 질문 자체는 "병렬 분할 알고리즘 변경"이지만 오히려 질문자의 상황에는 "에릭권"님의 답변이 올바른 해결책입니다.




자... 그럼 제목에 맞게 "병렬 분할 알고리즘 변경"이라는 측면으로 답변을 해보겠습니다. 마이크로소프트는 Partition에 대한 사용자 정의를 할 수 있는 방법을 제공하고 있는데,

Custom Partitioners for PLINQ and TPL
; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/custom-partitioners-for-plinq-and-tpl

바로 Partitioner<T>를 구현하기만 하면 됩니다. 실제로 "에릭권"님의 답변이 보여주는 방식을 Partitioner를 이용해 다음과 같이 만들어 둘 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    ConcurrentQueue<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = new ConcurrentQueue<T>(list.ToArray());
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        ConcurrentQueue<T> _list;

        public EnumerateFromList(ConcurrentQueue<T> list)
        {
            _list = list;
        }

        public IEnumerator<T> GetEnumerator()
        {
            while (_list!.TryDequeue(out T? item))
            {
                yield return item;
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

Partitioner가 해야 할 가장 중요한 작업은 GetPartitions 메서드에서 작업을 열거할 IEnumerable 배열을 인자로 넘어온 partitionCount 만큼 반환하는 것입니다. 즉, WithDegreeOfParallelism으로 지정한 값이 partitionCount로 넘어오게 되고, 여기서 반환한 IEnumerable 배열에 대해 하나씩 스레드 풀의 스레드가 맡아 작업을 처리하는 식입니다.

일단 이렇게 Partitioner를 만들어 두면 사용법은 이전과 유사하게 다룰 수 있습니다.

var list = Enumerable.Range(0, 100).ToList();

var partitioner = new ListPartitioner<int>(list);
partitioner.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    Thread.Sleep(i * 5);
});

보는 바와 같이 AsParallel()을 호출하고 있는데, 이게 가능한 이유는 AsParallel 확장 메서드가 기본적으로 IEnumerable 인터페이스와 함께 Partitioner<T>에 대해서도 정의돼 있기 때문입니다.

혹은, ConcurrentQueue를 사용할 필요 없이 단순히 인덱스를 지정하는 식으로 좀 더 Partitioner 본연의 기능에 가깝게 다음과 같이 구현하는 것도 가능합니다.

class RefInteger
{
    int _value = -1;

    public int GetNext()
    {
        int newValue = Interlocked.Increment(ref _value);
        return newValue;
    }
}

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;
    RefInteger _position = new RefInteger();

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list, _position).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        List<T> _list;
        RefInteger _position;

        public EnumerateFromList(List<T> list, RefInteger position)
        {
            _list = list;
            _position = position;
        }

        public IEnumerator<T> GetEnumerator()
        {
            do
            {
                int idx = _position.GetNext();
                if (idx >= _list.Count)
                {
                    yield break;
                }

                yield return _list[idx];
            } while (true);
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

"병렬 분할 알고리즘 변경"이라는 측면에서 봤을 때 이렇게 만들어 둔 Partitioner가 유용할 수 있겠지만, 아마도 대부분의 경우에는 그냥 "에릭권"님의 답변처럼 직접적으로 구현하는 것이 더 이해하기는 쉬울 것입니다. ^^ (재미있게도, 마이크로소프트의 Partitioner 도움말에 실린 예제 코드에 정확히 위의 역할과 동일한 SingleElementPartitioner를 구현하고 있습니다.)





참고로, 좀 더 단순하게라면, 이러한 구획 나누기는 list의 순서를 재정렬하는 것으로도 가능합니다. 구간마다 들어가는 요소를, 다음과 같이 각각 교차해서 PLinq에 전달해도 되는 것입니다.

0,4,8,12,16,20,24,28,32,36,40,44,48,52,56,60,64,68,72,76,80,84,88,92,96
1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97
2,6,10,14,18,22,26,30,34,38,42,46,50,54,58,62,66,70,74,78,82,86,90,94,98
3,7,11,15,19,23,27,31,35,39,43,47,51,55,59,63,67,71,75,79,83,87,91,95,99

이를 위해 간단하게 다음의 코드를 하나 넣어주면,

var list = Enumerable.Range(0, 100).ToList();

List<int> ordered = list.OrderBy(i => i % 4).ToList();
ordered.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    // ...[생략]...
});

기본적인 파티션 규칙에 따라 부하 분산 효과를 갖게 됩니다. 실제 출력 결과도 이런데요,

Total Id: 1, Value: 1275
Total Id: 4, Value: 1250
Total Id: 10, Value: 1200
Total Id: 11, Value: 1225

물론, 저 코드는 OrderBy로 인해 정렬 작업이 추가되는 부하가 있으니 권장할 수는 없습니다. 단지, 기본 제공하는 Partitioner의 규칙에 실어 자연스럽게 구획을 나누게 한다는 점에서 의미가 있다고만 보면 되겠습니다.

게다가 위의 규칙 역시도 Partitioner 본연의 개념을 이용한다면 다음과 같이 (이번엔 정렬 작업 없이) 구현할 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new HopEnumerate(_list, i, partitionCount).GetEnumerator();
        }

        return partitions;
    }

    private class HopEnumerate: IEnumerable<T>
    {
        List<T> _list;
        int _start;
        int _hop;

        public HopEnumerate(List<T> list, int start, int hop) 
            => (_list, _start, _hop) = (list, start, hop);    

        public IEnumerator<T> GetEnumerator()
        {
            for (int i = _start; i < _list.Count; i += _hop)
            {
                yield return _list[i];
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]







[최초 등록일: ]
[최종 수정일: 10/16/2023]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




1  [2]  3  4  5  6  7  8  9  10  11  12  13  14  15  ...
NoWriterDateCnt.TitleFile(s)
13706정성태8/5/20242114개발 환경 구성: 718. Hyper-V - 리눅스 VM에 새로운 디스크 추가
13705정성태8/4/20242304닷넷: 2291. C# 13 - (5) params 인자 타입으로 컬렉션 허용파일 다운로드1
13704정성태8/2/20242266닷넷: 2290. C# - 간이 dotnet-dump 프로그램 만들기파일 다운로드1
13703정성태8/1/20242284닷넷: 2289. "dotnet-dump ps" 명령어가 닷넷 프로세스를 찾는 방법
13702정성태7/31/20242214닷넷: 2288. Collection 식을 지원하는 사용자 정의 타입을 CollectionBuilder 특성으로 성능 보완파일 다운로드1
13701정성태7/30/20242364닷넷: 2287. C# 13 - (4) Indexer를 이용한 개체 초기화 구문에서 System.Index 연산자 허용파일 다운로드1
13700정성태7/29/20242301디버깅 기술: 200. DLL Export/Import의 Hint 의미
13699정성태7/27/20242472닷넷: 2286. C# 13 - (3) Monitor를 대체할 Lock 타입파일 다운로드1
13698정성태7/27/20242469닷넷: 2285. C# - async 메서드에서의 System.Threading.Lock 잠금 처리파일 다운로드1
13697정성태7/26/20242406닷넷: 2284. C# - async 메서드에서의 lock/Monitor.Enter/Exit 잠금 처리파일 다운로드1
13696정성태7/26/20241980오류 유형: 920. dotnet publish - error NETSDK1047: Assets file '...\obj\project.assets.json' doesn't have a target for '...'
13695정성태7/25/20242104닷넷: 2283. C# - Lock / Wait 상태에서도 STA COM 메서드 호출 처리파일 다운로드1
13694정성태7/25/20242012닷넷: 2282. C# - ASP.NET Core Web App의 Request 용량 상한값 (Kestrel, IIS)
13693정성태7/24/20242084개발 환경 구성: 717. Visual Studio - C# 프로젝트에서 레지스트리에 등록하지 않은 COM 개체 참조 및 사용 방법파일 다운로드1
13692정성태7/24/20242267디버깅 기술: 199. Windbg - 리눅스에서 뜬 닷넷 응용 프로그램 덤프 파일에 포함된 DLL의 Export Directory 탐색
13691정성태7/23/20242298디버깅 기술: 198. Windbg - 스레드의 Win32 Message Queue 정보 조회
13690정성태7/23/20242231오류 유형: 919. Visual C++ 리눅스 프로젝트 - error : ‘u8’ was not declared in this scope
13689정성태7/22/20242121디버깅 기술: 197. Windbg - PE 포맷의 Export Directory 탐색
13688정성태7/21/20242284닷넷: 2281. C# - Lock / Wait 상태에서도 일부 Win32 메시지 처리파일 다운로드1
13687정성태7/19/20242281닷넷: 2280. C# - PostThreadMessage로 보낸 메시지를 Windows Forms에서 수신하는 방법파일 다운로드1
13686정성태7/19/20242146오류 유형: 918. Visual Studio - ATL Simple Object 추가 시 error C2065: 'IDR_...': undeclared identifier
13685정성태7/19/20242238스크립트: 66. Windows 디렉터리 경로를 WSL의 /mnt 포맷으로 구하는 방법 - 두 번째 이야기
13684정성태7/19/20242415닷넷: 2279. C# - 문자열 보간식 사례
13683정성태7/18/20242207오류 유형: 917. ClrMD - Linux 환경의 .NET 5 덤프 분석 시 hang 현상
13682정성태7/18/20242405닷넷: 2278. WPF - 스레드에 종속되는 DependencyObject파일 다운로드1
13681정성태7/17/20242349닷넷: 2277. C# 13 - (2) 메서드 그룹의 자연 타입 개선 (메서드 추론 개선)파일 다운로드1
1  [2]  3  4  5  6  7  8  9  10  11  12  13  14  15  ...