レガシーガジェット研究所

気になったことのあれこれ。

Linux Kernel ~ 同期機構 ~

概要

「詳解Linux Kernel」を参考にVersion 2.6.11のコードリーディングをしていく。CPUのアーキテクチャは書籍に沿ってIntelx86とする。

今回は同期処理について見ていく。

同期処理

(高速で切り替えながら)同時に動作するプロセスや割り込みなどの要求に対して、競合しないようカーネルの各機能を提供するために適切な同期を取る必要がある。

カーネル内プリエンプション

プリエンプションには2種類存在する。

  • 計画的なプロセス切り替え: ある資源の解放を待つ場合にプロセスは自発的にCPUを手放す。
  • 強制的なプロセス切り替え: 高い優先度を持つ割り込みハンドラが起床すると非同期のプロセス切り替えが発生する。

プリエンプション発生の可能不可能はthread_infopreempt_countで管理され、当該メンバが0よりも大きい場合にプリエンプションは禁止される。

preempt_countは以下の場合に0より大きくなる。

  • カーネルが割り込みルーチンを実行している場合。
  • 遅延処理が実行されている(ソフト割り込みやタスクレット)場合。
  • 明示的にpreempt_countをインクリメントしてプリエンプションを禁止する場合。

割り込みとプリエンプションの関係をまとめると以下のようになる。

  • 割り込み発生後ハンドラはPICに応答しIRQラインを禁止する。同種の割り込みはハンドラ完了まで実行されない。
  • 割り込みハンドラ、ソフト割り込み、タスクレットはプリエンプト及び実行の中断はできない(割り込み実行中に他の割り込みは発生しネストする場合がある)。
  • 割り込み処理は遅延処理やシステムコールにから割り込まれることはない。
  • ソフト割り込み及びタスクレットは実行中に他のソフト割り込み及びタスクレットから割り込まれることはない。
  • 同じタスクレットは複数のCPUで同時に実行されない。

同期が必要であるケース

複数のCPUで同時に割り込み処理を行なった場合などに処理結果に影響を与えることを競合状態といい、一度に単一のCPUでのみ実行できる区間のことをクリティカル区間という。

シングルプロセッサ環境では、システムコールルーチンのような同じデータにアクセスを行う場合カーネル内プリエンプションを禁止するだけでクリティカルセッションを実現することが可能である。しかしマルチプロセッサ環境ではカーネルプリエンプションを禁止しただけではデータを保護することはできない。

同期が不要であるケース

割り込みとプリエンプションの関係から以下のことがわかる。

  • 割り込みハンドラとタスクレットは再入可能(リエントラント)に実装する必要はない。
  • ソフト割り込みとタスクレットからアクセスされるCPUごとの変数は同期処理が不要。
  • あるタスクレットからのみアクセスのあるデータは同期処理が不要。

同期方法

同期にはローカルのCPUに対して行うものや全てのCPUで適応するものが存在する。

同期方法 説明 適応範囲
CPU毎の変数 各CPUが保持する変数 全てのCPU
アトミック操作 カウンタに対するアトミックな読み込み、更新及び書き出し 全てのCPU
メモリバリア 命令の実行順序変更の回避 ローカルのCPUまたは全てのCPU
スピンロック ジーウエイトによるロック 全てのCPU
セマフォ プロセスの実行中断または休止によるロック 全てのCPU
順次ロック アクセスカウンタに基づいたロック 全てのCPU
ローカル割り込み禁止 ローカルCPUで割り込み禁止 ローカルのCPU
ローカルソフト割り込み禁止 ローカルCPUで遅延処理禁止 ローカルのCPU
RCU(Read-Copy-Update) ポインタ利用した共有データへのロック不要アクセス 全てのCPU

CPU毎の変数

CPU毎に配置することに意味のあるデータは各CPU用に保持することで競合状態が起こらない状態を作りシステム性能の向上に繋がる(CPU毎の変数は基本的にキャッシュに載せるためアライメントされる)。他のCPUは基本的にそのデータにアクセスはできない。

しかしカーネルプリエンプションに関しては搭載されているCPUの数に関係なく同期する必要があるため、CPUの値にアクセスする際にはプリエンプションを禁止する。

CPU毎にデータにアクセスする関数やマクロには以下のようなものがある。

// include/asm-generic/percpu.h
#define DEFINE_PER_CPU(type, name) \
    __typeof__(type) per_cpu__##name

#define per_cpu(var, cpu)  (*((void)cpu, &per_cpu__##var))

アトミック操作

読み出し、更新、書き出しのタイミングが悪いと更新時の値に不整合が起こる。この問題に対する最も簡単な方法としてはCPUレベルの命令でアトミック性を保証することである。ここでのアトミックな命令とは実行途中に割り込まれず他のCPUから同じ番地へアクセスも回避するような命令のことをいう。

x86の命令をアトミックの観点がから見ると以下のことが言える。

  • アラインメント境界をまたがないメモリアクセスまたは全くメモリアクセスを行わない命令はアトミックである。
  • incやdecなどの「読み込み、更新、書き出し」を行うアセンブリ命令は読み込みと書き出しの間にプロセッサにメモリバスをスティールされなければアトミックに実行される(シングルプロセッサ環境ではスティールは発生しない)。
  • オペコードにlockバイト(0xF0)でプレフィクスされている命令は実行完了までCPUがメモリバスをロックすることでアトミックに実行される。
  • オペコードがrepバイト(0xF2, 0xF3)でプレフィクスされている命令は制御回路に当該命令を複数回実行させるためアトミックではない。制御回路は各繰り返しの実行前に割り込みなどがないか確認を行う。

C言語のインクリメント処理(a++a=a+1)はコンパイラがアトミックな命令として生成することを保証していない。Linuxではatomic_t型(アトミックアクセスが可能なカウンタ)と、当該型の変数を扱うための関数やマクロを提供している。

// include/asm-i386/atomic.h
/*
 * Make sure gcc doesn't try to be clever and move things around
 * on us. We need to use _exactly_ the address the user gave us,
 * not some alias that contains the same information.
 */
typedef struct { volatile int counter; } atomic_t;

#define ATOMIC_INIT(i) { (i) }

/**
 * atomic_read - read atomic variable
 * @v: pointer of type atomic_t
 * 
 * Atomically reads the value of @v.
 */ 
