Microsoft MVP성태의 닷넷 이야기
.NET Framework: 375. System.Net.Sockets.Socket이 Thread-safe할까? [링크 복사], [링크+제목 복사],
조회: 17882
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 2개 있습니다.)

System.Net.Sockets.Socket이 Thread-safe할까?

System.Net.Sockets.Socket 타입의 여러 메서드 중에서도 결국 궁금한 것은 과연 Send 메서드가 thread-safe하냐에 대해서입니다.

MSDN 문서를 보면 .NET에 포함된 대부분의 BCL 타입들에 대해서 "Thread Safety" 설명 부분은 거의 일관적으로 다음과 같이 적혀 있습니다.

Any public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe.


즉, 정적(static) 메서드는 thread-safe하지만 그 외 인스턴스 메서드는 thread-safe하지 않습니다. 사실 "members"라고 표현되어 있지만 저는 이 단어가 오타가 아닌가 생각됩니다. 왜냐하면 "member"에는 field도 포함되어 있는데 static 필드에 대한 thread-safe을 보장하는 방법은 없기 때문입니다. 따라서, static 유형의 method와 property에 대해서만 thread-safe하다고 보면 될 것입니다.

그런데, 가끔 아닌 것도 있습니다. ^^ 그래서 문서를 잘 봐야 합니다. 그 대표적인 예가 바로 Socket입니다.

Socket Class
; https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket

위의 문서에 마지막을 보면 Thread Safety에 대해 다음과 같이 언급하고 있습니다.

Instances of this class are thread safe.

그렇습니다. System.Net.Sockets.Socket은 Thread-safe 합니다.




실제로 이를 확인할 수 있는 테스트 프로그램을 만들 수 있을까요? 간단한 수준으로 제가 생각한 바는 이렇습니다. 클라이언트 측은 다음과 같은 기준을 따르도록 만들고,

  • 버퍼의 내용은 "[4바이트 고유ID][버퍼길이][일련번호]:[0123456789001234567890...반복...]
  • 한 번의 Send 메서드에 전달되는 [01234567890...반복...] 유형의 데이터 양은 100,000바이트 정도
  • Send 메서드를 10,000번 수행

반면, 서버 측은 다음과 같은 검증 절차를 수행합니다.

  • Receive로 읽어들이는 모든 데이터의 내용이 "0123456789...."으로 반복되는지 확인
  • 10,000번 전달된 내용의 일련번호가 모두 채워졌는지 확인

코드로 좀 더 설명해 볼까요? ^^ 우선, 서버 측 코드는 다음과 같이 작성될 수 있습니다.

namespace SocketServerSample
{
    class Program
    {
        static void Main(string[] args)
        {
            using (Socket srvSocket =
                new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                int port = 11200;

                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
                srvSocket.Bind(endPoint);
                srvSocket.Listen(10);

                while (true)
                {
                    Socket clntSocket = srvSocket.Accept();
                    Tuple<Socket, int> threadParam = new Tuple<Socket, int>(clntSocket, 0);
                    ThreadPool.QueueUserWorkItem(CheckClientPacket, threadParam);
                }
            }
        }

