《Windows核心编程》读书笔记八 用户模式下的内核同步


第八章 用户模式下的线程同步


本章内容

8.1 原子访问:Interlocked系列函数

8.2 高速缓存行

8.3 高级线程同步

8.4 关键段(临界区)

8.5 Slim 读/写锁

8.6 条件变量


什么是用户模式? Windows系统上的CPU运行时有两种模式,User Mode 和 Kernel Mode(CPU硬件支持)详细参照MSDN

https://docs.microsoft.com/en-us/windows-hardware/drivers/gettingstarted/user-mode-and-kernel-mode

通常应用程序和一些的驱动运行于User Mode 使用进程自己的私有的虚拟内存地址,进程crash不会影响操作系统

操作系统内核和另一些驱动运行于Kernel Mode 使用公有的共享内存地址,crash会影响操作系统




在以下两种基本情况下,线程之间需要互相通信。

1)需要让多个线程同时访问一个共享资源,同时不能破坏资源的完整性。(互斥)

2)一个线程需要通知其他线程某项任务已经完成。(同步)


8.1 原子访问:Interlocked系列函数

原子访问atomic access,指一个线程在访问某个资源的同时能够保证没有其他线程会在同一时刻访问同一资源。

书上举了一个例子(汇编代码不贴)

typedef unsigned(__stdcall *PTHREAD_START) (void *);
// Define a global variable.
long g_x = 0;

DWORD WINAPI ThreadFunc1(PVOID pvParam) {
g_x++;
return 0;
}

DWORD WINAPI ThreadFunc2(PVOID pvParam) {
g_x++;
return 0;
}
在主线程中创建两个子线程同时去递增全局变量g_x。最终g_x的值是多少。

由于这依赖于线程调度和编译器,所以g_x的值可能是1 也可能是2. 产生了资源竞争(race condition)

因此可以使用Interlocked函数来保证原子操作

unsigned long
InterlockedExchangeAdd(
_Inout_ _Interlocked_operand_ unsigned long volatile *Addend,
_In_ unsigned long Value
)
{
return (unsigned long) _InterlockedExchangeAdd((volatile long*) Addend, (long) Value);
}

LONG64
InterlockedExchangeAdd64 (
_Inout_ _Interlocked_operand_ LONG64 volatile *Addend,
_In_ LONG64 Value
);


修改以后的代码。

// Define a global variable.
long g_x = 0;

DWORD WINAPI ThreadFunc1(PVOID pvParam) {
InterlockedExchangeAdd(&g_x, 1);
return 0;
}

DWORD WINAPI ThreadFunc2(PVOID pvParam) {
InterlockedExchangeAdd(&g_x, 1);
return 0;
}

InterlockedIncrement 函数可以以原子操作的方式递增数值。

所有线程都应该调用这些函数来修改共享变量的值,不应该使用简单的C++语句来修改共享变量(因为这依赖运行环境,编译器和最终的机器指令,如果代码做平台移植,结果是未定义的)。


Interlocked函数是如何工作的? 这取决于代码运行的CPU平台。如果是x86系列cpu,Interlocked函数会在总线上维持一个硬件信号,该信号会阻止其他CPU访问一个内存地址。另外传递给该函数的变量 必须是经过地址对齐的,否则可能会失败。

(x86 支持硬件LOCK指令 实现原理同上。  IA64 直接支持原子操作load-modify-store)

 在非x86和x86_64体系的CPU上可能会分成Load-link/store-condition 

类似实现如下

 1 LONG InterlockedIncrement( LONG volatile *value )
2 {
3 LONG lOriginal, lNewValue;
4 do
5 {
6 //
7 //通过load-link读取当前值
8 //可以知道写回之前是否有人修改它
9 //
10 lOriginal = load_link(value);
11
12 //
13 //计算新的值
14 //
15 lNewValue = lOriginal + 1;
16
17 //
18 //有条件的写回新值
19 //如果有人在计算期间覆写该值,则函数返回失败
20 //
21 } while ( !store_conditional(value, lNewValue));
22 return lNewValue;
23 }
所以在load_link和store_conditional之间的指令越长该操作可能越容易失败。


C运行库提供一个_aligned_malloc函数来保证分配的一块对齐过的内存。

void * __cdecl _aligned_malloc(_In_ _CRT_GUARDOVERFLOW size_t _Size, _In_ size_t _Alignment);

size表示要分配的字节数, alignment必须是2的整数幂次方


其他3个Interlocked函数

LONG
InterlockedExchange (
_Inout_ _Interlocked_operand_ LONG volatile *Target,
_In_ LONG Value
);

LONG64
InterlockedExchange64 (
_Inout_ _Interlocked_operand_ LONG64 volatile *Target,
_In_ LONG64 Value
);

PVOID
__cdecl
InterlockedExchangePointer(
_Inout_ _Interlocked_operand_ PVOID volatile *Target,
_In_ PVOID Value
);

InterlockedExchangeInterlockedExchangePointer 在32位app下都是替换值32位操作

在64位下 前者替换32位值 后者替换64位值。

两者的返回值都是第一个参数传入的地址的原来值

在实现旋转锁(spinlock)的时候,InterlockedExchange极有用。

// Global variable indicating whether a shared resource is in use or not
BOOL g_fResourceInUse = FALSE;
void Func1() {
// Wait to access the resource.
while (InterlockedExchange((LONG*)&g_fResourceInUse, TRUE) == TRUE) //if the resource is in use sleep the thread.
Sleep(0);

// Access the resource.
// do something.

// we no longer need to access the resource.
InterlockedExchange((LONG*)&g_fResourceInUse, FALSE);
}

他实现了一个简单的线程共享资源


使用旋转锁要极为小心,因为旋转锁会耗费CPU时间。CPU在不断的对比两个值,直到另一个线程“神器地”改变了其中一个值为止。而且这种代码还假定所用使用旋转锁的线程都以相同的优先级运行。

