愛鋒貝

標(biāo)題: xv6 學(xué)習(xí):進程管理C 互斥與同步 [打印本頁]

作者: 小強實驗室    時間: 2023-4-3 10:11
標(biāo)題: xv6 學(xué)習(xí):進程管理C 互斥與同步
主要參考自 《給操作系統(tǒng)捋條線》,加上了自己的筆記。
鎖大家都很熟悉了,就是用來進程互斥、實現(xiàn)同步。本文首先介紹鎖的實現(xiàn),然后是 xv6 中鎖的使用,xv6 基于鎖實現(xiàn)了互斥和同步。
索引
1. 基本概念、函數(shù)

1.1 概念

這里介紹下與鎖相關(guān)的基本概念
競爭條件比如說多進程要并行讀寫同一段地址空間,這樣很可能會破壞讀寫原子性,譬如讀取到其他寫操作的中間狀態(tài)。顯然競爭條件并不是我們想要的,雖然一些競爭條件出現(xiàn)的概率很小,但根據(jù)墨菲定律,會出錯的總會出錯,加之計算機的運行頻率,就算出錯的概率再小,在某天某時某刻那也是有可能發(fā)生的。
所以對于進入臨界區(qū)訪問公共資源我們要避免競爭條件,保證公共資源的互斥排他性,一般有兩種大的解決方案來實現(xiàn)互斥
xv6 就根據(jù)上面兩種方案,分別實現(xiàn)了兩個類型的鎖,在后面會看到
1.2 sleep

這里介紹下 sleep、exit、wakeup 三個函數(shù)的基本作用,更深層次的,在這些函數(shù)中使用的互斥并發(fā),將在介紹了鎖后討論。
void sleep(void *chan, struct spinlock *lk);這里的 sleep 不是那個系統(tǒng)調(diào)用,而是 xv6 的內(nèi)核函數(shù)。chan 表示進程因為什么對象而沉睡?lk 表示管理這個對象的鎖。
void sleep(void *chan, struct spinlock *lk){
    p->chan = chan;         // 休眠在chan上
    p->state = SLEEPING;    // 狀態(tài)更改為SLEEPING
    sched();                // 讓出CPU調(diào)度
    p->chan = 0;            // 休眠對象改為0
}sleep 大概就干了上面的事:
調(diào)用 sched 之后是不會返回的,而是去執(zhí)行其他進程了,這就是休眠的好處,提高 cpu 的利用率。
1.3 wakeup

static void
wakeup1(void *chan)
{
  struct proc *p;

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}wakeup 的作用就是喚醒所有因 chan 而沉睡的進程,具體就是遍歷一遍 pcb 數(shù)組,然后判斷 sleeping && p->chan == chan。wakeup1 是沒有加鎖版本的 wakeup,一般是獲取了鎖然后調(diào)用這個函數(shù)。
1.4 exit

這里來看看 exit 函數(shù)。在進程的 main 函數(shù)執(zhí)行完后將會系統(tǒng)調(diào)用 exit 函數(shù),讓子進程回收一部分資源,標(biāo)記為僵尸態(tài)等到父進程回收剩下的資源。
1.5 wait

父進程調(diào)用 wait 就是用來負責(zé)回收子進程,流程為:
2. Lock

簡單介紹了幾個函數(shù)和基本概念,接下來就是鎖是怎么實現(xiàn)的了!前面說過,鎖分為自旋鎖和睡眠鎖。首先介紹最簡單的自旋鎖
2.1 自旋鎖

2.1.1 結(jié)構(gòu)體

struct spinlock {

  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};有關(guān)自旋鎖的結(jié)構(gòu)定義如上,最重要的就是 locked 元素,用來表示該鎖是否已被某 cpu 取得,1表示該鎖已某 cpu 取走,0 表示該鎖空閑。其他三個元素用作為 debug 調(diào)試信息。
因為鎖不屬于信號量,所以 locked 最多只能為1,這應(yīng)該很清楚了。
2.1.2 初始化

void initlock(struct spinlock *lk, char *name)
//初始化鎖 lk
{
    lk->name = name; //初始化該鎖的名字
    lk->locked = 0;  //初始化該鎖空閑
    lk->cpu = 0;     //初始化持有該鎖的CPU為空
}這部分就是初始化鎖的名字、設(shè)置為空閑、沒有 CPU 持有該鎖
2.1.3 開關(guān)中斷

