Microsoft MVP성태의 닷넷 이야기
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 1개 있습니다.)
(시리즈 글이 3개 있습니다.)
.NET Framework: 830. C# - 비동기 호출을 취소하는 CancellationToken의 간단한 예제 코드
; https://www.sysnet.pe.kr/2/0/11888

닷넷: 2218. C# - (예를 들어, Socket) 비동기 I/O에 대한 await 호출 시 CancellationToken을 이용한 취소
; https://www.sysnet.pe.kr/2/0/13561

닷넷: 2231. C# - ReceiveTimeout, SendTimeout이 적용되지 않는 Socket await 비동기 호출
; https://www.sysnet.pe.kr/2/0/13580




C# - (예를 들어, Socket) 비동기 I/O에 대한 await 호출 시 CancellationToken을 이용한 취소

예전에,

C# - 비동기 호출을 취소하는 CancellationToken의 간단한 예제 코드
; https://www.sysnet.pe.kr/2/0/11888

CancellationToken을 설명하면서, 사용자가 직접 구현하는 취소 동작을 설명했습니다. 그러니까, 그 코드가 시사하는 바는, "마법은 없다"입니다. ^^ 모든 건, 개발자가 지정해 준 대로 동작하는 것이므로 CancellationToken을 이용한 취소도 결국 개발자가 어떻게 그것을 처리하느냐에 따라 달라집니다.

이번 글에서는 그에 대한 사례를 들어봅니다.




그나저나, 혹시 커널 레벨에 전달된 비동기 I/O를 어떻게 취소할 수 있는지 생각해 보셨나요? 닷넷 개발만 했다면 짐작할 수 없을 텐데요, Win32 시절의 개발자라면 비동기 I/O 원칙에 따라 CancelIo, CancelIoEx 함수 호출로 이어졌을 거라는 짐작을 할 수 있을 것입니다.

달리 말하면, CancellationToken을 이용한 비동기 I/O 메서드에 대한 취소를 원한다면 CancelIo/CancelIoEx API를 호출해야 하지만, 이것 역시 개발자가 그렇게 구현했어야만 하는 것입니다. 제가 이렇게 말했으니, 아마도 닷넷 코드의 비동기 I/O 코드가 언제나 그런 식으로 동작하는 것은 아니라는 것을 눈치채셨을 것입니다. ^^

여기서는 그에 대한 사례로 TcpClient의 NetworkStream.WriteAsync 사용 예를 보겠습니다. 우선, Write 동작에 대한 테스트를 받아주는 서버를 다음과 같이 구현해 주고,

internal class Program
{
    static void Main(string[] args)
    {
        Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint ep = new IPEndPoint(IPAddress.Any, 15000);
        socket.Bind(ep);
        socket.Listen(5);

        while (true)
        {
            Socket clntSocket = socket.Accept();
            Console.WriteLine($"Connected: {clntSocket.RemoteEndPoint}");
            ThreadPool.QueueUserWorkItem((arg) =>
            {
                while (true)
                {  
                    // 끊김만 감지
                    try
                    {
                        if (clntSocket.Poll(0, SelectMode.SelectRead))
                        {
                            byte[] buff = new byte[1];
                            if (clntSocket.Receive(buff, SocketFlags.Peek) == 0)
                            {
                                break;
                            }
                        }
                    }
                    catch 
                    {
                        break;
                    }

                    Thread.Sleep(1000);
                }
                Console.WriteLine($"Client disconnected: {clntSocket.RemoteEndPoint}");
                clntSocket.Close();
            });
        }
    }
}

클라이언트는, Send에서의 (일단은) blocking 테스트 여부를 위해 다음과 같이 만들어 보겠습니다.