通常使用旋转锁,调用SetProcessPriorityBoost或者SetThreadPriorityBoost来禁用系统优先级提升。


此外,我们还必须确保锁变量和锁所保护的数据位于不同的高速缓存行。如果所变量和数据共享同一高速缓存行,那么使用资源的CPU就会与任何试图访问资源的CPU发生争夺,这将会影响性能。


在单CPU上应该尽量避免使用旋转锁。一个线程不停的循环不仅浪费cpu资源还会阻止其他线程改变锁的值。 上面的代码使用了Sleep能略微改善这种情况。

如果使用Sleep可以每次睡眠一段随机时间,如果对资源的访问被拒绝时,可以进一步增加睡眠时间。可以避免让线程浪费cpu时间。事实上这里改成SwitchToThread可能更好。

// Global variable indicating whether a shared resource is in use or not
BOOL g_fResourceInUse = FALSE;
void Func1() {
// Wait to access the resource.
while (InterlockedExchange((LONG*)&g_fResourceInUse, TRUE) == TRUE) //if the resource is in use sleep the thread.
// to improve the performance we use switch to thread
SwitchToThread(); //Sleep(0);

// Access the resource.
// do something.

// we no longer need to access the resource.
InterlockedExchange((LONG*)&g_fResourceInUse, FALSE);
}

旋转锁假定被保护的资源只会被占用一小段时间。与切换到内核模式然后等待对比,这种情况以循环的方式进行等待的效率会更高。

当然可以指定一定的循环次数,如果仍然无法访问资源,那么线程就会切换到内核模式,并一直等到资源可以使用位置(此时不消耗cpu时间),这就是关键段(临界区)的实现方式。


在多处理器上旋转锁比较有用,这是因为当一个线程在一个cpu上运行时,另一个线程可以在另一个cpu上循环等待。(但是还是要小心旋转锁带来的过高的CPU开销问题)

最后两个Interlocked交换函数

LONG
InterlockedCompareExchange (
_Inout_ _Interlocked_operand_ LONG volatile *Destination,
_In_ LONG ExChange,
_In_ LONG Comperand
);


_Ret_writes_(_Inexpressible_(Unknown)) PVOID
InterlockedCompareExchangePointer (
_Inout_ _At_(*Destination,
_Pre_writable_byte_size_(_Inexpressible_(Unknown))
_Post_writable_byte_size_(_Inexpressible_(Unknown)))
_Interlocked_operand_ PVOID volatile *Destination,
_In_opt_ PVOID Exchange,
_In_opt_ PVOID Comperand
);

以原子方式执行一个测试和设置操作。对32位来说应用程序来说,这两个函数都执行32位值操作。 64位程序来说,前者操作32位值。后者操作64位值。

查看以下伪代码。(以下代码只展示逻辑,实际上所有这些操作都是以原子方式进行的。)

LONG InterlockedCompareExchange(PLONG plDestination,
LONG lExchange, LONG lComparand) {

LONG lRet = *plDestination; // Original value

if (*plDestination == lComparand)
*plDestination = lExchange;
return lRet;
}
函数将Destination的值和Comparand比较。如果相同,则将Destination修改为exchange。

如果Destination的值不等于Comparand,则值不变。函数返回原来的pDestination值。所有这些操作都是以原子方式进行的。

还有64位版本,用来处理已对齐的64位值。

LONGLONG
__cdecl
InterlockedCompareExchange64 (
_Inout_ _Interlocked_operand_ LONGLONG volatile *Destination,
_In_ LONGLONG ExChange,
_In_ LONGLONG Comperand
);

另外多个进程需要读一个共享内存段,也可以使用Interlocked函数。

LONG InterlockedIncrement(PLONG plAddend);

LONG InterlockedDecrement(PLONG plAddend);

还有一些列基于InterlockedCompareExchange64的OR, AND和XOR函数

例如

FORCEINLINE
LONG64
_InterlockedAnd64 (
_Inout_ _Interlocked_operand_ LONG64 volatile *Destination,
_In_ LONG64 Value
)
{
LONG64 Old;

do {
Old = *Destination;
} while (InterlockedCompareExchange64(Destination,
Old & Value,
Old) != Old);

return Old;
}

在WinXP以后的系统除了能对整数或者布尔值进行原子操作,还能使用一系列其他函数来对一种被称为Interlocked单向链表的栈进行操作。

栈中的每个操作如入栈出栈都是以原子方式进行。



8.2 高速缓存行

如果想为装配有多个处理器的机器构建高性能的应用程序,应该注意高速缓存行。

1. CPU读取数据的时候并不是直接从内存中取回,而是取回一个高速缓存行。(根据CPU不同可能是32字节,64甚至128字节)

2 .他们始终都对齐到32字节的边界(也可能是64,128字节)其目的是为了提高性能。

3. 一般程序会对一组相邻的字节操作。如果所有字节都在高速缓存行中,那么CPU不会访问内存总线(后者消耗的时间高于前者)。

但是在多处理器CPU中高速缓存行使得对内存的更新变得更加困难。

一个例子

1)CPU1读取一个字节,这使得该字节以及它相邻的字节被读到CPU1的高速缓存行中。

2)CPU2读取同一个字节,这使得该字节被读到CPU2的高速缓存行中。

3)CPU1对内存中的这个字节进行修改,这使得该字节被写入到CPU1的高速缓存行中。但是此时这一信息还没有写回到内存。

4)CPU2再次读取同一个字节。由于该字节已经在CPU2的高速缓存行中,因此CPU2不需要再访问内存。但CPU2无法看到该字节在内存中新的值。


为了解决这个问题,芯片设计者制定了一个规则。当一个 CPU 修改高速缓存行中的字节时,计算机中的其它 CPU 会被通知,它们的高速缓存将视为无效。于是,在上面的情况下, CPU2 发现自己的高速缓存中数据已无效, CPU1 将立即把自己的数据写回 RAM ,然后 CPU2 重新读取该数据。 可以看出,高速缓存行在多处理器上会导致一些不利。


