Concurrency and Parallelism

Parallelism ∈ Concurrency
Concurrency: two or more actions(workflows) exists at the same time
Parallelism: two or more actions(workflows) is running at the same time

Why Multithreading?

  • to achive Concurrency
  • maximize (hardware)resource usage

Process vs Thread vs Coroutine

  • Process:
    • is a program
    • is a unit for resource distribution
    • process has at least one thread, each process has isolated resources, such as address space.
    • heavyWeight, it takes more to create a process
  • Thread:
    • is a workflow, a subset of process
    • is a unit for schedule and program executing.
    • threads within a process share the same fate(false) and resources: memory(address space, stack), file descriptors, and other process related attributes.
    • unique resources: stack, stack & stack pointer, program counter(%rip: the address of the next instruction)
  • Coroutine(fiber, lightweight thread)
    • used for asynchronous IO or generator
    • a context switch technique in user mode

Kernel Thread vs Kernel-level thread(LWP) vs User-level thread

  • kernel thread
    • only run on kernel
    • used for asynchronous operation(IO)
  • kernel-level thread (LWP 轻量进程, 或者叫内核支持的线程)
    • created by clone() systemcall

  • user-level thread
    • manage thread (creation,sechduling,destory) under user mode

Context Switch

  • process switch
    • the memory address space: memory addresses, mappings, page tables, and kernel resources, TLB and even L1 cache(some ARM processor)
    • processor state
  • thread switch
    • processor state (such as the program counter and register contents) is generally very efficient.

Linux Process Communication

  1. pipe
  2. Semophore
  3. Signal
  4. Message Queue
  5. Share memory
  6. Socket, could cross-machine

Mutex vs Condition Variable vs semaphore vs CAS

  • Mutual Lock

    • used for resource protection, the operation of this resource must be serial
    • emphasize resource ownership, only owner can release the lock
    • Mutex sleep-wait
      • call system_wait() to start a context switch
    • Spin Lock busy-waiting
      • take more user time to get lock as early as possible
  • Semaphore

    • mainly used for schedule
    • stands for the number of resource
    • binary semaphore is simliar to mutex, but **semaphore does not record owner
    • atomic operations: P(+1),V(-1)
    • e.g. record the number of items of producer-consumer pro
  • Conditional variable:

    • optmize such situation: lock -> check condition -> unlock
    • to avoid frequent checking before condition satisfied
    • it requires another thread to wake up when locked
  • other

    • critical section
      • a small code section only allows at most one thread access it at any moment.
      • is more efficient
    • read-write lock
      • for high frequency reading and low frequency writing
    • recursive mutex
      • thread could lock its own clocked mutex repeatedly

deadlock

necessary condition

  1. Mutual Exclusion (互斥):
    at least one resource can be used by only one process at a time.
  2. Hold and Wait (持有且等待):
    There must exist a thread which holds some thread and waits for another resource held by some other thread.
  3. No preemption (无法强制取得):
    Once the resource has been allocated to the process, it can not be preempted(强制取得).
  4. Circular wait (循环等待):
    All the threads must wait for the resource in a cyclic manner where the last process waits for the resource held by the first thread.

detect deadlock

  • gdb
  • kernel tracing: Valgrind, Lockdep, bcc(Linux BPF-based)

producer-comsumer problem

mutex + cond var (linked list)

  • mutex + cond var
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    struct msg {
    struct msg *next;
    int num;
    };

    struct msg *head; //queue header pointer
    pthread_cond_t has_product = PTHREAD_COND_INITIALIZER; //cond var
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; //mutex

    void *consumer(void *p)
    {
    struct msg *mp;

    for(;;) {
    pthread_mutex_lock(&lock); //互斥量保护条件变量
    while(head == NULL) {
    pthread_cond_wait(&has_product, &lock);
    }
    mp = head;
    head = mp->next;
    pthread_mutex_unlock(&lock);

    printf("Consume %d\n", mp->num);
    free(mp);
    sleep(rand() % 5);
    }
    }

    void *producer(void *p)
    {
    struct msg *mp;

    for(;;) {
    mp = malloc(sizeof(struct msg));
    mp->num = rand() % 1000 + 1;
    printf("Produce %d\n", mp->num);

    pthread_mutex_lock(&lock);
    mp->next = head;
    head = mp; //条件变化,要加锁
    pthread_mutex_unlock(&lock);

    pthread_cond_signal(&has_product);

    sleep(rand() % 5);
    }
    }

Semaphore (ring queue)

这里blank_number是空位的意思,product_number是队列中的元素个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#define NUM 5
int queue[NUM]; //环形队列
sem_t blank_number, product_number;

void *producer(void *arg)
{
int p = 0;
while(1) {
sem_wait(&blank_number);
queue[p] = rand() % 1000 + 1; //生产者生产1到1000随机值
printf("produce %d\n", queue[p]);
sem_post(&product_number);
p = (p+1) % NUM;
sleep(rand() % 5);
}
}

void *consumer(void *arg)
{
int c = 0;
while(1) {
sem_wait(&product_number);
printf("consume %d\n", queue[c]);
queue[c] = 0; //消费者将队列值置为零
sem_post(&blank_number);
c = (c + 1) % NUM;
sleep(rand() % 5);
}
}

int main(void)
{
pthread_t pid, cid;

srand(time(NULL));
sem_init(&blank_number, 0, NUM);
sem_init(&product_number, 0, 0);

pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);

pthread_join(pid, NULL);
pthread_join(cid, NULL);

sem_destroy(&blank_number);
sem_destroy(&product_number);

return 0;
}

CAS (lock free)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
EnQueue(x) //进队列
{
//init new record
q = new record();
q->value = x;
q->next = NULL;

do {
p = tail; //取链表尾指针的快照
} while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试

CAS(tail, p, q); //置尾结点
}

DeQueue() //出队列
{
do{
p = head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
while( CAS(head, p, p->next) != TRUE );
return p->next->value;
}

ABA problem

1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。
2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SafeRead(q)
{
loop:
p = q->next;
if (p == NULL){
return p;
}

Fetch&Add(p->refcnt, 1);

if (p == q->next){
return p;
}else{
Release(p);
}
goto loop;
}

Reference