Microsoft MVP성태의 닷넷 이야기
닷넷: 2149. C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할 [링크 복사], [링크+제목 복사],
조회: 11209
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일

C# - PLinq의 Partitioner<T>를 이용한 사용자 정의 분할

아래와 같은 질문이 있군요. ^^

C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?
; https://forum.dotnetdev.kr/t/c-parallel/8548/4

질문의 코드를 그대로 실어 보면,

var list = new List<int>();
for(var i = 0; i < 100; i++)
{
    list.Add(i);
}

list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    // 번호가 높을수록 작업이 오래 걸린다.
    Thread.Sleep(i * 5);
});

보는 바와 같이 Task에 해당하는 작업이 0, 1, 2, 3, ..., 99까지의 수로 이뤄진 형태가 되고, 그 수에 5를 곱해 Thread.Sleep을 하고 있습니다. 예제가 저렇긴 하지만 아마도 저 숫자를 작업이 끝날 것으로 예상되는 대략적인 weight라고 봐도 무방할 것입니다.

어쨌든, 저 코드대로라면 Parallel.ForEach의 특성상 작업은 100개의 목록을 4단계(0 ~ 24, 25 ~ 49, 50 ~ 74, 75 ~ 99) 구획으로 분할해 그 영역만큼을 ThreadPool의 개별 여유 스레드에 할당할 것이므로, 결국 마지막으로 갈수록, 특히나 75 ~ 99 구간을 담당한 스레드는 더 오랫동안 실행하게 될 것입니다. 실제로 대충 다음과 같은 부가 코드를 곁들이면,

namespace ConsoleApp1;

internal class Program
{
    static void Main(string[] args)
    {
        var list = Enumerable.Range(0, 100).ToList();

        Dictionary<int, int> processed = new Dictionary<int, int>();

        ThreadPool.GetAvailableThreads(out int workerThreads, out _);
        Enumerable.Range(0, workerThreads).All((elem) =>
        {
            processed[elem] = 0;
            return true;
        });

        list.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
        {
            Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");

            processed[Thread.CurrentThread.ManagedThreadId] += i;
            Thread.Sleep(i * 5);
        });

        foreach (var item in processed)
        {
            if (item.Value == 0)
            {
                continue;
            }

            Console.WriteLine($"Total Id: {item.Key}, Value: {item.Value}");               
        }
    }
}

개별 스레드 당 수행된 부하를 출력할 수 있는데,

Total Id: 1, Value: 2175
Total Id: 4, Value: 300
Total Id: 7, Value: 925
Total Id: 10, Value: 1550

4번 스레드의 경우 300에 끝난 반면 1번 스레드는 2175의 부하로 오랫동안 혼자서 남은 작업들을 수행했음을 짐작게 합니다.

위의 코드로 알 수 있지만, PLinq의 작업 구획 나누기 규칙은 간단합니다. PLinq는 기본적으로 대상 작업의 수를 구할 수 있다면 WithDegreeOfParallelism으로 나누기를 한 구획을 분배합니다. 그렇기 때문에 "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 예제 코드에서 사용한 List<int>는 Count를 알 수 있는 IReadOnlyCollection을 구현하고 있으므로 그 수를 이용해 구획을 판정합니다.

그렇다면 만약 작업의 수를 모른다면 어떻게 될까요? 그럼 당연히 작업 분배는 List와는 다른 양상을 띠게 됩니다. 실제로 예제 코드에서 List 구성을,

var list = Enumerable.Range(0, 100).ToList();

다음과 같이만 바꿔도,

var list = Enumerable.Range(0, 100); // ToList를 하지 않았으므로 IEnumerable 반환

작업의 정확한 수를 알 수 없는 IEnumerable이 반환되므로 작업 부하는 이제 다음과 같이 바뀝니다.

// 실행마다 다른 결과
Total Id: 1, Value: 1120
Total Id: 4, Value: 1160
Total Id: 7, Value: 1469
Total Id: 10, Value: 1201