internal class Program
{
    static void Main(string[] args)
    {
        Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        client.Connect("192.168.100.20", 15000);
        int length = client.SendBufferSize; // Windows 11의 경우 보통 65536

        long totalSent = 0;
        int count = 0;
        while (true)
        {
            byte[] buf = new byte[length];
            Console.WriteLine($"Sending: {buf.Length}");
            int sendLen = client.Send(buf, 0, buf.Length, SocketFlags.None); // 메서드 호출 후 곧바로 반환
            totalSent += sendLen;
            count++;
            Console.WriteLine($"[{count}] Sent: {buf.Length}, Total: {totalSent}");
        }

        // client.Close();
    }
}

/* 출력 결과:
Sending: 65536
[1] Sent: 65536, Total: 65536
Sending: 65536
...[생략]...
Sending: 65536
[49] Sent: 65536, Total: 3211264
Sending: 65536
[50] Sent: 65536, Total: 3276800
Sending: 65536
*/

테스트를 해보면, 3MB 정도에서 (대상 소켓에서 Receive를 하지 않으므로) 버퍼가 모두 차는 바람에 더 이상 Send를 하지 못하고 Send 호출에 blocking이 걸리는 것을 확인할 수 있습니다.

그럼, 이제 위의 예제를 비동기로 만들어 보면,

static async Task Main(string[] args)
{
    TcpClient client = new TcpClient();

    client.Connect("192.168.100.20", 15000);
    NetworkStream ns = client.GetStream();

    int length = client.SendBufferSize;

    long totalSent = 0;
    int count = 0;

    while (true)
    {
        byte[] buf = new byte[length];
        Console.WriteLine($"Sending: {buf.Length}");
        Task task = ns.WriteAsync(buf, 0, buf.Length);

        await task;

        totalSent += length;
        count++;
        Console.WriteLine($"[{count}] Sent: {buf.Length}, Total: {totalSent}");
    }

    // client.Close();
}

/* 출력 결과:
Sending: 65536
[1] Sent: 65536, Total: 65536
Sending: 65536
...[생략]...
Sending: 65536
[49] Sent: 65536, Total: 3211264
Sending: 65536
[50] Sent: 65536, Total: 3276800
Sending: 65536
*/

역시나 동일하게 3MB 송신 후 await에서 걸리는 것을 확인할 수 있습니다. 단지, 동기 버전과 다른 점이 있다면 위의 비동기 버전에서는 스레드의 blocking이 아닌, TCP Send가 완료되지 않아 await 이후의 분리된 코드 영역이 콜백에서 실행되지 않는 것입니다.




자, 이제 위의 비동기 호출에 대한 취소를 해볼 텐데요, 차이점을 알기 위해 .NET Framework과 .NET Core 환경으로 나눠서 실행할 것입니다. 우선, .NET Framework 프로젝트로 위의 코드에서 비동기 호출을 취소하기 위한 코드만 다음과 같이 살짝 추가해 줍니다.

CancellationTokenSource ct = new CancellationTokenSource();

ThreadPool.QueueUserWorkItem((arg) =>
{
    Thread.Sleep(5000); // 5초 후에, Cancel 호출
    ct.Cancel();
    Console.WriteLine("Cancel called!");
});

// ct.CancelAfter(5000);

while (true)
{
    byte[] buf = new byte[length];
    Console.WriteLine($"Sending: {buf.Length}");
    Task task = ns.WriteAsync(buf, 0, buf.Length, ct.Token);

    await task;

    totalSent += length;
    count++;
    Console.WriteLine($"[{count}] Sent: {buf.Length}, Total: {totalSent}");
}

실행해 보면, 화면에는 분명히 "Cancel called!" 문자열이 출력되지만 여전히 "await task;" 이후의 코드로는 나아가지 않습니다. (물론, 취소에 따른 예외 발생도 없습니다.) 하지만, 위의 코드를 그대로 .NET Core/5+ 환경, 여기서는 .NET 8 프로젝트에서 만들어 테스트하면 5초 후에 다음과 같은 출력이 나옵니다.