void pushcli(void)
{
    int eflags;
    eflags = readeflags(); //cli之前讀取eflags寄存器

    cli(); //關(guān)中斷
    if(mycpu()->ncli == 0) //第一次pushcli()
        mycpu()->intena = eflags & FL_IF; //記錄cli之前的中斷狀態(tài)

    mycpu()->ncli += 1; //關(guān)中斷次數(shù)(深度)加1
}


void popcli(void)
{
    if(readeflags()&FL_IF) //如果eflags寄存器IF位為1
        panic("popcli - interruptible");
    if(--mycpu()->ncli < 0) //如果計數(shù)小于0
        panic("popcli");
    if(mycpu()->ncli == 0 && mycpu()->intena) //關(guān)中斷次數(shù)為0時即開中斷
        sti();
}開關(guān)中斷這兩個函數(shù)在請求鎖、釋放鎖的時候會被用到。這里看下函數(shù)原理:pushcli 和 popcli 為 cli 和 sti 的封裝函數(shù),只是增加了計數(shù),每個 popcli 與 pushcli 匹配,有多少次 pushcli 就要有多少次 popcli,只有第一次 pushcli、最后一次 popcli 才會開關(guān)中斷。
在持有鎖的過程中,進程將不會處理中斷,因此后面本文的討論中,只要擁有一個鎖,此時是不會因時間片到期放棄 CPU 進入調(diào)度的。
為什么使用 pushcli() 和 popcli() 而不是使用 cli() sti() 后面詳細說明。
2.1.4 檢測持有鎖

int holding(struct spinlock *lock)
{
    int r;
    pushcli();
    r = lock->locked && lock->cpu == mycpu(); //檢驗鎖lock是否被某CPU鎖持有且上鎖
    popcli();
    return r;
}holding 函數(shù)是在關(guān)中斷下檢查鎖是否被某 CPU 取走,仔細看檢查是否持有鎖的條件為兩個:一是鎖是否被取走,二鎖是否由當(dāng)前 CPU 取走。
由于不一定持有鎖,所以為了保證獲取信息這個操作的原子性,需要關(guān)中斷下執(zhí)行,避免被調(diào)度。
2.1.5 取鎖解鎖

void acquire(struct spinlock *lk)
{
    pushcli(); // disable interrupts to avoid deadlock.
    if(holding(lk)) // 如果已經(jīng)取了鎖
        panic("acquire");
    while(xchg(&lk->locked, 1) != 0) //原子賦值
        ;
    __sync_synchronize(); //發(fā)出一個full barrier

    // 調(diào)試信息
    lk->cpu = mycpu(); //記錄當(dāng)前取鎖的CPU
    getcallerpcs(&lk, lk->pcs); //獲取調(diào)用棧信息
}關(guān)中斷下進行取鎖的操作,以避免死鎖,原因見后面 FAQ
acquire 檢查當(dāng)前 CPU 是否已經(jīng)持有鎖,如果已持有,則 panic(),也就是說 xv6 不允許同一個 CPU 對同一個鎖重復(fù)上鎖。
上鎖的語句為 xchg(&lk->locked, 1),xchg 函數(shù)可以看作一個原子賦值函數(shù)(本是交換,與 1 交換就相當(dāng)于賦值)
因取鎖可能需要一直循環(huán)等待,所以名為自旋鎖。
__sync_synchronize() 是發(fā)出一個 full barrier,簡單來說就是不允許將這條語句之前的內(nèi)存讀寫指令放在這條之后,也不允許將這條語句之后的內(nèi)存讀寫指令放在這條指令之前。
由于編譯器翻譯成匯編代碼是亂序的,在 xchg 獲得鎖之前,后面關(guān)于 debug 的語句可能提前執(zhí)行,這條指令就能保證需要等到獲取鎖之后,才會執(zhí)行 debug 相關(guān)的匯編指令。
void release(struct spinlock *lk)
{
    if(!holding(lk))
        panic("release");
    lk->pcs[0] = 0;

    //清除調(diào)試信息
    lk->cpu = 0;
    __sync_synchronize(); //發(fā)出一個full barrier
    asm volatile("movl $0, %0" : "+m" (lk->locked) : ); //lk->locked=0
    popcli();
}釋放鎖主要就是將 lk->locked 設(shè)置為 0,其他進程被調(diào)度到時如果檢測到鎖為空閑,將會重新上鎖,訪問互斥資源。
2.1.6 faq