因此在组织数据的时候应该遵循

1)根据高速缓存行大小将数据组织在一起,并且与高速缓存行对齐。

2)为了且确保不同的CPU能够各自访问不同的内存地址,而且这些地址不在同一个高速缓存行中。

3)只读与可读可写数据也分别存放。

4)会同时访问的数据尽量组织在一起

一个设计糟糕的数据结构

struct CUSTINFO {
DWORD dwCustomerID; // Mostly read-only
int nBalanceDue; // REad-write
wchar_t szName[100]; // Mostly read-only
FILETIME ftLastOrderDate; // Read-write
};

要确定CPU的高速缓存行的大小,可以调用Win32的GetLogicalProcessorInformation

WINBASEAPI
BOOL
WINAPI
GetLogicalProcessorInformation(
_Out_writes_bytes_to_opt_(*ReturnedLength, *ReturnedLength) PSYSTEM_LOGICAL_PROCESSOR_INFORMATION Buffer,
_Inout_ PDWORD ReturnedLength
);

该函数会返回一个SYSTEM_LOGICAL_PROCESSOR_INFORMATION结构数组。

检查其Cache字段,该成员是一个CACHE_DESCRIPTOR的结构。其中的LineSize就是CPU高速缓存的大小。

有了该信息可以使用C/C++编译器的__declspec(align(#))来只是对字段对其加以控制。以下是经过对其以后的数据额结构定义


可以查看本机的LineSize 是64,


#define CACHE_ALIGN 64

// Force each structure to be in a different cache line.
struct __declspec(align(CACHE_ALIGN)) CUSTINFO {
DWORD dwCustomerID; // Mostly read-only
wchar_t szName[100]; // Mostly read-only

// Force the following members to be in a different cache line.
__declspec(align(CACHE_ALIGN))
int nBalanceDue; // Read-write
FILETIME ftLastOrderDate; // read-write
};

最好是只让一个线程访问数据(函数参数和局部变量是最简单的方法)

或者只让一个CPU访问数据(线程的CPU关联性, thread affinity)


8.3 高级线程同步

旋转锁虽然对执行效率高,但是浪费cpu时间。尤其在单cpu上应该避免使用旋转锁。

需要一种机制能让线程等待共享资源的访问权,又不会浪费cpu时间。

调用一种操作系统函数将线程需要访问的资源或者需要等待的某种事件,传递给操作系统。线程自身将挂起。

操作系统代理线程去检测目标资源是否可以使用,或者特殊事件是否已经发生了。(在这个过程中线程本身不会被调度,一直处于等待状态)。


需要避免使用的一种方法

(在操作系统没有内建线程同步机制时,才使用。否则应该使用操作系统提供的线程同步方案)

volatile BOOL g_fFnishedCalculation = FALSE;

DWORD WINAPI RecalcFunc(PVOID pvParam) {
// Perform the recalculation.
//..
g_fFnishedCalculation = TRUE;
return 0;
}

int _tmain(int argc, TCHAR* argv[], TCHAR * env[])
{
CreateThread(..., RecalcFunc, ...);
//...
// Wait for the recalculation to complete.
while (!g_fFnishedCalculation)
;
//...


return 0;
}
缺点,主线程会一直被调度。夺走cpu时间。(轮循)


还有可能主线程优先级比RecalcFunc高,那么可能发生这种情况RelcalcFunc不会被调用(事实上在Win7以上,cpu会给处于饥饿状态(2-4秒)的线程临时提升线程优先级一个cpu时间片,所以这种情况在实际应用场合不大可能会发生)


可以使用主线程挂起(sleep)类似旋转锁。这样RecalcFunc有机会执行。

但是通用规则是,我们既不应该使用旋转锁,也不应该进行轮循,而应该用函数把线程切换到等待状态,直到线程想要访问的资源可以供使用为止。


volatile类型表示其修饰的变量会被应用程序以外的东西修改,如操作系统,硬件或者并发执行的线程。(volatile限定告诉编译器不要堆这个变量进行任何形式的优化,而是始终在其内存地址读取变量的值)


8.4 关键段(临界区)

关键段(critical section)是一小段代码,它在执行前需要独占对一些共享资源的访问权。除了当前线程以外,其他任何线程不可访问该资源。

在当前线程离开临界区之前,系统是不会去调度任何想要访问同一资源的其他线程的。当前系统仍然可以暂停当前线程去调度其他线程。



CRITICAL_SECTION g_s;

EnterCriticalSection(&g_s);

LeaveCriticalSection(&g_s);


临界区内部也使用了Interlock系的旋转锁,执行速度非常快。临界区的缺点在于无法在多个进程之间的线程进行同步。


8.4.1 关键段:细节

typedef struct _RTL_CRITICAL_SECTION {
PRTL_CRITICAL_SECTION_DEBUG DebugInfo;

//
// The following three fields control entering and exiting the critical
// section for the resource
//

LONG LockCount;
LONG RecursionCount;
HANDLE OwningThread; // from the thread's ClientId->UniqueThread
HANDLE LockSemaphore;
ULONG_PTR SpinCount; // force size on 64-bit systems when packed
} RTL_CRITICAL_SECTION, *PRTL_CRITICAL_SECTION;


使用CRITICAL_SECTION 应该用WindowsAPI来操作其成员,不要私自修改。

1)所有访问资源的线程需要知道CRITICAL_SECTION结构的地址

2)任何试图访问CRITICAL_SECTION的线程,需要将CRITICAL_SECTION的内部成员初始化。

VOID InitializeCriticalSection(PCRITICAL_SECTION pcs);


当线程不再需要临界区对象来保护共享资源时,

VOID DeleteCriticalSection(PCRITICAL_SECTION pcs);


EnterCriticalSection 会检查结构中的成员变量,这些变量表示是否有线程正在访问资源,以及哪个线程正在访问资源。

1 )如果没有线程正在访问资源,EnterCriticalSection会更新成员变量,以表示调用线程已经获准对资源的访问,并立即返回。