#define atomic_read(v)     ((v)->counter)

最適化処理とメモリバリア

コンパイラコンパイル時にプログラムの最適化を行う過程でアセンブリ命令の省略及び並び替えなどを行う、加えてCPUは複数命令の並列実行を行なったり動的な並び替えを行う。よってプログラムの性能は大きく向上する。しかしこれは同期処理を行う上でデータを破壊する要因となってしまう。

最適化処理バリア

コンパイラの最適化からデータを保護するためにLinuxでは以下のマクロを使用する。

// include/linux/compiler-gcc.h
/* Optimization barrier */
/* The "volatile" is due to gcc bugs */
#define barrier() __asm__ __volatile__("": : :"memory")

__asm__はプログラム内にアセンブリ命令を挿入することを意味する(ここでは命令無し)。次にvolatile__asm__内の命令とその前後の命令(コンパイル結果)間での並び替えの禁止をコンパイラに指示する。memoryはRAM内の全てのメモリ領域が書き換わる可能性があることを明示し、これによりコンパイラ__asm__内の命令実行前にCPUに読み込んだレジスタの値を使用した最適化はできなくなる。

最適化処理バリアは、CPUによる動的なアセンブリ命令実行順序の並び替えを禁止するものではない。

メモリバリア

メモリバリアの前に配置された命令の実行が完了後、その後に配置されている命令の実行を行うことを保証する。つまりメモリバリアをまたぐような命令の移動はできなくなる。

x86では以下の命令はシリアル化(直列化)命令と呼ばれ、メモリバリアとして動作する。

  • I/Oポートを操作する全ての命令
  • オペコードがlockバイト(0xF0)でプレフィクスされている全ての命令
  • 制御レジスタ、システムレジスタ及びデバッグレジスタを変更する全ての命令(EFLAGSレジスタのIFフラグ値を変更するcli命令やsti命令など)
  • lfence(読み込みバリア), sfence(書き込みバリア), mfence(読み書きバリア)(Pentium 4で導入された効率的なメモリバリア)
  • iret命令(割り込みや例外ハンドラからの復帰に用いられる)などのいくつかの特殊命令

Linuxではメモリバリアに以下のようなマクロを使用する。

// include/asm-x86_64/system.h
/*
 * Force strict CPU ordering.
 * And yes, this is required on UP too when we're talking
 * to devices.
 */
#define mb()   asm volatile("mfence":::"memory")
#define rmb()  asm volatile("lfence":::"memory")

#ifdef CONFIG_UNORDERED_IO
#define wmb()  asm volatile("sfence" ::: "memory")

lfenceやsfence、sfenceなどのメモリバリア用命令が使用できる場合には上記のように定義される。

上記のメモリバリア用命令が使用できない場合には以下のように定義される。

// include/asm-i386/system.h
/* 
 * Actually only lfence would be needed for mb() because all stores done 
 * by the kernel should be already ordered. But keep a full barrier for now. 
 */
#define mb() alternative("lock; addl $0,0(%%esp)", "mfence", X86_FEATURE_XMM2)
#define rmb() alternative("lock; addl $0,0(%%esp)", "lfence", X86_FEATURE_XMM2)

#define wmb() alternative("lock; addl $0,0(%%esp)", "sfence", X86_FEATURE_XMM)

上記はプロセッサがメモリバリア用の命令をサポートしていなかった場合に指定の命令に置き換える。lock; addl $0,0(%%esp)addl $0,0(%%esp)自体は何もしていないがlockプレフィクスを付与しているのでメモリバリアの役割を果たす。

スピンロック

スピンロックはマルチプロセッサ環境に特化して設計されたロック機構で、クリティカルせっしょに入る際にはロックを取得し処理を継続する。既にロックが取得されている場合にはロック解除までループを回して待機するので、このことをビジーウエイトと呼ぶ。

スピンロックで実装されている全てのクリティカルセッション内ではカーネルプリエンプションは禁止されている。逆にシングルプロセッサ環境ではスピンロックは不要で単純にカーネルプリエンプションの禁止と許可を切り替えるだけの処理となる。

ジーウエイトの際はプリエンプションは可能で優先度の高いプロセスが待機している場合にはそのプロセスに実行権が移る。

スピンロックは以下の構造体を用いて実現されている。

// include/asm-i386/spinlock.h
/*
 * Your basic SMP spinlocks, allowing only a single CPU anywhere
 */
typedef struct {
    volatile unsigned int slock;
#ifdef CONFIG_PREEMPT
    unsigned int break_lock;
#endif
} spinlock_t;

slockは1の場合はロックされていない状態、0以下の場合はロックされている状態を表す。break_lockはビジーウエイトしているプロセスの有無を表し、マルチプロセッサ環境で且つカーネル内プリエンプションが有効の場合に使用される。

スピンロックを扱う全てのマクロは複数のCPUからの更新で値の不整合が起こること避けるためアトミックに実装されている。

マクロ名 説明
spin_lock_init spinlock_t構造体の初期化
spin_lock スピンロックの取得を行う
spin_unlock スピンロックの解除を行う
spin_unlock_wait スピンロックが解除されるまでビジーウエイトを行う
spin_is_locked ロックが取得されている場合には1、解除されている場合には0を返す。
spin_trylock ロックの取得を試み取得に成功すると1、失敗すると0を返す

spin_lock

上記のspin_lockマクロは以下のように定義されている。

// include/linux/spinlock.h
#define spin_lock(lock)        _spin_lock(lock)

#define _spin_lock(lock)   \
do { \
   preempt_disable(); \ // thread_infoのpreemt_countメンバをインクリメント
    _raw_spin_lock(lock); \
    __acquire(lock); \ // デバッグのための記述
} while(0)

上記を見ると内部で_raw_spin_lockが呼ばれているのがわかる。ここからCPUアーキテクチャに依存した処理となる。

// include/asm-i386/spinlock.h
static inline void _raw_spin_lock(spinlock_t *lock)
{
    __asm__ __volatile__(
        spin_lock_string
        :"=m" (lock->slock) : : "memory"); // slockを更新する。
}

