Microsoft MVP성태의 닷넷 이야기
닷넷: 2149. C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할 [링크 복사], [링크+제목 복사],
조회: 11365
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일

C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할

아래와 같은 질문이 있군요. ^^

C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?
; https://forum.dotnetdev.kr/t/c-parallel/8548/4

질문의 코드를 그대로 실어 보면,

var list = new List<int>();
for(var i = 0; i < 100; i++)
{
    list.Add(i);
}

list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    // 번호가 높을수록 작업이 오래 걸린다.
    Thread.Sleep(i * 5);
});

보는 바와 같이 Task에 해당하는 작업이 0, 1, 2, 3, ..., 99까지의 수로 이뤄진 형태가 되고, 그 수에 5를 곱해 Thread.Sleep을 하고 있습니다. 예제가 저렇긴 하지만 아마도 저 숫자를 작업이 끝날 것으로 예상되는 대략적인 weight라고 봐도 무방할 것입니다.

어쨌든, 저 코드대로라면 Parallel.ForEach의 특성상 작업은 100개의 목록을 4단계(0 ~ 24, 25 ~ 49, 50 ~ 74, 75 ~ 99) 구획으로 분할해 그 영역만큼을 ThreadPool의 개별 여유 스레드에 할당할 것이므로, 결국 마지막으로 갈수록, 특히나 75 ~ 99 구간을 담당한 스레드는 더 오랫동안 실행하게 될 것입니다. 실제로 대충 다음과 같은 부가 코드를 곁들이면,

namespace ConsoleApp1;

internal class Program
{
    static void Main(string[] args)
    {
        var list = Enumerable.Range(0, 100).ToList();

        Dictionary<int, int> processed = new Dictionary<int, int>();

        ThreadPool.GetAvailableThreads(out int workerThreads, out _);
        Enumerable.Range(0, workerThreads).All((elem) =>
        {
            processed[elem] = 0;
            return true;
        });

        list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
        {
            Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");

            processed[Thread.CurrentThread.ManagedThreadId] += i;
            Thread.Sleep(i * 5);
        });

        foreach (var item in processed)
        {
            if (item.Value == 0)
            {
                continue;
            }

            Console.WriteLine($"Total Id: {item.Key}, Value: {item.Value}");               
        }
    }
}

개별 스레드 당 수행된 부하를 출력할 수 있는데,

Total Id: 1, Value: 2175
Total Id: 4, Value: 300
Total Id: 7, Value: 925
Total Id: 10, Value: 1550

4번 스레드의 경우 300에 끝난 반면 1번 스레드는 2175의 부하로 오랫동안 혼자서 남은 작업들을 수행했음을 짐작게 합니다.

위의 코드로 알 수 있지만, PLinq의 작업 구획 나누기 규칙은 간단합니다. PLinq는 기본적으로 대상 작업의 수를 구할 수 있다면 WithDegreeOfParallelism으로 나누기를 한 구획을 분배합니다. 그렇기 때문에 "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 예제 코드에서 사용한 List<int>는 Count를 알 수 있는 IReadOnlyCollection을 구현하고 있으므로 그 수를 이용해 구획을 판정합니다.

그렇다면 만약 작업의 수를 모른다면 어떻게 될까요? 그럼 당연히 작업 분배는 List와는 다른 양상을 띠게 됩니다. 실제로 예제 코드에서 List 구성을,

var list = Enumerable.Range(0, 100).ToList();

다음과 같이만 바꿔도,

var list = Enumerable.Range(0, 100); // ToList를 하지 않았으므로 IEnumerable 반환

작업의 정확한 수를 알 수 없는 IEnumerable이 반환되므로 작업 부하는 이제 다음과 같이 바뀝니다.

// 실행마다 다른 결과
Total Id: 1, Value: 1120
Total Id: 4, Value: 1160
Total Id: 7, Value: 1469
Total Id: 10, Value: 1201

IEnumerable이기 때문에 어쩔 수 없이 작업을 분배하는 측은 IEnumerable.MoveNext 호출에 따라 작업을 분배하는 식으로 처리할 듯합니다.




자, 그럼 이것을 골고루 나누려면 어떻게 해야 할까요? "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 글의 답변에 있는 "에릭권"님의 답변이 이에 대한 훌륭한 답이 됩니다.

var list = Enumerable.Range(0, 100).ToList();
var queue = new ConcurrentQueue<int>(list);

Parallel.ForEach(Enumerable.Range(0, 4), _ =>
{
    while (queue.TryDequeue(out int p))
    {
        Console.WriteLine($"Id: {Environment.CurrentManagedThreadId}, Value: {p}");
        Thread.Sleep(p * 5);
    }
});

ForEach의 분할 특성을 이용했고, 작업을 Queue에서 빼오는 방식을 통해 먼저 작업을 끝낸 스레드가 후속 작업이 없어질 때까지 반복/수행하고 있습니다. 이전의 수행 부하 측정 코드를 집어넣어 보면 실행할 때마다 값이 달라지긴 하지만 다음과 같이 고르게 부하 분산이 됩니다.

