Microsoft MVP성태의 닷넷 이야기
.NET Framework: 375. System.Net.Sockets.Socket이 Thread-safe할까? [링크 복사], [링크+제목 복사],
조회: 23974
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
(연관된 글이 2개 있습니다.)

System.Net.Sockets.Socket이 Thread-safe할까?

System.Net.Sockets.Socket 타입의 여러 메서드 중에서도 결국 궁금한 것은 과연 Send 메서드가 thread-safe하냐에 대해서입니다.

MSDN 문서를 보면 .NET에 포함된 대부분의 BCL 타입들에 대해서 "Thread Safety" 설명 부분은 거의 일관적으로 다음과 같이 적혀 있습니다.

Any public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe.


즉, 정적(static) 메서드는 thread-safe하지만 그 외 인스턴스 메서드는 thread-safe하지 않습니다. 사실 "members"라고 표현되어 있지만 저는 이 단어가 오타가 아닌가 생각됩니다. 왜냐하면 "member"에는 field도 포함되어 있는데 static 필드에 대한 thread-safe을 보장하는 방법은 없기 때문입니다. 따라서, static 유형의 method와 property에 대해서만 thread-safe하다고 보면 될 것입니다.

그런데, 가끔 아닌 것도 있습니다. ^^ 그래서 문서를 잘 봐야 합니다. 그 대표적인 예가 바로 Socket입니다.

Socket Class
; https://docs.microsoft.com/en-us/dotnet/api/system.net.sockets.socket

위의 문서에 마지막을 보면 Thread Safety에 대해 다음과 같이 언급하고 있습니다.

Instances of this class are thread safe.

그렇습니다. System.Net.Sockets.Socket은 Thread-safe 합니다.




실제로 이를 확인할 수 있는 테스트 프로그램을 만들 수 있을까요? 간단한 수준으로 제가 생각한 바는 이렇습니다. 클라이언트 측은 다음과 같은 기준을 따르도록 만들고,

  • 버퍼의 내용은 "[4바이트 고유ID][버퍼길이][일련번호]:[0123456789001234567890...반복...]
  • 한 번의 Send 메서드에 전달되는 [01234567890...반복...] 유형의 데이터 양은 100,000바이트 정도
  • Send 메서드를 10,000번 수행

반면, 서버 측은 다음과 같은 검증 절차를 수행합니다.

  • Receive로 읽어들이는 모든 데이터의 내용이 "0123456789...."으로 반복되는지 확인
  • 10,000번 전달된 내용의 일련번호가 모두 채워졌는지 확인

코드로 좀 더 설명해 볼까요? ^^ 우선, 서버 측 코드는 다음과 같이 작성될 수 있습니다.

namespace SocketServerSample
{
    class Program
    {
        static void Main(string[] args)
        {
            using (Socket srvSocket =
                new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                int port = 11200;

                IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, port);
                srvSocket.Bind(endPoint);
                srvSocket.Listen(10);

                while (true)
                {
                    Socket clntSocket = srvSocket.Accept();
                    Tuple<Socket, int> threadParam = new Tuple<Socket, int>(clntSocket, 0);
                    ThreadPool.QueueUserWorkItem(CheckClientPacket, threadParam);
                }
            }
        }

