Microsoft MVP성태의 닷넷 이야기
닷넷: 2146. C# - ConcurrentDictionary 자료 구조의 동기화 방식 [링크 복사], [링크+제목 복사],
조회: 12535
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
 
(연관된 글이 1개 있습니다.)

C# - ConcurrentDictionary 자료 구조의 동기화 방식

이에 대해 묻는 질문이 있었는데... ^^; 제가 주말에 일이 있어 답변을 못했더니 그사이 삭제가 되었습니다.

암튼, 기존 DictionaryConcurrentDictionary의 구조를 잠시 살펴볼까요? ^^

우선 Dictionary는 동기화가 제공되지 않습니다. 따라서 당연히 다중 스레드에서의 접근이 안전하지 않으므로 반드시 lock을 걸어줘야 합니다.

재미있는 점이 하나 있다면, Dictionary의 경우 동기화는 제공하지 않지만 기초적인 수준에서의 "깨짐"을 예방하기 위한 수단은 제공한다는 점입니다. 일례로, 열거(enumeration)하는 동안 다른 스레드에서 Write 작업을 하게 되면 무조건 예외를 발생시키는 구조로 작성되었습니다.

이 현상을 간단하게 다음의 코드를 사용하면 재현할 수 있습니다.

namespace ConsoleApp2;

internal class Program
{
    static void Main(string[] args)
    {
        Dictionary<int, int> dict = new Dictionary<int, int>();

        Thread t = new Thread(EnumThreadProc);
        t.Start(dict);

        Thread.Sleep(1000);
        dict[7] = 7;

        t.Join();
    }

    private static void EnumThreadProc(object? obj)
    {
        Dictionary<int, int>? dict = obj! as Dictionary<int, int>;

        foreach (var item in dict!)
        {
            Console.WriteLine(item);
            Thread.Sleep(2000);
        }
    }
}

/* 출력 결과:
[5, 5]
Unhandled exception. System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
   at System.Collections.Generic.Dictionary`2.Enumerator.MoveNext()
   at ConsoleApp2.Program.EnumThreadProc(Object obj)
*/

위의 코드는 EnumThreadProc에서 Dictionary를 열거하는 동안, 다른 스레드에서 "dict[7] = 7" 코드를 수행해 변경하므로 예외가 발생하는데요, 이렇게 할 수 있었던 것은 Dictionary 내에 _version 숫자 필드를 이용해 변경이 있을 때마다 매번 값을 증가시키기 때문입니다.

아래는 Add 메서드에서 호출하는 TryInsert에서 _version 필드의 값이 증가하는 것을 보여주는데,

namespace System.Collections.Generic
{
    // ...[생략]...
    public class Dictionary<TKey, TValue> : IDictionary<TKey, TValue>, IDictionary, IReadOnlyDictionary<TKey, TValue>, ISerializable, IDeserializationCallback where TKey : notnull
    {
        // ...[생략]...

        private int _freeCount;
        private int _version;

        // ...[생략]...

        private bool TryInsert(TKey key, TValue value, InsertionBehavior behavior)
        {
            // ...[생략]...

            ref Entry entry = ref entries![index];
            entry.hashCode = hashCode;
            entry.next = bucket - 1; // Value in _buckets is 1-based
            entry.key = key;
            entry.value = value;
            bucket = index + 1; // Value in _buckets is 1-based
            _version++;

            // ...[생략]...

            return true;
        }

        // ...[생략]...
    }
}

이와 함께 Dictionary의 Enumerator.MoveNext 호출 때마다 매번 _version이 바뀐 것은 아닌지 검사하는 코드가 있기 때문에,

public struct Enumerator : IEnumerator<KeyValuePair<TKey, TValue>>, IDictionaryEnumerator
{
    private readonly Dictionary<TKey, TValue> _dictionary;
    private readonly int _version;
    // ...[생략]...

    internal Enumerator(Dictionary<TKey, TValue> dictionary, int getEnumeratorRetType)
    {
        _dictionary = dictionary;
        _version = dictionary._version;
        _index = 0;
        _getEnumeratorRetType = getEnumeratorRetType;
        _current = default;
    }

    public bool MoveNext()
    {
        if (_version != _dictionary._version)
        {
            ThrowHelper.ThrowInvalidOperationException_InvalidOperation_EnumFailedVersion();
        }

        // ...[생략]...
    }

