Linux Kernel(4.19) Hacks

rousalome.egloos.com

포토로그 Kernel Crash


통계 위젯 (화이트)

15192
888
89788


[라즈베리파이] 워크큐(Workqueue) - 워크를 워크큐에 어떻게 큐잉할까?(1) 8장. 워크큐

워크를 초기화만 하면 워크를 실행할 수는 없습니다. 우선 워크를 워크큐에 큐잉해야 워커 쓰레드는 워크를 실행할 수 있습니다. 이번절에서는 워크를 워크큐에 큐잉하면 워크큐 전체 흐름으로 어떤 동작을 하는지 알아 보겠습니다. 

워크 초기화는 struct work_struct 타입 변수를 INIT_WORK() 함수에 전달하거나 DECLEAR_WORK() 함수로 struct work_struct 변수를 선언하면 됩니다. 이 워크를 워크큐를 큐잉하려면 struct work_struct 타입 변수를 schedule_work()이라는 함수에 파라미터로 전달하면 됩니다.

이번에는 워크를 워크큐에 큐잉하는 코드는 다음과 같습니다.
[https://elixir.bootlin.com/linux/v4.14.43/source/drivers/tty/vt/keyboard.c#L1454]
static DECLARE_WORK(console_work, console_callback);

1  void schedule_console_callback(void)
2  {
3 schedule_work(&console_work);
11 }

3번째 줄 코드와 같이 struct work_struct 이란 구조체 주소를 &console_work 변수로 schedule_work() 함수에 전달합니다.

queue_work_on() 함수 분석
schedule_work() 함수를 호출하면 어떻게 워크를 워크큐에 큐잉하는지 이제부터 살펴보겠습니다. schedule_work() 함수를 호출하면 queue_work_on() 함수를 호출하니 먼저 schedule_work() 함수를 보겠습니다.
[/include/linux/workqueue.h]
1 static inline bool schedule_work(struct work_struct *work)
2 {
3 return queue_work(system_wq, work);
4}

5 static inline bool queue_work(struct workqueue_struct *wq,
6       struct work_struct *work)
7 {
8 return queue_work_on(WORK_CPU_UNBOUND, wq, work);
9 }

먼저 3번 줄 코드를 보겠습니다.
queue_work() 이란 함수를 호출하는데 system_wq 이란 전역 변수를 첫 번째 인자로 queue_work() 함수에 전달합니다. schedule_work() 함수로 전달하는 워크는 시스템 워크큐에 큐잉된다는 사실을 알 수 있습니다.

8번 줄 코드를 보면 queue_work() 함수는 현재 워크를 WORK_CPU_UNBOUND를 첫 번째 인자로 queue_work_on() 함수를 호출합니다.

schedule_work() 함수로 워크를 워크큐에 큐잉을 하면 queue_work() 함수에서 queue_work_on() 함수를 호출합니다. 이때 워크는 시스템 워크 system_wq에 큐잉됩니다.

이어서 queue_work_on() 함수를 분석하겠습니다.
[kernel/workqueue.c]
1 bool queue_work_on(int cpu, struct workqueue_struct *wq,
2    struct work_struct *work)
3 {
4 bool ret = false;
5 unsigned long flags;
6
7 local_irq_save(flags);
8
9 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
10 __queue_work(cpu, wq, work);
11 ret = true;
12 }
13
14 local_irq_restore(flags);
15 return ret;
16}

먼저 7번 줄 코드를 보겠습니다.
7 local_irq_save(flags);

9~12번 줄 코드를 보호 구역으로 보고 인터럽트 동기화시킵니다. 
7번과 14번 줄 코드로 워크를 큐잉하는 9~12줄 코드 실행 도중에 인터럽트가 발생해서 동기화 문제가 일어나는 것을 방지합니다.

리눅스 커널 코드는 언제든 인터럽트가 발생해서 실행 흐름이 멈출 수 있다는 점 기억하세요.

9번 줄을 보겠습니다.
9 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
10 __queue_work(cpu, wq, work);
11 ret = true;
12 }

work_data_bits(work) 매크로 함수는 struct work_struct 구조체 주소에서 struct work_struct.data 멤버를 읽습니다. 이 값이 WORK_STRUCT_PENDING_BIT(1)이면 9~12줄 코드를 실행하지 않고 if 문을 빠져나옵니다.

test_and_set_bit() 함수는 리눅스 커널 자료구조 함수 중 하나입니다.
test_and_set_bit(A, B); 와 같이 호출하면 A와 B란 변수 비트를 AND 연산한 다음 결과가 1이면 1을 리턴하고 반대로 0이면 0을 리턴합니다. 연산 결과에 상관없이 B이란 변수에 A비트를 설정합니다.

