Microsoft MVP성태의 닷넷 이야기
닷넷: 2149. C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할 [링크 복사], [링크+제목 복사],
조회: 3500
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일

C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할

아래와 같은 질문이 있군요. ^^

C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?
; https://forum.dotnetdev.kr/t/c-parallel/8548/4

질문의 코드를 그대로 실어 보면,

var list = new List<int>();
for(var i = 0; i < 100; i++)
{
    list.Add(i);
}

list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    // 번호가 높을수록 작업이 오래 걸린다.
    Thread.Sleep(i * 5);
});

보는 바와 같이 Task에 해당하는 작업이 0, 1, 2, 3, ..., 99까지의 수로 이뤄진 형태가 되고, 그 수에 5를 곱해 Thread.Sleep을 하고 있습니다. 예제가 저렇긴 하지만 아마도 저 숫자를 작업이 끝날 것으로 예상되는 대략적인 weight라고 봐도 무방할 것입니다.

어쨌든, 저 코드대로라면 Parallel.ForEach의 특성상 작업은 100개의 목록을 4단계(0 ~ 24, 25 ~ 49, 50 ~ 74, 75 ~ 99) 구획으로 분할해 그 영역만큼을 ThreadPool의 개별 여유 스레드에 할당할 것이므로, 결국 마지막으로 갈수록, 특히나 75 ~ 99 구간을 담당한 스레드는 더 오랫동안 실행하게 될 것입니다. 실제로 대충 다음과 같은 부가 코드를 곁들이면,

namespace ConsoleApp1;

internal class Program
{
    static void Main(string[] args)
    {
        var list = Enumerable.Range(0, 100).ToList();

        Dictionary<int, int> processed = new Dictionary<int, int>();

        ThreadPool.GetAvailableThreads(out int workerThreads, out _);
        Enumerable.Range(0, workerThreads).All((elem) =>
        {
            processed[elem] = 0;
            return true;
        });

        list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
        {
            Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");

            processed[Thread.CurrentThread.ManagedThreadId] += i;
            Thread.Sleep(i * 5);
        });

        foreach (var item in processed)
        {
            if (item.Value == 0)
            {
                continue;
            }

            Console.WriteLine($"Total Id: {item.Key}, Value: {item.Value}");               
        }
    }
}

개별 스레드 당 수행된 부하를 출력할 수 있는데,

Total Id: 1, Value: 2175
Total Id: 4, Value: 300
Total Id: 7, Value: 925
Total Id: 10, Value: 1550

4번 스레드의 경우 300에 끝난 반면 1번 스레드는 2175의 부하로 오랫동안 혼자서 남은 작업들을 수행했음을 짐작게 합니다.

위의 코드로 알 수 있지만, PLinq의 작업 구획 나누기 규칙은 간단합니다. PLinq는 기본적으로 대상 작업의 수를 구할 수 있다면 WithDegreeOfParallelism으로 나누기를 한 구획을 분배합니다. 그렇기 때문에 "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 예제 코드에서 사용한 List<int>는 Count를 알 수 있는 IReadOnlyCollection을 구현하고 있으므로 그 수를 이용해 구획을 판정합니다.

그렇다면 만약 작업의 수를 모른다면 어떻게 될까요? 그럼 당연히 작업 분배는 List와는 다른 양상을 띠게 됩니다. 실제로 예제 코드에서 List 구성을,

var list = Enumerable.Range(0, 100).ToList();

다음과 같이만 바꿔도,

var list = Enumerable.Range(0, 100); // ToList를 하지 않았으므로 IEnumerable 반환

작업의 정확한 수를 알 수 없는 IEnumerable이 반환되므로 작업 부하는 이제 다음과 같이 바뀝니다.

// 실행마다 다른 결과
Total Id: 1, Value: 1120
Total Id: 4, Value: 1160
Total Id: 7, Value: 1469
Total Id: 10, Value: 1201

IEnumerable이기 때문에 어쩔 수 없이 작업을 분배하는 측은 IEnumerable.MoveNext 호출에 따라 작업을 분배하는 식으로 처리할 듯합니다.




자, 그럼 이것을 골고루 나누려면 어떻게 해야 할까요? "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 글의 답변에 있는 "에릭권"님의 답변이 이에 대한 훌륭한 답이 됩니다.

var list = Enumerable.Range(0, 100).ToList();
var queue = new ConcurrentQueue<int>(list);

Parallel.ForEach(Enumerable.Range(0, 4), _ =>
{
    while (queue.TryDequeue(out int p))
    {
        Console.WriteLine($"Id: {Environment.CurrentManagedThreadId}, Value: {p}");
        Thread.Sleep(p * 5);
    }
});

ForEach의 분할 특성을 이용했고, 작업을 Queue에서 빼오는 방식을 통해 먼저 작업을 끝낸 스레드가 후속 작업이 없어질 때까지 반복/수행하고 있습니다. 이전의 수행 부하 측정 코드를 집어넣어 보면 실행할 때마다 값이 달라지긴 하지만 다음과 같이 고르게 부하 분산이 됩니다.

// 실행마다 다름

