Microsoft MVP성태의 닷넷 이야기
.NET Framework: 375. System.Net.Sockets.Socket이 Thread-safe할까? [링크 복사], [링크+제목 복사],
조회: 23882
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 2개 있습니다.)

System.Net.Sockets.Socket이 Thread-safe할까?

System.Net.Sockets.Socket 타입의 여러 메서드 중에서도 결국 궁금한 것은 과연 Send 메서드가 thread-safe하냐에 대해서입니다.

MSDN 문서를 보면 .NET에 포함된 대부분의 BCL 타입들에 대해서 "Thread Safety" 설명 부분은 거의 일관적으로 다음과 같이 적혀 있습니다.

Any public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe.


즉, 정적(static) 메서드는 thread-safe하지만 그 외 인스턴스 메서드는 thread-safe하지 않습니다. 사실 "members"라고 표현되어 있지만 저는 이 단어가 오타가 아닌가 생각됩니다. 왜냐하면 "member"에는 field도 포함되어 있는데 static 필드에 대한 thread-safe을 보장하는 방법은 없기 때문입니다. 따라서, static 유형의 method와 property에 대해서만 thread-safe하다고 보면 될 것입니다.

그런데, 가끔 아닌 것도 있습니다. ^^ 그래서 문서를 잘 봐야 합니다. 그 대표적인 예가 바로 Socket입니다.

Socket Class
; https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket

위의 문서에 마지막을 보면 Thread Safety에 대해 다음과 같이 언급하고 있습니다.

Instances of this class are thread safe.

그렇습니다. System.Net.Sockets.Socket은 Thread-safe 합니다.




실제로 이를 확인할 수 있는 테스트 프로그램을 만들 수 있을까요? 간단한 수준으로 제가 생각한 바는 이렇습니다. 클라이언트 측은 다음과 같은 기준을 따르도록 만들고,

  • 버퍼의 내용은 "[4바이트 고유ID][버퍼길이][일련번호]:[0123456789001234567890...반복...]
  • 한 번의 Send 메서드에 전달되는 [01234567890...반복...] 유형의 데이터 양은 100,000바이트 정도
  • Send 메서드를 10,000번 수행

반면, 서버 측은 다음과 같은 검증 절차를 수행합니다.

  • Receive로 읽어들이는 모든 데이터의 내용이 "0123456789...."으로 반복되는지 확인
  • 10,000번 전달된 내용의 일련번호가 모두 채워졌는지 확인

코드로 좀 더 설명해 볼까요? ^^ 우선, 서버 측 코드는 다음과 같이 작성될 수 있습니다.

namespace SocketServerSample
{
    class Program
    {
        static void Main(string[] args)
        {
            using (Socket srvSocket =
                new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                int port = 11200;

                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
                srvSocket.Bind(endPoint);
                srvSocket.Listen(10);

                while (true)
                {
                    Socket clntSocket = srvSocket.Accept();
                    Tuple<Socket, int> threadParam = new Tuple<Socket, int>(clntSocket, 0);
                    ThreadPool.QueueUserWorkItem(CheckClientPacket, threadParam);
                }
            }
        }