이해를 돕기 위해 9~12줄 코드를 다른 코드로 쉽게 표현하면 다음과 같습니다.
1 if (work->data == WORK_STRUCT_PENDING_BIT) {
3 } else
4 work->data =| WORK_STRUCT_PENDING_BIT;
5 __queue_work(cpu, wq, work);
6 ret = true;
7 }

work->data가 WORK_STRUCT_PENDING_BIT이면 if 문을 만족하는데 2번 줄 코드와 같이 동작도 안 하고 if 문을 빠져나옵니다. 2번째 줄 코드와 같이 실행할 코드가 없기 때문입니다. 대신 work->data가 WORK_STRUCT_PENDING_BIT 가 아니면 else문을 실행합니다. 

if 문 조건을 만족하면 아무 동작을 안 하는 코드는 모양이 이상하니 if 문에 ! 조건으로 바꿔  코드 순서를 바꿔 보면 다음과 같습니다.
if ( !(work->data == WORK_STRUCT_PENDING_BIT)) {
work->data =| WORK_STRUCT_PENDING_BIT;
__queue_work(cpu, wq, work);
ret = true;
}

work->data 멤버가 WORK_STRUCT_PENDING_BIT 매크로가 아니면 work->data에 WORK_STRUCT_PENDING_BIT를 저장하고 __queue_work() 함수를 호출하는 겁니다.

test_and_set_bit() 함수를 다른 코드로 바꿔서 설명을 드렸습니다. test_and_set_bit() 함수는 위에 바꾼 코드와 같이 struct work_struct.data 멤버가 WORK_STRUCT_PENDING_BIT 매크로가 아니면 struct work_struct.data 멤버를 WORK_STRUCT_PENDING_BIT 매크로로 설정하고 0을 리턴합니다.

반대로 struct work_struct.data 멤버가 WORK_STRUCT_PENDING_BIT 매크로이면 struct work_struct.data 멤버를 WORK_STRUCT_PENDING_BIT 매크로로 설정한 후 1을 리턴합니다.

여기서 한 가지 의문이 생깁니다. struct work_struct.data 멤버가 WORK_STRUCT_PENDING_BIT인지 왜 점검하는 것일까요? 그 이유는 struct work_struct.data 멤버에 워크 실행 상태가 저장돼있기 때문입니다. queue_work_on() 함수 호출로 워크를 워크큐에 큐잉하기 직전에 이 멤버는 WORK_STRUCT_PENDING_BIT 매크로로 변경합니다. 

만약 워크를 워크큐에 큐잉하기 직전에 이 멤버가 WORK_STRUCT_PENDING_BIT이면 어떻게 해석해야 할까요? 이미 워크를 워크에 큐잉한 상태라 할 수 있습니다. 그래서 워크를 워크큐에 큐잉하지 않습니다. 워크를 워크큐에 중복 큐잉할 때 예외 처리 코드입니다.

커널은 디바이스 드라이버에서 중복 코드를 실행할 경우 시나리오를 생각해서 예외 처리를 수행합니다. 

이제 워크를 워크큐에 큐잉하는 __queue_work() 함수를 분석을 시작하겠습니다. 전체 코드는 다음과 같습니다.
1 static void __queue_work(int cpu, struct workqueue_struct *wq,
2  struct work_struct *work)
3 {
4 struct pool_workqueue *pwq;
5 struct worker_pool *last_pool;
6 struct list_head *worklist;
7 unsigned int work_flags;
8 unsigned int req_cpu = cpu;
9
10 WARN_ON_ONCE(!irqs_disabled());
11
12 debug_work_activate(work);
13
14 /* if draining, only works from the same workqueue are allowed */
15 if (unlikely(wq->flags & __WQ_DRAINING) &&
16     WARN_ON_ONCE(!is_chained_work(wq)))
17 return;
18 retry:
19 if (req_cpu == WORK_CPU_UNBOUND)
20 cpu = wq_select_unbound_cpu(raw_smp_processor_id());
21
22 /* pwq which will be used unless @work is executing elsewhere */
23 if (!(wq->flags & WQ_UNBOUND))
24 pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
25 else
26 pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
27
28 last_pool = get_work_pool(work);
29 if (last_pool && last_pool != pwq->pool) {
30 struct worker *worker;
31
32 spin_lock(&last_pool->lock);
33
34 worker = find_worker_executing_work(last_pool, work);
35
36 if (worker && worker->current_pwq->wq == wq) {
37 pwq = worker->current_pwq;
38 } else {
39 /* meh... not running there, queue here */
40 spin_unlock(&last_pool->lock);
41 spin_lock(&pwq->pool->lock);
42 }
43 } else {
44 spin_lock(&pwq->pool->lock);
45 }
46
47 if (unlikely(!pwq->refcnt)) {
48 if (wq->flags & WQ_UNBOUND) {
49 spin_unlock(&pwq->pool->lock);
50 cpu_relax();
51 goto retry;
52 }
53 /* oops */
54 WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
55   wq->name, cpu);
56 }
57
58 /* pwq determined, queue */
59 trace_workqueue_queue_work(req_cpu, pwq, work);
60
61 if (WARN_ON(!list_empty(&work->entry))) {
62 spin_unlock(&pwq->pool->lock);
63 return;
64 }
65
66 pwq->nr_in_flight[pwq->work_color]++;
67 work_flags = work_color_to_flags(pwq->work_color);

