Microsoft MVP성태의 닷넷 이야기
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 2개 있습니다.)

C# - 직접 만들어 보는 TaskScheduler 실습 (SingleThreadTaskScheduler)

일반적으로 Task에 할당된 코드는 다음과 같은 구문으로 실행합니다.

// Windows Forms + .NET 7

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }

    private void button1_Click(object sender, EventArgs e)
    {
        System.Diagnostics.Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: button1_Click");

        Task.Run(() =>
        {
            System.Diagnostics.Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: Task.Run");
            this.button1.Text = "TEST";
        });
    }
}

/* 출력 결과
1: button1_Click
10: Task.Run
*/

보는 바와 같이 button1_Click 이벤트 핸들러를 실행하는 UI 스레드가 있고, Task.Run에 전달된 코드를 실행하는 스레드는 ThreadPool로부터 빌려온 것입니다. (위의 코드는 2차 스레드에서 button1을 접근하므로 예외가 발생합니다.)

여기서, Task.Run이 사용하는 스레드 운영은 TaskScheduler 타입을 상속받아 사용자 정의할 수 있습니다. 위의 경우처럼 사용자가 명시하지 않은 경우, 기본적으로는 System.Threading.Tasks.ThreadPoolTaskScheduler 타입이 사용되는데, 이 값은 TaskScheduler.Default 정적 속성으로 반환됩니다.

System.Diagnostics.Trace.WriteLine($"{TaskScheduler.Default}: TaskScheduler.Default");

// 출력 결과: System.Threading.Tasks.ThreadPoolTaskScheduler: TaskScheduler.Default

전에도 설명했지만, Task.Run은 다음의 구문과 동일하게 취급할 수 있습니다.

// 간단하게는 이렇게,
Task.Run(action);

// 복잡하게는 이렇게 호출
Task.Factory.StartNew(func, CancellationToken.None, 
                TaskCreationOptions.DenyChildAttach, TaskScheduler.Default)

TaskScheduler.Default에 설정된 ThreadPoolTaskScheduler 타입의 소스 코드는 그리 복잡하지 않은데,

runtime/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ThreadPoolTaskScheduler.cs 
; https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/ThreadPoolTaskScheduler.cs

핵심 코드는 QueueTask 메서드로, 이것은 사용자 작업을 나타내는 "Task"를 ThreadPool의 스레드에 실어 실행하는 코드를 담고 있습니다.

protected internal override void QueueTask(Task task)
{
    TaskCreationOptions options = task.Options;
    if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
    {
        // Run LongRunning tasks on their own dedicated thread.
        new Thread(s_longRunningThreadWork)
        {
            IsBackground = true,
            Name = ".NET Long Running Task"
        }.UnsafeStart(task);
    }
    else
    {
        // Normal handling for non-LongRunning tasks.
        ThreadPool.UnsafeQueueUserWorkItemInternal(task), (options & TaskCreationOptions.PreferFairness) == 0);
    }
}

게다가 LongRunning인 경우는 별도의 Thread를 생성해 (Task 인스턴스로 대표되는) 사용자 작업을 처리합니다.

그러니까, 사실 Task 자체는 스레드에 대한 어떤 선택권도 없고, 순수하게 사용자가 맡긴 작업과 그 작업에 대한 속성을 나타내는 인스턴스에 불과한 것입니다.




자, 그럼 다른 유형의 TaskScheduler를 알아볼까요? ^^ Task와 연관돼 직접 지정하는 유형으로 FromCurrentSynchronizationContext 메서드도 별도의 TaskScheduler를 반환합니다.

private void button2_Click(object sender, EventArgs e)
{
    System.Diagnostics.Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: button2_Click");

    Task.Factory.StartNew(() =>
    {
        System.Diagnostics.Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: StartNew");
        this.button2.Text = "TEST";
    }, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.FromCurrentSynchronizationContext());
}

/* 출력 결과:
1: button2_Click
1: StartNew
*/

보는 바와 같이 TaskScheduler를 지정하려면 Task.Factory, 즉 TaskFactory 타입을 이용해야 합니다. 그리고 이때 TaskScheduler.FromCurrentSynchronizationContext 메서드는,

// https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskScheduler.cs#L344

public static TaskScheduler FromCurrentSynchronizationContext()
{
    return new SynchronizationContextTaskScheduler();
}

ThreadPoolTaskScheduler보다 더 간단한 구현 코드를 갖는 SynchronizationContextTaskScheduler 인스턴스를 반환합니다.

// https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskScheduler.cs#L567

// 아래의 타입을 여러분의 프로젝트에 그대로 추가하는 것도 가능합니다.
// 따라서 new를 한 인스턴스를 Task.Factory.StartNew의 TaskScheduler 인자에 전달해도 됩니다.
// 당연히 동작은 닷넷의 것과 동일하므로 전체적인 수행 과정을 이해하기 위한 용도로 사용할 수 있습니다. 
internal sealed class SynchronizationContextTaskScheduler : TaskScheduler
{
    private readonly SynchronizationContext m_synchronizationContext;