#define spin_lock_string \
   "\n1:\t" \
   "lock ; decb %0\n\t" \ // lock->slockデクリメント
    "jns 3f\n" \ // 結果が負の値でない(0)である->ロックの取得に成功。3:へジャンプ
    "2:\t" \ // ロックに失敗した場合
    "rep;nop\n\t" \
    "cmpb $0,%0\n\t" \ // 再度lock->slockを確認
    "jle 2b\n\t" \ // 0以下である->ロックは取得されている。2:へジャンプ
    "jmp 1b\n" \ // ロックが解除されている->再度ロックの取得処理へ。1:へジャンプ
    "3:\n\t"

spin_unlock

// include/linux/spinlock.h
#define spin_unlock(lock)  _spin_unlock(lock)

#define _spin_unlock(lock) \
do { \
   _raw_spin_unlock(lock); \ // 
    preempt_enable(); \ // thread_info構造体のpreempt_countをデクリメントする
    __release(lock); \
} while (0)

上記から_raw_spin_unlockが呼び出されているのがわかる。

// include/asm-i386/spinlock.h
static inline void _raw_spin_unlock(spinlock_t *lock)
{
    __asm__ __volatile__(
        spin_unlock_string
    );
}

#define spin_unlock_string \
   "movb $1,%0" \ // lock->slockに1を設定する(ロック解除)。
        :"=m" (lock->slock) : : "memory"

読み書き用のスピンロック

並列実行を効率的に行うために導入されたスピンロックで、書き込み時は排他的なロックを行うが、読み込み時は同時に複数のカーネル実行パスからアクセスが可能となる。書き込みロックの取得の際に読み込みロックが取得されている場合はロックを取得することはできない。

読み書き用のスピンロックでは以下の構造体が使用される。

typedef struct {
    volatile unsigned int lock;
#ifdef CONFIG_PREEMPT
    unsigned int break_lock;
#endif
} rwlock_t;

上記の構造体メンバであるlockは32bitのビットフィールド形式で処理される。

ビット 説明
0~23 読み込み処理を行なっているカーネル実行パスの数
24 ロック解除フラグ。読み込み及び書き込み処理を行なっているカーネル実行パスが存在しないことを示す。(1:存在しない, 0:存在する)

初期化

初期化は以下のマクロで行う。

// include/asm-i386/spinlock.h
#define rwlock_init(x) do { *(x) = RW_LOCK_UNLOCKED; } while(0)

#define RW_LOCK_UNLOCKED (rwlock_t) { RW_LOCK_BIAS RWLOCK_MAGIC_INIT }

#define RWLOCK_MAGIC_INIT  /* */

// include/asm-i386/rwlock.h
#define RW_LOCK_BIAS        

lockメンバ変数にRW_LOCK_BIAS(0x01000000)を設定しており、ロック解除フラグが立っている(ロックされていない)状態で初期化される。

読み込み用のロック取得及び解除

読み込み用のロックにはread_lockマクロを使用する。

// include/linux/spinlock.h
#define read_lock(lock)        _read_lock(lock)

#define _read_lock(lock)   \
do { \
   preempt_disable(); \
   _raw_read_lock(lock); \
   __acquire(lock); \
} while(0)

上記からspin_lockと同じ処理を行い、_raw_read_lockを使用しているのがわかる。当該マクロの定義は以下。

// include/asm-i386/spinlock.h
static inline void _raw_read_lock(rwlock_t *rw)
{
    __build_read_lock(rw, "__read_lock_failed");
}

#define __build_read_lock(rw, helper)  do { \
   if (__builtin_constant_p(rw)) \ // 引数が定数であれば1、そうでなれけば0を返す。
        __build_read_lock_const(rw, helper); \
    else \
        __build_read_lock_ptr(rw, helper); \
    } while (0)

__builtin_constant_p(v)GCCのビルトイン関数でvが定数だと1、それ以外だと0を返す。ここではrwlock_t構造体へのポインタを受け取っているので__build_read_lock_ptrが呼び出される。

#define __build_read_lock_ptr(rw, helper)   \
   asm volatile(LOCK "subl $1,(%0)\n\t" \ // 構造体の先頭(lockメンバ)をデクリメント
        "jns 1f\n" \
        "call " helper "\n\t" \
        "1:\n" \
        ::"a" (rw) : "memory")

__build_read_lock_ptrではrw->lock(eaxレジスタ)をアトミック(LOCK)にデクリメントし、結果が0以上なら取得に成功しているので(書き込みロックが取得されていない場合には24bit目が1であるから減算しても負の値にはならない)、1:にジャンプし処理を抜ける。失敗した場合には第二引数のheplerで受け取った関数(ここでは__read_lock_failed)を実行する。

__read_lock_failedは以下のように定義されている。

// arch/i386/kernel/i386_ksyms.c
extern void FASTCALL( __read_lock_failed(rwlock_t *rw));

// arch/i386/kernel/semaphore.c
asm(
".section .sched.text\n"
".align  4\n"
".globl  __read_lock_failed\n"
"__read_lock_failed:\n\t"
    LOCK "incl   (%eax)\n" // アトミックインクリメント(先ほどの減算を修正)
"1:  rep; nop\n\t"
    "cmpl    $1,(%eax)\n\t" // lockが1以上になった(書き込みロックが解除された)かどうか
    "js  1b\n\t" // 解除されていない場合は1:に戻る
    LOCK "decl   (%eax)\n\t" // 再度減算(ロック)を行う。
    "js  __read_lock_failed\n\t" // マイナスならロックに失敗、1:に戻る
    "ret"
);

__read_lock_failedfastcallで定義されており、先ほどeaxレジスタに保持していたrw->lockを使用する。

読み込みロックの解除はシンプルで、以下のようにlockをインクリメントしカーネルプリエンプションを有効にするだけの処理となっている。

// include/linux/spinlock.h
#define read_unlock(lock)  _read_unlock(lock)

#define _read_unlock(lock) \
do { \
   _raw_read_unlock(lock); \
   preempt_enable(); \
   __release(lock); \
} while(0)

#define _raw_read_unlock(rw)   asm volatile("lock ; incl %0" :"=m" ((rw)->lock) : : "memory")

書き込み用のロック取得及び解除

書き込み用のマクロにはwrite_lockを使用する。

// include/linux/spinlock.h
#define write_lock(lock)   _write_lock(lock)

