Microsoft MVP성태의 닷넷 이야기
.NET Framework: 375. System.Net.Sockets.Socket이 Thread-safe할까? [링크 복사], [링크+제목 복사],
조회: 25381
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 2개 있습니다.)

System.Net.Sockets.Socket이 Thread-safe할까?

System.Net.Sockets.Socket 타입의 여러 메서드 중에서도 결국 궁금한 것은 과연 Send 메서드가 thread-safe하냐에 대해서입니다.

MSDN 문서를 보면 .NET에 포함된 대부분의 BCL 타입들에 대해서 "Thread Safety" 설명 부분은 거의 일관적으로 다음과 같이 적혀 있습니다.

Any public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe.


즉, 정적(static) 메서드는 thread-safe하지만 그 외 인스턴스 메서드는 thread-safe하지 않습니다. 사실 "members"라고 표현되어 있지만 저는 이 단어가 오타가 아닌가 생각됩니다. 왜냐하면 "member"에는 field도 포함되어 있는데 static 필드에 대한 thread-safe을 보장하는 방법은 없기 때문입니다. 따라서, static 유형의 method와 property에 대해서만 thread-safe하다고 보면 될 것입니다.

그런데, 가끔 아닌 것도 있습니다. ^^ 그래서 문서를 잘 봐야 합니다. 그 대표적인 예가 바로 Socket입니다.

Socket Class
; https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket

위의 문서에 마지막을 보면 Thread Safety에 대해 다음과 같이 언급하고 있습니다.

Instances of this class are thread safe.

그렇습니다. System.Net.Sockets.Socket은 Thread-safe 합니다.




실제로 이를 확인할 수 있는 테스트 프로그램을 만들 수 있을까요? 간단한 수준으로 제가 생각한 바는 이렇습니다. 클라이언트 측은 다음과 같은 기준을 따르도록 만들고,

  • 버퍼의 내용은 "[4바이트 고유ID][버퍼길이][일련번호]:[0123456789001234567890...반복...]
  • 한 번의 Send 메서드에 전달되는 [01234567890...반복...] 유형의 데이터 양은 100,000바이트 정도
  • Send 메서드를 10,000번 수행

반면, 서버 측은 다음과 같은 검증 절차를 수행합니다.

  • Receive로 읽어들이는 모든 데이터의 내용이 "0123456789...."으로 반복되는지 확인
  • 10,000번 전달된 내용의 일련번호가 모두 채워졌는지 확인

코드로 좀 더 설명해 볼까요? ^^ 우선, 서버 측 코드는 다음과 같이 작성될 수 있습니다.

namespace SocketServerSample
{
    class Program
    {
        static void Main(string[] args)
        {
            using (Socket srvSocket =
                new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                int port = 11200;

                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
                srvSocket.Bind(endPoint);
                srvSocket.Listen(10);

                while (true)
                {
                    Socket clntSocket = srvSocket.Accept();
                    Tuple<Socket, int> threadParam = new Tuple<Socket, int>(clntSocket, 0);
                    ThreadPool.QueueUserWorkItem(CheckClientPacket, threadParam);
                }
            }
        }