        static void CheckClientPacket(object param)
        {
            Tuple<Socket, int> threadParam = param as Tuple<Socket, int>;
            int instanceId = 0;

            try
            {
                using (Socket clntSocket = threadParam.Item1)
                {
                    long totalReceived = 0;
                    SortedList<int, bool> sequenceList = new SortedList<int, bool>();
                    int bodyCount = 0;

                    byte [] instanceIdBuf = MustReadBuffer(clntSocket, 4); // 4바이트로 넘어온 고유 클라이언트 ID를 알아내고,
                    instanceId = BitConverter.ToInt32(instanceIdBuf, 0);

                    Console.WriteLine(": Accepted and Processing..." + instanceId.ToString("X"));

                    while (true)
                    {
                        byte[] headerBuf = MustReadBuffer(clntSocket, 4); // 패킷의 전체 길이를 알아내고,
                        if (headerBuf == null)
                        {
                            break;
                        }

                        int bodyLength = BitConverter.ToInt32(headerBuf, 0); // 패킷 길이: bodyLength
                        byte[] bodyBuf = MustReadBuffer(clntSocket, bodyLength); // bodyLength만큼의 데이터를 모두 읽어낸다.
                        if (bodyBuf == null)
                        {
                            break;
                        }

                        totalReceived += bodyLength + headerBuf.Length;

                        string txt = Encoding.UTF8.GetString(bodyBuf); // UTF8 디코딩해서 원본 문자열 복원하고,
                        int sequencePos = txt.IndexOf(':'); // [일련번호]:[0123456789....]에서 [일련번호] 값만을 추출.
                        if (sequencePos == -1)
                        {
                            Console.WriteLine("No Sequence number");
                            break;
                        }

                        string sequenceText = txt.Substring(0, sequencePos); 
                        int sequenceNo = Int32.Parse(sequenceText);

                        // 클라이언트로부터 받은 순서 값을 보관
                        //  - 다중 스레드로 보낸 경우 차례대로 수신되지 않으므로.
                        try
                        {
                            sequenceList.Add(sequenceNo, true);
                        }
                        catch
                        {
                            Console.WriteLine("Failed: duplicated seq No. " + sequenceNo + ": " + txt.Substring(0, 20));
                            break;
                        }

                        bool bodyResult = IsBodyComplete(txt); // [0123456789...] 데이터가 올바르게 구성되어 있는지 검사
                        if (bodyResult == false)
                        {
                            Console.WriteLine("Invalid body data");
                            break;
                        }

                        bodyCount++;
                    }

                    // 하나의 클라이언트와 통신이 완료된 후, 검증을 한다.

                    // 모든 패킷이 누락되지 않고 왔는지 검증
                    bool seqResult = IsSequenceIncrement(instanceId, sequenceList);
                    if (seqResult == true)
                    {
                        Console.WriteLine("Thread-safety: Success");
                    }

                    sequenceList.Clear();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine();
                Console.WriteLine(instanceId.ToString("X") + ": exception occurred");
                Console.WriteLine(ex.ToString());
            }
        }

        private static bool IsBodyComplete(string item)
        {
            int pos = item.IndexOf(':');
            string txt = item.Substring(pos + 1);

            pos = 0;
            int cmp = 48; // 48 == '0'
            foreach (var ch in txt)
            {
                if (cmp != (int)ch)
                {
                    Console.WriteLine("Failed: at " + pos);
                    return false;
                }

                pos++;
                cmp++;
                if (cmp == 58) // 57 == '9'
                {
                    cmp = 48;
                }
            }

            return true;
        }

        private static bool IsSequenceIncrement(int instanceId, SortedList<int, bool> sequenceList)
        {
            int oldItem = sequenceList.Keys.Min() - 1;

            foreach (var item in sequenceList.Keys)
            {
                if ((item - oldItem) != 1)
                {
                    Console.WriteLine(instanceId.ToString("X") + ": Failed:" + oldItem + " and " + item);
                    return false;
                }

                oldItem = item;
            }

            return true;
        }

        private static byte[] MustReadBuffer(Socket clntSocket, int mustRead)
        {
            byte[] result = null;

            using (MemoryStream ms = new MemoryStream())
            {
                while (true)
                {
                    byte[] byteBuf = new byte[mustRead];
                    int recv = 0;
                    try
                    {
                        recv = clntSocket.Receive(byteBuf, mustRead, SocketFlags.None);
                    }
                    catch (System.Net.Sockets.SocketException ex)
                    {
                        // 클라이언트 측에서 데이터를 전송 도중,
                        //      1) 소켓을 Close한 경우
                        //      2) 클라이언트 응용 프로그램을 강제 종료하는 경우
                        return null;
                    }

                    if (recv == 0)
                    {
                        Console.WriteLine("Receive returns 0");
                        return null;
                    }

                    ms.Write(byteBuf, 0, recv);
                    mustRead -= recv;
                    if (mustRead == 0)
                    {
                        break;
                    }
                }

                ms.Flush();

                result = ms.ToArray();
            }

            return result;
        }
    }
}

클라이언트는 비교를 위해 단일 스레드와 다중 스레드로 나눠서 작성해 보겠습니다. 우선, 단일 스레드로 Send하는 Socket 클라이언트는 다음과 같이 작성합니다.