Cancel called!
Unhandled exception. System.OperationCanceledException: The operation was canceled.
   at System.Threading.CancellationToken.ThrowOperationCanceledException()
   at System.Threading.CancellationToken.ThrowIfCancellationRequested()
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
   at System.Threading.Tasks.ValueTask.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
--- End of stack trace from previous location ---
   at ConsoleApp5.Program.Main(String[] args)
   at ConsoleApp5.Program.<Main>(String[] args)

보는 바와 같이, CancellationTokenSource.Cancel 호출은 WriteAsync 호출을 대기하던 비동기 작업을 (.NET Framework과는 달리) 취소했습니다.




그 둘 간의 차이는 도대체 뭘까요? 당연히 구현상의 차이가 있겠죠. ^^ 우선, .NET Framework의 NetworkStream.WriteAsync 코드는 그 상위의 Stream 타입에 있는 WriteAsync를 호출합니다.

// referencesource/mscorlib/system/io/stream.cs
// https://github.com/microsoft/referencesource/blob/master/mscorlib/system/io/stream.cs

[HostProtection(ExternalThreading = true)]
[ComVisible(false)]
public virtual Task WriteAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
    // If cancellation was requested, bail early with an already completed task.
    // Otherwise, return a task that represents the Begin/End methods.
    return cancellationToken.IsCancellationRequested // CancellationToken 처리는 I/O 발생 전에만 체크
                ? Task.FromCancellation(cancellationToken)
                : BeginEndWriteAsync(buffer, offset, count); // .NET APM 비동기 호출의 Begin...과 End... 조합
}

// BeginEndWriteAsync 메서드는 CancellationToken 처리가 없음
private Task BeginEndWriteAsync(Byte[] buffer, Int32 offset, Int32 count)
{            
    return TaskFactory<VoidTaskResult>.FromAsyncTrim(
                this, new ReadWriteParameters { Buffer=buffer, Offset=offset, Count=count },
                (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
                (stream, asyncResult) => // cached by compiler
                {
                    stream.EndWrite(asyncResult);
                    return default(VoidTaskResult);
                });
}  

보는 바와 같이 WriteAsync 호출에 전달한 CancellationToken은 I/O 발생 전에만 한번 체크하고 이후 I/O 동작을 수행하는 BeginWrite로는 전달하지 않고 있습니다. 당연히, 이렇게 호출한 I/O는 이후 취소할 수 있는 방법이 없습니다.

반면, .NET 8의 WriteAsync는 Socket.SendAsyncForNetworkStream 메서드를 거쳐,

// NetworkStream.WriteAsync

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
    ValidateBufferArguments(buffer, offset, count);
    ThrowIfDisposed();
    if (!CanWrite)
    {
        throw new InvalidOperationException(SR.net_readonlystream);
    }

    try
    {
        return _streamSocket.SendAsyncForNetworkStream(
            new ReadOnlyMemory<byte>(buffer, offset, count),
            SocketFlags.None,
            cancellationToken).AsTask();
    }
    catch (Exception exception) when (!(exception is OutOfMemoryException))
    {
        throw WrapException(SR.net_io_writefailure, exception);
    }
}

Socket.SendAsyncForNetworkStream -> AwaitableSocketAsyncEventArgs.SendAsyncForNetworkStream -> Socket.SendAsync -> SocketAsyncEventArgs.DoOperationSend를 거치면서 CancellationToken이 계속 전달되고,

// Socket
public partial class Socket
{
    // ...[생략]...

    internal ValueTask SendAsyncForNetworkStream(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken)
    {
        if (cancellationToken.IsCancellationRequested)
        {
            return ValueTask.FromCanceled(cancellationToken);
        }

        AwaitableSocketAsyncEventArgs saea =
            Interlocked.Exchange(ref _singleBufferSendEventArgs, null) ??
            new AwaitableSocketAsyncEventArgs(this, isReceiveForCaching: false);

        Debug.Assert(saea.BufferList == null);
        saea.SetBuffer(MemoryMarshal.AsMemory(buffer));
        saea.SocketFlags = socketFlags;
        saea.WrapExceptionsForNetworkStream = true;
        return saea.SendAsyncForNetworkStream(this, cancellationToken);
    }

