Microsoft MVP성태의 닷넷 이야기
.NET Framework: 375. System.Net.Sockets.Socket이 Thread-safe할까? [링크 복사], [링크+제목 복사],
조회: 23969
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 2개 있습니다.)

System.Net.Sockets.Socket이 Thread-safe할까?

System.Net.Sockets.Socket 타입의 여러 메서드 중에서도 결국 궁금한 것은 과연 Send 메서드가 thread-safe하냐에 대해서입니다.

MSDN 문서를 보면 .NET에 포함된 대부분의 BCL 타입들에 대해서 "Thread Safety" 설명 부분은 거의 일관적으로 다음과 같이 적혀 있습니다.

Any public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe.


즉, 정적(static) 메서드는 thread-safe하지만 그 외 인스턴스 메서드는 thread-safe하지 않습니다. 사실 "members"라고 표현되어 있지만 저는 이 단어가 오타가 아닌가 생각됩니다. 왜냐하면 "member"에는 field도 포함되어 있는데 static 필드에 대한 thread-safe을 보장하는 방법은 없기 때문입니다. 따라서, static 유형의 method와 property에 대해서만 thread-safe하다고 보면 될 것입니다.

그런데, 가끔 아닌 것도 있습니다. ^^ 그래서 문서를 잘 봐야 합니다. 그 대표적인 예가 바로 Socket입니다.

Socket Class
; https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket

위의 문서에 마지막을 보면 Thread Safety에 대해 다음과 같이 언급하고 있습니다.

Instances of this class are thread safe.

그렇습니다. System.Net.Sockets.Socket은 Thread-safe 합니다.




실제로 이를 확인할 수 있는 테스트 프로그램을 만들 수 있을까요? 간단한 수준으로 제가 생각한 바는 이렇습니다. 클라이언트 측은 다음과 같은 기준을 따르도록 만들고,

  • 버퍼의 내용은 "[4바이트 고유ID][버퍼길이][일련번호]:[0123456789001234567890...반복...]
  • 한 번의 Send 메서드에 전달되는 [01234567890...반복...] 유형의 데이터 양은 100,000바이트 정도
  • Send 메서드를 10,000번 수행

반면, 서버 측은 다음과 같은 검증 절차를 수행합니다.

  • Receive로 읽어들이는 모든 데이터의 내용이 "0123456789...."으로 반복되는지 확인
  • 10,000번 전달된 내용의 일련번호가 모두 채워졌는지 확인

코드로 좀 더 설명해 볼까요? ^^ 우선, 서버 측 코드는 다음과 같이 작성될 수 있습니다.

namespace SocketServerSample
{
    class Program
    {
        static void Main(string[] args)
        {
            using (Socket srvSocket =
                new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                int port = 11200;

                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
                srvSocket.Bind(endPoint);
                srvSocket.Listen(10);

                while (true)
                {
                    Socket clntSocket = srvSocket.Accept();
                    Tuple<Socket, int> threadParam = new Tuple<Socket, int>(clntSocket, 0);
                    ThreadPool.QueueUserWorkItem(CheckClientPacket, threadParam);
                }
            }
        }