Ⅰ xv6 內(nèi)核 scheduler 的開中斷
在 proc.c 中可以看到
for(;;){
    // Enable interrupts on this processor.
    sti();
   
    // Loop over process table looking for process to run.
    acquire(&ptable.lock);執(zhí)行 scheduler 的時候是開中斷的,內(nèi)核中只有 scheduler 函數(shù)會短暫的開中斷的,其他時候都是關(guān)中斷的。而如果做過 jos 就知道,jos 在內(nèi)核態(tài)下是全程關(guān)中斷的。
造成兩者區(qū)別的原因是:
因此,xv6 的 scheduler 要定時開中斷,如果 scheduler 一直無限運行下去,就不能執(zhí)行中斷處理程序。
Ⅱ xv6 加鎖為什么要使用 xchg()
我們知道 xchg 是硬件提供的原子指令,xchg(&lk->locked, 1) 具體執(zhí)行了下面兩件事:
old = lk->locked;
lk->locked = 1;
return old;假如這個語句不是原子的,那么同時可能有多個 cpu 標(biāo)記為 lk->locked = 1,然后返回。最后,有多個 cpu 訪問互斥資源,顯然是錯誤的。
Ⅲ acquire() 加鎖函數(shù)的關(guān)中斷
acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.為什么要關(guān)中斷呢?由于 scheduler 在 acquire 之前臨時開中斷,所以現(xiàn)在內(nèi)核態(tài)下,是可以收到中斷然后運行中斷處理程序的。假設(shè)加鎖時沒有沒有關(guān)中斷,這里劃分為幾個時間點下產(chǎn)生中斷:
逐一分析。假設(shè)在 xchg 指令前發(fā)生中斷,中斷處理程序中可能會獲取到和 acquire 相同的鎖,因為此時內(nèi)核代碼還沒有獲取鎖,所以不會發(fā)生死鎖;假設(shè)在 xchg 指令后發(fā)生中斷,中斷處理程序中可能會請求和 acquire 相同的鎖,由于先前該 CPU 的  scheduler 已經(jīng)獲取了鎖,中斷處理程序?qū)⒆孕蛘叱了?,發(fā)生死鎖;假設(shè)在 release 函數(shù)前發(fā)生中斷,和上面同理,將會發(fā)生死鎖;
因此,我們需要在 xchg 前關(guān)中斷,可以回答下面幾個問題:
release() 函數(shù)先原子賦值釋放鎖再開中斷,也就同理了,如果兩者交換先開中斷,那么在釋放鎖之前可能發(fā)生中斷,而中斷處理程序剛好需要該鎖,那么發(fā)生死鎖。
Ⅳ xv6 的競爭條件有哪些?
可能的情況如下,下面逐一分析:
xv6 是個支持多處理器的系統(tǒng),各個 CPU 之間可以并行執(zhí)行,所以可能會出現(xiàn)同時訪問公共資源的情況,是一個競爭條件。
在單個 CPU 上,內(nèi)核態(tài)下,scheduler 是開中斷的,所以內(nèi)核代碼可能會和中斷處理其程序并發(fā)執(zhí)行,并發(fā)的進入臨界區(qū)。前面提到過,取鎖時關(guān)中斷,內(nèi)核代碼就不會切換到 scheduler,所以不是競爭條件。
xv6 不支持線程,而各個進程之間內(nèi)存是不共享的,加之內(nèi)核進入臨界區(qū)訪問公共資源的時候是關(guān)了中斷的,關(guān)了中斷除了自己休眠是不會讓出 CPU 的,所以運行在單個處理器上的各個進程之間的并發(fā)其實并不會產(chǎn)生競爭條件。
Ⅴ 關(guān)中斷開中斷為什么要使用 pushcli() 和 popcli() 而不直接使用 cli() 和 sti()?
為什么要使用 pushcli() 和 popcli(),其實也簡單,那是因為某個函數(shù)中可能要取多個鎖,比如先后取了鎖 1 鎖 2,那么釋放鎖 2 之后能開中斷嗎?顯然不能,必須等鎖 1 也釋放的時候才能開中斷。所以使用增加了計數(shù)功能的 pushcli() 和 popcli() 來實現(xiàn)最后一個鎖釋放的時候才開中斷。
Ⅵ 內(nèi)存亂序問題
現(xiàn)今的指令的執(zhí)行都有流水線的技術(shù),其實還有亂序執(zhí)行。亂序執(zhí)行指的是在 CPU 運行中的指令不按照代碼既定的順序執(zhí)行,而是按照一定的策略打亂后順序執(zhí)行,以此來提高性能。
在一些時候,我們不希望指令順序被打亂,為避免這種情況,我們設(shè)置了屏障,禁止這個屏障前后的指令順序打亂
2.2 休眠鎖