2)如果成员变量表示调用线程已经获准访问资源,那么EnterCriticalSection会更新变量,以表示线程被获准访问的次数,并立即放回。这样线程可以继续执行。(多次进入)

3)如果成员变量表示有其他线程正在访问资源,那么EnterCriticalSection会使用一个事件内核对象把调用线程切换到等待状态。(调用线程被挂起,不会浪费CPU时间)

操作系统会记住这个线程想要访问的资源,一旦当前正在访问的资源线程调用了LeaveCriticalSection,系统会自动更新CRITICAL_SECTION的成员变量并将等待中的线程切换回可调度状态。



BOOL TryEnterCriticalSection(PCRITICAL_SECTION pcs);(非阻塞)

测试当前线程是否可以访问此资源,可以返回TRUE 否则FALSE 不会被挂起等待。

若返回TRUE表示当前线程已经对次临界区对象有访问权,需要配一个LeaveCriticalSection


VOID LeaveCriticalSection(PCRITICAL_SECTION pcs);

该成员会递减CRITICAL_SECTION内部成员的线程引用计数器,如果计数器递减后为0.会更新成员变量,表示没有任何线程正在访问被保护的资源。

同时会检测有没有其他线程由于调用了EnterCriticalSection而处于等待状态。

如果有一个线程处于等待,则函数会更新成员变量,把其中一个处于等待的线程切换回可调度状态。


8.4.2 关键段和旋转锁

Microsoft为了提高关键段的性能(因为切换到内核模式等待需要1000个CPU周期),将旋转锁也合并入关键段。

在一段时间内不断循环尝试获得自由的访问权,只有尝试失败的时候,线程才会切换到内核模式并进入等待状态。

WINBASEAPI
_Must_inspect_result_
BOOL
WINAPI
InitializeCriticalSectionAndSpinCount(
_Out_ LPCRITICAL_SECTION lpCriticalSection,
_In_ DWORD dwSpinCount
);

第一个参数是关键段的地址,

第二个参数是希望旋转锁循环的次数。在单处理器上设置循环次数毫无用处因此会被忽略dwSpinCount。


调用以下函数改变关键段的旋转次数:

WINBASEAPI
DWORD
WINAPI
SetCriticalSectionSpinCount(
_Inout_ LPCRITICAL_SECTION lpCriticalSection,
_In_ DWORD dwSpinCount
);


同样如果主机只有一个处理器,函数会忽略dwSpinCount参数。

为了提高关键段的性能,应该同时使用旋转锁。通常保护进程堆关键段所使用的选中次数是4000.


8.4.3  关键段和错误处理

在使用InitializeCriticalSection函数会分配一些内存,如果失败会抛出STATUS_NO_MEMORY异常。

使用结构化异常可以捕获这个异常。


InitializeCriticalSectionAndSpinCount也能发生这个问题,如果内存分配不成功其返回FALSE


在多个线程抢占关键段资源的时候,系统会为其创建事件内核对象(挂起等待线程),所以在不使用某一个关键段时需要调用DeleteCriticalSection系统才会释放这个事件内核对象。


在WinXP之前,内存不足的时候,EnterCriticalSection(创建事件内核对象)可能会抛出EXCEPTION_INVALID_HANDLE异常。


使用InitializeCriticalSectionAndSpinCoun(dwSpinCount的参数最高位设为1)在初始化关键段时,同时初始化事件内核对象。如果无法创建事件内核对象, 那么函数会返回FALSE。

但是总是创建事件内核对象可能会耗费系统资源。只有在以下3种情况下才必须这样

1)不允许EnterCriticalSection失败

2)线程抢占资源的情况一定会发生

3)预计进程运行会在内存不足的环境下进行。


8.5 Slim 读/写锁

SRWLOCK. 区分那些想要读取资源的线程和更新资源的线程。

允许同时读取资源,但是对写入资源加以保护。

若资源被独占写入,任何其他线程,无论是读取还是写入都不允许访问资源。

SRWLOCK srwLock;

typedef struct _RTL_SRWLOCK {                            
PVOID Ptr;
} RTL_SRWLOCK, *PRTL_SRWLOCK;

因为其指向的地址是完全未公开的。不能编写代码来访问他。


VOID InitalizeSRWLock(PSRWLOCK SRWLock); 初始化读写锁。


对于需要写入资源的线程可以调用

VOID AcquireSRWLockExclusive(PSRWLOCK SRWLock); 独占被保护资源


VOID ReleaseSRWLockExclusive(PSRWLOCK SRWLock); 释放资源的锁定


对于读取线程来说可以调用

VOID AcquireSRWLockShared(PSRWLOCK SRWLock);

VOID ReleaseSRWLockShared(PSRWLOCK SRWLock);


不存在对SRWLOCK删除或销毁的函数,系统会自动进行清理工作。


和CRITICAL_SECTION比较,读写锁有几个缺点

1)不存在TryEnter(Shared/Exclusive)SRWLock之类的函数,如果锁被占用,那么调用AcquireSRWLOCK(Shared/Exclusive)的线程会被阻塞。

2)不能递归获得SRWLOCK ,也就是一个线程不能为了多次写入资源而多次锁定资源。然后再多次调用ReleaseSRWLock*来释放资源的锁定。


对比不同的UserMode下的线程同步机制性能对比。

对比

Volatile

Interlocked

CRITICAL_SECTION

SRWLock(Shared)

SRWLock(Exclusive)

Mutex

分别使用以上的线程同步机制在多线程的情况下对全局变量(资源)进行读写操作。

