atomic and mutex

atomic and mutex

atomic in GCC

_sync_fetch_and_add
_sync_fetch_and_sub
_sync_val_compare_and_swap

atomic in C

use _Atomic to define atomic variable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>
_Atomic int atomic_count = 0;
int count = 0;
void* runner() {
for(int i = 0; i < 1000000; i++) {
count++;
atomic_count++;
}
return 0;
}
int main() {
pthread_t threadIDs[16];
for(int i = 0; i < 16; i++) {
pthread_create(&threadIDs[i], NULL, runner, NULL);
}
for(int i = 0; i < 16; i++) {
pthread_join(threadIDs[i], NULL);
}
printf("The atomic counter is %u\n", atomic_count);
printf("The non-atomic counter is %u\n", count);
}

after compile and run, the result is:

1
2
3
4
$ gcc _atomic.c -o _atomic
$ ./_atomic
The atomic counter is 16000000
The non-atomic counter is 5270623

atomic in C++

use std::atomic<T> to define atomic variable

the exchange function can be used to set the value of atomic variable and return the old value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <pthread.h>
#include <atomic>


std::atomic<bool> ready (false);
std::atomic<bool> winner (false);

void* count1m (void *arg) {
int id = *(int*)arg;
while (!ready) {} // wait for the ready signal
for (int i=0; i<1000000; ++i) {} // go!, count to 1 million
if (!winner.exchange(true)) {
std::cout << "thread #" << id << " won!\n";
}
return NULL;
};

int main ()
{
pthread_t threadIDs[10];
int ids[10];
std::cout << "spawning 10 threads that count to 1 million..." << std::endl;
for(int i = 0; i < 10; i++) {
ids[i] = i;
pthread_create(&threadIDs[i], NULL, count1m, (void*)&ids[i]);
}
ready = true;
for(int i = 0; i < 10; i++) {
pthread_join(threadIDs[i], NULL);
}

return 0;
}

p.s. at first, winner is false, when the first thread call exchange, it will set winner to true and return false, so the first thread will print “won”, and the other threads will not print anything because the value of winner is already true.

pthread and mutex

basic actions of mutex:

pthread_mutex_init
pthread_mutex_lock
pthread_mutex_unlock
pthread_mutex_destroy

alt text

experiment to test different lock

Here is hash realized by array.

without lock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>

#define TABLE_SIZE 10007 // size of the hash table
#define N_OPERATIONS 10000000 // total number of operations to perform
#define N_THREADS 10 // number of threads to use
#define READ_RATIO 0.8 // ratio of read operations (rest are write and delete)


typedef struct node {
int key;
int value;
struct node* next;
} node_t;

node_t* table[TABLE_SIZE]; // array of linked lists, one for each hash bucket

void hash_init() {
for (int i = 0; i < TABLE_SIZE; i++) {
table[i] = NULL;
}
}

void hash_insert(int key, int value) {
int index = key % TABLE_SIZE;
node_t* new_node = (node_t*)malloc(sizeof(node_t));
new_node->key = key;
new_node->value = value;
new_node->next = table[index];
table[index] = new_node;
}

int hash_search(int key) {
int index = key % TABLE_SIZE;
node_t* current = table[index];
while (current != NULL) {
if (current->key == key) {
int value = current->value;
return value;
}
current = current->next;
}
return -1; // key not found
}

void hash_delete(int key) {
int index = key % TABLE_SIZE;
node_t* current = table[index];
node_t* previous = NULL;
while (current != NULL) {
if (current->key == key) {
if (previous == NULL) {
table[index] = current->next;
} else {
previous->next = current->next;
}
free(current);
break;
}
previous = current;
current = current->next;
}
}