    // ...[생략]...
}

예외가 발생할 수 있는 구조입니다. 간단하죠? ^^ 관련해서 아래의 글에서도 _version 필드에 대한 내용을 실었으니 참고하시고.

IEnumerator는 언제나 읽기 전용일까?
; https://www.sysnet.pe.kr/2/0/1308




그렇다면 ConcurrentDictionary는 어떻게 저걸 개선했을까요? 이름에서 알리고 싶은 것처럼, 이것은 다중 스레드에서의 접근을 허용합니다.

따라서, 당연히 다른 스레드에서의 읽기/쓰기가 가능합니다. 실제로 위의 예제 코드에서 Dictionary 타입을 그대로 ConcurrentDictionary로 바꾸기만 한 다음 실행하면 이번엔 InvalidOperationException 예외가 발생하지 않습니다. 이를 위해 _version 필드는 제거되었고, lock을 써서 업데이트에 따른 자료 구조가 깨지는 것을 방지합니다.

관련 코드를 간단하게 살펴볼까요? ^^

우선 열거의 경우, 중간에 목록이 추가되거나 삭제되어도 이를 반영하지 못합니다.

private sealed class Enumerator : IEnumerator<KeyValuePair<TKey, TValue>>
{
    private readonly ConcurrentDictionary<TKey, TValue> _dictionary;

    // ...[생략]...

    public bool MoveNext()
    {
        switch (_state)
        {
            // ...[생략]...

            case StateOuterloop: // bucket을 순회
                ConcurrentDictionary<TKey, TValue>.Node?[]? buckets = _buckets;
                Debug.Assert(buckets != null);

                int i = ++_i;
                if ((uint)i < (uint)buckets.Length)
                {
                    // The Volatile.Read ensures that we have a copy of the reference to buckets[i]:
                    // this protects us from reading fields ('_key', '_value' and '_next') of different instances.
                    _node = Volatile.Read(ref buckets[i]);
                    _state = StateInnerLoop;
                    goto case StateInnerLoop;
                }
                goto default;

            case StateInnerLoop: // bucket의 연결 리스트를 순회
                Node? node = _node;
                if (node != null)
                {
                    Current = new KeyValuePair<TKey, TValue>(node._key, node._value);
                    _node = node._next;
                    return true;
                }
                goto case StateOuterloop;

            // ...[생략]...
        }
    }
}

bucket의 경우 현재 배열에 있는 것을 그대로 가져오고, 일단 한번 hash code에 해당하는 슬롯이 비어 있어 지나가면(순회하면) 그걸로 끝입니다. 이전 슬롯에 새로운 hash code에 해당하는 값이 업데이트(추가/삭제) 되었어도 그걸 다시 찾아가서 순회하지는 않습니다.

또한, bucket 내의 연결 리스트도 마찬가지입니다. Current에 해당하는 노드가 삭제되었어도 (atomic하게 업데이트 가능한 포인터 크기만큼의) _next가 null이면 다음 bucket으로 넘어가고, 값이 있으면 계속 열거하게 되어 있습니다.

그럼 Add/Update/Delete 작업에 lock이 어떻게 걸리는지 한번 볼까요? ^^

public TValue AddOrUpdate(TKey key, Func<TKey, TValue> addValueFactory, Func<TKey, TValue, TValue> updateValueFactory)
{
    // ...[생략]...

    IEqualityComparer<TKey>? comparer = _comparer;
    int hashcode = comparer is null ? key.GetHashCode() : comparer.GetHashCode(key);

    while (true)
    {
        if (TryGetValueInternal(key, hashcode, out TValue? oldValue))
        {
            // key exists, try to update
            TValue newValue = updateValueFactory(key, oldValue);
            if (TryUpdateInternal(key, hashcode, newValue, oldValue))
            {
                return newValue;
            }
        }
        else
        {
            // key doesn't exist, try to add
            if (TryAddInternal(key, hashcode, addValueFactory(key), updateIfExists: false, acquireLock: true, out TValue resultingValue))
            {
                return resultingValue;
            }
        }
    }
}