        static void CheckClientPacket(object param)
        {
            Tuple<Socket, int> threadParam = param as Tuple<Socket, int>;
            int instanceId = 0;

            try
            {
                using (Socket clntSocket = threadParam.Item1)
                {
                    long totalReceived = 0;
                    SortedList<int, bool> sequenceList = new SortedList<int, bool>();
                    int bodyCount = 0;

                    byte [] instanceIdBuf = MustReadBuffer(clntSocket, 4); // 4바이트로 넘어온 고유 클라이언트 ID를 알아내고,
                    instanceId = BitConverter.ToInt32(instanceIdBuf, 0);

                    Console.WriteLine(": Accepted and Processing..." + instanceId.ToString("X"));

                    while (true)
                    {
                        byte[] headerBuf = MustReadBuffer(clntSocket, 4); // 패킷의 전체 길이를 알아내고,
                        if (headerBuf == null)
                        {
                            break;
                        }

                        int bodyLength = BitConverter.ToInt32(headerBuf, 0); // 패킷 길이: bodyLength
                        byte[] bodyBuf = MustReadBuffer(clntSocket, bodyLength); // bodyLength만큼의 데이터를 모두 읽어낸다.
                        if (bodyBuf == null)
                        {
                            break;
                        }

                        totalReceived += bodyLength + headerBuf.Length;

                        string txt = Encoding.UTF8.GetString(bodyBuf); // UTF8 디코딩해서 원본 문자열 복원하고,
                        int sequencePos = txt.IndexOf(':'); // [일련번호]:[0123456789....]에서 [일련번호] 값만을 추출.
                        if (sequencePos == -1)
                        {
                            Console.WriteLine("No Sequence number");
                            break;
                        }

                        string sequenceText = txt.Substring(0, sequencePos); 
                        int sequenceNo = Int32.Parse(sequenceText);

                        // 클라이언트로부터 받은 순서 값을 보관
                        //  - 다중 스레드로 보낸 경우 차례대로 수신되지 않으므로.
                        try
                        {
                            sequenceList.Add(sequenceNo, true);
                        }
                        catch
                        {
                            Console.WriteLine("Failed: duplicated seq No. " + sequenceNo + ": " + txt.Substring(0, 20));
                            break;
                        }

                        bool bodyResult = IsBodyComplete(txt); // [0123456789...] 데이터가 올바르게 구성되어 있는지 검사
                        if (bodyResult == false)
                        {
                            Console.WriteLine("Invalid body data");
                            break;
                        }

                        bodyCount++;
                    }

                    // 하나의 클라이언트와 통신이 완료된 후, 검증을 한다.

                    // 모든 패킷이 누락되지 않고 왔는지 검증
                    bool seqResult = IsSequenceIncrement(instanceId, sequenceList);
                    if (seqResult == true)
                    {
                        Console.WriteLine("Thread-safety: Success");
                    }

                    sequenceList.Clear();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine();
                Console.WriteLine(instanceId.ToString("X") + ": exception occurred");
                Console.WriteLine(ex.ToString());
            }
        }

        private static bool IsBodyComplete(string item)
        {
            int pos = item.IndexOf(':');
            string txt = item.Substring(pos + 1);

            pos = 0;
            int cmp = 48; // 48 == '0'
            foreach (var ch in txt)
            {
                if (cmp != (int)ch)
                {
                    Console.WriteLine("Failed: at " + pos);
                    return false;
                }

                pos++;
                cmp++;
                if (cmp == 58) // 57 == '9'
                {
                    cmp = 48;
                }
            }

            return true;
        }

        private static bool IsSequenceIncrement(int instanceId, SortedList<int, bool> sequenceList)
        {
            int oldItem = sequenceList.Keys.Min() - 1;

            foreach (var item in sequenceList.Keys)
            {
                if ((item - oldItem) != 1)
                {
                    Console.WriteLine(instanceId.ToString("X") + ": Failed:" + oldItem + " and " + item);
                    return false;
                }

                oldItem = item;
            }

            return true;
        }

        private static byte[] MustReadBuffer(Socket clntSocket, int mustRead)
        {
            byte[] result = null;

            using (MemoryStream ms = new MemoryStream())
            {
                while (true)
                {
                    byte[] byteBuf = new byte[mustRead];
                    int recv = 0;
                    try
                    {
                        recv = clntSocket.Receive(byteBuf, mustRead, SocketFlags.None);
                    }
                    catch (System.Net.Sockets.SocketException ex)
                    {
                        // 클라이언트 측에서 데이터를 전송 도중,
                        //      1) 소켓을 Close한 경우
                        //      2) 클라이언트 응용 프로그램을 강제 종료하는 경우
                        return null;
                    }

                    if (recv == 0)
                    {
                        Console.WriteLine("Receive returns 0");
                        return null;
                    }

                    ms.Write(byteBuf, 0, recv);
                    mustRead -= recv;
                    if (mustRead == 0)
                    {
                        break;
                    }
                }

                ms.Flush();

                result = ms.ToArray();
            }

            return result;
        }
    }
}

클라이언트는 비교를 위해 단일 스레드와 다중 스레드로 나눠서 작성해 보겠습니다. 우선, 단일 스레드로 Send하는 Socket 클라이언트는 다음과 같이 작성합니다.

namespace SocketClientST
{
    class Program
    {
        static void Main(string[] args)
        {
            int instanceId = 0;

            try
            {
                string body = string.Empty;
                StringBuilder sb = new StringBuilder();

                // 0123456789... 가 10,000번 반복되는 문자열을 생성
                Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
                for (int i = 0; i < 10000; i++)
                {
                    sb.Append(body);
                }

                body = sb.ToString();

                int loopCount = 10000;
                int dot = loopCount / (80 * 25);

                // 단일 스레드에서 1개의 소켓 인스턴스로 10,000번 Send 테스트
                using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                {
                    IPAddress target = IPAddress.Loopback;
                    // IPAddress target = IPAddress.Parse("192.168.0.17");
                    EndPoint serverEP = new IPEndPoint(target, 11200);

                    int retryCount = 3;
                    bool connected = false;
                    while (retryCount-- > 0)
                    {
                        try
                        {
                            socket.Connect(serverEP);
                            connected = true;
                        }
                        catch { }
                    }

                    if (connected == false)
                    {
                        Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                        return;
                    }

                    byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                    socket.Send(instanceIdBuf);

                    int failedAt = -1;

                    // 0123456789... 가 10,000번 반복되는 문자열을 가진 CommonPacket 인스턴스를 
                    // 10,000번 루프를 돌면서 Socket.Send로 전송
                    for (int i = 0; i < loopCount; i++)
                    {
                        CommonPacket packet = new CommonPacket(i);
                        packet.AddData(body);

                        byte[] dataBuf = packet.GetBuffer();
                        bool result = MustSendBuffer(socket, dataBuf, dataBuf.Length);
                        if (result == false)
                        {
                            failedAt = i;
                            break;
                        }

                        if (i % dot == 0)
                        {
                            Console.Write(((int)instanceId).ToString("X") + ",");
                        }
                    }

                    socket.Close();

                    Console.WriteLine();

                    sb = new StringBuilder();
                    if (failedAt == -1)
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " is completed");
                        sb.AppendLine("TCP Client socket: Closed");
                        sb.AppendLine(loopCount + " times: data sent");
                        sb.AppendLine((body.Length / 1024) + "KB / packet");
                    }
                    else
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " - failed");
                        sb.AppendLine("Send failed at: " + failedAt);
                    }