        static void CheckClientPacket(object param)
        {
            Tuple<Socket, int> threadParam = param as Tuple<Socket, int>;
            int instanceId = 0;

            try
            {
                using (Socket clntSocket = threadParam.Item1)
                {
                    long totalReceived = 0;
                    SortedList<int, bool> sequenceList = new SortedList<int, bool>();
                    int bodyCount = 0;

                    byte [] instanceIdBuf = MustReadBuffer(clntSocket, 4); // 4바이트로 넘어온 고유 클라이언트 ID를 알아내고,
                    instanceId = BitConverter.ToInt32(instanceIdBuf, 0);

                    Console.WriteLine(": Accepted and Processing..." + instanceId.ToString("X"));

                    while (true)
                    {
                        byte[] headerBuf = MustReadBuffer(clntSocket, 4); // 패킷의 전체 길이를 알아내고,
                        if (headerBuf == null)
                        {
                            break;
                        }

                        int bodyLength = BitConverter.ToInt32(headerBuf, 0); // 패킷 길이: bodyLength
                        byte[] bodyBuf = MustReadBuffer(clntSocket, bodyLength); // bodyLength만큼의 데이터를 모두 읽어낸다.
                        if (bodyBuf == null)
                        {
                            break;
                        }

                        totalReceived += bodyLength + headerBuf.Length;

                        string txt = Encoding.UTF8.GetString(bodyBuf); // UTF8 디코딩해서 원본 문자열 복원하고,
                        int sequencePos = txt.IndexOf(':'); // [일련번호]:[0123456789....]에서 [일련번호] 값만을 추출.
                        if (sequencePos == -1)
                        {
                            Console.WriteLine("No Sequence number");
                            break;
                        }

                        string sequenceText = txt.Substring(0, sequencePos); 
                        int sequenceNo = Int32.Parse(sequenceText);

                        // 클라이언트로부터 받은 순서 값을 보관
                        //  - 다중 스레드로 보낸 경우 차례대로 수신되지 않으므로.
                        try
                        {
                            sequenceList.Add(sequenceNo, true);
                        }
                        catch
                        {
                            Console.WriteLine("Failed: duplicated seq No. " + sequenceNo + ": " + txt.Substring(0, 20));
                            break;
                        }

                        bool bodyResult = IsBodyComplete(txt); // [0123456789...] 데이터가 올바르게 구성되어 있는지 검사
                        if (bodyResult == false)
                        {
                            Console.WriteLine("Invalid body data");
                            break;
                        }

                        bodyCount++;
                    }

                    // 하나의 클라이언트와 통신이 완료된 후, 검증을 한다.

                    // 모든 패킷이 누락되지 않고 왔는지 검증
                    bool seqResult = IsSequenceIncrement(instanceId, sequenceList);
                    if (seqResult == true)
                    {
                        Console.WriteLine("Thread-safety: Success");
                    }

                    sequenceList.Clear();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine();
                Console.WriteLine(instanceId.ToString("X") + ": exception occurred");
                Console.WriteLine(ex.ToString());
            }
        }

        private static bool IsBodyComplete(string item)
        {
            int pos = item.IndexOf(':');
            string txt = item.Substring(pos + 1);

            pos = 0;
            int cmp = 48; // 48 == '0'
            foreach (var ch in txt)
            {
                if (cmp != (int)ch)
                {
                    Console.WriteLine("Failed: at " + pos);
                    return false;
                }

                pos++;
                cmp++;
                if (cmp == 58) // 57 == '9'
                {
                    cmp = 48;
                }
            }

            return true;
        }

        private static bool IsSequenceIncrement(int instanceId, SortedList<int, bool> sequenceList)
        {
            int oldItem = sequenceList.Keys.Min() - 1;

            foreach (var item in sequenceList.Keys)
            {
                if ((item - oldItem) != 1)
                {
                    Console.WriteLine(instanceId.ToString("X") + ": Failed:" + oldItem + " and " + item);
                    return false;
                }

                oldItem = item;
            }

            return true;
        }

        private static byte[] MustReadBuffer(Socket clntSocket, int mustRead)
        {
            byte[] result = null;

            using (MemoryStream ms = new MemoryStream())
            {
                while (true)
                {
                    byte[] byteBuf = new byte[mustRead];
                    int recv = 0;
                    try
                    {
                        recv = clntSocket.Receive(byteBuf, mustRead, SocketFlags.None);
                    }
                    catch (System.Net.Sockets.SocketException ex)
                    {
                        // 클라이언트 측에서 데이터를 전송 도중,
                        //      1) 소켓을 Close한 경우
                        //      2) 클라이언트 응용 프로그램을 강제 종료하는 경우
                        return null;
                    }

                    if (recv == 0)
                    {
                        Console.WriteLine("Receive returns 0");
                        return null;
                    }

                    ms.Write(byteBuf, 0, recv);
                    mustRead -= recv;
                    if (mustRead == 0)
                    {
                        break;
                    }
                }

                ms.Flush();

                result = ms.ToArray();
            }

            return result;
        }
    }
}

클라이언트는 비교를 위해 단일 스레드와 다중 스레드로 나눠서 작성해 보겠습니다. 우선, 단일 스레드로 Send하는 Socket 클라이언트는 다음과 같이 작성합니다.

namespace SocketClientST
{
    class Program
    {
        static void Main(string[] args)
        {
            int instanceId = 0;

            try
            {
                string body = string.Empty;
                StringBuilder sb = new StringBuilder();

                // 0123456789... 가 10,000번 반복되는 문자열을 생성
                Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
                for (int i = 0; i < 10000; i++)
                {
                    sb.Append(body);
                }

                body = sb.ToString();

                int loopCount = 10000;
                int dot = loopCount / (80 * 25);

                // 단일 스레드에서 1개의 소켓 인스턴스로 10,000번 Send 테스트
                using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                {
                    IPAddress target = IPAddress.Loopback;
                    // IPAddress target = IPAddress.Parse("192.168.0.17");
                    EndPoint serverEP = new IPEndPoint(target, 11200);

                    int retryCount = 3;
                    bool connected = false;
                    while (retryCount-- > 0)
                    {
                        try
                        {
                            socket.Connect(serverEP);
                            connected = true;
                        }
                        catch { }
                    }

                    if (connected == false)
                    {
                        Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                        return;
                    }

                    byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                    socket.Send(instanceIdBuf);

                    int failedAt = -1;

                    // 0123456789... 가 10,000번 반복되는 문자열을 가진 CommonPacket 인스턴스를 
                    // 10,000번 루프를 돌면서 Socket.Send로 전송
                    for (int i = 0; i < loopCount; i++)
                    {
                        CommonPacket packet = new CommonPacket(i);
                        packet.AddData(body);

                        byte[] dataBuf = packet.GetBuffer();
                        bool result = MustSendBuffer(socket, dataBuf, dataBuf.Length);
                        if (result == false)
                        {
                            failedAt = i;
                            break;
                        }

                        if (i % dot == 0)
                        {
                            Console.Write(((int)instanceId).ToString("X") + ",");
                        }
                    }

                    socket.Close();

                    Console.WriteLine();

                    sb = new StringBuilder();
                    if (failedAt == -1)
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " is completed");
                        sb.AppendLine("TCP Client socket: Closed");
                        sb.AppendLine(loopCount + " times: data sent");
                        sb.AppendLine((body.Length / 1024) + "KB / packet");
                    }
                    else
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " - failed");
                        sb.AppendLine("Send failed at: " + failedAt);
                    }