    private bool SendAsync(SocketAsyncEventArgs e, CancellationToken cancellationToken)
    {
        ThrowIfDisposed();

        ArgumentNullException.ThrowIfNull(e);

        // Prepare for and make the native call.
        e.StartOperationCommon(this, SocketAsyncOperation.Send);
        SocketError socketError;
        try
        {
            socketError = e.DoOperationSend(_handle, cancellationToken);
        }
        catch
        {
            // Clear in-use flag on event args object.
            e.Complete();
            throw;
        }

        return socketError == SocketError.IOPending;
    }

    // ...[생략]...

    internal sealed class AwaitableSocketAsyncEventArgs : SocketAsyncEventArgs, IValueTaskSource, IValueTaskSource<int>, IValueTaskSource<Socket>, IValueTaskSource<SocketReceiveFromResult>, IValueTaskSource<SocketReceiveMessageFromResult>
    {
        // ...[생략]...

        public ValueTask SendAsyncForNetworkStream(Socket socket, CancellationToken cancellationToken)
        {
            if (socket.SendAsync(this, cancellationToken))
            {
                _cancellationToken = cancellationToken;
                return new ValueTask(this, _mrvtsc.Version);
            }

            SocketError error = SocketError;

            ReleaseForSyncCompletion();

            return error == SocketError.Success ?
                default :
                ValueTask.FromException(CreateException(error));
        }

        // ...[생략]...
    }
}

SocketAsyncEventArgs의 ProcessIOCPResult 내에서 마침내 cancellationToken에 대해 UnsafeRegister를 호출해 등록한 callback으로 CancelIoEx 호출을 하고 있습니다.

// dotnet/runtime/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.Windows.cs

internal unsafe SocketError DoOperationSend(SafeSocketHandle handle, CancellationToken cancellationToken) => _bufferList == null ?
    DoOperationSendSingleBuffer(handle, cancellationToken) :
    DoOperationSendMultiBuffer(handle);

internal unsafe SocketError DoOperationSendSingleBuffer(SafeSocketHandle handle, CancellationToken cancellationToken)
{
    Debug.Assert(_asyncCompletionOwnership == 0, $"Expected 0, got {_asyncCompletionOwnership}");

    fixed (byte* bufferPtr = &MemoryMarshal.GetReference(_buffer.Span))
    {
        NativeOverlapped* overlapped = AllocateNativeOverlapped();
        try
        {
            var wsaBuffer = new WSABuffer { Length = _count, Pointer = (IntPtr)(bufferPtr + _offset) };

            SocketError socketError = Interop.Winsock.WSASend(
                handle,
                &wsaBuffer,
                1,
                out int bytesTransferred,
                _socketFlags,
                overlapped,
                IntPtr.Zero);

            return ProcessIOCPResult(socketError == SocketError.Success, bytesTransferred, ref overlapped, _buffer, cancellationToken);
        }
        catch when (overlapped is not null)
        {
            FreeNativeOverlapped(ref overlapped);
            throw;
        }
    }
}