                    Console.WriteLine(sb.ToString());
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        // 전달된 버퍼를 Socket.Send로 전송
        private static bool MustSendBuffer(Socket socket, byte[] dataBuf, int mustSend)
        {
            int pos = 0;

            while (true)
            {
                int sentLength = 0;

                try
                {
                    // blocking call인 경우 sentLength == mustSend이지만!
                    sentLength = socket.Send(dataBuf, pos, mustSend, SocketFlags.None);
                }
                catch (Exception ex)
                {
                    int error = Marshal.GetLastWin32Error();
                    Console.WriteLine(error + ": " + ex.ToString());
                    return false;
                }

                if (sentLength == 0)
                {
                    return false;
                }

                mustSend -= sentLength;
                pos += sentLength;
                if (mustSend == 0)
                {
                    return true;
                }
            }
        }
    }

    public class CommonPacket
    {
        int _seq = 1;

        MemoryStream ms = new MemoryStream();

        public CommonPacket(int seqenceId)
        {
            _seq = seqenceId;
        }

        public void AddData(string txt)
        {
            int bodyLength = 0;
            string seqText = _seq.ToString();

            byte[] dataBuf = Encoding.UTF8.GetBytes(txt);
            byte[] seqBuf = Encoding.UTF8.GetBytes(seqText);

            bodyLength = dataBuf.Length + seqBuf.Length + 1; // 1 == ':'

            byte[] headerLength = BitConverter.GetBytes(bodyLength);

            ms.Write(headerLength, 0, headerLength.Length);

            ms.Write(seqBuf, 0, seqBuf.Length);
            ms.WriteByte((byte)':');
            ms.Write(dataBuf, 0, dataBuf.Length);
            ms.Flush();
        }

        public byte[] GetBuffer()
        {
            return ms.ToArray();
        }
    }
}