                    Console.WriteLine(sb.ToString());
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        // 전달된 버퍼를 Socket.Send로 전송
        private static bool MustSendBuffer(Socket socket, byte[] dataBuf, int mustSend)
        {
            int pos = 0;

            while (true)
            {
                int sentLength = 0;

                try
                {
                    // blocking call인 경우 sentLength == mustSend이지만!
                    sentLength = socket.Send(dataBuf, pos, mustSend, SocketFlags.None);
                }
                catch (Exception ex)
                {
                    int error = Marshal.GetLastWin32Error();
                    Console.WriteLine(error + ": " + ex.ToString());
                    return false;
                }

                if (sentLength == 0)
                {
                    return false;
                }

                mustSend -= sentLength;
                pos += sentLength;
                if (mustSend == 0)
                {
                    return true;
                }
            }
        }
    }

    public class CommonPacket
    {
        int _seq = 1;

        MemoryStream ms = new MemoryStream();

        public CommonPacket(int seqenceId)
        {
            _seq = seqenceId;
        }

        public void AddData(string txt)
        {
            int bodyLength = 0;
            string seqText = _seq.ToString();

            byte[] dataBuf = Encoding.UTF8.GetBytes(txt);
            byte[] seqBuf = Encoding.UTF8.GetBytes(seqText);

            bodyLength = dataBuf.Length + seqBuf.Length + 1; // 1 == ':'

            byte[] headerLength = BitConverter.GetBytes(bodyLength);

            ms.Write(headerLength, 0, headerLength.Length);

            ms.Write(seqBuf, 0, seqBuf.Length);
            ms.WriteByte((byte)':');
            ms.Write(dataBuf, 0, dataBuf.Length);
            ms.Flush();
        }

        public byte[] GetBuffer()
        {
            return ms.ToArray();
        }
    }
}

테스트 해보니, 서버 측 프로그램과 클라이언트의 정상 동작이 확인되었습니다. 마지막으로, 클라이언트 측을 다중 스레드를 이용해 Send를 호출하는 것으로 바꿔 보겠습니다.