private bool TryAddInternal(TKey key, int? nullableHashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
{
    IEqualityComparer<TKey>? comparer = _comparer;

    // ...[생략]...

    int hashcode =
        nullableHashcode ??
        (comparer is null ? key.GetHashCode() : comparer.GetHashCode(key));

    while (true)
    {
        Tables tables = _tables;
        object[] locks = tables._locks;
        ref Node? bucket = ref tables.GetBucketAndLock(hashcode, out uint lockNo);

        bool resizeDesired = false;
        bool lockTaken = false;
        try
        {
            if (acquireLock)
            {
                Monitor.Enter(locks[lockNo], ref lockTaken);
            }

            // ...[생략]...

            Node? prev = null;
            for (Node? node = bucket; node != null; node = node._next)
            {
                // ...[생략]...
                if (hashcode == node._hashcode && (comparer is null ? _defaultComparer.Equals(node._key, key) : comparer.Equals(node._key, key)))
                {
                    if (updateIfExists)
                    {
                        if (s_isValueWriteAtomic)
                        {
                            node._value = value;
                        }
                        else
                        {
                            var newNode = new Node(node._key, value, hashcode, node._next);
                            if (prev is null)
                            {
                                Volatile.Write(ref bucket, newNode);
                            }
                            else
                            {
                                prev._next = newNode;
                            }
                        }
                        resultingValue = value;
                    }
                    else
                    {
                        resultingValue = node._value;
                    }
                    return false;
                }
                prev = node;
            }

            var resultNode = new Node(key, value, hashcode, bucket);
            Volatile.Write(ref bucket, resultNode);

            // ...[생략]...
        }
        finally
        {
            if (lockTaken)
            {
                Monitor.Exit(locks[lockNo]);
            }
        }

        // ...[생략]...
    }
}

성능을 위해 lock을 전체적으로 걸지 않고 bucket 요소 단위로 걸고 있기 때문에 hash code가 같지 않다면 동시에 업데이트가 가능하게 작성했습니다.

이 상황을 Visual Studio의 디버깅 기능을 이용하면 실제로 테스트하는 것도 가능합니다. 일례로 다음과 같이 예제를 구성하고,

using System.Collections.Concurrent;

namespace ConsoleApp3;

internal class Program
{
    static void Main(string[] args)
    {
        ConcurrentDictionary<int, int> dict = new ConcurrentDictionary<int, int>();

        Thread t = new Thread(UpdateThreadProc);
        t.Start(dict);

        while (true)
        {
            Console.ReadLine();
            dict[2] = 8;
        }
    }

    private static void UpdateThreadProc(object? obj)
    {
        ConcurrentDictionary<int, int>? dict = obj! as ConcurrentDictionary<int, int>;

        dict![2] = 7;
    }
}

이후, ConcurrentDictionary의 Source Code에 TryAddInternal 메서드의 Monitor.Enter 위치에 앞/뒤로 각각 BP를 걸어둬 F5 키를 눌러 실행합니다. 테스트의 의도가 대충 그려지시죠? ^^

UpdateThreadProc에 의해 TryAddInternal 내부의 Monitor.Enter 뒤에 있는 BP까지 실행한 다음, 해당 스레드를 Visual Studio 스레드 창(Ctrl+Alt+H)을 이용해 "Freeze" 시킵니다.

이제 프로세스 실행을 재개하면 Monitor.Lock은 점유된 상태가 되고, 콘솔에 아무 키나 누르면 "dict[2] = 8;"가 실행돼 다시 Monitor.Enter 위치의 앞에 있는 BP에서 멈추게 될 것입니다. 당연히 hash code가 같기 때문에 bucket의 위치도, 그것에 대한 lock도 같아 이런 경우 Monitor.Enter 실행 시 블록킹이 걸리게 됩니다.

concurrent_lock_1.png

반면, hash code가 다르도록 코드를 바꾸면,

while (true)
{
    Console.ReadLine();
    dict[3] = 8;
}

같은 bucket을 공유하지 않기 때문에 lock도 달라져 blocking이 걸리지 않습니다.




잘 이해가 되지 않아도 상관없습니다. ^^; 대충 ConcurrentDictionary는 다중 스레드에서 안전하게 사용할 수 있도록 마이크로소프트가 잘 만들어 놨다고만 여겨도 무리가 없을 것입니다.

참고로, python의 경우에도 Dictionary는 스레드에 안전하지 않아 유사한 현상이 발생합니다.




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]

[연관 글]