IEnumerable이기 때문에 어쩔 수 없이 작업을 분배하는 측은 IEnumerable.MoveNext 호출에 따라 작업을 분배하는 식으로 처리할 듯합니다.




자, 그럼 이것을 골고루 나누려면 어떻게 해야 할까요? "C# Parallel 병렬 분할 알고리즘 변경이 가능한가요?" 글의 답변에 있는 "에릭권"님의 답변이 이에 대한 훌륭한 답이 됩니다.

var list = Enumerable.Range(0, 100).ToList();
var queue = new ConcurrentQueue<int>(list);

Parallel.ForEach(Enumerable.Range(0, 4), _ =>
{
    while (queue.TryDequeue(out int p))
    {
        Console.WriteLine($"Id: {Environment.CurrentManagedThreadId}, Value: {p}");
        Thread.Sleep(p * 5);
    }
});

ForEach의 분할 특성을 이용했고, 작업을 Queue에서 빼오는 방식을 통해 먼저 작업을 끝낸 스레드가 후속 작업이 없어질 때까지 반복/수행하고 있습니다. 이전의 수행 부하 측정 코드를 집어넣어 보면 실행할 때마다 값이 달라지긴 하지만 다음과 같이 고르게 부하 분산이 됩니다.

// 실행마다 다름

Total Id: 4, Value: 1225
Total Id: 12, Value: 1250
Total Id: 13, Value: 1277
Total Id: 20, Value: 1198

오히려 IEnumerable을 자동으로 분배해 주던 PLinq의 것보다 골고루 더 분산이 됩니다. 사실 질문 자체는 "병렬 분할 알고리즘 변경"이지만 오히려 질문자의 상황에는 "에릭권"님의 답변이 올바른 해결책입니다.




자... 그럼 제목에 맞게 "병렬 분할 알고리즘 변경"이라는 측면으로 답변을 해보겠습니다. 마이크로소프트는 Partition에 대한 사용자 정의를 할 수 있는 방법을 제공하고 있는데,

Custom Partitioners for PLINQ and TPL
; https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/custom-partitioners-for-plinq-and-tpl

바로 Partitioner<T>를 구현하기만 하면 됩니다. 실제로 "에릭권"님의 답변이 보여주는 방식을 Partitioner를 이용해 다음과 같이 만들어 둘 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    ConcurrentQueue<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = new ConcurrentQueue<T>(list.ToArray());
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        ConcurrentQueue<T> _list;

        public EnumerateFromList(ConcurrentQueue<T> list)
        {
            _list = list;
        }

        public IEnumerator<T> GetEnumerator()
        {
            while (_list!.TryDequeue(out T? item))
            {
                yield return item;
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

Partitioner가 해야 할 가장 중요한 작업은 GetPartitions 메서드에서 작업을 열거할 IEnumerable 배열을 인자로 넘어온 partitionCount 만큼 반환하는 것입니다. 즉, WithDegreeOfParallelism으로 지정한 값이 partitionCount로 넘어오게 되고, 여기서 반환한 IEnumerable 배열에 대해 하나씩 스레드 풀의 스레드가 맡아 작업을 처리하는 식입니다.

일단 이렇게 Partitioner를 만들어 두면 사용법은 이전과 유사하게 다룰 수 있습니다.

var list = Enumerable.Range(0, 100).ToList();

var partitioner = new ListPartitioner<int>(list);
partitioner.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    Console.WriteLine($"Id: {Thread.CurrentThread.ManagedThreadId}, Value: {i}");
    Thread.Sleep(i * 5);
});

보는 바와 같이 AsParallel()을 호출하고 있는데, 이게 가능한 이유는 AsParallel 확장 메서드가 기본적으로 IEnumerable 인터페이스와 함께 Partitioner<T>에 대해서도 정의돼 있기 때문입니다.

혹은, ConcurrentQueue를 사용할 필요 없이 단순히 인덱스를 지정하는 식으로 좀 더 Partitioner 본연의 기능에 가깝게 다음과 같이 구현하는 것도 가능합니다.

class RefInteger
{
    int _value = -1;