2.2.1 結(jié)構(gòu)體

xv6 里面還提供了另一種鎖,休眠鎖,它在自旋鎖的基礎(chǔ)之上實現(xiàn),定義如下:
struct sleeplock {
    uint locked;
    // Is the lock held? 已鎖?
    struct spinlock lk; // spinlock protecting this sleep lock 自旋鎖
    // For debugging:
    char *name;
    // Name of lock. 名字
    int pid;
    // Process holding lock 哪個進程持有?
};休眠鎖配了一把自旋鎖來保護,原因見后。休眠鎖的意思是某個進程為了取這個休眠鎖不得而休眠,所以有個 pid 來記錄進程號。
2.2.2 取鎖

休眠鎖的初始化,檢驗是否持有鎖等都類似,就不贅述了,再這里主要看看取鎖和解鎖的區(qū)別:
void acquiresleep(struct sleeplock *lk)
{
    acquire(&lk->lk); // 優(yōu)先級:->高于&
    while (lk->locked) { // 當(dāng)鎖已被其他進程取走
        sleep(lk, &lk->lk); // 休眠
    }
    lk->locked = 1; // 上鎖
    lk->pid = myproc()->pid; // 取鎖進程的pid
    release(&lk->lk);
}睡眠鎖需要保證 檢測鎖狀態(tài)、上鎖操作、休眠操作 這幾個操作是原子的,在自旋鎖中由于只需要保證前兩個操作,所以使用 xchg 指令就可以實現(xiàn)。
為什么要上鎖?一個原因是原子性,另一個原因就是為了防止 lost wakeup,防止死鎖??梢钥纯春竺娴?同步:sleep & wakeup。
原子性即代表當(dāng)前 CPU 不發(fā)生調(diào)度、其他 CPU 執(zhí)行 acquiresleep 需要互斥。因此睡眠鎖使用 xchg 很難保證原子性,這里 xv6 使用自旋鎖,就能很好的保證睡眠鎖的原子性。
這里使用自旋鎖的 acquire、release 就保證了原子性。在獲取自旋鎖后,如果此時休眠鎖上鎖,將會調(diào)用 sleep 函數(shù)放棄自旋鎖并陷入沉睡。如果此時休眠鎖空閑,將設(shè)置 lk->locked 表示上鎖,并且記錄進程的 pid。
2.2.3 解鎖

void releasesleep(struct sleeplock *lk)
{
    acquire(&lk->lk); //取自旋鎖
    lk->locked = 0;
    lk->pid = 0;
    wakeup(lk); //喚醒
    release(&lk->lk);
}解鎖操作基本上就是上鎖的逆操作,注意一點,可能有其他進程休眠在休眠鎖上,所以當(dāng)前進程解鎖后需要喚醒休眠在休眠鎖上的進程,wakeup 將會請求表鎖,查找因為休眠鎖沉睡的進程。
3. 同步

鎖同步的問題一直是操作系統(tǒng)里面最為復(fù)雜的問題之一,,xv6 的鎖設(shè)計本身不難,難得是鎖的使用,這里就根據(jù)進程這一塊使用鎖的地方來簡單聊一聊,進程中與鎖的有關(guān)地方主要有休眠,喚醒,等待,退出,調(diào)度,切換。
3.1 休眠喚醒

休眠是休眠在某個對象上,喚醒是喚醒休眠在某個對象上的進程,所以想當(dāng)然的可以這樣來聲明 sleep 和 wakeup:
void sleep(void *obj);
void wakeup(void *obj);那這樣聲明對不對呢?來看個用戶態(tài)下,簡單的變種生產(chǎn)者消費者的例子
3.1.1 消費者生產(chǎn)者