#define _write_lock(lock) \
do { \
   preempt_disable(); \
   _raw_write_lock(lock); \
   __acquire(lock); \
} while(0)

_write_lockが呼ばれているのがわかる。

// include/asm-i386/rwlock.h
static inline void _raw_write_lock(rwlock_t *rw)
{
    __build_write_lock(rw, "__write_lock_failed");
}

#define __build_write_lock(rw, helper) do { \
   if (__builtin_constant_p(rw)) \
       __build_write_lock_const(rw, helper); \
   else \
       __build_write_lock_ptr(rw, helper); \
   } while (0)

前述した__builtin_constant_pの引数はrwlock_t構造体のポインタであるためここでは__build_write_lock_ptrが呼ばれる。

// include/asm-i386/rwlock.h
#define RW_LOCK_BIAS_STR   "0x01000000"

#define __build_write_lock_ptr(rw, helper) \
   asm volatile(LOCK "subl $" RW_LOCK_BIAS_STR ",(%0)\n\t" \
       "jz 1f\n" \
       "call " helper "\n\t" \
       "1:\n" \
       ::"a" (rw) : "memory")

24bit目の1にした0x01000000rw->lockを減算し、結果が0であった(減算前24bit目が1だった→書き込みロックが解除された状態だった)場合には1:にジャンプし処理を抜ける。

失敗した場合には以下の__write_lock_failedが呼び出される。

// arch/i386/kernel/semaphore.c
/*
 * rw spinlock fallbacks
 */
#if defined(CONFIG_SMP)
asm(
".section .sched.text\n"
".align  4\n"
".globl  __write_lock_failed\n"
"__write_lock_failed:\n\t"
    LOCK "addl   $" RW_LOCK_BIAS_STR ",(%eax)\n" // 減算前の値に戻す
"1:  rep; nop\n\t"
    "cmpl    $" RW_LOCK_BIAS_STR ",(%eax)\n\t" // `0x01000000`と比較し読み書き共にロックが解除されているかどうか
    "jne 1b\n\t" // されていないので再度1:に戻る
    LOCK "subl   $" RW_LOCK_BIAS_STR ",(%eax)\n\t" // ロックを取得
    "jnz __write_lock_failed\n\t" // ロック取得に失敗
    "ret"
);

書き込みロックの解除はシンプルで、以下のようにlockをインクリメントしカーネルプリエンプションを有効にするだけの処理となっている。

// include/linux/spinlock.h
#define write_unlock(lock) _write_unlock(lock)

#define _write_unlock(lock) \
do { \
   _raw_write_unlock(lock); \
   preempt_enable(); \
   __release(lock); \
} while(0)

// include/asm-i386/spinlock.h
#define _raw_write_unlock(rw)  asm volatile("lock ; addl $" RW_LOCK_BIAS_STR ",%0":"=m" ((rw)->lock) : : "memory")

順次ロック

読み書きロックでは、読み込みロックが取得されている際に書き込みロックの取得はできなかったが、順次ロックでは書き込みロックに高い権限を与えることにより読み込みロックが取得されている場合でもロックの取得を行うことできる。

順次ロックでは以下の構造体が用いられる。

typedef struct {
    unsigned sequence;
    spinlock_t lock;
} seqlock_t;

sequenceは順次カウンタの役目を果たしており、データにアクセスする際は読み込みの前後でカウンタを確認し、当該データの更新が行われていないことを確認する。

初期化

初期化はseqlock_initを用いて行い、メンバのsequenceに0を設定し、前述のSPIN_LOCK_UNLOCKEDspinlock_t構造体のlockを初期化する(ロック解除状態)。

#define SEQLOCK_UNLOCKED { 0, SPIN_LOCK_UNLOCKED }
#define seqlock_init(x)    do { *(x) = (seqlock_t) SEQLOCK_UNLOCKED; } while (0)

書き込み用の順次ロックの取得と解除

順次ロックの取得と解除は以下のマクロを用いて行う。

// include/linux/seqlock.h
static inline void write_seqlock(seqlock_t *sl)
{
    spin_lock(&sl->lock); // スピンロックを取得
    ++sl->sequence; //  カウンタをインクリメント
    smp_wmb(); // メモリバリア
}   

static inline void write_sequnlock(seqlock_t *sl) 
{
    smp_wmb(); // メモリバリア
    sl->sequence++; // カウンタをインクリメント
    spin_unlock(&sl->lock); // スピンロックを解除
}

ロックの取得及び解除は両者ともカウンタをインクリメントしているのがわかる。これはカウンタが奇数の場合ロックが取得されていて、偶数の場合解除されていることを示すからである。

書き込みロックではスピンロックを取得するため、カーネルプリエンプションを禁止する必要はない。

読み込みの際の処理

順次ロックではデータを参照する際、以下のようにカウンタの値を確認する。

// arch/i386/kernel/timers/timer_cyclone.c
do {
    seq = read_seqbegin(&monotonic_lock); // カウンタ値の取得
    last_offset = ((unsigned long long)last_cyclone_high<<32)|last_cyclone_low;
    base = monotonic_base;
} while (read_seqretry(&monotonic_lock, seq)); // カウンタ値の確認

まずread_seqbegin()は以下のように定義されておりカウンタ値を返すのがわかる。

// include/linux/seqlock.h
static inline unsigned read_seqbegin(const seqlock_t *sl)
{
    unsigned ret = sl->sequence;
    smp_rmb();
    return ret;
}

カウンタ値の確認で用いられているread_seqretry()は以下のように定義されており、カウンタ値が奇数でないこと及び先ほど読み込んだカウンタ値に変更がないことを確認している。

// include/linux/seqlock.h
static inline int read_seqretry(const seqlock_t *sl, unsigned iv)
{
    smp_rmb();
    return (iv & 1) | (sl->sequence ^ iv);
}

読み込み処理がクリティカルセッションに入る時にはカーネルプリエンプションを禁止する必要はない。

順次ロックには以下のような制約がある。

  • 参照及び更新に用いるデータのポインタは基本的に扱うことができない。これは参照先の値が書き換えられる可能性があるというのが理由となる。
  • 読み込み処理のクリティカルセッションは短くし、加えて順次ロックの取得は可能な限り行わないようにする必要がある。そうでなければ読み込み処理の繰り返しによるオーバーヘッドが無視できなくなる。