        static void CheckClientPacket(object param)
        {
            Tuple<Socket, int> threadParam = param as Tuple<Socket, int>;
            int instanceId = 0;

            try
            {
                using (Socket clntSocket = threadParam.Item1)
                {
                    long totalReceived = 0;
                    SortedList<int, bool> sequenceList = new SortedList<int, bool>();
                    int bodyCount = 0;

                    byte [] instanceIdBuf = MustReadBuffer(clntSocket, 4); // 4바이트로 넘어온 고유 클라이언트 ID를 알아내고,
                    instanceId = BitConverter.ToInt32(instanceIdBuf, 0);

                    Console.WriteLine(": Accepted and Processing..." + instanceId.ToString("X"));

                    while (true)
                    {
                        byte[] headerBuf = MustReadBuffer(clntSocket, 4); // 패킷의 전체 길이를 알아내고,
                        if (headerBuf == null)
                        {
                            break;
                        }

                        int bodyLength = BitConverter.ToInt32(headerBuf, 0); // 패킷 길이: bodyLength
                        byte[] bodyBuf = MustReadBuffer(clntSocket, bodyLength); // bodyLength만큼의 데이터를 모두 읽어낸다.
                        if (bodyBuf == null)
                        {
                            break;
                        }

                        totalReceived += bodyLength + headerBuf.Length;

                        string txt = Encoding.UTF8.GetString(bodyBuf); // UTF8 디코딩해서 원본 문자열 복원하고,
                        int sequencePos = txt.IndexOf(':'); // [일련번호]:[0123456789....]에서 [일련번호] 값만을 추출.
                        if (sequencePos == -1)
                        {
                            Console.WriteLine("No Sequence number");
                            break;
                        }

                        string sequenceText = txt.Substring(0, sequencePos); 
                        int sequenceNo = Int32.Parse(sequenceText);

                        // 클라이언트로부터 받은 순서 값을 보관
                        //  - 다중 스레드로 보낸 경우 차례대로 수신되지 않으므로.
                        try
                        {
                            sequenceList.Add(sequenceNo, true);
                        }
                        catch
                        {
                            Console.WriteLine("Failed: duplicated seq No. " + sequenceNo + ": " + txt.Substring(0, 20));
                            break;
                        }

                        bool bodyResult = IsBodyComplete(txt); // [0123456789...] 데이터가 올바르게 구성되어 있는지 검사
                        if (bodyResult == false)
                        {
                            Console.WriteLine("Invalid body data");
                            break;
                        }

                        bodyCount++;
                    }

                    // 하나의 클라이언트와 통신이 완료된 후, 검증을 한다.

                    // 모든 패킷이 누락되지 않고 왔는지 검증
                    bool seqResult = IsSequenceIncrement(instanceId, sequenceList);
                    if (seqResult == true)
                    {
                        Console.WriteLine("Thread-safety: Success");
                    }

                    sequenceList.Clear();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine();
                Console.WriteLine(instanceId.ToString("X") + ": exception occurred");
                Console.WriteLine(ex.ToString());
            }
        }

        private static bool IsBodyComplete(string item)
        {
            int pos = item.IndexOf(':');
            string txt = item.Substring(pos + 1);

            pos = 0;
            int cmp = 48; // 48 == '0'
            foreach (var ch in txt)
            {
                if (cmp != (int)ch)
                {
                    Console.WriteLine("Failed: at " + pos);
                    return false;
                }

                pos++;
                cmp++;
                if (cmp == 58) // 57 == '9'
                {
                    cmp = 48;
                }
            }

            return true;
        }

        private static bool IsSequenceIncrement(int instanceId, SortedList<int, bool> sequenceList)
        {
            int oldItem = sequenceList.Keys.Min() - 1;

            foreach (var item in sequenceList.Keys)
            {
                if ((item - oldItem) != 1)
                {
                    Console.WriteLine(instanceId.ToString("X") + ": Failed:" + oldItem + " and " + item);
                    return false;
                }

                oldItem = item;
            }

            return true;
        }

        private static byte[] MustReadBuffer(Socket clntSocket, int mustRead)
        {
            byte[] result = null;

            using (MemoryStream ms = new MemoryStream())
            {
                while (true)
                {
                    byte[] byteBuf = new byte[mustRead];
                    int recv = 0;
                    try
                    {
                        recv = clntSocket.Receive(byteBuf, mustRead, SocketFlags.None);
                    }
                    catch (System.Net.Sockets.SocketException ex)
                    {
                        // 클라이언트 측에서 데이터를 전송 도중,
                        //      1) 소켓을 Close한 경우
                        //      2) 클라이언트 응용 프로그램을 강제 종료하는 경우
                        return null;
                    }

                    if (recv == 0)
                    {
                        Console.WriteLine("Receive returns 0");
                        return null;
                    }

                    ms.Write(byteBuf, 0, recv);
                    mustRead -= recv;
                    if (mustRead == 0)
                    {
                        break;
                    }
                }

                ms.Flush();

                result = ms.ToArray();
            }

            return result;
        }
    }
}

클라이언트는 비교를 위해 단일 스레드와 다중 스레드로 나눠서 작성해 보겠습니다. 우선, 단일 스레드로 Send하는 Socket 클라이언트는 다음과 같이 작성합니다.

namespace SocketClientST
{
    class Program
    {
        static void Main(string[] args)
        {
            int instanceId = 0;

            try
            {
                string body = string.Empty;
                StringBuilder sb = new StringBuilder();

                // 0123456789... 가 10,000번 반복되는 문자열을 생성
                Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
                for (int i = 0; i < 10000; i++)
                {
                    sb.Append(body);
                }

                body = sb.ToString();

                int loopCount = 10000;
                int dot = loopCount / (80 * 25);

                // 단일 스레드에서 1개의 소켓 인스턴스로 10,000번 Send 테스트
                using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                {
                    IPAddress target = IPAddress.Loopback;
                    // IPAddress target = IPAddress.Parse("192.168.0.17");
                    EndPoint serverEP = new IPEndPoint(target, 11200);

                    int retryCount = 3;
                    bool connected = false;
                    while (retryCount-- > 0)
                    {
                        try
                        {
                            socket.Connect(serverEP);
                            connected = true;
                        }
                        catch { }
                    }

                    if (connected == false)
                    {
                        Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                        return;
                    }

                    byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                    socket.Send(instanceIdBuf);

                    int failedAt = -1;

                    // 0123456789... 가 10,000번 반복되는 문자열을 가진 CommonPacket 인스턴스를 
                    // 10,000번 루프를 돌면서 Socket.Send로 전송
                    for (int i = 0; i < loopCount; i++)
                    {
                        CommonPacket packet = new CommonPacket(i);
                        packet.AddData(body);

                        byte[] dataBuf = packet.GetBuffer();
                        bool result = MustSendBuffer(socket, dataBuf, dataBuf.Length);
                        if (result == false)
                        {
                            failedAt = i;
                            break;
                        }

                        if (i % dot == 0)
                        {
                            Console.Write(((int)instanceId).ToString("X") + ",");
                        }
                    }

                    socket.Close();

                    Console.WriteLine();

                    sb = new StringBuilder();
                    if (failedAt == -1)
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " is completed");
                        sb.AppendLine("TCP Client socket: Closed");
                        sb.AppendLine(loopCount + " times: data sent");
                        sb.AppendLine((body.Length / 1024) + "KB / packet");
                    }
                    else
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " - failed");
                        sb.AppendLine("Send failed at: " + failedAt);
                    }