        static void CheckClientPacket(object param)
        {
            Tuple<Socket, int> threadParam = param as Tuple<Socket, int>;
            int instanceId = 0;

            try
            {
                using (Socket clntSocket = threadParam.Item1)
                {
                    long totalReceived = 0;
                    SortedList<int, bool> sequenceList = new SortedList<int, bool>();
                    int bodyCount = 0;

                    byte [] instanceIdBuf = MustReadBuffer(clntSocket, 4); // 4바이트로 넘어온 고유 클라이언트 ID를 알아내고,
                    instanceId = BitConverter.ToInt32(instanceIdBuf, 0);

                    Console.WriteLine(": Accepted and Processing..." + instanceId.ToString("X"));

                    while (true)
                    {
                        byte[] headerBuf = MustReadBuffer(clntSocket, 4); // 패킷의 전체 길이를 알아내고,
                        if (headerBuf == null)
                        {
                            break;
                        }

                        int bodyLength = BitConverter.ToInt32(headerBuf, 0); // 패킷 길이: bodyLength
                        byte[] bodyBuf = MustReadBuffer(clntSocket, bodyLength); // bodyLength만큼의 데이터를 모두 읽어낸다.
                        if (bodyBuf == null)
                        {
                            break;
                        }

                        totalReceived += bodyLength + headerBuf.Length;

                        string txt = Encoding.UTF8.GetString(bodyBuf); // UTF8 디코딩해서 원본 문자열 복원하고,
                        int sequencePos = txt.IndexOf(':'); // [일련번호]:[0123456789....]에서 [일련번호] 값만을 추출.
                        if (sequencePos == -1)
                        {
                            Console.WriteLine("No Sequence number");
                            break;
                        }

                        string sequenceText = txt.Substring(0, sequencePos); 
                        int sequenceNo = Int32.Parse(sequenceText);

                        // 클라이언트로부터 받은 순서 값을 보관
                        //  - 다중 스레드로 보낸 경우 차례대로 수신되지 않으므로.
                        try
                        {
                            sequenceList.Add(sequenceNo, true);
                        }
                        catch
                        {
                            Console.WriteLine("Failed: duplicated seq No. " + sequenceNo + ": " + txt.Substring(0, 20));
                            break;
                        }

                        bool bodyResult = IsBodyComplete(txt); // [0123456789...] 데이터가 올바르게 구성되어 있는지 검사
                        if (bodyResult == false)
                        {
                            Console.WriteLine("Invalid body data");
                            break;
                        }

                        bodyCount++;
                    }

                    // 하나의 클라이언트와 통신이 완료된 후, 검증을 한다.

                    // 모든 패킷이 누락되지 않고 왔는지 검증
                    bool seqResult = IsSequenceIncrement(instanceId, sequenceList);
                    if (seqResult == true)
                    {
                        Console.WriteLine("Thread-safety: Success");
                    }

                    sequenceList.Clear();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine();
                Console.WriteLine(instanceId.ToString("X") + ": exception occurred");
                Console.WriteLine(ex.ToString());
            }
        }

        private static bool IsBodyComplete(string item)
        {
            int pos = item.IndexOf(':');
            string txt = item.Substring(pos + 1);

            pos = 0;
            int cmp = 48; // 48 == '0'
            foreach (var ch in txt)
            {
                if (cmp != (int)ch)
                {
                    Console.WriteLine("Failed: at " + pos);
                    return false;
                }

                pos++;
                cmp++;
                if (cmp == 58) // 57 == '9'
                {
                    cmp = 48;
                }
            }

            return true;
        }

        private static bool IsSequenceIncrement(int instanceId, SortedList<int, bool> sequenceList)
        {
            int oldItem = sequenceList.Keys.Min() - 1;

            foreach (var item in sequenceList.Keys)
            {
                if ((item - oldItem) != 1)
                {
                    Console.WriteLine(instanceId.ToString("X") + ": Failed:" + oldItem + " and " + item);
                    return false;
                }

                oldItem = item;
            }

            return true;
        }

        private static byte[] MustReadBuffer(Socket clntSocket, int mustRead)
        {
            byte[] result = null;

            using (MemoryStream ms = new MemoryStream())
            {
                while (true)
                {
                    byte[] byteBuf = new byte[mustRead];
                    int recv = 0;
                    try
                    {
                        recv = clntSocket.Receive(byteBuf, mustRead, SocketFlags.None);
                    }
                    catch (System.Net.Sockets.SocketException ex)
                    {
                        // 클라이언트 측에서 데이터를 전송 도중,
                        //      1) 소켓을 Close한 경우
                        //      2) 클라이언트 응용 프로그램을 강제 종료하는 경우
                        return null;
                    }

                    if (recv == 0)
                    {
                        Console.WriteLine("Receive returns 0");
                        return null;
                    }

                    ms.Write(byteBuf, 0, recv);
                    mustRead -= recv;
                    if (mustRead == 0)
                    {
                        break;
                    }
                }

                ms.Flush();

                result = ms.ToArray();
            }

            return result;
        }
    }
}

클라이언트는 비교를 위해 단일 스레드와 다중 스레드로 나눠서 작성해 보겠습니다. 우선, 단일 스레드로 Send하는 Socket 클라이언트는 다음과 같이 작성합니다.

namespace SocketClientST
{
    class Program
    {
        static void Main(string[] args)
        {
            int instanceId = 0;

            try
            {
                string body = string.Empty;
                StringBuilder sb = new StringBuilder();

                // 0123456789... 가 10,000번 반복되는 문자열을 생성
                Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
                for (int i = 0; i < 10000; i++)
                {
                    sb.Append(body);
                }

                body = sb.ToString();

                int loopCount = 10000;
                int dot = loopCount / (80 * 25);

                // 단일 스레드에서 1개의 소켓 인스턴스로 10,000번 Send 테스트
                using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                {
                    IPAddress target = IPAddress.Loopback;
                    // IPAddress target = IPAddress.Parse("192.168.0.17");
                    EndPoint serverEP = new IPEndPoint(target, 11200);

                    int retryCount = 3;
                    bool connected = false;
                    while (retryCount-- > 0)
                    {
                        try
                        {
                            socket.Connect(serverEP);
                            connected = true;
                        }
                        catch { }
                    }

                    if (connected == false)
                    {
                        Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                        return;
                    }

                    byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                    socket.Send(instanceIdBuf);

                    int failedAt = -1;

                    // 0123456789... 가 10,000번 반복되는 문자열을 가진 CommonPacket 인스턴스를 
                    // 10,000번 루프를 돌면서 Socket.Send로 전송
                    for (int i = 0; i < loopCount; i++)
                    {
                        CommonPacket packet = new CommonPacket(i);
                        packet.AddData(body);

                        byte[] dataBuf = packet.GetBuffer();
                        bool result = MustSendBuffer(socket, dataBuf, dataBuf.Length);
                        if (result == false)
                        {
                            failedAt = i;
                            break;
                        }

                        if (i % dot == 0)
                        {
                            Console.Write(((int)instanceId).ToString("X") + ",");
                        }
                    }

                    socket.Close();

                    Console.WriteLine();

                    sb = new StringBuilder();
                    if (failedAt == -1)
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " is completed");
                        sb.AppendLine("TCP Client socket: Closed");
                        sb.AppendLine(loopCount + " times: data sent");
                        sb.AppendLine((body.Length / 1024) + "KB / packet");
                    }
                    else
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " - failed");
                        sb.AppendLine("Send failed at: " + failedAt);
                    }