void* worker(void* arg) {
srand(time(NULL) + (long)arg); // seed random number generator
int n_reads = 0, n_writes = 0, n_deletes = 0;

struct timespec start_time, end_time;
clock_gettime(CLOCK_REALTIME, &start_time); // start timer

for (int i = 0; i < N_OPERATIONS / N_THREADS; i++) {
double p = (double)rand() / RAND_MAX;
if (p < READ_RATIO) {
int key = rand() % N_OPERATIONS; // assume keys are integers in [0, N_OPERATIONS)
hash_search(key);
n_reads++;
} else if (p < 0.9) {
int key = rand() % N_OPERATIONS;
int value = rand() % 1000; // assume values are integers in [0, 1000)
hash_insert(key, value);
n_writes++;
} else {
int key = rand() % N_OPERATIONS;
hash_delete(key);
n_deletes++;
}
}

clock_gettime(CLOCK_REALTIME, &end_time); // end timer
double elapsed_time = (end_time.tv_sec - start_time.tv_sec) + 1e-9 * (end_time.tv_nsec - start_time.tv_nsec);
printf("Thread %ld finished: %.0f reads/sec, %.0f writes/sec, %.0f deletes/sec\n", (long)arg, n_reads / elapsed_time, n_writes / elapsed_time, n_deletes / elapsed_time);
pthread_exit(NULL);
}

int main() {
pthread_t threads[N_THREADS];

hash_init();

struct timespec start_time, end_time;
clock_gettime(CLOCK_REALTIME, &start_time); // start timer

int ids[N_THREADS];
for (int i = 0; i < N_THREADS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, worker, (void*)(long)ids[i]);
}

for (int i = 0; i < N_THREADS; i++) {
pthread_join(threads[i], NULL);
}

clock_gettime(CLOCK_REALTIME, &end_time); // end timer
double elapsed_time = (end_time.tv_sec - start_time.tv_sec) + 1e-9 * (end_time.tv_nsec - start_time.tv_nsec);
printf("Benchmark finished in %f seconds\n", elapsed_time);

return 0;
}

without lock:

1
2
3
4
5
6
...
Benchmark finished in 8.320094 seconds
...
Benchmark finished in 7.188930 seconds
...
Benchmark finished in 6.892515 seconds

average time is about 7.467 seconds

use mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>

#define TABLE_SIZE 10007 // size of the hash table
#define N_OPERATIONS 10000000 // total number of operations to perform
#define N_THREADS 10 // number of threads to use
#define READ_RATIO 0.8 // ratio of read operations (rest are write and delete)

typedef struct node {
int key;
int value;
struct node* next;
} node_t;

node_t* table[TABLE_SIZE]; // array of linked lists, one for each hash bucket
pthread_mutex_t lock[TABLE_SIZE]; // mutex locks for each bucket

void hash_init() {
for (int i = 0; i < TABLE_SIZE; i++) {
table[i] = NULL;
pthread_mutex_init(&lock[i], NULL); // initialize mutex for each bucket
}
}

void hash_insert(int key, int value) {
int index = key % TABLE_SIZE;
pthread_mutex_lock(&lock[index]); // acquire lock for the bucket
node_t* new_node = (node_t*)malloc(sizeof(node_t));
new_node->key = key;
new_node->value = value;
new_node->next = table[index];
table[index] = new_node;
pthread_mutex_unlock(&lock[index]); // release lock
}

/// we need to unlock before return, so I change two return
/// into only one return so that we can unlock the lock before return
int hash_search(int key) {
int index = key % TABLE_SIZE;
pthread_mutex_lock(&lock[index]); // acquire lock for the bucket
node_t* current = table[index];
int value = -1;
while (current != NULL) {
if (current->key == key) {
value = current->value;
break;
}
current = current->next;
}
pthread_mutex_unlock(&lock[index]); // release lock
return value;
}

void hash_delete(int key) {
int index = key % TABLE_SIZE;
pthread_mutex_lock(&lock[index]); // acquire lock for the bucket
node_t* current = table[index];
node_t* previous = NULL;
while (current != NULL) {
if (current->key == key) {
if (previous == NULL) {
table[index] = current->next;
} else {
previous->next = current->next;
}
free(current);
break;
}
previous = current;
current = current->next;
}
pthread_mutex_unlock(&lock[index]); // release lock
}