Total Id: 4, Value: 1225
Total Id: 12, Value: 1250
Total Id: 13, Value: 1277
Total Id: 20, Value: 1198

오히려 IEnumerable을 자동으로 분배해 주던 PLinq의 것보다 골고루 더 분산이 됩니다. 사실 질문 자체는 "병렬 분할 알고리즘 변경"이지만 오히려 질문자의 상황에는 "에릭권"님의 답변이 올바른 해결책입니다.




자... 그럼 제목에 맞게 "병렬 분할 알고리즘 변경"이라는 측면으로 답변을 해보겠습니다. 마이크로소프트는 Partition에 대한 사용자 정의를 할 수 있는 방법을 제공하고 있는데,

Custom Partitioners for PLINQ and TPL
; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/custom-partitioners-for-plinq-and-tpl

바로 Partitioner<T>를 구현하기만 하면 됩니다. 실제로 "에릭권"님의 답변이 보여주는 방식을 Partitioner를 이용해 다음과 같이 만들어 둘 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    ConcurrentQueue<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = new ConcurrentQueue<T>(list.ToArray());
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        ConcurrentQueue<T> _list;

        public EnumerateFromList(ConcurrentQueue<T> list)
        {
            _list = list;
        }

        public IEnumerator<T> GetEnumerator()
        {
            while (_list!.TryDequeue(out T? item))
            {
                yield return item;
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

Partitioner가 해야 할 가장 중요한 작업은 GetPartitions 메서드에서 작업을 열거할 IEnumerable 배열을 인자로 넘어온 partitionCount 만큼 반환하는 것입니다. 즉, WithDegreeOfParallelism으로 지정한 값이 partitionCount로 넘어오게 되고, 여기서 반환한 IEnumerable 배열에 대해 하나씩 스레드 풀의 스레드가 맡아 작업을 처리하는 식입니다.

일단 이렇게 Partitioner를 만들어 두면 사용법은 이전과 유사하게 다룰 수 있습니다.

var list = Enumerable.Range(0, 100).ToList();

var partitioner = new ListPartitioner<int>(list);
partitioner.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    Thread.Sleep(i * 5);
});

보는 바와 같이 AsParallel()을 호출하고 있는데, 이게 가능한 이유는 AsParallel 확장 메서드가 기본적으로 IEnumerable 인터페이스와 함께 Partitioner<T>에 대해서도 정의돼 있기 때문입니다.

혹은, ConcurrentQueue를 사용할 필요 없이 단순히 인덱스를 지정하는 식으로 좀 더 Partitioner 본연의 기능에 가깝게 다음과 같이 구현하는 것도 가능합니다.

class RefInteger
{
    int _value = -1;

    public int GetNext()
    {
        int newValue = Interlocked.Increment(ref _value);
        return newValue;
    }
}

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;
    RefInteger _position = new RefInteger();

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list, _position).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        List<T> _list;
        RefInteger _position;

        public EnumerateFromList(List<T> list, RefInteger position)
        {
            _list = list;
            _position = position;
        }

        public IEnumerator<T> GetEnumerator()
        {
            do
            {
                int idx = _position.GetNext();
                if (idx >= _list.Count)
                {
                    yield break;
                }

                yield return _list[idx];
            } while (true);
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

"병렬 분할 알고리즘 변경"이라는 측면에서 봤을 때 이렇게 만들어 둔 Partitioner가 유용할 수 있겠지만, 아마도 대부분의 경우에는 그냥 "에릭권"님의 답변처럼 직접적으로 구현하는 것이 더 이해하기는 쉬울 것입니다. ^^ (재미있게도, 마이크로소프트의 Partitioner 도움말에 실린 예제 코드에 정확히 위의 역할과 동일한 SingleElementPartitioner를 구현하고 있습니다.)





참고로, 좀 더 단순하게라면, 이러한 구획 나누기는 list의 순서를 재정렬하는 것으로도 가능합니다. 구간마다 들어가는 요소를, 다음과 같이 각각 교차해서 PLinq에 전달해도 되는 것입니다.

0,4,8,12,16,20,24,28,32,36,40,44,48,52,56,60,64,68,72,76,80,84,88,92,96
1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97
2,6,10,14,18,22,26,30,34,38,42,46,50,54,58,62,66,70,74,78,82,86,90,94,98
3,7,11,15,19,23,27,31,35,39,43,47,51,55,59,63,67,71,75,79,83,87,91,95,99

이를 위해 간단하게 다음의 코드를 하나 넣어주면,

var list = Enumerable.Range(0, 100).ToList();

List<int> ordered = list.OrderBy(i => i % 4).ToList();
ordered.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    // ...[생략]...
});

기본적인 파티션 규칙에 따라 부하 분산 효과를 갖게 됩니다. 실제 출력 결과도 이런데요,

Total Id: 1, Value: 1275
Total Id: 4, Value: 1250
Total Id: 10, Value: 1200
Total Id: 11, Value: 1225