namespace SocketClientST
{
    class Program
    {
        static void Main(string[] args)
        {
            int instanceId = 0;

            try
            {
                string body = string.Empty;
                StringBuilder sb = new StringBuilder();

                // 0123456789... 가 10,000번 반복되는 문자열을 생성
                Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
                for (int i = 0; i < 10000; i++)
                {
                    sb.Append(body);
                }

                body = sb.ToString();

                int loopCount = 10000;
                int dot = loopCount / (80 * 25);

                // 단일 스레드에서 1개의 소켓 인스턴스로 10,000번 Send 테스트
                using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                {
                    IPAddress target = IPAddress.Loopback;
                    // IPAddress target = IPAddress.Parse("192.168.0.17");
                    EndPoint serverEP = new IPEndPoint(target, 11200);

                    int retryCount = 3;
                    bool connected = false;
                    while (retryCount-- > 0)
                    {
                        try
                        {
                            socket.Connect(serverEP);
                            connected = true;
                        }
                        catch { }
                    }

                    if (connected == false)
                    {
                        Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                        return;
                    }

                    byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                    socket.Send(instanceIdBuf);

                    int failedAt = -1;

                    // 0123456789... 가 10,000번 반복되는 문자열을 가진 CommonPacket 인스턴스를 
                    // 10,000번 루프를 돌면서 Socket.Send로 전송
                    for (int i = 0; i < loopCount; i++)
                    {
                        CommonPacket packet = new CommonPacket(i);
                        packet.AddData(body);

                        byte[] dataBuf = packet.GetBuffer();
                        bool result = MustSendBuffer(socket, dataBuf, dataBuf.Length);
                        if (result == false)
                        {
                            failedAt = i;
                            break;
                        }

                        if (i % dot == 0)
                        {
                            Console.Write(((int)instanceId).ToString("X") + ",");
                        }
                    }

                    socket.Close();

                    Console.WriteLine();

                    sb = new StringBuilder();
                    if (failedAt == -1)
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " is completed");
                        sb.AppendLine("TCP Client socket: Closed");
                        sb.AppendLine(loopCount + " times: data sent");
                        sb.AppendLine((body.Length / 1024) + "KB / packet");
                    }
                    else
                    {
                        sb.AppendLine("======" + ((int)instanceId).ToString("X") + " - failed");
                        sb.AppendLine("Send failed at: " + failedAt);
                    }

                    Console.WriteLine(sb.ToString());
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        // 전달된 버퍼를 Socket.Send로 전송
        private static bool MustSendBuffer(Socket socket, byte[] dataBuf, int mustSend)
        {
            int pos = 0;

            while (true)
            {
                int sentLength = 0;

                try
                {
                    // blocking call인 경우 sentLength == mustSend이지만!
                    sentLength = socket.Send(dataBuf, pos, mustSend, SocketFlags.None);
                }
                catch (Exception ex)
                {
                    int error = Marshal.GetLastWin32Error();
                    Console.WriteLine(error + ": " + ex.ToString());
                    return false;
                }

                if (sentLength == 0)
                {
                    return false;
                }

                mustSend -= sentLength;
                pos += sentLength;
                if (mustSend == 0)
                {
                    return true;
                }
            }
        }
    }

    public class CommonPacket
    {
        int _seq = 1;

        MemoryStream ms = new MemoryStream();

        public CommonPacket(int seqenceId)
        {
            _seq = seqenceId;
        }

        public void AddData(string txt)
        {
            int bodyLength = 0;
            string seqText = _seq.ToString();

            byte[] dataBuf = Encoding.UTF8.GetBytes(txt);
            byte[] seqBuf = Encoding.UTF8.GetBytes(seqText);

            bodyLength = dataBuf.Length + seqBuf.Length + 1; // 1 == ':'

            byte[] headerLength = BitConverter.GetBytes(bodyLength);

            ms.Write(headerLength, 0, headerLength.Length);

            ms.Write(seqBuf, 0, seqBuf.Length);
            ms.WriteByte((byte)':');
            ms.Write(dataBuf, 0, dataBuf.Length);
            ms.Flush();
        }

        public byte[] GetBuffer()
        {
            return ms.ToArray();
        }
    }
}

테스트 해보니, 서버 측 프로그램과 클라이언트의 정상 동작이 확인되었습니다. 마지막으로, 클라이언트 측을 다중 스레드를 이용해 Send를 호출하는 것으로 바꿔 보겠습니다.

namespace SocketClientMT
{
    class Program
    {
        class ThreadParam
        {
            public List<CommonPacket> Packets;
            public Socket Socket;
            public int Sent;
        }