使用了上一章节的CStopwatch类来进行性能统计。(这个例子中学习统计分析程序性能的方法更重要

以下是测试代码,笔者自行增加了spinlock的性能测试。分别设置spinCount 10, 100 和1000

还增加了C++的std::mutex对象

由于使用了C++对象,创建线程改成了_beginThreadex(采用chBEGINTHREADEX宏,原例子使用CreateThread)

/*
Module: UserSyncCompare.cpp
Notice: Copyright (c) 2008 Jeffrey Richter & Christophe Nasarre
*/

#include "..\CommonFiles\CmnHdr.h"
#include <windows.h>
#include <stdio.h>
#include <tchar.h>
#include <mutex>

// Stop watch calss from Chapter 7
class CStopwatch {
public:
CStopwatch() { QueryPerformanceFrequency(&m_liPerfFreq); Start(); }

void Start() { QueryPerformanceCounter(&m_liPerfStart); }

// Returns # of milliseconds since Start was called
__int64 Now() const {
LARGE_INTEGER liPerfNow;
QueryPerformanceCounter(&liPerfNow);
return (((liPerfNow.QuadPart - m_liPerfStart.QuadPart) * 1000)
/ m_liPerfFreq.QuadPart);
}
private:
LARGE_INTEGER m_liPerfFreq; // Counts per second
LARGE_INTEGER m_liPerfStart; // Starting count
};


DWORD g_nIterations = 1000000;
typedef void(CALLBACK * OPERATIONFUNC)();

DWORD WINAPI ThreadIterationFunction(PVOID operationFunc) {
OPERATIONFUNC op = (OPERATIONFUNC)operationFunc;

for (DWORD iteration = 0; iteration < g_nIterations; iteration++) {
op();
}
return 0;
}


void MeasureConcurrentOperation(
TCHAR* operationName, DWORD nThreads, OPERATIONFUNC operationFunc) {
HANDLE * phThreads = new HANDLE[nThreads];

SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
for (DWORD currentThread = 0; currentThread < nThreads; currentThread++) {
phThreads[currentThread] =
//CreateThread(NULL, 0, ThreadIterationFunction, operationFunc, 0, NULL);
chBEGINTHREADEX(NULL, 0, ThreadIterationFunction, operationFunc, 0, NULL);
}
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_NORMAL);

CStopwatch watch;
WaitForMultipleObjects(nThreads, phThreads, TRUE, INFINITE);
__int64 elapsedTime = watch.Now();
_tprintf(
TEXT("Threads=%u, Milliseconds=%u, Test=%s\n"),
nThreads, (DWORD)elapsedTime, operationName);

// Don't forget to clean up the thread handles
for (DWORD currentThread = 0; currentThread < nThreads; currentThread++) {
CloseHandle(phThreads[currentThread]);
}
delete[] phThreads;
}

// -----------------------------------------------------------
// List of tests to run:
// -----------------------------------------------------------
// Reading from a volatile int with NO synchronization at all
// Writing to an int using InterlockedIncrement
// Reading from a volatile int using critical sections
// Read from a volatile int using SRWLock
// Read from a volatile int using Mutex
// -----------------------------------------------------------

volatile LONG gv_value = 0;

// 'lValue': local variable is initialized but not referenced
#pragma warning(disable:4189)
void WINAPI VolatileReadCallback()
{
volatile LONG lValue = gv_value;
}
#pragma warning(default:4189)

void WINAPI VolatileWriteCallback()
{
gv_value = 0;
}

void WINAPI InterlockedIncrementCallback()
{
InterlockedIncrement(&gv_value);
}

CRITICAL_SECTION g_cs;
void WINAPI CriticalSectionCallback()
{
EnterCriticalSection(&g_cs);
gv_value = 0;
LeaveCriticalSection(&g_cs);
}

HANDLE g_hMutex;
void WINAPI MutexCallback()
{
WaitForSingleObject(g_hMutex, INFINITE);
gv_value = 0;
ReleaseMutex(g_hMutex);
}


std::mutex g_stdMutex;
void WINAPI StdMutexCallback()
{
g_stdMutex.lock();
gv_value = 0;
g_stdMutex.unlock();
}

// Slim Reader/Writer Lock global variable
SRWLOCK g_srwLock;

void WINAPI SRWLockReadCallback()
{
AcquireSRWLockShared(&g_srwLock);
gv_value = 0;
ReleaseSRWLockShared(&g_srwLock);
}

void WINAPI SRWLockWriteCallback()
{
AcquireSRWLockExclusive(&g_srwLock);
gv_value = 0;
ReleaseSRWLockExclusive(&g_srwLock);
}


int _tmain(int argc, _TCHAR* argv[], _TCHAR* env[]) {

for (int nThreads = 1; nThreads <= 4; nThreads *= 2) {
MeasureConcurrentOperation(TEXT("Volatile Read"), nThreads, VolatileReadCallback);
MeasureConcurrentOperation(TEXT("Volatile Write"), nThreads, VolatileWriteCallback);
MeasureConcurrentOperation(TEXT("Interlocked Increment"), nThreads, InterlockedIncrementCallback);

// Prepare the critical section
InitializeCriticalSection(&g_cs);
MeasureConcurrentOperation(TEXT("Critical Section"), nThreads, CriticalSectionCallback);
// Don't forget to cleanup
DeleteCriticalSection(&g_cs);

// Prepare the critical section with spinlock
InitializeCriticalSectionAndSpinCount(&g_cs, 10);
MeasureConcurrentOperation(TEXT("Critical Section with SpinLock(10)"), nThreads, CriticalSectionCallback);
// Don't forget to cleanup
DeleteCriticalSection(&g_cs);

// Prepare the critical section with spinlock
InitializeCriticalSectionAndSpinCount(&g_cs, 100);
MeasureConcurrentOperation(TEXT("Critical Section with SpinLock(100)"), nThreads, CriticalSectionCallback);
// Don't forget to cleanup
DeleteCriticalSection(&g_cs);

// Prepare the critical section with spinlock
InitializeCriticalSectionAndSpinCount(&g_cs, 1000);
MeasureConcurrentOperation(TEXT("Critical Section with SpinLock(1000)"), nThreads, CriticalSectionCallback);
// Don't forget to cleanup
DeleteCriticalSection(&g_cs);

// Prepare the Slim Reader/Writer lock
InitializeSRWLock(&g_srwLock);
MeasureConcurrentOperation(TEXT("SRWLock Read"), nThreads, SRWLockReadCallback);
MeasureConcurrentOperation(TEXT("SRWLock Write"), nThreads, SRWLockWriteCallback);
// Note: You can't cleanup a Slim Reader/Writer lock

// Prepare the mutex
g_hMutex = CreateMutex(NULL, false, NULL);
MeasureConcurrentOperation(TEXT("Mutex"), nThreads, MutexCallback);
CloseHandle(g_hMutex);

// test for std::mutex
MeasureConcurrentOperation(TEXT("std::mutex"), nThreads, StdMutexCallback);

_tprintf(TEXT("\n"));
}

system("pause");
return 0;
}