Read-Copy Update (RCU)

RCUはほとんどの場合に読み込みしかされないデータを保護するために設計された同期機構である。RCUはロックフリーでロックを取得することもカウンタを持つこともしていない。

そのためRCUでは以下の利用制限がある。

読み込み処理は単純で保護対象のポインタが参照しているデータにアクセスするのみとなる。一方書き込み処理では保護対象のポインタが参照するデータをコピーし、コピーしたデータに対して更新を行う。更新後その更新したデータをポインタの参照先として設定し値を更新する。このことから読み込み処理は古いデータまたは新しいデータのどちらかを参照する。

読み込み処理ではクリティカルセッションに入る際にrcu_read_lock()マクロを呼び出し、抜ける際にrcu_read_unlock()マクロを呼び出す。

// include/linux/rcupdate.h
#define rcu_read_lock()        preempt_disable()
#define rcu_read_unlock()  preempt_enable()

RCUの問題点としては書き込み処理でデータが更新された場合でも、古いデータを参照している全ての読み込み処理がクリティカルセッションを抜けるまでその古いデータを解放できないことにある。

Linuxでは以下の処理を行う前にクリティカルセッションを抜ける必要がある。

  • プロセス切り替え
  • ユーザモードプロセスの実行を開始
  • アイドルループの実行

上記をソースコード上ではgrace periodと呼んでいる。

書き込み処理ではcall_rcu()を呼び出すことで古いデータの解放を行う。

// kernel/rcupdate.c
/**
 * call_rcu - Queue an RCU callback for invocation after a grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual update function to be invoked after the grace period
 *
 * The update function will be invoked some time after a full grace
 * period elapses, in other words after all currently executing RCU
 * read-side critical sections have completed.  RCU read-side critical
 * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
 * and may be nested.
 */
void fastcall call_rcu(struct rcu_head *head,
                void (*func)(struct rcu_head *rcu))
{
    unsigned long flags;
    struct rcu_data *rdp;

    head->func = func;
    head->next = NULL;
    local_irq_save(flags);
    rdp = &__get_cpu_var(rcu_data);
    *rdp->nxttail = head;
    rdp->nxttail = &head->next;
    local_irq_restore(flags);
}

上記ではCPU毎のコールバック関数リストに古いデータを解放するような関数を設定している。これはタイマ割り込みで一定期間毎に読み込み処理がgrace periodを通過したかを確認し、通過していれば古いデータを解放するようなコールバック関数を呼び出す。

セマフォ

セマフォとはある資源が解放されるまで、他の読み込みを行う処理を休止させておけるロック機構である。Linuxで以下の2種類のセマフォを提供している。

ここではカーネルモードで動作するセマフォを見ていく。

スピンロックとの違いは、スピンロックではループで処理を止めておくのに対して、セマフォではその資源が解放されるまで処理を休止する(shedule()を呼び出し実行権を他の処理に譲る)というところが異なる。よって休止可能なプロセスのみがセマフォを使用でき、その他の割り込みハンドラや遅延処理はセマフォを使用することはできない。

セマフォは以下の構造体を使用する。

// include/asm-i386/semaphore.h
struct semaphore {
    atomic_t count; // 資源の利用可能数。
    int sleepers; // プロセスの待機状態。
    wait_queue_head_t wait; // 資源の解放待ちをしている処理の待ちキュー。0以上の場合は空。
};

上記のwait_queue_head_tにはspinlock_tが使用されている。

// include/linux/wait.h
typedef struct __wait_queue_head wait_queue_head_t;
struct __wait_queue_head {
    spinlock_t lock;
    struct list_head task_list; // 待ちキュー
};

初期化

セマフォの初期化にはinit_MUTEX()マクロを使用する。

// include/asm-i386/semaphore.h
static inline void init_MUTEX (struct semaphore *sem)
{
    sema_init(sem, 1);
}

static inline void sema_init (struct semaphore *sem, int val)
{
    atomic_set(&sem->count, val);
    sem->sleepers = 0;
    init_waitqueue_head(&sem->wait);
}

init_MUTEXではカウンタ値は1だがsema_init()を用いてることで任意の値を設定できるのがわかる。

init_waitqueue_headは以下のように定義されている。

static inline void init_waitqueue_head(wait_queue_head_t *q)
{
    q->lock = SPIN_LOCK_UNLOCKED;
    INIT_LIST_HEAD(&q->task_list);
}

取得と解放

セマフォの取得にはdown()関数を用いる。

// include/asm-i386/semaphore.h
/*
 * This is ugly, but we want the default case to fall through.
 * "__down_failed" is a special asm handler that calls the C
 * routine that actually waits. See arch/i386/kernel/semaphore.c
 */
static inline void down(struct semaphore * sem)
{
    might_sleep();
    __asm__ __volatile__(
        "# atomic down operation\n\t"
        LOCK "decl %0\n\t" // アトミックにカウンタをデクリメントする
        "js 2f\n" // マイナス値になった(取得に失敗した)場合には2:へ
        "1:\n"
        LOCK_SECTION_START("") // ロック成功時にはここで関数を抜ける。
        /* ロック失敗時はココから */
        "2:\tlea %0,%%eax\n\t" // sem->count == semのアドレスをeaxにセット
        "call __down_failed\n\t" // 関数の呼び出し
        "jmp 1b\n"
        LOCK_SECTION_END
        :"=m" (sem->count)
        :
        :"memory","ax");
}

カウンタの減算(decl)結果が0以上(ロックが成功)の場合にはLOCK_SECTION_START("")で関数を抜ける。失敗時にはsemeaxに設定し__down_failed()を呼び出していることがわかる。

// arch/i386/kernel/semaphore.c
asm(
".section .sched.text\n"
".align 4\n"
".globl __down_failed\n"
"__down_failed:\n\t"
/* スタックフレームの切り替え */
#if defined(CONFIG_FRAME_POINTER)
    "pushl %ebp\n\t"
    "movl  %esp,%ebp\n\t"
#endif
    /* __down()がfastcallで呼び出されるため */
    "pushl %edx\n\t" // ↑
    "pushl %ecx\n\t" // ↑
    "call __down\n\t"
    "popl %ecx\n\t" // リストア
    "popl %edx\n\t" // リストア
/* スタックフレームの切り替え */
#if defined(CONFIG_FRAME_POINTER)
    "movl %ebp,%esp\n\t"
    "popl %ebp\n\t"
#endif
    "ret"
);