                    Console.WriteLine(sb.ToString());
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        // 전달된 버퍼를 Socket.Send로 전송
        private static bool MustSendBuffer(Socket socket, byte[] dataBuf, int mustSend)
        {
            int pos = 0;

            while (true)
            {
                int sentLength = 0;

                try
                {
                    // blocking call인 경우 sentLength == mustSend이지만!
                    sentLength = socket.Send(dataBuf, pos, mustSend, SocketFlags.None);
                }
                catch (Exception ex)
                {
                    int error = Marshal.GetLastWin32Error();
                    Console.WriteLine(error + ": " + ex.ToString());
                    return false;
                }

                if (sentLength == 0)
                {
                    return false;
                }

                mustSend -= sentLength;
                pos += sentLength;
                if (mustSend == 0)
                {
                    return true;
                }
            }
        }
    }

    public class CommonPacket
    {
        int _seq = 1;

        MemoryStream ms = new MemoryStream();

        public CommonPacket(int seqenceId)
        {
            _seq = seqenceId;
        }

        public void AddData(string txt)
        {
            int bodyLength = 0;
            string seqText = _seq.ToString();

            byte[] dataBuf = Encoding.UTF8.GetBytes(txt);
            byte[] seqBuf = Encoding.UTF8.GetBytes(seqText);

            bodyLength = dataBuf.Length + seqBuf.Length + 1; // 1 == ':'

            byte[] headerLength = BitConverter.GetBytes(bodyLength);

            ms.Write(headerLength, 0, headerLength.Length);

            ms.Write(seqBuf, 0, seqBuf.Length);
            ms.WriteByte((byte)':');
            ms.Write(dataBuf, 0, dataBuf.Length);
            ms.Flush();
        }

        public byte[] GetBuffer()
        {
            return ms.ToArray();
        }
    }
}

테스트 해보니, 서버 측 프로그램과 클라이언트의 정상 동작이 확인되었습니다. 마지막으로, 클라이언트 측을 다중 스레드를 이용해 Send를 호출하는 것으로 바꿔 보겠습니다.