// 실행마다 다름

Total Id: 4, Value: 1225
Total Id: 12, Value: 1250
Total Id: 13, Value: 1277
Total Id: 20, Value: 1198

오히려 IEnumerable을 자동으로 분배해 주던 PLinq의 것보다 골고루 더 분산이 됩니다. 사실 질문 자체는 "병렬 분할 알고리즘 변경"이지만 오히려 질문자의 상황에는 "에릭권"님의 답변이 올바른 해결책입니다.




자... 그럼 제목에 맞게 "병렬 분할 알고리즘 변경"이라는 측면으로 답변을 해보겠습니다. 마이크로소프트는 Partition에 대한 사용자 정의를 할 수 있는 방법을 제공하고 있는데,

Custom Partitioners for PLINQ and TPL
; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/custom-partitioners-for-plinq-and-tpl

바로 Partitioner<T>를 구현하기만 하면 됩니다. 실제로 "에릭권"님의 답변이 보여주는 방식을 Partitioner를 이용해 다음과 같이 만들어 둘 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    ConcurrentQueue<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = new ConcurrentQueue<T>(list.ToArray());
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        ConcurrentQueue<T> _list;

        public EnumerateFromList(ConcurrentQueue<T> list)
        {
            _list = list;
        }

        public IEnumerator<T> GetEnumerator()
        {
            while (_list!.TryDequeue(out T? item))
            {
                yield return item;
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

Partitioner가 해야 할 가장 중요한 작업은 GetPartitions 메서드에서 작업을 열거할 IEnumerable 배열을 인자로 넘어온 partitionCount 만큼 반환하는 것입니다. 즉, WithDegreeOfParallelism으로 지정한 값이 partitionCount로 넘어오게 되고, 여기서 반환한 IEnumerable 배열에 대해 하나씩 스레드 풀의 스레드가 맡아 작업을 처리하는 식입니다.

일단 이렇게 Partitioner를 만들어 두면 사용법은 이전과 유사하게 다룰 수 있습니다.

var list = Enumerable.Range(0, 100).ToList();

var partitioner = new ListPartitioner<int>(list);
partitioner.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    Thread.Sleep(i * 5);
});

보는 바와 같이 AsParallel()을 호출하고 있는데, 이게 가능한 이유는 AsParallel 확장 메서드가 기본적으로 IEnumerable 인터페이스와 함께 Partitioner<T>에 대해서도 정의돼 있기 때문입니다.

혹은, ConcurrentQueue를 사용할 필요 없이 단순히 인덱스를 지정하는 식으로 좀 더 Partitioner 본연의 기능에 가깝게 다음과 같이 구현하는 것도 가능합니다.

class RefInteger
{
    int _value = -1;

    public int GetNext()
    {
        int newValue = Interlocked.Increment(ref _value);
        return newValue;
    }
}

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;
    RefInteger _position = new RefInteger();

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list, _position).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        List<T> _list;
        RefInteger _position;

        public EnumerateFromList(List<T> list, RefInteger position)
        {
            _list = list;
            _position = position;
        }

        public IEnumerator<T> GetEnumerator()
        {
            do
            {
                int idx = _position.GetNext();
                if (idx >= _list.Count)
                {
                    yield break;
                }

                yield return _list[idx];
            } while (true);
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

"병렬 분할 알고리즘 변경"이라는 측면에서 봤을 때 이렇게 만들어 둔 Partitioner가 유용할 수 있겠지만, 아마도 대부분의 경우에는 그냥 "에릭권"님의 답변처럼 직접적으로 구현하는 것이 더 이해하기는 쉬울 것입니다. ^^ (재미있게도, 마이크로소프트의 Partitioner 도움말에 실린 예제 코드에 정확히 위의 역할과 동일한 SingleElementPartitioner를 구현하고 있습니다.)





참고로, 좀 더 단순하게라면, 이러한 구획 나누기는 list의 순서를 재정렬하는 것으로도 가능합니다. 구간마다 들어가는 요소를, 다음과 같이 각각 교차해서 PLinq에 전달해도 되는 것입니다.

0,4,8,12,16,20,24,28,32,36,40,44,48,52,56,60,64,68,72,76,80,84,88,92,96
1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97
2,6,10,14,18,22,26,30,34,38,42,46,50,54,58,62,66,70,74,78,82,86,90,94,98
3,7,11,15,19,23,27,31,35,39,43,47,51,55,59,63,67,71,75,79,83,87,91,95,99

이를 위해 간단하게 다음의 코드를 하나 넣어주면,

var list = Enumerable.Range(0, 100).ToList();

List<int> ordered = list.OrderBy(i => i % 4).ToList();
ordered.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    // ...[생략]...
});

기본적인 파티션 규칙에 따라 부하 분산 효과를 갖게 됩니다. 실제 출력 결과도 이런데요,

Total Id: 1, Value: 1275
Total Id: 4, Value: 1250
Total Id: 10, Value: 1200
Total Id: 11, Value: 1225