68 if (likely(pwq->nr_active < pwq->max_active)) {
69 trace_workqueue_activate_work(work);
70 pwq->nr_active++;
71 worklist = &pwq->pool->worklist;
72 if (list_empty(worklist))
73 pwq->pool->watchdog_ts = jiffies;
74 } else {
75 work_flags |= WORK_STRUCT_DELAYED;
76 worklist = &pwq->delayed_works;
77 }
78
79 insert_work(pwq, work, worklist, work_flags);
80
81 spin_unlock(&pwq->pool->lock);

코드 분석에 앞서 __queue_work() 함수로 전달되는 인자를 점검합시다.
1 static void __queue_work(int cpu, struct workqueue_struct *wq,
2  struct work_struct *work)
3 {

queue_work() 함수에서 첫 번째 인자로 WORK_CPU_UNBOUND, 두 번째 인자로 system_wq 를 전달했으니 cpu는 WORK_CPU_UNBOUND, wq는 system_wq이란 시스템 워크큐 전역 변수 주소입니다. 세 번째 work는 struct work_struct 구조체 주소를 저장하는 포인터입니다.

18번째 줄 코드부터 봅시다.
18 retry:
19 if (req_cpu == WORK_CPU_UNBOUND)
20 cpu = wq_select_unbound_cpu(raw_smp_processor_id());

req_cpu이란 지역 변수는 이 함수에 전달되는 cpu를 그대로 저장했으니 WORK_CPU_UNBOUND입니다.
8 unsigned int req_cpu = cpu;

따라서 20번 줄 코드를 바로 실행합니다. 

20번 줄 코드는 raw_smp_processor_id() 함수로 현재 실행 중인 CPU 번호를 알아낸 후 wq_select_unbound_cpu() 함수에 인자를 전달합니다. 이진수로 1111인 wq_unbound_cpumask 비트 마스크용 변수와 AND 연산으로 CPU 번호를 얻어옵니다. 현재 실행 중인 CPU 번호가 cpu이란 지역 변수로 저장됩니다. 실행 중인 CPU 번호로 per-cpu 타입 워크풀을 선택해서 per-cpu 내 워크풀이 골고루 실행되게 합니다.

이번에는 23번 줄 코드를 보겠습니다.
23 if (!(wq->flags & WQ_UNBOUND))
24 pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
25 else
26 pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

시스템 워크큐는 wq->flags 값이 0입니다. 대부분 워크큐인 경우 wq->flags가 0이므로 24번 코드를 실행합니다. struct workqueue_struct 구조체에서 per-cpu 타입 멤버인 cpu_pwqs에 접근해서 CPU 주소를 읽어 옵니다.

다음 5번 째 줄 struct workqueue_struct 구조체 cpu_pwqs 멤버 선언부 코드를 보면 __percpu 이란 타입을 볼 수 있습니다.
1 struct workqueue_struct {
2 struct list_head pwqs; /* WR: all pwqs of this wq */
3 struct list_head list; /* PR: list of all workqueues */
...
4 unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
5 struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */

per-cpu 타입 변수인 wq->cpu_pwqs 멤버 변수는 다음 코드로 접근합니다.
24 pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);

24번 코드 동작을 그림으로 표현하면 다음과 같습니다.


위 그림에서 보듯 system_rq->cpu_pwqs 멤버에 저장된 주소에 현재 실행 중인 CPU 번호 기준으로 __per_cpu_offset[cpu번호] 배열에 있는 주소를 더합니다. per-cpu 타입 변수는 CPU별로 주소 공간이 있는 겁니다. 라즈베리파이는 CPU 개수가 4개이니 4개 per-cpu 메모리 공간을 할당 받습니다.

#Reference 워크큐
워크큐 소개
워크큐 종류 알아보기
워크란  
워크를 워크큐에 어떻게 큐잉할까?
   워크를 큐잉할 때 호출하는 워크큐 커널 함수 분석   
워커 쓰레드란
워크큐 실습 및 디버깅
   ftrace로 워크큐 동작 확인   
   인터럽트 후반부로 워크큐 추가 실습 및 로그 분석 
   Trace32로 워크큐 자료 구조 디버깅하기 
딜레이 워크 소개  
   딜레이 워크는 누가 언제 호출할까?
라즈베리파이 딜레이 워크 실습 및 로그 확인  


핑백

덧글

댓글 입력 영역