上記から__down()が呼び出されているのがわかる。呼び出しの前後ではスタックフレームの切り替えとレジスタのプッシュ及びポップを行なっている。__down()はfastcall(eax及びedxecxを引数とする)で呼び出されているが、レジスタの退避で行うedx及びecxは特に使用されない。

__down()は以下のように定義されている。

// arch/i386/kernel/semaphore.c
fastcall void __sched __down(struct semaphore * sem)
{
    struct task_struct *tsk = current;
    DECLARE_WAITQUEUE(wait, tsk); // 待ちキュー初期化
    unsigned long flags; // EFLAGレジスタの退避場所

    tsk->state = TASK_UNINTERRUPTIBLE; // インタラプト不可能に
    spin_lock_irqsave(&sem->wait.lock, flags); // EFLAGSレジスタの退避、ローカルCPUへの割り込みの禁止、プリエンプトの禁止、スピンロックの取得を行う。
    add_wait_queue_exclusive_locked(&sem->wait, &wait); // semが保持している待ちキューにカレントプロセスを登録

    sem->sleepers++; // 待ちキューに待機プロセスが存在する。
    for (;;) {
        int sleepers = sem->sleepers;
        
        /* (sleepers - 1)をsem->countに足し込んだ結果が0以上の場合 */ 
        if (!atomic_add_negative(sleepers - 1, &sem->count)) {
            sem->sleepers = 0;
            break;
        }
        sem->sleepers = 1;  /* us - see -1 above */
        spin_unlock_irqrestore(&sem->wait.lock, flags); // 先ほど禁止した物を一時的に解除

        schedule(); // 再度スケジューリング

        spin_lock_irqsave(&sem->wait.lock, flags); // 再度禁止
        tsk->state = TASK_UNINTERRUPTIBLE; // インタラプト不可能に
    }
    
    remove_wait_queue_locked(&sem->wait, &wait); // 待ちキューからタスクを取り出す
    wake_up_locked(&sem->wait); // 取り出したタスクを起床させる。
    spin_unlock_irqrestore(&sem->wait.lock, flags); // 先ほど禁止した物を解除
    tsk->state = TASK_RUNNING; // タスクを動作中に
}

上記ではまずロックの取得に失敗したカレントプロセスを待ちキューに登録し、ループ内でカウンタ値をロックが取得できるまで繰り返し確認する。その間カウンタ値を見てロックを取得できないとわかるとschedule()関数を呼び出し他のプロセスに実行権を譲る。

待ちキューの初期化に使用されるDECLARE_WAITQUEUEは以下のように定義されている。

// include/linux/wait.h
#define DECLARE_WAITQUEUE(name, tsk)                   \
   wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)

#define __WAITQUEUE_INITIALIZER(name, tsk) {               \
   .task       = tsk,                      \
   .func       = default_wake_function,            \
   .task_list  = { NULL, NULL } }

taskがカレントプロセスディスクリプタを保持し、funcが起床のための関数、task_listが待ちキューの要素となる。

ロックの解除にはup関数を使用する。

// include/asm-i386/semaphore.h
static inline void up(struct semaphore * sem)
{
    __asm__ __volatile__(
        "# atomic up operation\n\t"
        LOCK "incl %0\n\t" // カウンタインクリメント
        "jle 2f\n" // 0以下の場合
        "1:\n"
        LOCK_SECTION_START("") // 結果が0より大きい場合ココで抜ける
        "2:\tlea %0,%%eax\n\t" // semをeaxに設定
        "call __up_wakeup\n\t"
        "jmp 1b\n"
        LOCK_SECTION_END
        ".subsection 0\n"
        :"=m" (sem->count)
        :
        :"memory","ax");
}

カウンタが0以下の場合には__up_wakeupを呼び出す。

// arch/i386/kernel/semaphore.c
asm(
".section .sched.text\n"
".align 4\n"
".globl __up_wakeup\n"
"__up_wakeup:\n\t"
    "pushl %edx\n\t"
    "pushl %ecx\n\t"
    "call __up\n\t"
    "popl %ecx\n\t"
    "popl %edx\n\t"
    "ret"
);

先ほど__down()と同じくレジスタをスタックに退避し__up()を呼び出している。

// arch/i386/kernel/semaphore.c
fastcall void __up(struct semaphore *sem)
{
    wake_up(&sem->wait);
}

wake_up()では待ちキューからプロセスを起動させる処理を行う。

制約

前述のようにセマフォがビジーの時にプロセスは休止状態に入るため、例外ハンドラやシステムコールルーチンだけがdown()を呼び出すことができ、休止が許されない割り込みハンドラや遅延処理はdown()を呼び出すことはできない。

そのため休止できない非同期処理にも対応したdown_trylock()という関数も提供されている。基本的にdown()を同じだが資源がビジー状態の時に処理を休止せず関数を終了する。加えてdown_interruptible()という関数も提供されており、休止状態に入った際にシグナルを送信することで処理を中止できデバイスドライバなどで多く使用されている。

読み書きセマフォ

読み書き用スピンロックと同様に読み込みであれば複数の実行パスが動作可能で、書き込みであれば読み書き両者ともロックが解除された際に初めてロックを取得することができると言うもの。

// include/asm-i386/rwsem.h
struct rw_semaphore {
    /* 上位16bit: 書き込み処理数 + ロック待ち数の2の補数、下位16bit: 読み込み及び書き込み処理両方の総数 */
    signed long       count;数
   #define RWSEM_UNLOCKED_VALUE        0x00000000
   #define RWSEM_ACTIVE_BIAS       0x00000001
   #define RWSEM_ACTIVE_MASK       0x0000ffff
   #define RWSEM_WAITING_BIAS      (-0x00010000)
   #define RWSEM_ACTIVE_READ_BIAS      RWSEM_ACTIVE_BIAS
   #define RWSEM_ACTIVE_WRITE_BIAS (RWSEM_WAITING_BIAS + RWSEM_ACTIVE_BIAS)
    spinlock_t      wait_lock; // スピンロック
    struct list_head   wait_list; // 待ちプロセスリスト
#if RWSEM_DEBUG
    int            debug;
#endif
};