테스트 해보니, 서버 측 프로그램과 클라이언트의 정상 동작이 확인되었습니다. 마지막으로, 클라이언트 측을 다중 스레드를 이용해 Send를 호출하는 것으로 바꿔 보겠습니다.

namespace SocketClientMT
{
    class Program
    {
        class ThreadParam
        {
            public List<CommonPacket> Packets;
            public Socket Socket;
            public int Sent;
        }

        static void Main(string[] args)
        {
            int instanceId = 0;

            string body = string.Empty;
            StringBuilder sb = new StringBuilder();

            Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
            for (int i = 0; i < 10000; i++)
            {
                sb.Append(body);
            }

            body = sb.ToString();

            int loopCount = 10000;

            List<CommonPacket> packets = new List<CommonPacket>();

            // 1개의 소켓을 생성하고,
            using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                IPAddress target = IPAddress.Loopback; 
                // IPAddress target = IPAddress.Parse("192.168.0.17");
                EndPoint serverEP = new IPEndPoint(target, 11200);

                int retryCount = 3;
                bool connected = false;
                while (retryCount-- > 0)
                {
                    try
                    {
                        socket.Connect(serverEP);
                        connected = true;
                    }
                    catch { }
                }

                if (connected == false)
                {
                    Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                    return;
                }

                byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                socket.Send(instanceIdBuf);

                // 일련번호가 붙은 10,000개의 버퍼를 준비한 다음,
                for (int i = 0; i < loopCount; i++)
                {
                    CommonPacket packet = new CommonPacket(i);
                    packet.AddData(body);

                    packets.Add(packet);
                }

                // 20개의 스레드에서 List에 보관된 CommonPacket을 가져가서 같은 소켓 인스턴스의 Send에 전달
                List<Thread> threads = new List<Thread>();
                ThreadParam threadParam = new ThreadParam();
                threadParam.Packets = packets;
                threadParam.Socket = socket;

                for (int i = 0; i < 20; i++)
                {
                    Thread aThread = new Thread(sendBufferThread);
                    aThread.IsBackground = true;
                    aThread.Start(threadParam);

                    threads.Add(aThread);
                }

                // 모든 스레드의 실행이 종료될 때까지 대기
                foreach (var item in threads)
                {
                    item.Join();
                }

                socket.Close();
            }
        }

        // 스레드 메서드: Packets 리스트에 보관된 CommonPacket 데이터를 경쟁적으로 가져와서
        //               서버 측에 Send 메서드로 전달.
        private static void sendBufferThread(object obj)
        {
            ThreadParam threadParam = (ThreadParam)obj;

            while (true)
            {
                CommonPacket packet = null;

                lock (threadParam.Packets)
                {
                    if (threadParam.Packets.Count == 0)
                    {
                        break;
                    }

                    packet = threadParam.Packets[0];
                    threadParam.Packets.RemoveAt(0);
                    threadParam.Sent++;
                }

                byte[] dataBuf = packet.GetBuffer();
                MustSendBuffer(threadParam.Socket, dataBuf, dataBuf.Length);
            }
        }
    }
}

물론, 다중 스레드 클라이언트도 실행해 보면 서버 측에서의 검증 결과를 잘 통과하지만 이것은 테스트에 의한 실험값에 불과하기 때문에 절대적인 증명이라고 볼 수는 없습니다. 그래도 분명히 MSDN 문서에 thread-safe하다고 되어 있으므로 위의 예제 코드는 단순 확인용이라는 수준으로 놓고 보면 되겠습니다. (혹시, 테스트 코드의 조건에 의문 사항이나 개선이 필요하면 덧글 부탁드립니다.)

첨부 파일은 위의 코드를 반영한 서버, 단일 스레드 클라이언트, 다중 스레드 클라이언트 예제 프로젝트를 담고 있습니다.