private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred, ref NativeOverlapped* overlapped, Memory<byte> bufferToPin, CancellationToken cancellationToken)
{
    SocketError socketError = GetIOCPResult(success, ref overlapped);
    SocketFlags socketFlags = SocketFlags.None;

    if (socketError == SocketError.IOPending)
    {
        // Perform any required setup of the asynchronous operation.  Everything set up here needs to be undone in CompleteCore.CleanupIOCPResult.
        if (cancellationToken.CanBeCanceled)
        {
            Debug.Assert(_pendingOverlappedForCancellation == null);
            _pendingOverlappedForCancellation = overlapped;
            _registrationToCancelPendingIO = cancellationToken.UnsafeRegister(static s =>
            {
                // Try to cancel the I/O.  We ignore the return value (other than for logging), as cancellation
                // is opportunistic and we don't want to fail the operation because we couldn't cancel it.
                var thisRef = (SocketAsyncEventArgs)s!;
                SafeSocketHandle handle = thisRef._currentSocket!.SafeHandle;
                if (!handle.IsClosed)
                {
                    try
                    {
                        // 아래의 글은 OVERLAPPED 사용 시 CancelIO의 중요성을 보여줍니다.
                        // The case of the crash when destructing a std::map
                        // https://devblogs.microsoft.com/oldnewthing/20240927-00/?p=110320
                        bool canceled = Interop.Kernel32.CancelIoEx(handle, thisRef._pendingOverlappedForCancellation);
                        if (NetEventSource.Log.IsEnabled())
                        {
                            NetEventSource.Info(thisRef, canceled ?
                                "Socket operation canceled." :
                                $"CancelIoEx failed with error '{Marshal.GetLastPInvokeError()}'.");
                        }
                    }
                    catch (ObjectDisposedException)
                    {
                        // Ignore errors resulting from the SafeHandle being closed concurrently.
                    }
                }
            }, this);
        }
        if (!bufferToPin.Equals(default))
        {
            _singleBufferHandle = bufferToPin.Pin();
        }

        // ...[생략]...
    }

    // ...[생략]...
    return socketError;
}

(뭔가 코드가 많지만) ^^ 동작 자체는 깔끔하게 취소가 됩니다.




약간의 수고를 곁들인다면 .NET Framework 버전에서도 Cancel 자체의 과정은 병합할 수 있습니다. 이에 대해서는 다음의 글에서 자세하게 소개하고 있는데요,

Is there a way I can cause a running method to stop immediately with a cts.Cancel();
; https://stackoverflow.com/questions/59243161/is-there-a-way-i-can-cause-a-running-method-to-stop-immediately-with-a-cts-cance/59267214#59267214

위의 내용을 이번 글에서 작성한 .NET Framework 코드에 병합한다면 다음과 같은 식으로 할 수 있습니다.

while (true)
{
    byte[] buf = new byte[length];
    Console.WriteLine($"Sending: {buf.Length}");
    Task task = ns.WriteAsync(buf, 0, buf.Length, ct.Token);

    var cancelable = new Task(() => { }, ct.Token);
    await Task.WhenAny(task, cancelable);

    if (ct.IsCancellationRequested) // 혹은 ct.Token.ThrowIfCancellationRequested(); 호출로 cancel 예외 발생
    {
        Console.WriteLine("Task cancelled!");
        break;
    }

    totalSent += length;
    count++;
    Console.WriteLine($"[{count}] Sent: {buf.Length}, Total: {totalSent}");
}