初期化はinit_rwsem()、読み込み用セマフォの取得及び解除はdown_read()及びup_read()で行い、書き込みセマフォの取得及び解除はdown_write()及びup_write()で行う。

完了通知

セマフォと類似した同期機構でマルチプロセッサシステムで起こりえる競合状態を解決するために導入された。

セマフォでは、Aがあるセマフォ変数をロック状態で初期化し、それをBがup()で解放させている最中に、Aがその変数を削除した場合にBは存在しないデータにアクセスすることになる。この不整合を回避するために導入されたのが完了通知である。

// include/linux/completion.h
struct completion {
    unsigned int done; // フラグ(1: 完了, 0: 未完)
    wait_queue_head_t wait; // 待ちプロセスのキュー
}

完了通知ではセマフォdown()に当たるのはwait_for_completion()と言う関数である。

// kernel/sched.c
void fastcall __sched wait_for_completion(struct completion *x)
{
    might_sleep();
    spin_lock_irq(&x->wait.lock); // スピンロック取得
    if (!x->done) { // "未完"である場合
        DECLARE_WAITQUEUE(wait, current);

        wait.flags |= WQ_FLAG_EXCLUSIVE;
        __add_wait_queue_tail(&x->wait, &wait); // カレントプロセスを待ちキューに追加
        /* プロセスを休止 */        
        do {
            __set_current_state(TASK_UNINTERRUPTIBLE);
            spin_unlock_irq(&x->wait.lock);
            schedule();
            spin_lock_irq(&x->wait.lock);
        } while (!x->done); // "未完"である間待ち続ける。
        __remove_wait_queue(&x->wait, &wait);
    }
    x->done--;
    spin_unlock_irq(&x->wait.lock); // スピンロック解放
}

これはフラグを確認し、フラグが未完であった場合にはこの関数は実行を休止する。

セマフォup()が完了通知ではcomplete()を用いる。

// kernel/sched.c
void fastcall complete(struct completion *x)
{
    unsigned long flags;

    spin_lock_irqsave(&x->wait.lock, flags); // ローカルCPUの割り込み及びプリエンプション禁止、スピンロックの取得を行う。
    x->done++; // フラグを完了状態に
    __wake_up_common(&x->wait, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE,
             1, 0, NULL); // 待ちキューのプロセスを起床させる。
    spin_unlock_irqrestore(&x->wait.lock, flags); // 解除
}

上記はシンプルでフラグをインクリメントし完了状態とする。

セマフォと異なる点は、セマフォではデータ保護の目的でスピンロックを使用していたのに対し、完了通知では関数(wait_for_completion及びcomplete)の同時実行を回避するためにスピンロックが使用されている点である。

ローカル割り込みの禁止

ローカルCPU割り込みの禁止によりハードウェアからのIRQシグナルが発生しても実行パスを継続でき、割り込みハンドラからアクセスされるデータを保護するのに有効である。しかしローカル割り込みの禁止のみでは他のCPUからアクセスされるデータを保護することができないため、マルチプロセッサ環境ではスピンロックと併用される。

ローカル割り込みの禁止はlocal_irq_disable()、許可はlocal_irq_enable()で行う。

#define local_irq_disable()     __asm__ __volatile__("cli": : :"memory")
#define local_irq_enable() __asm__ __volatile__("sti": : :"memory")

割り込みの禁止はEFLAGSレジスタのIFフラグで管理しており、当該フラグはセットされている際には割り込みが可能であり、クリアされている場合には割り込みは禁止されている。上記のマクロで使用されているcli(Clear Interrupt Flag)命令はIFフラグをクリアしsti(Set Interrupt Flag)命令は当該フラグをセットする。

クリティカルセッションに入るとEFLAGSレジスタのIFフラグをクリアし割り込みを禁止するが、抜ける際にIFフラグをセットすることはできない。これはIFフラグを禁止する前のRFLAGSレジスタの状態がどうであるかを知る方法がないためである。よってIFフラグをクリアする前にローカル変数にフラグ値を退避し、抜ける際に退避した値をリストアするという処理を行う。

// include/asm-i386/system.h
#define local_irq_save(x) __asm__ __volatile__(
    "pushfl"
    "popl %0"  // 変数に保存
    "cli" // 割り込みの禁止
    :"=g" (x)
    : /* no input */ 
    :"memory"
)

#define local_irq_restore(x) do {
    typecheck(unsigned long,x); __asm__ __volatile__(
        "pushl %0;" // 変数の値をスタックに積み
        "popfl" // EFLAGSレジスタへリストア
        : /* no output */ 
        :"g" (x)
        :"memory", 
        "cc"
    ); 
} while (0)

local_irq_save()は変数にEFLAGSレジスタの値を退避し、割り込みを禁止するマクロで、local_irq_restore()は変数の値をEFLAGSレジスタにリストアするマクロである。クリティカルセッションを扱う関数ではこれらのマクロを入る前と抜けた後に使用する。

遅延処理の禁止及び許可

遅延処理(ソフト割り込み及びタスクレット)は基本的にハードウェア割り込みハンドラから起動されるため、遅延実行の禁止の方法として一番簡単なのはローカルCPUへの割り込みの禁止ということになる。もしくはthread_info構造体のpreemt_countメンバが保持しているsoftirqカウンタを用いる方法である。

softirqの禁止にはlocal_bh_disable()を、許可には__local_bh_enable()を使用する。

// include/linux/interrupt.h
/* SoftIRQ primitives.  */
#define local_bh_disable() \
       do { add_preempt_count(SOFTIRQ_OFFSET); barrier(); } while (0)
#define __local_bh_enable() \
       do { barrier(); sub_preempt_count(SOFTIRQ_OFFSET); } while (0)

同期機構の選択

同期機構によって共有データの競合状態を回避できるが、一方でシステム性能に多大な影響を与える。カーネル性能には並列度が大きく関係している。

基本的にシステムの並列度には2つの要素が依存する。

  • 同時に動作可能なI/Oデバイスの数
  • CPUの個数

I/Oスループットの最大化には割り込み禁止時間を可能な限り短くすることが求められる。CPUの効率的な利用にはハードウェアキャッシュへの影響を起こしたりマシンサイクルを消費するスピンロックを可能な限り使用しないことが求められる。