물론, 저 코드는 OrderBy로 인해 정렬 작업이 추가되는 부하가 있으니 권장할 수는 없습니다. 단지, 기본 제공하는 Partitioner의 규칙에 실어 자연스럽게 구획을 나누게 한다는 점에서 의미가 있다고만 보면 되겠습니다.

게다가 위의 규칙 역시도 Partitioner 본연의 개념을 이용한다면 다음과 같이 (이번엔 정렬 작업 없이) 구현할 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new HopEnumerate(_list, i, partitionCount).GetEnumerator();
        }

        return partitions;
    }

    private class HopEnumerate: IEnumerable<T>
    {
        List<T> _list;
        int _start;
        int _hop;

        public HopEnumerate(List<T> list, int start, int hop) 
            => (_list, _start, _hop) = (list, start, hop);    

        public IEnumerator<T> GetEnumerator()
        {
            for (int i = _start; i < _list.Count; i += _hop)
            {
                yield return _list[i];
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]







[최초 등록일: ]
[최종 수정일: 10/16/2023]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... 16  17  [18]  19  20  21  22  23  24  25  26  27  28  29  30  ...
NoWriterDateCnt.TitleFile(s)
13203정성태12/22/20224870.NET Framework: 2081. C# Interop 예제 - (LSA_UNICODE_STRING 예제로) 구조체를 C++에 전달하는 방법파일 다운로드1
13202정성태12/21/20225260기타: 84. 직렬화로 설명하는 Little/Big Endian파일 다운로드1
13201정성태12/20/20225822오류 유형: 835. PyCharm 사용 시 C 드라이브 용량 부족
13200정성태12/19/20224758오류 유형: 834. 이벤트 로그 - SSL Certificate Settings created by an admin process for endpoint
13199정성태12/19/20224941개발 환경 구성: 656. Internal Network 유형의 스위치로 공유한 Hyper-V의 VM과 호스트가 통신이 안 되는 경우
13198정성태12/18/20224884.NET Framework: 2080. C# - Microsoft.XmlSerializer.Generator 처리 없이 XmlSerializer 생성자를 예외 없이 사용하고 싶다면?파일 다운로드1
13197정성태12/17/20224670.NET Framework: 2079. .NET Core/5+ 환경에서 XmlSerializer 사용 시 System.IO.FileNotFoundException 예외 발생하는 경우파일 다운로드1
13196정성태12/16/20224875.NET Framework: 2078. .NET Core/5+를 위한 SGen(Microsoft.XmlSerializer.Generator) 사용법
13195정성태12/15/20225323개발 환경 구성: 655. docker - bridge 네트워크 모드에서 컨테이너 간 통신 시 --link 옵션 권장 이유
13194정성태12/14/20225460오류 유형: 833. warning C4747: Calling managed 'DllMain': Managed code may not be run under loader lock파일 다운로드1
13193정성태12/14/20225579오류 유형: 832. error C7681: two-phase name lookup is not supported for C++/CLI or C++/CX; use /Zc:twoPhase-
13192정성태12/13/20225634Linux: 55. 리눅스 - bash shell에서 실수 연산
13191정성태12/11/20226513.NET Framework: 2077. C# - 직접 만들어 보는 SynchronizationContext파일 다운로드1
13190정성태12/9/20227007.NET Framework: 2076. C# - SynchronizationContext 기본 사용법파일 다운로드1
13189정성태12/9/20227896오류 유형: 831. Visual Studio - Windows Forms 디자이너의 도구 상자에 컨트롤이 보이지 않는 문제
13188정성태12/9/20226468.NET Framework: 2075. C# - 직접 만들어 보는 TaskScheduler 실습 (SingleThreadTaskScheduler)파일 다운로드1
13187정성태12/8/20226389개발 환경 구성: 654. openssl - CA로부터 인증받은 새로운 인증서를 생성하는 방법 (2)
13186정성태12/6/20224929오류 유형: 831. The framework 'Microsoft.AspNetCore.App', version '...' was not found.
13185정성태12/6/20225897개발 환경 구성: 653. Windows 환경에서의 Hello World x64 어셈블리 예제 (NASM 버전)
13184정성태12/5/20225075개발 환경 구성: 652. ml64.exe와 link.exe x64 실행 환경 구성
13183정성태12/4/20225099오류 유형: 830. MASM + CRT 함수를 사용하는 경우 발생하는 컴파일 오류 정리
13182정성태12/4/20225828Windows: 217. Windows 환경에서의 Hello World x64 어셈블리 예제 (MASM 버전)
13181정성태12/3/20225219Linux: 54. 리눅스/WSL - hello world 어셈블리 코드 x86/x64 (nasm)
13180정성태12/2/20225320.NET Framework: 2074. C# - 스택 메모리에 대한 여유 공간 확인하는 방법파일 다운로드1
13179정성태12/2/20224646Windows: 216. Windows 11 - 22H2 업데이트 이후 Terminal 대신 cmd 창이 뜨는 경우
13178정성태12/1/20225223Windows: 215. Win32 API 금지된 함수 - IsBadXxxPtr 유의 함수들이 안전하지 않은 이유파일 다운로드1
... 16  17  [18]  19  20  21  22  23  24  25  26  27  28  29  30  ...