void* worker(void* arg) {
srand(time(NULL) + (long)arg); // seed random number generator
int n_reads = 0, n_writes = 0, n_deletes = 0;

struct timespec start_time, end_time;
clock_gettime(CLOCK_REALTIME, &start_time); // start timer

for (int i = 0; i < N_OPERATIONS / N_THREADS; i++) {
double p = (double)rand() / RAND_MAX;

if (p < READ_RATIO) {
int key = rand() % N_OPERATIONS; // assume keys are integers in [0, N_OPERATIONS)
hash_search(key);
n_reads++;
} else if (p < 0.9) {
int key = rand() % N_OPERATIONS;
int value = rand() % 1000; // assume values are integers in [0, 1000)
hash_insert(key, value);
n_writes++;
} else {
int key = rand() % N_OPERATIONS;
hash_delete(key);
n_deletes++;
}
}

clock_gettime(CLOCK_REALTIME, &end_time); // end timer
double elapsed_time = (end_time.tv_sec - start_time.tv_sec) + 1e-9 * (end_time.tv_nsec - start_time.tv_nsec);
printf("Thread %ld finished: %.0f reads/sec, %.0f writes/sec, %.0f deletes/sec\n", (long)arg, n_reads / elapsed_time, n_writes / elapsed_time, n_deletes / elapsed_time);
pthread_exit(NULL);
}

int main() {
pthread_t threads[N_THREADS];

hash_init();

struct timespec start_time, end_time;
clock_gettime(CLOCK_REALTIME, &start_time); // start timer

int ids[N_THREADS];
for (int i = 0; i < N_THREADS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, worker, (void*)(long)ids[i]);
}

for (int i = 0; i < N_THREADS; i++) {
pthread_join(threads[i], NULL);
}

clock_gettime(CLOCK_REALTIME, &end_time); // end timer
double elapsed_time = (end_time.tv_sec - start_time.tv_sec) + 1e-9 * (end_time.tv_nsec - start_time.tv_nsec);
printf("Benchmark finished in %f seconds\n", elapsed_time);

for (int i = 0; i < TABLE_SIZE; i++) {
pthread_mutex_destroy(&lock[i]); // destroy mutex for each bucket
}

return 0;
}

mutex:

1
2
3
4
5
6
...
Benchmark finished in 7.895764 seconds
...
Benchmark finished in 8.734200 seconds
...
Benchmark finished in 8.333789 seconds

average time is about 8.321 seconds

use rwlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
node_t *table[TABLE_SIZE];          // array of linked lists, one for each hash bucket
pthread_rwlock_t locks[TABLE_SIZE]; // 为每个桶添加读写锁

void hash_init()
{
for (int i = 0; i < TABLE_SIZE; i++)
{
table[i] = NULL;
pthread_rwlock_init(&locks[i], NULL); // 初始化每个桶的读写锁
}
}

void hash_insert(int key, int value)
{
int index = key % TABLE_SIZE;
pthread_rwlock_wrlock(&locks[index]); // 获取写锁

// insert the node

pthread_rwlock_unlock(&locks[index]); // 释放写锁
}

int hash_search(int key)
{
int index = key % TABLE_SIZE;
pthread_rwlock_rdlock(&locks[index]); // 获取读锁

// search the node

pthread_rwlock_unlock(&locks[index]); // 释放读锁
return value;
}

void hash_delete(int key)
{
int index = key % TABLE_SIZE;
pthread_rwlock_wrlock(&locks[index]); // 获取写锁

// delete the node

pthread_rwlock_unlock(&locks[index]); // 释放写锁
}

int main()
{
pthread_t threads[N_THREADS];

hash_init();

// start working...

// Destroy locks (optional, as program is exiting)
for (int i = 0; i < TABLE_SIZE; i++)
{
pthread_rwlock_destroy(&locks[i]);
}

return 0;
}

rwlock:

1
2
3
4
5
6
...
Benchmark finished in 7.872260 seconds
...
Benchmark finished in 7.832486 seconds
...
Benchmark finished in 8.129092 seconds

average time is about 7.945 seconds

use spinlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
node_t *table[TABLE_SIZE];            // array of linked lists, one for each hash bucket
pthread_spinlock_t locks[TABLE_SIZE]; // 为每个桶添加读写锁

void hash_init()
{
for (int i = 0; i < TABLE_SIZE; i++)
{
table[i] = NULL;
pthread_spin_init(&locks[i], PTHREAD_PROCESS_PRIVATE); // 初始化每个桶的读写锁
}
}

void hash_insert(int key, int value)
{
int index = key % TABLE_SIZE;
pthread_spin_lock(&locks[index]); // 获取写锁

// insert the node

pthread_spin_unlock(&locks[index]); // 释放写锁
}