高いシステム性能を保つための同期機構の使用例を以下の示す。

  • 整数型の共有データはatomic_t型で宣言しアトミック操作で処理を行う。共有データへのアクセスの際のみ速度が低速になるのでスピンロックや割り込みの禁止を行うよりも高速に処理を行うことが可能。

  • 共有するリストの更新を行う場合にロックを使用せず更新できるケースがある。リストにアイテムを追加する基本的な処理は以下。

new->next = item->next;
item->next = new;

アセンブリでは上記の2つの操作はアトミックなものとなる。仮に1つ目の操作後に割り込みが発生した際にでもデータは一貫性を保ったままとなる。しかしこれはあくまで参照の場合で更新処理では存在しない要素へをアクセスなどの不整合が起こる可能性がある。加えてCPUの制御回路によって実行順序の入れ替えも回避する必要がある。これは更新の最中に割り込みが入るとハンドラは壊れたリストにアクセスする可能性があるからである。よって以下のように書き込み用のメモリバリアを上記の処理の間に入れることでこの不整合が回避できる。

new->next = item->next;
wmb();
item->next = new;

スピン及びセマフォ、割り込み禁止の選択

同期機構の選択はカーネル実行パスでどういった処理を行うかに依存する。しかしスピンロック取得時には常にローカルCPUへの割り込み、ローカルソフト割り込み及びカーネルプリエンプションも自動的に禁止になる。

カーネル実行パスの種類 シングルプロセッサ環境での保護 マルチプロセッサ環境での保護
例外 セマフォ -
割り込み ローカル割り込みの禁止 スピンロック
遅延処理 - 無し又はスピンロック
例外+割り込み ローカル割り込みの禁止 スピンロック
例外+遅延処理 ローカルソフト割り込みの禁止 スピンロック
割り込み+遅延処理 ローカル割り込みの禁止 スピンロック
例外+割り込み+遅延処理 ローカル割り込みの禁止 スピンロック

例外時のデータ保護

例外時は資源が利用可能になるまでプロセスを休止させることに問題がないため、データ保護はセマフォで行うことが可能。カーネルプリエンプションも問題なくセマフォで保護したデータにアクセスする場合にも失敗すれば休止状態に入るのでそれ以前に取得されたロックの解除を待つことになる。カーネルプリエンプションを禁止する必要があるのはCPU毎の変数にアクセスする時のみである。

割り込み時のデータ保護

同じ種類の割り込みは同時に発生することがないが、複数の割り込みからアクセスされるデータは保護の対象となる。

シングルプロセッサ環境ではクリティカルセッション内でのローカルCPUへの割り込みを禁止して競合状態を回避する。休止できない割り込みハンドラではセマフォは使用できず、スピンロックもシステムを停止させる可能性があるため単体では使用できない(CPUがスピンロックを取得した際にローカル割り込みが入りスピンロックの取得ができずビジーウエイト状態になるとそこから復帰できずシステムがストールする)。

マルチプロセッサ環境ではローカルCPUへの割り込み加えさらにスピンロックを使用する必要がある。マルチプロセッサ環境ではローカルCPUへの割り込みに加えて他のCPUでも割り込みが発生するためである。ローカルCPUがスピンロックを取得後、他のCPUでスピンロックを取得しても、(ローカルCPUの)クリティカルセッションで処理が進行するためシステムがストールすることはない。

遅延処理時のデータ保護

遅延処理のデータ保護はその遅延処理の種類に依存する。(例えばソフト割り込みとタスクレットは並列度が異なる)

シングルプロセッサ環境では競合状態は起きない。割り込み処理は常に逐次実行されるめ、他の遅延処理には割り込まれず同期機構は必要ない。

マルチプロセッサ環境では一部の遅延処理が並列で実行されるため競合状態が発生する。

遅延処理の種類 保護方法
ソフト割り込み スピンロック
単一のタスクレット 必要無し
複数のタスクレット スピンロック

同じソフト割り込みは複数のCPUで同時実行される可能性がある、逆に同じタスクレットは同時に実行できないので、単一のタスクレットからアクセスされるデータは保護の対象とならない。しかし複数種類のタスクレットは同時に実行される可能性があるため、そのようなアクセスがあるデータは保護の対象となる。

例外及び割り込みからアクセスのあるデータの保護

シングルプロセッサ環境では割り込みハンドラが動作している際に割り込みまたは例外は発生しないため、ローカル割り込みが禁止されていれば例外や割り込みハンドラのデータアクセス最中に割り込まれることはない。

マルチプロセッサ環境では複数のCPUが例外や割り込みを処理するため、ローカル割り込みとスピンロックを併用する必要がある。こうすることで共有データにアクセスしている処理が完了まで他の処理を待たせることが可能となる。スピンロックの他にもセマフォを使用できるケースもあり、割り込みハンドラは休止できないのでdown_trylock()を使い、システムコールルーリンではセマフォを使用する。システムコールルーチンは休止することを考慮し設計されており、セマフォを使用することでスピンロックよりも並列度を高く保つことができる。

例外及び遅延処理からアクセスのあるデータの保護

遅延処理は割り込みによって起動されるため、基本的には割り込みと遅延処理がアクセスするデータの保護と同じ方法を取ることができる。

例外ハンドラはlocal_bh_disable()を使用することで遅延処理のみを禁止することができる。

/* SoftIRQ primitives.  */
#define local_bh_disable() \
       do { add_preempt_count(SOFTIRQ_OFFSET); barrier(); } while (0)

マルチプロセッサシステムでは単一のカーネル実行パスからのアクセスを保証するためスピンロックが必要となる。

割り込み及び遅延処理からアクセスのあるデータの保護

遅延処理は割り込みハンドラを休止させることはできないが、割り込みハンドラは遅延処理に割り込むことが可能なため、遅延処理中にはローカルCPUへの割り込みは禁止する必要がある。マルチプロセッサ環境ではスピンロックが必要となる。

例外、割り込み処理及び遅延処理からアクセスのあるデータの保護

ローカルCPUへの割り込みの禁止とスピンロックの取得が有効である。遅延処理は割り込みハンドラにより起動されるので明示的に禁止する必要はない。

参考