[최초 등록일: ]
[최종 수정일: 9/25/2023]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... [61]  62  63  64  65  66  67  68  69  70  71  72  73  74  75  ...
NoWriterDateCnt.TitleFile(s)
12414정성태11/18/202019587VC++: 138. x64 빌드에서 extern "C"가 아닌 경우 ___cdecl name mangling 적용 [4]파일 다운로드1
12413정성태11/17/202018544.NET Framework: 970. .NET 5 / .NET Core - UnmanagedCallersOnly 특성을 사용한 함수 내보내기파일 다운로드1
12412정성태11/16/202020706.NET Framework: 969. .NET Framework 및 .NET 5 - UnmanagedCallersOnly 특성 사용파일 다운로드1
12411정성태11/12/202017476오류 유형: 680. C# 9.0 - Error CS8889 The target runtime doesn't support extensible or runtime-environment default calling conventions.
12410정성태11/12/202017631디버깅 기술: 174. windbg - System.TypeLoadException 예외 분석 사례
12409정성태11/12/202019457.NET Framework: 968. C# 9.0의 Function pointer를 이용한 함수 주소 구하는 방법파일 다운로드1
12408정성태11/9/202034700도서: 시작하세요! C# 9.0 프로그래밍 [8]
12407정성태11/9/202019817.NET Framework: 967. "clr!JIT_DbgIsJustMyCode" 호출이 뭘까요?
12406정성태11/8/202020803.NET Framework: 966. C# 9.0 - (15) 최상위 문(Top-level statements) [5]파일 다운로드1
12405정성태11/8/202018687.NET Framework: 965. C# 9.0 - (14) 부분 메서드에 대한 새로운 기능(New features for partial methods)파일 다운로드1
12404정성태11/7/202019315.NET Framework: 964. C# 9.0 - (13) 모듈 이니셜라이저(Module initializers)파일 다운로드1
12403정성태11/7/202018193.NET Framework: 963. C# 9.0 - (12) foreach 루프에 대한 GetEnumerator 확장 메서드 지원(Extension GetEnumerator)파일 다운로드1
12402정성태11/7/202019725.NET Framework: 962. C# 9.0 - (11) 공변 반환 형식(Covariant return types) [1]파일 다운로드1
12401정성태11/5/202018972VS.NET IDE: 153. 닷넷 응용 프로그램에서의 "My Code" 범위와 "Enable Just My Code"의 역할 [1]
12400정성태11/5/202015182오류 유형: 679. Visual Studio - "Source Not Found" 창에 "Decompile source code" 링크가 없는 경우
12399정성태11/5/202018690.NET Framework: 961. C# 9.0 - (10) 대상으로 형식화된 조건식(Target-typed conditional expressions)파일 다운로드1
12398정성태11/4/202018225오류 유형: 678. Windows Server 2008 R2 환경에서 Powershell을 psexec로 원격 실행할 때 hang이 발생하는 문제
12397정성태11/4/202018270.NET Framework: 960. C# - 조건 연산자(?:)를 사용하는 경우 달라지는 메서드 선택 사례파일 다운로드1
12396정성태11/3/202015261VS.NET IDE: 152. Visual Studio - "Tools" / "External Tools..."에 등록된 외부 명령어에 대한 단축키 설정 방법
12395정성태11/3/202018074오류 유형: 677. SSMS로 DB 접근 시 The server principal "..." is not able to access the database "..." under the current security context.
12394정성태11/3/202015692오류 유형: 676. cacls - The Recycle Bin on ... is corrupted. Do you want to empty the Recycle Bin for this drive?
12393정성태11/3/202015332오류 유형: 675. Visual Studio - 닷넷 응용 프로그램 디버깅 시 Disassembly 창에서 BP 설정할 때 "Error while processing breakpoint." 오류
12392정성태11/2/202019863.NET Framework: 959. C# 9.0 - (9) 레코드(Records) [4]파일 다운로드1
12390정성태11/1/202019529디버깅 기술: 173. windbg - System.Configuration.ConfigurationErrorsException 예외 분석 방법
12389정성태11/1/202018700.NET Framework: 958. C# 9.0 - (8) 정적 익명 함수 (static anonymous functions)파일 다운로드1
12388정성태10/29/202017722오류 유형: 674. 어느 순간부터 닷넷 응용 프로그램 실행 시 System.Configuration.ConfigurationErrorsException 예외가 발생한다면?
... [61]  62  63  64  65  66  67  68  69  70  71  72  73  74  75  ...