namespace SocketClientMT
{
    class Program
    {
        class ThreadParam
        {
            public List<CommonPacket> Packets;
            public Socket Socket;
            public int Sent;
        }

        static void Main(string[] args)
        {
            int instanceId = 0;

            string body = string.Empty;
            StringBuilder sb = new StringBuilder();

            Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
            for (int i = 0; i < 10000; i++)
            {
                sb.Append(body);
            }

            body = sb.ToString();

            int loopCount = 10000;

            List<CommonPacket> packets = new List<CommonPacket>();

            // 1개의 소켓을 생성하고,
            using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                IPAddress target = IPAddress.Loopback; 
                // IPAddress target = IPAddress.Parse("192.168.0.17");
                EndPoint serverEP = new IPEndPoint(target, 11200);

                int retryCount = 3;
                bool connected = false;
                while (retryCount-- > 0)
                {
                    try
                    {
                        socket.Connect(serverEP);
                        connected = true;
                    }
                    catch { }
                }

                if (connected == false)
                {
                    Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                    return;
                }

                byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                socket.Send(instanceIdBuf);

                // 일련번호가 붙은 10,000개의 버퍼를 준비한 다음,
                for (int i = 0; i < loopCount; i++)
                {
                    CommonPacket packet = new CommonPacket(i);
                    packet.AddData(body);

                    packets.Add(packet);
                }

                // 20개의 스레드에서 List에 보관된 CommonPacket을 가져가서 같은 소켓 인스턴스의 Send에 전달
                List<Thread> threads = new List<Thread>();
                ThreadParam threadParam = new ThreadParam();
                threadParam.Packets = packets;
                threadParam.Socket = socket;

                for (int i = 0; i < 20; i++)
                {
                    Thread aThread = new Thread(sendBufferThread);
                    aThread.IsBackground = true;
                    aThread.Start(threadParam);

                    threads.Add(aThread);
                }

                // 모든 스레드의 실행이 종료될 때까지 대기
                foreach (var item in threads)
                {
                    item.Join();
                }

                socket.Close();
            }
        }

        // 스레드 메서드: Packets 리스트에 보관된 CommonPacket 데이터를 경쟁적으로 가져와서
        //               서버 측에 Send 메서드로 전달.
        private static void sendBufferThread(object obj)
        {
            ThreadParam threadParam = (ThreadParam)obj;

            while (true)
            {
                CommonPacket packet = null;

                lock (threadParam.Packets)
                {
                    if (threadParam.Packets.Count == 0)
                    {
                        break;
                    }

                    packet = threadParam.Packets[0];
                    threadParam.Packets.RemoveAt(0);
                    threadParam.Sent++;
                }

                byte[] dataBuf = packet.GetBuffer();
                MustSendBuffer(threadParam.Socket, dataBuf, dataBuf.Length);
            }
        }
    }
}

물론, 다중 스레드 클라이언트도 실행해 보면 서버 측에서의 검증 결과를 잘 통과하지만 이것은 테스트에 의한 실험값에 불과하기 때문에 절대적인 증명이라고 볼 수는 없습니다. 그래도 분명히 MSDN 문서에 thread-safe하다고 되어 있으므로 위의 예제 코드는 단순 확인용이라는 수준으로 놓고 보면 되겠습니다. (혹시, 테스트 코드의 조건에 의문 사항이나 개선이 필요하면 덧글 부탁드립니다.)

첨부 파일은 위의 코드를 반영한 서버, 단일 스레드 클라이언트, 다중 스레드 클라이언트 예제 프로젝트를 담고 있습니다.