运行结果


分析

Volitale 读写由于不支持同步但是性能最佳。但是在多线程写入的时候由于cpu换成同步机制造成了写入性能下降。

Interlocked 在单线程下性能只有6个单位,但多线程的情况下明显效率降低。

CRITICAL_SECTION(不使用spinlock)性能高于spinlock(由于spinlock的开启会严重影响CPU性能,因为轮循机制抢占CPU资源)

CRITICAL_SECTION(spinlock) spinCount设置的越高在多线程下性能下降明显。

SRWLOCK 性能高于CRITICAL_SECTION,在笔者的机器上发现使用Shared还是Exclusive性能都没有发生下降。只有开启多线程时会影响性能。

Mutex内核对象的性能最差,因为每次等待和释放互斥量都需要在用户模式和内核模式直接进行切换。CPU开销非常大。

std::mutex是C++11标准库提供的线程同步对象,由于其非系统内核对象,性能介于CRITICAL_SECTION和Mutex内核对象之间。

总结:

首先尝试不要共享数据,然后依次使用volatile读取,volatile写入。

如果有资源竞争(race condition)则采用线程同步。(效率 SRWLock > CRITICAL_SECTION > spinlock > std::mutex > Mutex)

如果线程同步没有阻塞要求(不支持TryXXX)和递归(CRITICAL_SECTION允许同一线程递归调用多次进入临界区)的话。使用SRWLock性能大部分测试都优于CRITICAL_SECTION.

旋转锁会大量消耗CPU资源不建议用来进行线程同步。

内核Mutex互斥量性能较低,除非有特殊需求(跨进程间线程同步)否则不建议用其进行线程同步。


8.6 条件变量

有的时候需要让线程以原子方式把锁释放并将自己阻塞,直到某一个条件成立为止。可以使用条件变量

SleepConditionVariableCS

SleepConditionVariableSRW

WINBASEAPI
BOOL
WINAPI
SleepConditionVariableCS(
_Inout_ PCONDITION_VARIABLE ConditionVariable,
_Inout_ PCRITICAL_SECTION CriticalSection,
_In_ DWORD dwMilliseconds
);

WINBASEAPI
BOOL
WINAPI
SleepConditionVariableSRW(
_Inout_ PCONDITION_VARIABLE ConditionVariable,
_Inout_ PSRWLOCK SRWLock,
_In_ DWORD dwMilliseconds,
_In_ ULONG Flags
);

参数ConditionVariable 指向一个已经初始化的条件变量,调用线程正在等待该条件变量。

第二个参数是执行临界区或者SRWLock的指针,传入的锁会被释放

dwMilliseconds 用来等待的时间(也可以设定为INFINITE)

第二个函数的Flag指定一旦条件变量被触发,我们希望线程以何种方式来得到锁。对于写入线程传入0(独占资源),对于读取线程传入CONDITION_VARIABLE_LOCKMODE_SHARED



注意如果指定的时间用完,条件变量尚未触发,函数会返回FALSE,否则返回TRUE。 当返回FALSE时,线程显然并没有获得锁或临界区。


当另一个线程想要触发条件以用于唤醒等待条件的线程可以调用一下函数

WINBASEAPI
VOID
WINAPI
WakeConditionVariable(
_Inout_ PCONDITION_VARIABLE ConditionVariable
);

WINBASEAPI
VOID
WINAPI
WakeAllConditionVariable(
_Inout_ PCONDITION_VARIABLE ConditionVariable
);

这样被SleepConditonVariable*的线程会被唤醒。

第一个函数只唤醒一个线程

第二个可以唤醒多个线程


8.6.1 Queue 实例程序

该实例使用了条件变量和SRWLock来对一个请求元素的队列进行控制。

笔者这里将例子做了一下修改,

1)原始例子中使用4个写入线程2个读取线程  --》 笔者创建了4个读取线程,4个写入线程保持不变。并且0,2号线程可以读取偶数号数据,1,3号可以读取奇数号数据。

2)原始例子中写入线程sleep 1.5秒 而读取线程sleep 2.5秒。--> 这里都修改成Sleep 1.5秒

3)由于存在4个线程,两个读取线程也会存在资源竞争(读取队列以后设置队列元素空)。所以读取线程也采取了Exclusive来获得SRWLOCK。

测试发现这样的话队列不会被写满。

中途产生了一次数据错误(4个读取线程采用Shared来获取锁会产生资源竞争队列被被破坏)

后来修改成Exclusive以后发现SleepConditionVariable也有问题,将其最后一个参数设置为0 ,表示采用Exclusive获取SRWLock

修改后的代码。

/******************************************************************************
Module: Queue.cpp
Notices: Copyright (c) 2008 Jeffrey RIchter & Christophe Nasarre
******************************************************************************/

#include "..\CommonFiles\CmnHdr.h"
#include <windowsx.h>
#include <tchar.h>
#include <StrSafe.h>
#include "Resource.h"

//////////////////////////////////////////////////////////////////////////