Send:
    while(obj != NULL) //對象還在
    ;                  //循環(huán)
    obj = 1;           //制作對象
    wakeup(obj);       //喚醒休眠在其上的消費者

Recv:
    while(obj == NULL) //沒有對象可消費
        sleep(obj);    //休眠
    obj = NULL;        //消耗對象乍一看感覺沒什么問題,但是由于我們不能保證進程被調(diào)度到的順序。如果 wakeup 發(fā)生在 sleep 之前就會引起死鎖:

(, 下載次數(shù): 65)

IMG

比如上面這種在單處理器上并發(fā)的情況,指令執(zhí)行順序如下:
綜上,由于 sleep 發(fā)生在 wakeup 之前,陷入死鎖。就算不是中斷,因為是多處理器,兩者運行的時間是不確定的,完全也可能出現(xiàn)上述的情況。
避免這種前后感知的信息不一致的辦法就是加鎖,來看第二個版本:
Send:
    lock(obj_lock);
    while(obj != NULL) // 對象還在
        sleep(obj);    // 休眠
    obj = 1;           // 制作對象
    wakeup(obj);       // 喚醒休眠在其上的消費者
    unlock(obj_lock);

Recv:
    lock(obj_lock);
    while(obj == NULL) // 沒有對象可消費
        sleep(obj);    // 休眠
    obj = NULL;        // 消耗對象
    wakeup(obj)
    unlock(obj_lock);第二種版本做了一個修改,保證生產(chǎn)者和消費者操作的原子性,生產(chǎn)者、消費者讀取對象、使用對象、喚醒、沉睡是一個原子操作,在解鎖前不會被調(diào)度和搶占。
回到上面的問題,此時不會再出現(xiàn)消費者在 sleep 前被調(diào)度,lost wakeup 的情況。recv 在準(zhǔn)備 sleep 之前獲得了 obj_lock,send 沒有 obj_lock,是不可能 wakeup 的,所以休眠就不會錯過喚醒。
是否可以檢測完對象之后再獲取 obj_lock 呢?這是不正確的,將會導(dǎo)致進程的操作不是原子的,出現(xiàn)多個進程進入緩沖區(qū)操作同一個 obj 的情況。
3.1.2 sleep

這個問題倒是解決了,又有一個新問題,如果進程帶著 obj_lock 休眠不釋放的話,同樣死鎖。
因此將 sleep 的接口定義為:
void sleep(void *obj, void *obj_lock)這個鎖作為 sleep 的參數(shù),在進程進入休眠狀態(tài)時釋放該鎖,在進程返回時重新獲取該鎖。
也就是說,在 sleep 函數(shù)中,修改進程狀態(tài)前要先釋放鎖,然后才修改為 SLEEPING,讓出 CPU。為了保證在釋放鎖后,不會出現(xiàn) sleep 錯過 wakeup 的情況,修改進程狀態(tài)和釋放鎖需要是一個原子操作。
假設(shè)剛釋放鎖,雖然此時可能不會發(fā)生中斷,但是其他 cpu 并行時會重新獲取鎖,然后快速執(zhí)行完 wakeup,因此出現(xiàn)了 lost wakeup
由于涉及到了進程狀態(tài)的修改,所以 xv6 選擇用 ptable.lock 來保證上面操作的原子性。
void
sleep(void *chan, struct spinlock *lk)
{

  struct proc *p = myproc();

  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");


  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }

  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;
  // Reacquire original lock.
  
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }

}在放棄 CPU 前:
sched 將會交出 CPU,等到該進程被 wakeup,重新獲取 CPU。在重新?lián)碛?CPU 后:
進程變?yōu)?SLEEPING 之后將不會被調(diào)度到,因此 sleeplock 減少了 CPU 的負擔(dān),不會再讓一個CPU忙等,而是合理放棄 CPU。
特殊情況在于,如果 lk = 表鎖,那么放棄 cpu 前不請求表鎖,擁有 cpu 后不放棄表鎖。休眠對象是一個進程,在 xv6 中,這種情況只發(fā)生在 wait 函數(shù)中,父進程等待子進程退出,這個時候它休眠在自己身上。
3.1.3 wakeup