단지, 위와 같은 경우에는 WriteAsync 호출 시 커널로 넘어간 비동기 I/O에 대한 IRP가 취소된 것은 아니므로 buffer가 pinning 된 채로 살아있게 됩니다. 하지만, 현실적으로 위와 같은 경우에는 어차피 소켓을 정리하는 수순으로 넘어갈 것이므로 이후 Socket.Dispose 단계를 거치면 커널의 소켓 I/O도 해제될 것이므로 결국엔 버퍼가 정리됩니다.




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 9/28/2024]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... 61  62  63  64  65  66  67  68  69  70  71  72  73  [74]  75  ...
NoWriterDateCnt.TitleFile(s)
12084정성태12/19/201919298디버깅 기술: 143. windbg/sos - Hashtable의 buckets 배열 내용을 모두 덤프하는 방법 (do_hashtable.py) [1]
12083정성태12/17/201922270Linux: 27. linux - lldb를 이용한 .NET Core 응용 프로그램의 메모리 덤프 분석 방법 [2]
12082정성태12/17/201920551오류 유형: 585. lsof: WARNING: can't stat() fuse.gvfsd-fuse file system
12081정성태12/16/201922401개발 환경 구성: 465. 로컬 PC에서 개발 중인 ASP.NET Core 웹 응용 프로그램을 다른 PC에서도 접근하는 방법 [5]
12080정성태12/16/201919561.NET Framework: 870. C# - 프로세스의 모든 핸들을 열람
12079정성태12/13/201921430오류 유형: 584. 원격 데스크톱(rdp) 환경에서 다중 또는 고용량 파일 복사 시 "Unspecified error" 오류 발생
12078정성태12/13/201921236Linux: 26. .NET Core 응용 프로그램을 위한 메모리 덤프 방법 [3]
12077정성태12/13/201920340Linux: 25. 자주 실행할 명령어 또는 초기 환경을 "~/.bashrc" 파일에 등록
12076정성태12/12/201918851디버깅 기술: 142. Linux - lldb 환경에서 sos 확장 명령어를 이용한 닷넷 프로세스 디버깅 - 배포 방법에 따른 차이
12075정성태12/11/201919653디버깅 기술: 141. Linux - lldb 환경에서 sos 확장 명령어를 이용한 닷넷 프로세스 디버깅
12074정성태12/10/201919322디버깅 기술: 140. windbg/Visual Studio - 값이 변경된 경우를 위한 정지점(BP) 설정(Data Breakpoint)
12073정성태12/10/201920868Linux: 24. Linux/C# - 실행 파일이 아닌 스크립트 형식의 명령어를 Process.Start로 실행하는 방법
12072정성태12/9/201917669오류 유형: 583. iisreset 수행 시 "No such interface supported" 오류
12071정성태12/9/201921174오류 유형: 582. 리눅스 디스크 공간 부족 및 safemode 부팅 방법
12070정성태12/9/201923092오류 유형: 581. resize2fs: Bad magic number in super-block while trying to open /dev/.../root
12069정성태12/2/201919485디버깅 기술: 139. windbg - x64 덤프 분석 시 메서드의 인자 또는 로컬 변수의 값을 확인하는 방법
12068정성태11/28/201928146디버깅 기술: 138. windbg와 Win32 API로 알아보는 Windows Heap 정보 분석 [3]파일 다운로드2
12067정성태11/27/201919561디버깅 기술: 137. 실제 사례를 통해 Debug Diagnostics 도구가 생성한 닷넷 웹 응용 프로그램의 성능 장애 보고서 설명 [1]파일 다운로드1
12066정성태11/27/201919219디버깅 기술: 136. windbg - C# PInvoke 호출 시 마샬링을 담당하는 함수 분석 - OracleCommand.ExecuteReader에서 OpsSql.Prepare2 PInvoke 호출 분석
12065정성태11/25/201917527디버깅 기술: 135. windbg - C# PInvoke 호출 시 마샬링을 담당하는 함수 분석파일 다운로드1
12064정성태11/25/201920440오류 유형: 580. HTTP Error 500.0/500.33 - ANCM In-Process Handler Load Failure
12063정성태11/21/201919373디버깅 기술: 134. windbg - RtlReportCriticalFailure로부터 parameters 정보 찾는 방법
12062정성태11/21/201918886디버깅 기술: 133. windbg - CoTaskMemFree/FreeCoTaskMem에서 발생한 덤프 분석 사례 - 두 번째 이야기
12061정성태11/20/201919316Windows: 167. CoTaskMemAlloc/CoTaskMemFree과 윈도우 Heap의 관계
12060정성태11/20/201920920디버깅 기술: 132. windbg/Visual Studio - HeapFree x64의 동작 분석
12059정성태11/20/201920079디버깅 기술: 131. windbg/Visual Studio - HeapFree x86의 동작 분석
... 61  62  63  64  65  66  67  68  69  70  71  72  73  [74]  75  ...