                    Console.WriteLine(sb.ToString());
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        // 전달된 버퍼를 Socket.Send로 전송
        private static bool MustSendBuffer(Socket socket, byte[] dataBuf, int mustSend)
        {
            int pos = 0;

            while (true)
            {
                int sentLength = 0;

                try
                {
                    // blocking call인 경우 sentLength == mustSend이지만!
                    sentLength = socket.Send(dataBuf, pos, mustSend, SocketFlags.None);
                }
                catch (Exception ex)
                {
                    int error = Marshal.GetLastWin32Error();
                    Console.WriteLine(error + ": " + ex.ToString());
                    return false;
                }

                if (sentLength == 0)
                {
                    return false;
                }

                mustSend -= sentLength;
                pos += sentLength;
                if (mustSend == 0)
                {
                    return true;
                }
            }
        }
    }

    public class CommonPacket
    {
        int _seq = 1;

        MemoryStream ms = new MemoryStream();

        public CommonPacket(int seqenceId)
        {
            _seq = seqenceId;
        }

        public void AddData(string txt)
        {
            int bodyLength = 0;
            string seqText = _seq.ToString();

            byte[] dataBuf = Encoding.UTF8.GetBytes(txt);
            byte[] seqBuf = Encoding.UTF8.GetBytes(seqText);

            bodyLength = dataBuf.Length + seqBuf.Length + 1; // 1 == ':'

            byte[] headerLength = BitConverter.GetBytes(bodyLength);

            ms.Write(headerLength, 0, headerLength.Length);

            ms.Write(seqBuf, 0, seqBuf.Length);
            ms.WriteByte((byte)':');
            ms.Write(dataBuf, 0, dataBuf.Length);
            ms.Flush();
        }

        public byte[] GetBuffer()
        {
            return ms.ToArray();
        }
    }
}

테스트 해보니, 서버 측 프로그램과 클라이언트의 정상 동작이 확인되었습니다. 마지막으로, 클라이언트 측을 다중 스레드를 이용해 Send를 호출하는 것으로 바꿔 보겠습니다.