static void wakeup1(void *chan)
{
    struct proc *p;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
        if(p->state == SLEEPING && p->chan == chan)
            p->state = RUNNABLE;
}
void wakeup(void *chan)
{
    acquire(&ptable.lock);
    wakeup1(chan);
    release(&ptable.lock);
}然后是 wakeup 函數(shù),分為兩個部分:不加鎖的 wakeup1 和加鎖版本的 wakeup。這么劃分是因為,在有些情況下已經(jīng)獲取了表鎖,直接調(diào)用 wakeup 會死鎖,詳見下面的 wait 函數(shù)。
3.3 exit

這里來看看 exit 函數(shù)
void exit(void){
    struct proc *curproc = myproc();
    if(curproc == initproc)
        panic("init exiting");首先第一個進程是不能退出的,如果退出直接報錯。他需要用來回收子進程和孤兒進程的資源
// Close all open files.

  for(fd = 0; fd < NOFILE; fd++){
    if(curproc->ofile[fd]){
      fileclose(curproc->ofile[fd]);
      curproc->ofile[fd] = 0;
    }
  }子進程會自己回收掉一部分資源。比如打開文件描述符表,這一部分關(guān)閉所有文件,如果減到 0,再釋放該文件的 inode,如果文件的鏈接數(shù)和引用數(shù)都為 0 了,就刪除該文件。
acquire(&ptable.lock);

  // Parent might be sleeping in wait().
  wakeup1(curproc->parent);

  // Pass abandoned children to init.
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
    if(p->parent == curproc){
      p->parent = initproc;
      if(p->state == ZOMBIE)
        wakeup1(initproc);
    }
  }

  // Jump into the scheduler, never to return.
  curproc->state = ZOMBIE;
  sched();
  panic("zombie exit");和前文對應(yīng),由于 exit 函數(shù)已經(jīng)擁有了表鎖,為了避免死鎖,使用不加鎖版本的 wakeup1。
然后是要喚醒父進程,通知父進程來回收自己。這里還會拋棄當(dāng)前進程的所有子進程,子進程成為孤兒進程,將孤兒進程的父進程修改為 init 進程,讓 init 進程負責(zé)回收資源。最后將自己的狀態(tài)設(shè)置為 zombie,父進程將在 wait 調(diào)用中回收 zombie 的子進程。
因為這個操作會修改進程狀態(tài),所以執(zhí)行前要獲取表鎖。提一嘴,雖然這里是獲得了鎖,sched 函數(shù)需要當(dāng)前進程持有鎖,后面會看到設(shè)計的原因。
為什么要父進程負責(zé)回收呢?私以為,因為子進程在內(nèi)核中運行時還要使用到頁表、內(nèi)核棧、進程結(jié)構(gòu)體,如果直接回收,執(zhí)行下一條內(nèi)核代碼將會報錯,影響系統(tǒng)執(zhí)行,所以這些資源等到父進程回收為好。
3.4 wait

acquire(&ptable.lock);

for(;;){
    // Scan through table looking for exited children.

    havekids = 0;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      if(p->parent != curproc)
        continue;

      havekids = 1;

      if(p->state == ZOMBIE) { // Found one.
        pid = p->pid;
        kfree(p->kstack);
        p->kstack = 0;
        freevm(p->pgdir);
        p->pid = 0;
        p->parent = 0;
        p->name[0] = 0;
        p->killed = 0;
        p->state = UNUSED;
        release(&ptable.lock);
        return pid;
      }
    }

    // No point waiting if we don't have any children.
    if(!havekids || curproc->killed){
      release(&ptable.lock);
      return -1;
    }

    // Wait for children to exit.  (See wakeup1 call in proc_exit.)
    sleep(curproc, &ptable.lock);  //DOC: wait-sleep
}父進程調(diào)用 wait 就是用來負責(zé)回收子進程,流程為:
exit 和 wait 如何保證原子性和防止死鎖(沒有 lost wakeup)?因為他們使用表鎖進行互斥,正好符合上面所說的生產(chǎn)者消費者模型
一般而言,父進程可能不會sleep兩次?
4.進程調(diào)度