namespace SocketClientMT
{
    class Program
    {
        class ThreadParam
        {
            public List<CommonPacket> Packets;
            public Socket Socket;
            public int Sent;
        }

        static void Main(string[] args)
        {
            int instanceId = 0;

            string body = string.Empty;
            StringBuilder sb = new StringBuilder();

            Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
            for (int i = 0; i < 10000; i++)
            {
                sb.Append(body);
            }

            body = sb.ToString();

            int loopCount = 10000;

            List<CommonPacket> packets = new List<CommonPacket>();

            // 1개의 소켓을 생성하고,
            using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                IPAddress target = IPAddress.Loopback; 
                // IPAddress target = IPAddress.Parse("192.168.0.17");
                EndPoint serverEP = new IPEndPoint(target, 11200);

                int retryCount = 3;
                bool connected = false;
                while (retryCount-- > 0)
                {
                    try
                    {
                        socket.Connect(serverEP);
                        connected = true;
                    }
                    catch { }
                }

                if (connected == false)
                {
                    Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                    return;
                }

                byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                socket.Send(instanceIdBuf);

                // 일련번호가 붙은 10,000개의 버퍼를 준비한 다음,
                for (int i = 0; i < loopCount; i++)
                {
                    CommonPacket packet = new CommonPacket(i);
                    packet.AddData(body);

                    packets.Add(packet);
                }

                // 20개의 스레드에서 List에 보관된 CommonPacket을 가져가서 같은 소켓 인스턴스의 Send에 전달
                List<Thread> threads = new List<Thread>();
                ThreadParam threadParam = new ThreadParam();
                threadParam.Packets = packets;
                threadParam.Socket = socket;

                for (int i = 0; i < 20; i++)
                {
                    Thread aThread = new Thread(sendBufferThread);
                    aThread.IsBackground = true;
                    aThread.Start(threadParam);

                    threads.Add(aThread);
                }

                // 모든 스레드의 실행이 종료될 때까지 대기
                foreach (var item in threads)
                {
                    item.Join();
                }

                socket.Close();
            }
        }

        // 스레드 메서드: Packets 리스트에 보관된 CommonPacket 데이터를 경쟁적으로 가져와서
        //               서버 측에 Send 메서드로 전달.
        private static void sendBufferThread(object obj)
        {
            ThreadParam threadParam = (ThreadParam)obj;

            while (true)
            {
                CommonPacket packet = null;

                lock (threadParam.Packets)
                {
                    if (threadParam.Packets.Count == 0)
                    {
                        break;
                    }

                    packet = threadParam.Packets[0];
                    threadParam.Packets.RemoveAt(0);
                    threadParam.Sent++;
                }

                byte[] dataBuf = packet.GetBuffer();
                MustSendBuffer(threadParam.Socket, dataBuf, dataBuf.Length);
            }
        }
    }
}

물론, 다중 스레드 클라이언트도 실행해 보면 서버 측에서의 검증 결과를 잘 통과하지만 이것은 테스트에 의한 실험값에 불과하기 때문에 절대적인 증명이라고 볼 수는 없습니다. 그래도 분명히 MSDN 문서에 thread-safe하다고 되어 있으므로 위의 예제 코드는 단순 확인용이라는 수준으로 놓고 보면 되겠습니다. (혹시, 테스트 코드의 조건에 의문 사항이나 개선이 필요하면 덧글 부탁드립니다.)

첨부 파일은 위의 코드를 반영한 서버, 단일 스레드 클라이언트, 다중 스레드 클라이언트 예제 프로젝트를 담고 있습니다.