        static void Main(string[] args)
        {
            int instanceId = 0;

            string body = string.Empty;
            StringBuilder sb = new StringBuilder();

            Array.ForEach(Enumerable.Range(0, 10).ToArray(), (elem) => body += elem.ToString());
            for (int i = 0; i < 10000; i++)
            {
                sb.Append(body);
            }

            body = sb.ToString();

            int loopCount = 10000;

            List<CommonPacket> packets = new List<CommonPacket>();

            // 1개의 소켓을 생성하고,
            using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
            {
                IPAddress target = IPAddress.Loopback; 
                // IPAddress target = IPAddress.Parse("192.168.0.17");
                EndPoint serverEP = new IPEndPoint(target, 11200);

                int retryCount = 3;
                bool connected = false;
                while (retryCount-- > 0)
                {
                    try
                    {
                        socket.Connect(serverEP);
                        connected = true;
                    }
                    catch { }
                }

                if (connected == false)
                {
                    Console.WriteLine("Connection failed: " + instanceId.ToString("X"));
                    return;
                }

                byte[] instanceIdBuf = BitConverter.GetBytes(instanceId);
                socket.Send(instanceIdBuf);

                // 일련번호가 붙은 10,000개의 버퍼를 준비한 다음,
                for (int i = 0; i < loopCount; i++)
                {
                    CommonPacket packet = new CommonPacket(i);
                    packet.AddData(body);

                    packets.Add(packet);
                }

                // 20개의 스레드에서 List에 보관된 CommonPacket을 가져가서 같은 소켓 인스턴스의 Send에 전달
                List<Thread> threads = new List<Thread>();
                ThreadParam threadParam = new ThreadParam();
                threadParam.Packets = packets;
                threadParam.Socket = socket;

                for (int i = 0; i < 20; i++)
                {
                    Thread aThread = new Thread(sendBufferThread);
                    aThread.IsBackground = true;
                    aThread.Start(threadParam);

                    threads.Add(aThread);
                }

                // 모든 스레드의 실행이 종료될 때까지 대기
                foreach (var item in threads)
                {
                    item.Join();
                }

                socket.Close();
            }
        }