namespace SocketClientMT
{
    class Program
    {
        class ThreadParam
        {
            public List<CommonPacket> Packets;
            public Socket Socket;
            public int Sent;
        }

        static void Main(string[] args)
        {
            int instanceId = 0;

            string body = string.Empty;
            StringBuilder sb = new StringBuilder();

            Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
            for (int i = 0; i < 10000; i++)
            {
                sb.Append(body);
            }

            body = sb.ToString();

            int loopCount = 10000;

            List<CommonPacket> packets = new List<CommonPacket>();

            // 1개의 소켓을 생성하고,
            using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                IPAddress target = IPAddress.Loopback; 
                // IPAddress target = IPAddress.Parse("192.168.0.17");
                EndPoint serverEP = new IPEndPoint(target, 11200);

                int retryCount = 3;
                bool connected = false;
                while (retryCount-- > 0)
                {
                    try
                    {
                        socket.Connect(serverEP);
                        connected = true;
                    }
                    catch { }
                }

                if (connected == false)
                {
                    Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                    return;
                }

                byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                socket.Send(instanceIdBuf);

                // 일련번호가 붙은 10,000개의 버퍼를 준비한 다음,
                for (int i = 0; i < loopCount; i++)
                {
                    CommonPacket packet = new CommonPacket(i);
                    packet.AddData(body);

                    packets.Add(packet);
                }

                // 20개의 스레드에서 List에 보관된 CommonPacket을 가져가서 같은 소켓 인스턴스의 Send에 전달
                List<Thread> threads = new List<Thread>();
                ThreadParam threadParam = new ThreadParam();
                threadParam.Packets = packets;
                threadParam.Socket = socket;

                for (int i = 0; i < 20; i++)
                {
                    Thread aThread = new Thread(sendBufferThread);
                    aThread.IsBackground = true;
                    aThread.Start(threadParam);

                    threads.Add(aThread);
                }

                // 모든 스레드의 실행이 종료될 때까지 대기
                foreach (var item in threads)
                {
                    item.Join();
                }

                socket.Close();
            }
        }

        // 스레드 메서드: Packets 리스트에 보관된 CommonPacket 데이터를 경쟁적으로 가져와서
        //               서버 측에 Send 메서드로 전달.
        private static void sendBufferThread(object obj)
        {
            ThreadParam threadParam = (ThreadParam)obj;

            while (true)
            {
                CommonPacket packet = null;

                lock (threadParam.Packets)
                {
                    if (threadParam.Packets.Count == 0)
                    {
                        break;
                    }

                    packet = threadParam.Packets[0];
                    threadParam.Packets.RemoveAt(0);
                    threadParam.Sent++;
                }

                byte[] dataBuf = packet.GetBuffer();
                MustSendBuffer(threadParam.Socket, dataBuf, dataBuf.Length);
            }
        }
    }
}

물론, 다중 스레드 클라이언트도 실행해 보면 서버 측에서의 검증 결과를 잘 통과하지만 이것은 테스트에 의한 실험값에 불과하기 때문에 절대적인 증명이라고 볼 수는 없습니다. 그래도 분명히 MSDN 문서에 thread-safe하다고 되어 있으므로 위의 예제 코드는 단순 확인용이라는 수준으로 놓고 보면 되겠습니다. (혹시, 테스트 코드의 조건에 의문 사항이나 개선이 필요하면 덧글 부탁드립니다.)

첨부 파일은 위의 코드를 반영한 서버, 단일 스레드 클라이언트, 다중 스레드 클라이언트 예제 프로젝트를 담고 있습니다.