[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 7/17/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2013-08-06 11시54분
[ryujh] 안녕하세요. 소켓 소스 많은 도움될것 같습니다.

소스 분석 중 서버측 소스에서 IsSequenceIncrement 메소드 에 SortedList 를 사용하여 정렬된 키값이 연속적이지 않으면 검증되지 않은 것으로 나오는데

SortedList 를 사용하지 않아도 IsSequenceIncrement 메소드 에서 Sort 하여 키를 정렬시켜도 문제 없는 것이죠?

Thread-safe 관련 내용은 더 분석해보겠습니다. 이상입니다.
[guest]
2013-08-07 12시24분
@ryujh 님, 맞습니다. 굳이 SortedList를 사용하지 않고 일반 List에서 일련 번호만 제대로 검증할 수 있다면 어떤 식으로 해도 상관없습니다. ^^
정성태

... 121  122  123  124  125  126  127  128  129  [130]  131  132  133  134  135  ...
NoWriterDateCnt.TitleFile(s)
1805정성태11/5/201421876.NET Framework: 476. Visual Studio에서 Mono용 Profiler 개발 [3]파일 다운로드1
1804정성태11/5/201428117.NET Framework: 475. ETW(Event Tracing for Windows)를 C#에서 사용하는 방법 [9]파일 다운로드1
1803정성태11/4/201420219오류 유형: 261. Windows Server Backup 오류 - Error in backup of E:\$Extend\$RmMetadata\$TxfLog
1802정성태11/4/201422207오류 유형: 260. 이벤트 로그 - Windows Error Reporting / AEAPPINVW8
1801정성태11/4/201427440오류 유형: 259. 이벤트 로그 - Windows Error Reporting / IPX Assertion / KorIME.exe [1]
1800정성태11/4/201418218오류 유형: 258. 이벤트 로그 - Starting a SMART disk polling operation in Automatic mode.
1799정성태11/4/201422983오류 유형: 257. 이벤트 로그 - The WMI Performance Adapter service entered the stopped state.
1798정성태11/4/201431736오류 유형: 256. 이벤트 로그 - The WinHTTP Web Proxy Auto-Discovery Service service entered the stopped state. [1]
1797정성태11/4/201417398오류 유형: 255. 이벤트 로그 - The Adobe Flash Player Update Service service entered the stopped state.
1796정성태10/30/201424441개발 환경 구성: 249. Visual Studio 2013에서 Mono 컴파일하는 방법
1795정성태10/29/201426973개발 환경 구성: 248. Lync 2013 서버 설치 방법
1794정성태10/29/201422427개발 환경 구성: 247. "Microsoft Office 365 Enterprise E3" 서비스에 대한 간략 소개
1793정성태10/27/201423041.NET Framework: 474. C# - chromiumembedded 사용 - 두 번째 이야기 [2]파일 다운로드1
1792정성태10/27/201423184.NET Framework: 473. WebClient 객체에 쿠키(Cookie)를 사용하는 방법
1791정성태10/22/201422915VC++: 83. G++ - 템플릿 클래스의 iterator 코드 사용에서 발생하는 컴파일 오류 [5]
1790정성태10/22/201418440오류 유형: 254. NETLOGON Service is paused on [... AD Server...]
1789정성태10/22/201421054오류 유형: 253. 이벤트 로그 - The client-side extension could not remove user policy settings for '...'
1788정성태10/22/201423112VC++: 82. COM 프로그래밍에서 HRESULT 타입의 S_FALSE는 실패일까요? 성공일까요? [2]
1787정성태10/22/201431292오류 유형: 252. COM 개체 등록시 0x8002801C 오류가 발생한다면?
1786정성태10/22/201432636디버깅 기술: 65. 프로세스 비정상 종료 시 "Debug Diagnostic Tool"를 이용해 덤프를 남기는 방법 [3]파일 다운로드1
1785정성태10/22/201421861오류 유형: 251. 이벤트 로그 - Load control template file /_controltemplates/TaxonomyPicker.ascx failed [1]
1784정성태10/22/201429941.NET Framework: 472. C/C++과 C# 사이의 메모리 할당/해제 방법파일 다운로드1
1783정성태10/21/201423348VC++: 81. 프로그래밍에서 borrowing의 개념
1782정성태10/21/201420085오류 유형: 250. 이벤트 로그 - Application Server job failed for service instance Microsoft.Office.Server.Search.Administration.SearchServiceInstance
1781정성태10/21/201420506디버깅 기술: 64. new/delete의 짝이 맞는 경우에도 메모리 누수가 발생한다면?
1780정성태10/15/201424159오류 유형: 249. The application-specific permission settings do not grant Local Activation permission for the COM Server application with CLSID
... 121  122  123  124  125  126  127  128  129  [130]  131  132  133  134  135  ...