class CQueue {
public:
struct ELEMENT {
int m_nThreadNum;
int m_nRequestNum;
// Other element data should go here
};
typedef ELEMENT* PELEMENT;

private:
struct INNER_ELEMENT {
int m_nStamp; // 0 means empty
ELEMENT m_element;
};
typedef INNER_ELEMENT * PINNER_ELEMENT;

private:
PINNER_ELEMENT m_pElements; // Array of elements to be processed
int m_nMaxElements; // Maximum # of elements in the array
int m_nCurrentStamp; // Keep track of the # of added elements

private:
int GetFreeSlot();
int GetNextSlot(int nThreadNum);

public:
CQueue(int nMaxElements);
~CQueue();
BOOL IsFull();
BOOL IsEmpty(int nThreadNum);
void AddElement(ELEMENT e);
BOOL GetNewElement(int nThreadNum, ELEMENT& e);
};

//////////////////////////////////////////////////////////////////////////

CQueue::CQueue(int nMaxElements) {

// Allocate and initialize the elements
m_pElements = (PINNER_ELEMENT)
HeapAlloc(GetProcessHeap(), 0, sizeof(INNER_ELEMENT) * nMaxElements);
ZeroMemory(m_pElements, sizeof(INNER_ELEMENT) * nMaxElements);

// Initialize the element counter
m_nCurrentStamp = 0;

// Remember the max number of elements
m_nMaxElements = nMaxElements;
}

CQueue::~CQueue() {

HeapFree(GetProcessHeap(), 0, m_pElements);
}

BOOL CQueue::IsFull() {

return (GetFreeSlot() == -1);
}

BOOL CQueue::IsEmpty(int nThreadNum) {

return (GetNextSlot(nThreadNum) == -1);
}

int CQueue::GetFreeSlot() {

// Look for the first element with a 0 samp
for (int current = 0; current < m_nMaxElements; current++) {
if (m_pElements[current].m_nStamp == 0)
return current;
}

// No free slot was found
return -1;
}

int CQueue::GetNextSlot(int nThreadNum) {

// By default, there is no slot for this thread
int firstSlot = -1;

// The element can't have a stamp higher than the last added
int firstStamp = m_nCurrentStamp + 1;

// Look for the even (thread 0) / odd (thread 1) element that is not free
for (int current = 0; current < m_nMaxElements; current++) {

// Keep track of the first added (lowest stamp) in the queue
// -> so that "first in first out" behavior is ensured
if ((m_pElements[current].m_nStamp != 0) && // free element
((m_pElements[current].m_element.m_nRequestNum % 2) == nThreadNum % 2) &&
(m_pElements[current].m_nStamp < firstStamp)) {

firstStamp = m_pElements[current].m_nStamp;
firstSlot = current;
}
}

return firstSlot;
}


void CQueue::AddElement(ELEMENT e) {

// Do nothing if the queue is full
int nFreeSlot = GetFreeSlot();
if (nFreeSlot == -1)
return;

// Copy the content of the element
m_pElements[nFreeSlot].m_element = e;

// Mark the element with the new stamp
m_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp;
}


BOOL CQueue::GetNewElement(int nThreadNum, ELEMENT& e) {

int nNewSlot = GetNextSlot(nThreadNum);
if (nNewSlot == -1)
return FALSE;

// Copy the content of the element
e = m_pElements[nNewSlot].m_element;

// Mark the element as read
m_pElements[nNewSlot].m_nStamp = 0;

return TRUE;
}


//////////////////////////////////////////////////////////////////////////

CQueue g_q(10); // The shared queue
volatile LONG g_fShutdown; // Signals client/server thread to die
HWND g_hWnd; // How client/server threads give status
SRWLOCK g_srwLock; // Reader/Writer lock to protect the queue
CONDITION_VARIABLE g_cvReadyToConsume; // Signaled by writers
CONDITION_VARIABLE g_cvReadyToProduce; // Signaled by readers

// Handles to all reader/writer threads
HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS];

// Number of reader/writer threads
int g_nNumThreads = 0;


//////////////////////////////////////////////////////////////////////////

void AddText(HWND hWndLB, PCTSTR pszFormat, ...) {

va_list argList;
va_start(argList, pszFormat);

TCHAR sz[20 * 1024];
_vstprintf_s(sz, _countof(sz), pszFormat, argList);
ListBox_SetCurSel(hWndLB, ListBox_AddString(hWndLB, sz));

va_end(argList);
}

BOOL ConsumeElement(int nThreadNum, int nRequestNum, HWND hWndLB) {

// Get access to the queue to consume a new element
AcquireSRWLockExclusive(&g_srwLock);

// Fall asleep until there is something to read.
// Check if, while it was asleep,
// it was not decided that the thread should stop
while (g_q.IsEmpty(nThreadNum) && !g_fShutdown) {
// Threre was not a readable element
AddText(hWndLB, TEXT("[%d] Nothing to process"), nThreadNum);

// The queue is empty
// --> Wait until a writer adds a new element to read
// and come back with the lock acquired in shared mode
SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock,
INFINITE, 0);
}

// When thread is exiting, the lock should be released for writer
// and readers should be signaled through the condition variable
if (g_fShutdown) {
// Show that the current thread is exiting
AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);

// Another writer thread might still be blocked on the lock
// --> release it before exiting
ReleaseSRWLockExclusive(&g_srwLock);

// Notify other readers that it is time to exit
// ->> release readers
WakeConditionVariable(&g_cvReadyToConsume);

return FALSE;
}

// Get the first new element
CQueue::ELEMENT e;
// Note: No need to test the return value since IsEmpty
// returned FALSE
g_q.GetNewElement(nThreadNum, e);

// No need to keep the lock any longer
ReleaseSRWLockExclusive(&g_srwLock);

// Show result of consuming the element
AddText(hWndLB, TEXT("[%d] Processing %d:%d"),
nThreadNum, e.m_nThreadNum, e.m_nRequestNum);

// A free slot is now available for writer threads to produce
// --> wake up a writer thread
WakeConditionVariable(&g_cvReadyToProduce);

return TRUE;
}