    public int GetNext()
    {
        int newValue = Interlocked.Increment(ref _value);
        return newValue;
    }
}

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;
    RefInteger _position = new RefInteger();

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new EnumerateFromList(_list, _position).GetEnumerator();
        }

        return partitions;
    }

    private class EnumerateFromList : IEnumerable<T>
    {
        List<T> _list;
        RefInteger _position;

        public EnumerateFromList(List<T> list, RefInteger position)
        {
            _list = list;
            _position = position;
        }

        public IEnumerator<T> GetEnumerator()
        {
            do
            {
                int idx = _position.GetNext();
                if (idx >= _list.Count)
                {
                    yield break;
                }

                yield return _list[idx];
            } while (true);
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

"병렬 분할 알고리즘 변경"이라는 측면에서 봤을 때 이렇게 만들어 둔 Partitioner가 유용할 수 있겠지만, 아마도 대부분의 경우에는 그냥 "에릭권"님의 답변처럼 직접적으로 구현하는 것이 더 이해하기는 쉬울 것입니다. ^^ (재미있게도, 마이크로소프트의 Partitioner 도움말에 실린 예제 코드에 정확히 위의 역할과 동일한 SingleElementPartitioner를 구현하고 있습니다.)





참고로, 좀 더 단순하게라면, 이러한 구획 나누기는 list의 순서를 재정렬하는 것으로도 가능합니다. 구간마다 들어가는 요소를, 다음과 같이 각각 교차해서 PLinq에 전달해도 되는 것입니다.

0,4,8,12,16,20,24,28,32,36,40,44,48,52,56,60,64,68,72,76,80,84,88,92,96
1,5,9,13,17,21,25,29,33,37,41,45,49,53,57,61,65,69,73,77,81,85,89,93,97
2,6,10,14,18,22,26,30,34,38,42,46,50,54,58,62,66,70,74,78,82,86,90,94,98
3,7,11,15,19,23,27,31,35,39,43,47,51,55,59,63,67,71,75,79,83,87,91,95,99

이를 위해 간단하게 다음의 코드를 하나 넣어주면,

var list = Enumerable.Range(0, 100).ToList();

List<int> ordered = list.OrderBy(i => i % 4).ToList();
ordered.AsParallel().WithDegreeOfParallelism(4).ForAll(i =>
{
    // ...[생략]...
});

기본적인 파티션 규칙에 따라 부하 분산 효과를 갖게 됩니다. 실제 출력 결과도 이런데요,

Total Id: 1, Value: 1275
Total Id: 4, Value: 1250
Total Id: 10, Value: 1200
Total Id: 11, Value: 1225

물론, 저 코드는 OrderBy로 인해 정렬 작업이 추가되는 부하가 있으니 권장할 수는 없습니다. 단지, 기본 제공하는 Partitioner의 규칙에 실어 자연스럽게 구획을 나누게 한다는 점에서 의미가 있다고만 보면 되겠습니다.

게다가 위의 규칙 역시도 Partitioner 본연의 개념을 이용한다면 다음과 같이 (이번엔 정렬 작업 없이) 구현할 수 있습니다.

class ListPartitioner<T> : Partitioner<T>
{
    List<T> _list;

    public ListPartitioner(List<T> list)
    {
        _list = list;
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        IEnumerator<T>[] partitions = new IEnumerator<T>[partitionCount];
        for (int i = 0; i < partitionCount; i++)
        {
            partitions[i] = new HopEnumerate(_list, i, partitionCount).GetEnumerator();
        }

        return partitions;
    }

    private class HopEnumerate: IEnumerable<T>
    {
        List<T> _list;
        int _start;
        int _hop;

        public HopEnumerate(List<T> list, int start, int hop) 
            => (_list, _start, _hop) = (list, start, hop);    

        public IEnumerator<T> GetEnumerator()
        {
            for (int i = _start; i < _list.Count; i += _hop)
            {
                yield return _list[i];
            }
        }

        IEnumerator IEnumerable.GetEnumerator() =>
            ((IEnumerable<T>)this).GetEnumerator();
    }
}

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]







[최초 등록일: ]
[최종 수정일: 10/16/2023]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... 91  92  93  94  95  96  97  98  99  100  101  102  103  104  [105]  ...
NoWriterDateCnt.TitleFile(s)
11299정성태9/9/201719649개발 환경 구성: 330. Hyper-V VM의 Internal Network를 Private 유형으로 만드는 방법
11298정성태9/8/201722919VC++: 119. EnumProcesses / EnumProcessModules API 사용 시 주의점 [1]
11297정성태9/8/201719584디버깅 기술: 96. windbg - 풀 덤프에 포함된 모든 닷넷 모듈을 파일로 저장하는 방법
11296정성태9/8/201722784웹: 36. Edge - "이 웹 사이트는 이전 기술에서 실행되며 Internet Explorer에서만 작동합니다." 끄는 방법
11295정성태9/7/201720171디버깅 기술: 95. Windbg - .foreach 사용법
11294정성태9/4/201719965개발 환경 구성: 329. 마이크로소프트의 CoreCLR 프로파일러 예제 빌드 방법 [1]
11293정성태9/4/201720463개발 환경 구성: 328. Visual Studio(devenv.exe)를 배치 파일(.bat)을 통해 실행하는 방법
11292정성태9/4/201718738오류 유형: 419. Cannot connect to WMI provider - Invalid class [0x80041010]
11291정성태9/3/201720617개발 환경 구성: 327. 아파치 서버 2.4를 위한 mod_aspdotnet 마이그레이션
11290정성태9/3/201723825개발 환경 구성: 326. 아파치 서버에서 ASP.NET을 실행하는 mod_aspdotnet 모듈 [2]
11289정성태9/3/201721449개발 환경 구성: 325. GAC에 어셈블리 등록을 위해 gacutil.exe을 사용하는 경우 주의 사항
11288정성태9/3/201718230개발 환경 구성: 324. 윈도우용 XAMPP의 아파치 서버 구성 방법
11287정성태9/1/201727450.NET Framework: 680. C# - 작업자(Worker) 스레드와 UI 스레드 [11]
11286정성태8/28/201714792기타: 67. App Privacy Policy
11285정성태8/28/201723373.NET Framework: 679. C# - 개인 키 보안의 SFTP를 이용한 파일 업로드파일 다운로드1
11284정성태8/27/201721385.NET Framework: 678. 데스크톱 윈도우 응용 프로그램에서 UWP 라이브러리를 이용한 비디오 장치 열람하는 방법 [1]파일 다운로드1
11283정성태8/27/201717172오류 유형: 418. CSS3117: @font-face failed cross-origin request. Resource access is restricted.
11282정성태8/26/201719613Math: 22. 행렬로 바라보는 피보나치 수열
11281정성태8/26/201721448.NET Framework: 677. Visual Studio 2017 - NuGet 패키지를 직접 참조하는 PackageReference 지원 [2]
11280정성태8/24/201718445디버깅 기술: 94. windbg - 풀 덤프에 포함된 모든 모듈을 파일로 저장하는 방법
11279정성태8/23/201730099.NET Framework: 676. C# Thread가 Running 상태인지 아는 방법
11278정성태8/23/201718249오류 유형: 417. TFS - Warning - Unable to refresh ... because you have a pending edit. [1]
11277정성태8/23/201719472오류 유형: 416. msbuild - error MSB4062: The "TransformXml" task could not be loaded from the assembly
11276정성태8/23/201723783.NET Framework: 675. C# - (파일) 확장자와 연결된 실행 파일 경로 찾기 [2]파일 다운로드1
11275정성태8/23/201732756개발 환경 구성: 323. Visual Studio 설치 없이 빌드 환경 구성 - Visual Studio 2017용 Build Tools [1]
11274정성태8/22/201719330.NET Framework: 674. Thread 타입의 Suspend/Resume/Join 사용 관련 예외 처리
... 91  92  93  94  95  96  97  98  99  100  101  102  103  104  [105]  ...