물론, 저 코드는 OrderBy로 인해 정렬 작업이 추가되는 부하가 있으니 권장할 수는 없습니다. 단지, 기본 제공하는 Partitioner의 규칙에 실어 자연스럽게 구획을 나누게 한다는 점에서 의미가 있다고만 보면 되겠습니다.

게다가 위의 규칙 역시도 Partitioner 본연의 개념을 이용한다면 다음과 같이 (이번엔 정렬 작업 없이) 구현할 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new HopEnumerate(_list, i, partitionCount).GetEnumerator();
        }

        return partitions;
    }

    private class HopEnumerate: IEnumerable<T>
    {
        List<T> _list;
        int _start;
        int _hop;

        public HopEnumerate(List<T> list, int start, int hop) 
            => (_list, _start, _hop) = (list, start, hop);    

        public IEnumerator<T> GetEnumerator()
        {
            for (int i = _start; i < _list.Count; i += _hop)
            {
                yield return _list[i];
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]







[최초 등록일: ]
[최종 수정일: 10/16/2023]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... 121  122  123  124  125  126  127  [128]  129  130  131  132  133  134  135  ...
NoWriterDateCnt.TitleFile(s)
1855정성태2/10/201520795개발 환경 구성: 256. WebDAV Redirector - Sysinternals 폴더 연결 시 "The network path was not found" 오류 해결 방법
1854정성태2/10/201521762Windows: 104. 폴더는 삭제할 수 없지만, 그 하위 폴더/파일은 생성/삭제/변경하는 보안 설정
1853정성태2/6/201552055웹: 29. 여신금융협회 웹 사이트의 "Netscape 6.0은 지원하지 않습니다." 오류 메시지 [5]
1852정성태2/5/201522462.NET Framework: 492. .NET CLR Memory 성능 카운터의 의미파일 다운로드1
1851정성태2/5/201523395VC++: 88. 하룻밤의 꿈 - 인텔 하스웰의 TSX Instruction 지원 [2]
1850정성태2/4/201544187Windows: 103. 작업 관리자에서의 "Commit size"가 가리키는 메모리의 의미 [4]
1849정성태2/4/201524177기타: 51. DropBox의 CPU 100% 현상 [1]파일 다운로드1
1848정성태2/4/201519410.NET Framework: 491. 닷넷 Generic 타입의 메타 데이터 토큰 값 알아내는 방법 [2]
1847정성태2/3/201522684기타: 50. C# - 윈도우에서 dropbox 동기화 폴더 경로 및 종료하는 방법
1846정성태2/2/201532006Windows: 102. 제어판의 프로그램 추가/삭제 항목을 수동으로 실행하고 싶다면? [1]
1845정성태1/26/201532885Windows: 101. 제어판의 "Windows 자격 증명 관리(Manage your credentials)"를 금지시키는 방법
1844정성태1/26/201530844오류 유형: 269. USB 메모리의 용량이 비정상적으로 보여진다면? [7]
1843정성태1/24/201521899VC++: 87. 무시할 수 없는 Visual C++ 런타임 함수 성능
1842정성태1/23/201544393개발 환경 구성: 255. 노트북 키보드에 없는 BREAK 키를 다른 키로 대체하는 방법
1841정성태1/21/201519371오류 유형: 268. Win32 핸들 관련 CLR4 보안 오류 사례
1840정성태1/8/201527602오류 유형: 267. Visual Studio - CodeLens 사용 시 CPU 100% 현상
1839정성태1/5/201520515디버깅 기술: 69. windbg 분석 사례 - cpu 100% 현상 (2)
1838정성태1/4/201540231기타: 49. 윈도우 내레이터(Narrator) 기능 끄는 방법(윈도우에 파란색의 굵은 테두리 선이 나타난다면?) [4]
1837정성태1/4/201526327디버깅 기술: 68. windbg 분석 사례 - 메모리 부족 [1]
1836정성태1/4/201526341디버깅 기술: 67. windbg - 덤프 파일과 handle 정보
1835정성태1/3/201526814개발 환경 구성: 254. SQL 서버 역시 SSL 3.0/TLS 1.0만을 지원하는 듯!
1834정성태1/3/201551441개발 환경 구성: 253. TLS 1.2를 적용한 IIS 웹 사이트 구성
1833정성태1/3/201527507.NET Framework: 490. System.Data.SqlClient는 SSL 3.0/TLS 1.0만 지원하는 듯! [3]
1832정성태1/2/201520619오류 유형: 266. Azure에 응용 프로그램 게시 중 로그인 오류
1831정성태1/1/201528514디버깅 기술: 66. windbg 분석 사례 - cpu 100% 현상 (1) [1]
1830정성태1/1/201527556오류 유형: 265. svchost.exe 프로세스(IP Helper: IPHLPSVC)의 CPU 100% 현상
... 121  122  123  124  125  126  127  [128]  129  130  131  132  133  134  135  ...