int hash_search(int key)
{
int index = key % TABLE_SIZE;
pthread_spin_lock(&locks[index]); // 获取读锁

// search the node

pthread_spin_unlock(&locks[index]); // 释放读锁
return value;
}

void hash_delete(int key)
{
int index = key % TABLE_SIZE;
pthread_spin_lock(&locks[index]); // 获取写锁

// delete the node

pthread_spin_unlock(&locks[index]); // 释放写锁
}

int main()
{
hash_init();

// start working...

// destroy locks
for (int i = 0; i < TABLE_SIZE; i++)
{
pthread_spin_destroy(&locks[i]);
}

return 0;
}

spinlock:

1
2
3
4
5
6
...
Benchmark finished in 8.171603 seconds
...
Benchmark finished in 7.480587 seconds
...
Benchmark finished in 7.168322 seconds

average time is about 7.607 seconds

conclusion

When read ratio is 80%, the average time of different locks is:

without lock with mutex with rwlock with spinlock
7.467s 8.321s 7.945s 7.607s
  • without lock is the fastest, but it is not thread-safe.
  • mutex is the slowest, because it will block the thread which acquires kernel switch.
  • rwlock is faster than mutex, because it allows multiple threads to read at the same time.
  • spinlock is faster than mutex, because it is suitable for short time waiting. However, it is not suitable for long time waiting situation, because it will consume CPU resources.

When read ratio is 90%, the average time of different locks is:

without lock with mutex with rwlock with spinlock
4.296s 4.631s 4.516s 4.595s

we can see that when read ratio get higher, the performance of rwlock improves.

Here is linklist with lock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

typedef struct Node {
struct Node *next;
int value;
} Node;

Node *top = NULL; // 栈顶初始化为NULL
int SLIDE = 1000;

void push(Node **top_ptr, Node *n) {
n->next = *top_ptr;
*top_ptr = n;
}

void safe_push(Node **top_ptr, Node *n) {
pthread_mutex_lock(&lock);
push(top_ptr, n);
pthread_mutex_unlock(&lock);
}

Node *pop(Node **top_ptr) {
if (*top_ptr == NULL) {
return NULL;
}
Node *p = *top_ptr;
*top_ptr = (*top_ptr)->next;
return p;
}

void* thread_function(void *arg) {

for(int i = 0; i < SLIDE; i++) {
Node *new_node = (Node*)malloc(sizeof(Node));
new_node->value = *(int*)arg * SLIDE + i;

if (new_node == NULL) {
perror("Failed to allocate memory for new node");
pthread_exit(NULL);
}

safe_push(&top, new_node); // 可以把这句换成普通的push,感受一下非线程安全的结果
}
pthread_exit(NULL);
}

int main() {
pthread_t threads[10];
int thread_args[10];


// 初始化互斥锁
if (pthread_mutex_init(&lock, NULL) != 0) {
printf("\n mutex init failed\n");
return 1;
}

// 创建线程
for (int i = 0; i < 10; i++) {
thread_args[i] = i;
if (pthread_create(&threads[i], NULL, thread_function, (void *)&thread_args[i])) {
perror("Failed to create the thread");
}
}

// 等待线程结束
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}

// 销毁互斥锁
pthread_mutex_destroy(&lock);

// print
Node *current = top;
int count = 0;
printf("content of stack:\n");
while (current != NULL) {
count++;
//printf("%d\n", current->value);
current = current->next; // 假设每个节点都有一个指向下一个节点的指针
}
printf("count = %d\n", count);

return 0;
}

Now we can use atomic instead of mutex:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
_Atomic Node *top = NULL;
int SLIDE = 1000;

void safe_push(_Atomic Node **top_ptr, Node *n)
{
Node *old_top;
do {
old_top = atomic_load(top_ptr);
n->next = old_top;
} while (!atomic_compare_exchange_weak(top_ptr, &old_top, n));
}

Node *safe_pop(_Atomic Node **top_ptr)
{
Node *old_top;
do {
old_top = atomic_load(top_ptr);
if (old_top == NULL) {
return NULL;
}
} while (!atomic_compare_exchange_weak(top_ptr, &old_top, old_top->next));
return old_top;
}
...

the atomic action in fact use while to wait for the lock, so it is not suitable for long time waiting situation.