[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 7/17/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2013-08-06 11시54분
[ryujh] 안녕하세요. 소켓 소스 많은 도움될것 같습니다.

소스 분석 중 서버측 소스에서 IsSequenceIncrement 메소드 에 SortedList 를 사용하여 정렬된 키값이 연속적이지 않으면 검증되지 않은 것으로 나오는데

SortedList 를 사용하지 않아도 IsSequenceIncrement 메소드 에서 Sort 하여 키를 정렬시켜도 문제 없는 것이죠?

Thread-safe 관련 내용은 더 분석해보겠습니다. 이상입니다.
[guest]
2013-08-07 12시24분
@ryujh 님, 맞습니다. 굳이 SortedList를 사용하지 않고 일반 List에서 일련 번호만 제대로 검증할 수 있다면 어떤 식으로 해도 상관없습니다. ^^
정성태

... [106]  107  108  109  110  111  112  113  114  115  116  117  118  119  120  ...
NoWriterDateCnt.TitleFile(s)
11307정성태9/13/201723402오류 유형: 421. System.Runtime.InteropServices.SEHException - 0x80004005
11306정성태9/12/201721562.NET Framework: 682. 아웃룩 사용자를 위한 중국어 스팸 필터 Add-in
11305정성태9/12/201723041개발 환경 구성: 334. 기존 프로젝트를 Visual Studio를 이용해 Github의 신규 생성된 repo에 올리는 방법 [1]
11304정성태9/11/201719979개발 환경 구성: 333. 3ds Max를 Hyper-V VM에서 실행하는 방법
11303정성태9/11/201723517개발 환경 구성: 332. Inno Setup 파일의 관리자 권한을 제거하는 방법
11302정성태9/11/201719356개발 환경 구성: 331. SQL Server Express를 위한 방화벽 설정
11301정성태9/11/201717640오류 유형: 420. SQL Server Express 연결 오류 - A network-related or instance-specific error occurred while establishing a connection to SQL Server.
11300정성태9/10/201722403.NET Framework: 681. dotnet.exe - run, exec, build, restore, publish 차이점 [3]
11299정성태9/9/201721106개발 환경 구성: 330. Hyper-V VM의 Internal Network를 Private 유형으로 만드는 방법
11298정성태9/8/201724609VC++: 119. EnumProcesses / EnumProcessModules API 사용 시 주의점 [1]
11297정성태9/8/201721205디버깅 기술: 96. windbg - 풀 덤프에 포함된 모든 닷넷 모듈을 파일로 저장하는 방법
11296정성태9/8/201723994웹: 36. Edge - "이 웹 사이트는 이전 기술에서 실행되며 Internet Explorer에서만 작동합니다." 끄는 방법
11295정성태9/7/201721848디버깅 기술: 95. Windbg - .foreach 사용법
11294정성태9/4/201721439개발 환경 구성: 329. 마이크로소프트의 CoreCLR 프로파일러 예제 빌드 방법 [1]
11293정성태9/4/201722100개발 환경 구성: 328. Visual Studio(devenv.exe)를 배치 파일(.bat)을 통해 실행하는 방법
11292정성태9/4/201720266오류 유형: 419. Cannot connect to WMI provider - Invalid class [0x80041010]
11291정성태9/3/201721516개발 환경 구성: 327. 아파치 서버 2.4를 위한 mod_aspdotnet 마이그레이션
11290정성태9/3/201725305개발 환경 구성: 326. 아파치 서버에서 ASP.NET을 실행하는 mod_aspdotnet 모듈 [2]
11289정성태9/3/201722985개발 환경 구성: 325. GAC에 어셈블리 등록을 위해 gacutil.exe을 사용하는 경우 주의 사항
11288정성태9/3/201719674개발 환경 구성: 324. 윈도우용 XAMPP의 아파치 서버 구성 방법
11287정성태9/1/201728728.NET Framework: 680. C# - 작업자(Worker) 스레드와 UI 스레드 [11]
11286정성태8/28/201716260기타: 67. App Privacy Policy
11285정성태8/28/201724786.NET Framework: 679. C# - 개인 키 보안의 SFTP를 이용한 파일 업로드파일 다운로드1
11284정성태8/27/201722999.NET Framework: 678. 데스크톱 윈도우 응용 프로그램에서 UWP 라이브러리를 이용한 비디오 장치 열람하는 방법 [1]파일 다운로드1
11283정성태8/27/201718651오류 유형: 418. CSS3117: @font-face failed cross-origin request. Resource access is restricted.
11282정성태8/26/201720372Math: 22. 행렬로 바라보는 피보나치 수열
... [106]  107  108  109  110  111  112  113  114  115  116  117  118  119  120  ...