DWORD WINAPI ReaderThread(PVOID pvParam) {

int nThreadNum = PtrToUlong(pvParam);
HWND hWndLB = GetDlgItem(g_hWnd, IDC_SERVERS);

for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {

if (!ConsumeElement(nThreadNum, nRequestNum, hWndLB))
return 0;

Sleep(1500); // Wait before reading another element
}

// g_fShutdown has been set during Sleep
// --> Show that the current thread is exiting
AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);

return 0;
}

//////////////////////////////////////////////////////////////////////////

DWORD WINAPI WriterThread(PVOID pvParam) {

int nThreadNum = PtrToUlong(pvParam);
HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENTS);

for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) {

CQueue::ELEMENT e = { nThreadNum, nRequestNum };

// Require access for writing
AcquireSRWLockExclusive(&g_srwLock);

// If the queue is full, fall asleep as long as the condition variable
// is not signaled
// Note:: During the wait for acquiring the lock,
// a stop might have been received
if (g_q.IsFull() & !g_fShutdown) {
// No more room in the queue
AddText(hWndLB, TEXT("[%d] Queue is full: impossible to add %d"),
nThreadNum, nRequestNum);

// --> Need to wait for a reader to empty a slot before acquiring
// the lock again
SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock,
INFINITE, 0);
}


// Other write threads might still be blocked on the lock
// --> Release the lock and notify the remaining writer threads to quit
if (g_fShutdown) {
// Show that the current thread is exiting
AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);

// No need to keep the lock any longer
ReleaseSRWLockExclusive(&g_srwLock);

// Signal other blocked writers threads that it is time to exit
WakeAllConditionVariable(&g_cvReadyToProduce);

// Bye bye
return 0;
}
else {
// Add the new ELEMENT into the queue
g_q.AddElement(e);

// Show result of processing element
AddText(hWndLB, TEXT("[%d] Adding %d"), nThreadNum, nRequestNum);

// No need to keep the lock any longer
ReleaseSRWLockExclusive(&g_srwLock);

// Signal reader threads that three is an element to consume
WakeAllConditionVariable(&g_cvReadyToConsume);

// Wait before adding a new element
Sleep(1500);
}
}

// Show that the current thread is exiting
AddText(hWndLB, TEXT("[%d] bye bye"), nThreadNum);

return 0;
}

//////////////////////////////////////////////////////////////////////////

BOOL Dlg_OnInitDialog(HWND hWnd, HWND hWndFocus, LPARAM lParam) {

chSETDLGICONS(hWnd, IDI_QUEUE);

g_hWnd = hWnd; // Used by client/server threads to show status

// Prepar the SRWLock to be used
InitializeSRWLock(&g_srwLock);

// Prepare the condition variables to be used
InitializeConditionVariable(&g_cvReadyToConsume);
InitializeConditionVariable(&g_cvReadyToProduce);

// Will be set to TRUE in order to end threads
g_fShutdown = FALSE;

// Create the writer threads
DWORD dwThreadID;
for (int x = 0; x < 4; x++)
g_hThreads[g_nNumThreads++] =
chBEGINTHREADEX(NULL, 0, WriterThread, (PVOID)(INT_PTR)x,
0, &dwThreadID);

// Create the reader threads
for (int x = 0; x < 4; x++)
g_hThreads[g_nNumThreads++] =
chBEGINTHREADEX(NULL, 0, ReaderThread, (PVOID)(INT_PTR)x,
0, &dwThreadID);

return TRUE;
}


//////////////////////////////////////////////////////////////////////////

void StopProcessing() {

if (!g_fShutdown) {
// Ask all threads to end
InterlockedExchange(&g_fShutdown, TRUE);

// Free all threads waiting on condition variables
WakeAllConditionVariable(&g_cvReadyToConsume);
WakeAllConditionVariable(&g_cvReadyToProduce);

// Wait for all the threads to terminate & then clean up
WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE);

// Don't forget to clean up kernel resources
// Note: This is not really mandatory since the process is exiting
while (g_nNumThreads--)
CloseHandle(g_hThreads[g_nNumThreads]);

// Close each list box
AddText(GetDlgItem(g_hWnd, IDC_SERVERS), TEXT("---------------------"));
AddText(GetDlgItem(g_hWnd, IDC_CLIENTS), TEXT("---------------------"));
}
}


DWORD WINAPI StoppingThread(PVOID pvParam) {

StopProcessing();
return 0;
}


//////////////////////////////////////////////////////////////////////////

void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtl, UINT codeNotify) {

switch (id) {
case IDCANCEL:
EndDialog(hWnd, id);
break;

case IDC_BTN_STOP:
{
// StopProcessing can't be called from the UI thread
// or a deadlock will occur: SendMessage() is used
// to fill up the listboxes
// --> Another thread is required
DWORD dwThreadID;
CloseHandle(chBEGINTHREADEX(NULL, 0, StoppingThread,
NULL, 0, &dwThreadID));

// This button can't be pushed twice
Button_Enable(hWndCtl, FALSE);
}
break;
}
}


//////////////////////////////////////////////////////////////////////////

INT_PTR WINAPI Dlg_Proc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam) {

switch (uMsg) {
chHANDLE_DLGMSG(hWnd, WM_INITDIALOG, Dlg_OnInitDialog);
chHANDLE_DLGMSG(hWnd, WM_COMMAND, Dlg_OnCommand);
}

return FALSE;
}


//////////////////////////////////////////////////////////////////////////

int WINAPI _tWinMain(HINSTANCE hinstExe, HINSTANCE, PTSTR pszCmdLine, int) {

DialogBox(hinstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc);
StopProcessing();
return 0;
}

运行结果


可以看到4个线程合作正常,没有尝试队列满或者队列为空的状态。


8.6.3 一些有用的窍门和技巧

1. 以原子方式操作一组对象时使用一个锁

2. 同时访问多个逻辑资源(同时锁定多个资源 但是一定要按照相同的访问顺序)

3. 不要长时间占用锁



智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告