    internal SynchronizationContextTaskScheduler()
    {
        m_synchronizationContext = SynchronizationContext.Current ??
            throw new InvalidOperationException("TaskScheduler_FromCurrentSynchronizationContext_NoCurrent");
    }

    protected override void QueueTask(Task task)
    {
        m_synchronizationContext.Post(s_postCallback, (object)task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        if (SynchronizationContext.Current == m_synchronizationContext)
        {
            return TryExecuteTask(task);
        }
        else
        {
            return false;
        }
    }

    protected override IEnumerable<Task>? GetScheduledTasks() => null;

    public override int MaximumConcurrencyLevel => 1;

    private static readonly SendOrPostCallback s_postCallback = new SendOrPostCallback(PostCallback);

    private static void PostCallback(object? s)
    {
        // 원본 코드: ExecuteEntry가 internal이므로 호출이 불가능하기 때문에,
        //Debug.Assert(s is Task);
        //((Task)s).ExecuteEntry(); // with double-execute check because SC could be buggy

        // 수정 코드: Reflection을 이용해 호출하는 코드로 변경
        Task? task = s as Task;

        var mi = typeof(Task).GetMethod("ExecuteEntry", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
        mi?.Invoke(s, null);

        // 원본을 맞추기 위해 위의 코드를 사용했지만, 원래대로라면 이런 경우 TaskScheduler의 TryExecuteTask를 사용
        // 부모 클래스(TaskScheduler)에서 protected로 제공하는 TryExecuteTask는 task.ExecuteEntry 호출
        // 실제로 이 글의 마지막에 작성하는 SingleThreadTaskScheduler에서는 TryExecuteTask를 이용해 Task의 작업을 수행
    }
}

이 코드의 핵심도 마찬가지로 QueueTask 메서드인데, 하는 일이라고는 SynchronizationContext.Current, 위의 경우 Windows Forms이므로 System.Windows.Forms.WindowsFormsSynchronizationContextPost를 하는 것뿐입니다.

protected override void QueueTask(Task task)
{
    m_synchronizationContext.Post(s_postCallback, (object)task);
}

보는 바와 같이 (ThreadPoolTaskScheduler와는 달리) 별도의 스레드가 만들어진 것도 아니고, ThreadPool을 사용하지도 않았는데요, 따라서 위의 QueueTask와 WindowsFormsSynchronizationContext.Post 메서드까지 모두 Task.Factory.StartNew를 호출하는 스레드가 담당하게 됩니다. 물론, Post로 전달된 Delegate의 실행은 SynchronizationContext 종류에 따라 달라집니다.

이렇게 보니까 별로 신기할 것이 없죠? ^^




TaskScheduler는 사용자가 원한다면 얼마든지 새롭게 정의하는 것이 가능합니다. 예를 하나 들어볼까요? ^^ 너무 복잡하지 않은 걸로... 가령 SynchronizationContextTaskScheduler와는 달리 UI 스레드에 얽매이지는 않게 만들면서 오로지 "단일 스레드"만을 사용해 Task를 처리하는 스케줄러는 어떨까요?

일종의 STAThread 특성처럼, 호출을 (동기화가 필요 없도록) 직렬화하는 역할만 하는 것으로 대충 다음과 같이 만들 수 있습니다.

internal sealed class SingleThreadTaskScheduler : TaskScheduler
{
    static Thread _thread;
    static BlockingCollection<WorkItem> _workItems;

    class WorkItem
    {
        Task _task;
        public Task Task => _task;
        Func<Task, bool> _func;
        public Func<Task, bool> Func => _func;

        internal WorkItem(Func<Task, bool> func, Task task)
        {
            _task = task;
            _func = func;
        }
    }

    static SingleThreadTaskScheduler()
    {
        _workItems = new BlockingCollection<WorkItem>();

        _thread = new Thread(threadFunc);
        _thread.IsBackground = true;
        _thread.Start();
    }

    static void threadFunc()
    {
        while (true)
        {
            var item = _workItems.Take();
            item.Func(item.Task);
        }
    }

    protected override void QueueTask(Task task)
    {
        _workItems.Add(new WorkItem(TryExecuteTask, task));
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }

    protected override IEnumerable<Task>? GetScheduledTasks() => null;
    public override int MaximumConcurrencyLevel => 1;
}

QueueTask로 들어온 작업을, 미리 static 멤버로 생성해 두었던 Thread에서 실행하도록 만들었으므로 이를 이용하면,

private void button3_Click(object sender, EventArgs e)
{
    System.Diagnostics.Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: button3_Click");

    TaskScheduler staScheduler = new SingleThreadTaskScheduler();

    Task.Factory.StartNew(() =>
    {
        System.Diagnostics.Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: StartNew");
        this.Invoke(() => // SingleThreadTaskScheduler의 스레드에서 실행하므로 Control.Invoke 처리 필요
        {
            this.button3.Text = "TEST";
        });

    }, CancellationToken.None, TaskCreationOptions.None, staScheduler);
}

/* 출력 결과:
1: button3_Click
12: StartNew
*/

button3_Click을 몇 번을 실행해도 "StartNew" 출력은 매번 12번 스레드에서 실행되는 것을 확인할 수 있습니다.

(첨부 파일은 이 글의 예제 코드를 포함합니다.)




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 12/10/2022]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... 16  17  18  19  20  21  22  23  24  25  26  27  [28]  29  30  ...
NoWriterDateCnt.TitleFile(s)
12938정성태1/24/20229889개발 환경 구성: 633. Docker Desktop + k8s 환경에서 local 이미지를 사용하는 방법
12937정성태1/24/20227695.NET Framework: 1139. C# - ffmpeg(FFmpeg.AutoGen)를 이용해 오디오(mp2) 인코딩하는 예제(encode_audio.c) [2]파일 다운로드1
12936정성태1/22/20227632.NET Framework: 1138. C# - ffmpeg(FFmpeg.AutoGen)를 이용해 멀티미디어 파일의 메타데이터를 보여주는 예제(metadata.c)파일 다운로드1
12935정성태1/22/20227848.NET Framework: 1137. ffmpeg의 파일 해시 예제(ffhash.c)를 C#으로 포팅파일 다운로드1
12934정성태1/22/20227414오류 유형: 788. Warning C6262 Function uses '65564' bytes of stack: exceeds /analyze:stacksize '16384'. Consider moving some data to heap. [2]
12933정성태1/21/20227955.NET Framework: 1136. C# - ffmpeg(FFmpeg.AutoGen)를 이용해 MP2 오디오 파일 디코딩 예제(decode_audio.c)파일 다운로드1
12932정성태1/20/20228412.NET Framework: 1135. C# - ffmpeg(FFmpeg.AutoGen)로 하드웨어 가속기를 이용한 비디오 디코딩 예제(hw_decode.c) [2]파일 다운로드1
12931정성태1/20/20226498개발 환경 구성: 632. ASP.NET Core 프로젝트를 AKS/k8s에 올리는 과정
12930정성태1/19/20227178개발 환경 구성: 631. AKS/k8s의 Volume에 파일 복사하는 방법
12929정성태1/19/20226949개발 환경 구성: 630. AKS/k8s의 Pod에 Volume 연결하는 방법
12928정성태1/18/20227086개발 환경 구성: 629. AKS/Kubernetes에서 호스팅 중인 pod에 shell(/bin/bash)로 진입하는 방법
12927정성태1/18/20226866개발 환경 구성: 628. AKS 환경에 응용 프로그램 배포 방법
12926정성태1/17/20227371오류 유형: 787. AKS - pod 배포 시 ErrImagePull/ImagePullBackOff 오류
12925정성태1/17/20227444개발 환경 구성: 627. AKS의 준비 단계 - ACR(Azure Container Registry)에 docker 이미지 배포
12924정성태1/15/20228936.NET Framework: 1134. C# - ffmpeg(FFmpeg.AutoGen)를 이용한 비디오 디코딩 예제(decode_video.c) [2]파일 다운로드1
12923정성태1/15/20227810개발 환경 구성: 626. ffmpeg.exe를 사용해 비디오 파일을 MPEG1 포맷으로 변경하는 방법
12922정성태1/14/20226898개발 환경 구성: 625. AKS - Azure Kubernetes Service 생성 및 SLO/SLA 변경 방법
12921정성태1/14/20225797개발 환경 구성: 624. Docker Desktop에서 별도 서버에 설치한 docker registry에 이미지 올리는 방법
12920정성태1/14/20226571오류 유형: 786. Camtasia - An error occurred with the camera: Failed to Add Video Sampler.
12919정성태1/13/20226401Windows: 199. Host Network Service (HNS)에 의해서 점유되는 포트
12918정성태1/13/20226628Linux: 47. WSL - shell script에서 설정한 환경 변수가 스크립트 실행 후 반영되지 않는 문제
12917정성태1/12/20225833오류 유형: 785. C# - The type or namespace name '...' could not be found (are you missing a using directive or an assembly reference?)
12916정성태1/12/20225599오류 유형: 784. TFS - One or more source control bindings for this solution are not valid and are listed below.
12915정성태1/11/20225896오류 유형: 783. Visual Studio - We didn't find any interpreters
12914정성태1/11/20227900VS.NET IDE: 172. 비주얼 스튜디오 2022의 파이선 개발 환경 지원
12913정성태1/11/20228383.NET Framework: 1133. C# - byte * (바이트 포인터)를 FileStream으로 쓰는 방법 [1]
... 16  17  18  19  20  21  22  23  24  25  26  27  [28]  29  30  ...