[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 7/17/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2013-08-06 11시54분
[ryujh] 안녕하세요. 소켓 소스 많은 도움될것 같습니다.

소스 분석 중 서버측 소스에서 IsSequenceIncrement 메소드 에 SortedList 를 사용하여 정렬된 키값이 연속적이지 않으면 검증되지 않은 것으로 나오는데

SortedList 를 사용하지 않아도 IsSequenceIncrement 메소드 에서 Sort 하여 키를 정렬시켜도 문제 없는 것이죠?

Thread-safe 관련 내용은 더 분석해보겠습니다. 이상입니다.
[guest]
2013-08-07 12시24분
@ryujh 님, 맞습니다. 굳이 SortedList를 사용하지 않고 일반 List에서 일련 번호만 제대로 검증할 수 있다면 어떤 식으로 해도 상관없습니다. ^^
정성태

1  2  [3]  4  5  6  7  8  9  10  11  12  13  14  15  ...
NoWriterDateCnt.TitleFile(s)
13596정성태4/14/20242211닷넷: 2238. C# - WAV 기본 파일 포맷파일 다운로드1
13595정성태4/13/20242302닷넷: 2237. C# - Audio 장치 열기 (Windows Multimedia, NAudio)파일 다운로드1
13594정성태4/12/20242301닷넷: 2236. C# - Audio 장치 열람 (Windows Multimedia, NAudio)파일 다운로드1
13593정성태4/8/20242363닷넷: 2235. MSBuild - AccelerateBuildsInVisualStudio 옵션
13592정성태4/2/20243080C/C++: 165. CLion으로 만든 Rust Win32 DLL을 C#과 연동 [1]
13591정성태4/2/20242203닷넷: 2234. C# - WPF 응용 프로그램에 Blazor App 통합파일 다운로드1
13590정성태3/31/20242071Linux: 70. Python - uwsgi 응용 프로그램이 k8s 환경에서 OOM 발생하는 문제
13589정성태3/29/20242283닷넷: 2233. C# - 프로세스 CPU 사용량을 나타내는 성능 카운터와 Win32 API파일 다운로드1
13588정성태3/28/20242778닷넷: 2232. C# - Unity + 닷넷 App(WinForms/WPF) 간의 Named Pipe 통신 [2]파일 다운로드1
13587정성태3/27/20242327오류 유형: 900. Windows Update 오류 - 8024402C, 80070643
13586정성태3/27/20243441Windows: 263. Windows - 복구 파티션(Recovery Partition) 용량을 늘리는 방법
13585정성태3/26/20242515Windows: 262. PerformanceCounter의 InstanceName에 pid를 추가한 "Process V2"
13584정성태3/26/20242597개발 환경 구성: 708. Unity3D - C# Windows Forms / WPF Application에 통합하는 방법 [4]파일 다운로드1
13583정성태3/25/20242476Windows: 261. CPU Utilization이 100% 넘는 경우를 성능 카운터로 확인하는 방법
13582정성태3/19/20242609Windows: 260. CPU 사용률을 나타내는 2가지 수치 - 사용량(Usage)과 활용률(Utilization)파일 다운로드1
13581정성태3/18/20242632개발 환경 구성: 707. 빌드한 Unity3D 프로그램을 C++ Windows Application에 통합하는 방법
13580정성태3/15/20242164닷넷: 2231. C# - ReceiveTimeout, SendTimeout이 적용되지 않는 Socket await 비동기 호출파일 다운로드1
13579정성태3/13/20242720오류 유형: 899. HTTP Error 500.32 - ANCM Failed to Load dll
13578정성태3/11/20242996닷넷: 2230. C# - 덮어쓰기 가능한 환형 큐 (Circular queue)파일 다운로드1
13577정성태3/9/20243287닷넷: 2229. C# - 닷넷을 위한 난독화 도구 소개 (예: ConfuserEx)
13576정성태3/8/20242612닷넷: 2228. .NET Profiler - IMetaDataEmit2::DefineMethodSpec 사용법
13575정성태3/7/20242850닷넷: 2227. 최신 C# 문법을 .NET Framework 프로젝트에 쓸 수 있을까요?
13574정성태3/6/20242703닷넷: 2226. C# - "Docker Desktop for Windows" Container 환경에서의 IPv6 DualMode 소켓
13573정성태3/5/20242581닷넷: 2225. Windbg - dumasync로 분석하는 async/await 호출
13572정성태3/4/20242825닷넷: 2224. C# - WPF의 Dispatcher Queue로 알아보는 await 호출의 hang 현상파일 다운로드1
13571정성태3/1/20242603닷넷: 2223. C# - await 호출과 WPF의 Dispatcher Queue 동작 확인파일 다운로드1
1  2  [3]  4  5  6  7  8  9  10  11  12  13  14  15  ...