[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 7/17/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2013-08-06 11시54분
[ryujh] 안녕하세요. 소켓 소스 많은 도움될것 같습니다.

소스 분석 중 서버측 소스에서 IsSequenceIncrement 메소드 에 SortedList 를 사용하여 정렬된 키값이 연속적이지 않으면 검증되지 않은 것으로 나오는데

SortedList 를 사용하지 않아도 IsSequenceIncrement 메소드 에서 Sort 하여 키를 정렬시켜도 문제 없는 것이죠?

Thread-safe 관련 내용은 더 분석해보겠습니다. 이상입니다.
[guest]
2013-08-07 12시24분
@ryujh 님, 맞습니다. 굳이 SortedList를 사용하지 않고 일반 List에서 일련 번호만 제대로 검증할 수 있다면 어떤 식으로 해도 상관없습니다. ^^
정성태

... 136  137  138  139  140  141  142  143  [144]  145  146  147  148  149  150  ...
NoWriterDateCnt.TitleFile(s)
1452정성태5/21/201332927Windows: 73. TabProcGrowth 값 삭제 후 IE를 실행시키면 다시 복원되는 경우 [3]
1451정성태5/17/201331869Windows: 72. 윈도우 서버 2012 기초 사용법
1450정성태5/16/201322693오류 유형: 176. SQL10007N Message "0" could not be retrieved. Reason code: "3"
1449정성태5/15/201329792오류 유형: 175. SpeechRecognitionEngine 사용 시 오류 유형 2가지
1448정성태5/14/201324770VC++: 68. #pragma warning(disable: ...)로 오류 제어가 안된다면?
1447정성태5/3/201326443개발 환경 구성: 191. Debugging Tools for Windows 독립 설치 버전 [1]
1446정성태4/30/201327204.NET Framework: 368. Encoding 타입의 대체(fallback) 메카니즘 [1]
1445정성태4/26/201325443디버깅 기술: 54. NT 서비스의 Main 메서드 안에서 Process.GetProcessesByName 호출 시 멈춤 현상 [1]
1444정성태4/26/201329475기타: 31. Internet Explorer: 자바스크립트로 숨겨진 파일 다운로드 경로를 알아내는 방법 [1]
1443정성태4/24/201325132개발 환경 구성: 190. Azure PaaS 웹 응용 프로그램 배포 후 SMTP 서버 구성 [2]
1442정성태4/21/201328719기타: 30. 마이크로소프트 워드의 CPU 점유 현상으로 글자 입력이 느려졌다면? [1]
1441정성태4/21/201335317.NET Framework: 367. LargeAddressAware 옵션이 적용된 닷넷 32비트 프로세스의 가용 메모리 [14]
1440정성태4/19/201324037오류 유형: 174. dumpbin.exe 실행시 mspdb110.dll 로드 오류
1439정성태4/18/201327912VS.NET IDE: 76. Visual Studio 2012와 Itanium 빌드 옵션 [2]
1438정성태4/17/201327293.NET Framework: 366. 다른 프로세스에 환경 변수 설정하는 방법 - 두 번째 이야기 [1]파일 다운로드1
1437정성태4/17/201327523VC++: 67. CRT(C Runtime DLL: msvcr...dll)에 대한 의존성 제거
1436정성태4/17/201332946.NET Framework: 365. Local SYSTEM 권한으로 코드를 실행하는 방법파일 다운로드1
1435정성태4/15/201341830Windows: 71. ad-hoc 보다 더 편리한 "가상 Wifi" 를 이용한 인터넷 공유 [2]
1434정성태4/9/201323098오류 유형: 173. TFS 서버의 이벤트 로그 오류 - WebHost failed to process a request. Parameter name: certificate
1433정성태4/9/201323383개발 환경 구성: 189. TFS에 설치된 SharePoint 의 PowerShell 콘솔 띄우는 방법
1432정성태4/5/201324397오류 유형: 172. System.Web.PipelineModuleStepContainer.GetEventCount 에서 NullReferenceException 이 발생한다면?
1431정성태4/5/201325054기타: 29. 부팅 가능한 (외장) HDD를 기존 부팅 메뉴에 추가하는 방법
1430정성태4/4/201326871제니퍼 .NET: 23. 모바일용 웹 사이트에서 발생하는 응답 시간 지연 현상 [5]파일 다운로드1
1429정성태3/29/201323250개발 환경 구성: 188. SCOM 2012 - ASP.NET 모니터링 방법
1428정성태3/29/201324094개발 환경 구성: 187. SCOM 2012 환경 구성 - Management Packs
1427정성태3/29/201321173오류 유형: 171. SCOM 2012 - 원격 에이전트 설치 오류
... 136  137  138  139  140  141  142  143  [144]  145  146  147  148  149  150  ...