切換進程的流程是:進程 A --> scheduler --> 進程 B。進程 A 切換到調(diào)度進程是使用 sched 函數(shù),sched 函數(shù)是 swtch 函數(shù)的封裝。當(dāng)切換回 scheduler 時,除了返回對應(yīng)的切換上下文外,此時 scheduler 還需要保證關(guān)中斷和擁有 ptable.lock。
在 swtch 和  scheduler 執(zhí)行的過程中,必須要持有表鎖。要知道執(zhí)行 swtch 是會切換進程的,因為這個鎖在一個進程中獲取,在另一個進程中釋放。這樣使用鎖很不常見,但這里是必須的。
也就是說,sched 是進程 A 取鎖,進程 B 放鎖。即使是 sched 之后的語句一般會來個放鎖操作,新進程也是如此。
因此在進入 sched 前一定要獲取 ptable.lock,進入調(diào)度有多種情況,我們逐一分析及驗證:
4.1 yield

4.1.1 時間中斷

// Force process to give up CPU on clock tick.
  // If interrupts were on while locks held, would need to check nlock.
  if(myproc() && myproc()->state == RUNNING &&
     tf->trapno == T_IRQ0+IRQ_TIMER)
    yield();在 trap.c 中,當(dāng)進程因為時間中斷進入中斷處理時,將會 yield 放棄 CPU。
4.1.2 yield

void
yield(void)
{
  acquire(&ptable.lock);  // DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}yield 對應(yīng)于前文,首先要獲取 ptable.lock 這把鎖,更改狀態(tài)為 RUNNABLE。
如果在 swtch 沒有持有表鎖會怎么樣?例如:CPU 0 調(diào)用 yield 放棄進程A,然后 yield 中設(shè)置進程狀態(tài)為 RUNNABLE。CPU0 在執(zhí)行 sched 之前另一個 CPU1 會調(diào)度進程 A,在這短暫的時間內(nèi),這樣的話一個進程運行在兩個 CPU 上,內(nèi)核棧將會發(fā)生錯誤。
void
sched(void)
{

  int intena;
  struct proc *p = myproc();

  if(!holding(&ptable.lock))
    panic("sched ptable.lock");

  if(mycpu()->ncli != 1)
    panic("sched locks");
   
  if(p->state == RUNNING)
    panic("sched running");

  if(readeflags()&FL_IF)
    panic("sched interruptible");

  intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;
}sched 函數(shù)首先執(zhí)行了一些檢查:
intena 這里表示 pushcli 之前 CPU 允許中斷的情況。這個值保存下來等待進程被調(diào)度到時,用來恢復(fù) CPU 的中斷情況,保證在同一個進程里中斷允許情況一致。比如說在進程 A 放棄CPU之前是開中斷,進程 A 恢復(fù)執(zhí)行后,在 yield 中將會 popcli 恢復(fù)開中斷。
xv6 下系統(tǒng)調(diào)用是異常門,所以內(nèi)核下是可以開中斷的。
4.1.3 進程B解鎖

在 scheduler 中依舊還是關(guān)中斷,等到調(diào)度到進程 B 后。
intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;

}進程 B 在這里恢復(fù) CPU 的中斷情況
void
yield(void)
{

  acquire(&ptable.lock);  //DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}跳轉(zhuǎn)回到 yield 函數(shù),釋放表鎖,成功對應(yīng)上文:進程A上鎖,進程B解鎖。從 yield 返回后就是跑到 trapret 恢復(fù)進程執(zhí)行了,這里就不再多說。
假如是調(diào)度到一個新進程,新進程的第一條指令(不是用戶指令?。。┦?forkret,在這里將會釋放表鎖,完成”進程B解鎖“。
4.2  sleep

sleep 前面也看過了
// Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;
  sched();在 sched 函數(shù)前會獲取表鎖,sched 函數(shù)后釋放表鎖,也就保證了前面說的。進程 A 上鎖,進程B解鎖。進入 sched 的狀態(tài)不是 RUNNING!!
sleep 什么時候被使用到呢?
4.3 exit

// Jump into the scheduler, never to return.
  curproc->state = ZOMBIE;
  sched();
  panic("zombie exit");exit 也是在 sched 前獲取鎖,然后設(shè)置為僵尸態(tài)。

-----------------------------




歡迎光臨 愛鋒貝 (http://m.7gfy2te7.cn/) Powered by Discuz! X3.4