        // 스레드 메서드: Packets 리스트에 보관된 CommonPacket 데이터를 경쟁적으로 가져와서
        //               서버 측에 Send 메서드로 전달.
        private static void sendBufferThread(object obj)
        {
            ThreadParam threadParam = (ThreadParam)obj;

            while (true)
            {
                CommonPacket packet = null;

                lock (threadParam.Packets)
                {
                    if (threadParam.Packets.Count == 0)
                    {
                        break;
                    }

                    packet = threadParam.Packets[0];
                    threadParam.Packets.RemoveAt(0);
                    threadParam.Sent++;
                }

                byte[] dataBuf = packet.GetBuffer();
                MustSendBuffer(threadParam.Socket, dataBuf, dataBuf.Length);
            }
        }
    }
}

물론, 다중 스레드 클라이언트도 실행해 보면 서버 측에서의 검증 결과를 잘 통과하지만 이것은 테스트에 의한 실험값에 불과하기 때문에 절대적인 증명이라고 볼 수는 없습니다. 그래도 분명히 MSDN 문서에 thread-safe하다고 되어 있으므로 위의 예제 코드는 단순 확인용이라는 수준으로 놓고 보면 되겠습니다. (혹시, 테스트 코드의 조건에 의문 사항이나 개선이 필요하면 덧글 부탁드립니다.)

첨부 파일은 위의 코드를 반영한 서버, 단일 스레드 클라이언트, 다중 스레드 클라이언트 예제 프로젝트를 담고 있습니다.





[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 7/17/2021]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 



2013-08-06 11시54분
[ryujh] 안녕하세요. 소켓 소스 많은 도움될것 같습니다.

소스 분석 중 서버측 소스에서 IsSequenceIncrement 메소드 에 SortedList 를 사용하여 정렬된 키값이 연속적이지 않으면 검증되지 않은 것으로 나오는데

SortedList 를 사용하지 않아도 IsSequenceIncrement 메소드 에서 Sort 하여 키를 정렬시켜도 문제 없는 것이죠?

Thread-safe 관련 내용은 더 분석해보겠습니다. 이상입니다.
[guest]
2013-08-07 12시24분
@ryujh 님, 맞습니다. 굳이 SortedList를 사용하지 않고 일반 List에서 일련 번호만 제대로 검증할 수 있다면 어떤 식으로 해도 상관없습니다. ^^
정성태

... 91  [92]  93  94  95  96  97  98  99  100  101  102  103  104  105  ...
NoWriterDateCnt.TitleFile(s)
11635정성태8/1/201818710오류 유형: 472. C# 컴파일 오류 - Your project is not referencing the ".NETFramework,Version=v3.5" framework.
11634정성태8/1/201821673.NET Framework: 790. .NET Thread 상태가 Cooperative일 때 GC hang 현상 재현 방법파일 다운로드1
11633정성태7/29/201825639Graphics: 15. Unity - shader의 World matrix(unity_ObjectToWorld)를 수작업으로 구성 [2]파일 다운로드1
11632정성태7/28/201827936Graphics: 14. C# - Unity에서 캐릭터가 바라보는 방향을 기준으로 카메라의 위치 이동 및 회전하는 방법
11631정성태7/27/201829900Graphics: 13. Unity로 실습하는 Shader (9) - 투명 배경이 있는 텍스처 입히기 [1]
11630정성태7/27/201825029개발 환경 구성: 391. (GitHub 등과 직접 연동해) 소스 코드 디버깅을 쉽게 해 주는 SourceLink [3]
11629정성태7/26/201823814.NET Framework: 789. C# 컴파일 옵션 - Check for arithmetic overflow/underflow [2]
11628정성태7/25/201825654Graphics: 12. Unity로 실습하는 Shader (8) - 다중 패스(Multi-Pass Shader)
11627정성태7/25/201820061개발 환경 구성: 390. C# - 컴파일러 옵션 OSS signing / Public Signing
11626정성태7/25/201818416오류 유형: 471. .C++ 함수를 const로 바꾼 경우 C2440 컴파일 오류가 발생한다면?
11625정성태7/24/201817646Math: 49. GeoGebra 기하 (25) - 타원의 중심점 찾기파일 다운로드1
11624정성태7/24/201822080개발 환경 구성: 389. C# - 재현 가능한 빌드(reproducible builds) == Deterministic builds [4]
11623정성태7/24/201821465Math: 48. C# - 가우시안 함수의 이산형(discrete) 커널 값 생성파일 다운로드1
11622정성태7/23/201821621개발 환경 구성: 388. Windows 환경에서 Octave 패키지 설치하는 방법
11621정성태7/23/201819232VC++: 127. 멤버 함수에 대한 포인터를 외부에서 호출하는 방법파일 다운로드1
11620정성태7/22/201822490Graphics: 11. Unity로 실습하는 Shader (7) - Blur (평균값, 가우스, 중간값) 필터 [1]파일 다운로드1
11619정성태7/21/201821530Graphics: 10. Unity로 실습하는 Shader (6) - Mosaic Shading
11618정성태7/20/201818615개발 환경 구성: 387. 삼성 오디세이(Odyssey) 노트북의 운영체제를 새로 설치하는 방법
11617정성태7/20/201819396Team Foundation Server: 50. TFS 소스 코드 관리 기능 (5) - "Rollback", "Rollback Entire Changeset"
11616정성태7/17/201818756Graphics: 9. Unity Shader - 전역 변수의 초기화
11615정성태7/17/201823096.NET Framework: 788. RawInput을 이용한 키보드/마우스 입력 모니터링파일 다운로드1
11614정성태7/17/201825328Graphics: 8. Unity Shader - Texture의 UV 좌표에 대응하는 Pixel 좌표
11613정성태7/16/201821627Graphics: 7. Unity로 실습하는 Shader (5) - Flat Shading
11612정성태7/16/201820596Windows: 148. Windows - Raw Input의 Top level collection 의미
11611정성태7/15/201820828Graphics: 6. Unity로 실습하는 Shader (4) - 퐁 셰이딩(phong shading)
11610정성태7/15/201817378Graphics: 5. Unity로 실습하는 Shader (3) - 고로 셰이딩(gouraud shading) + 퐁 모델(Phong model) + Texture
... 91  [92]  93  94  95  96  97  98  99  100  101  102  103  104  105  ...