愛(ài)鋒貝

 找回密碼
 立即注冊(cè)

只需一步,快速開(kāi)始

扫一扫,极速登录

查看: 931|回復(fù): 0
打印 上一主題 下一主題
收起左側(cè)

xv6 學(xué)習(xí):進(jìn)程管理C 互斥與同步

[復(fù)制鏈接]

1423

主題

1507

帖子

5891

積分

Rank: 8Rank: 8

跳轉(zhuǎn)到指定樓層
樓主
發(fā)表于 2023-4-3 10:11:49 | 只看該作者 回帖獎(jiǎng)勵(lì) |倒序?yàn)g覽 |閱讀模式

一鍵注冊(cè),加入手機(jī)圈

您需要 登錄 才可以下載或查看,沒(méi)有帳號(hào)?立即注冊(cè)   

x
主要參考自 《給操作系統(tǒng)捋條線》,加上了自己的筆記。
鎖大家都很熟悉了,就是用來(lái)進(jìn)程互斥、實(shí)現(xiàn)同步。本文首先介紹鎖的實(shí)現(xiàn),然后是 xv6 中鎖的使用,xv6 基于鎖實(shí)現(xiàn)了互斥和同步。
索引

  • 基本概念、基本函數(shù)
  • 自旋鎖、睡眠鎖的實(shí)現(xiàn)
  • 同步:sleep、wakeup
  • 同步:wait、exit
  • 調(diào)度切換
1. 基本概念、函數(shù)

1.1 概念

這里介紹下與鎖相關(guān)的基本概念

  • 公共資源:顧名思義就是被多個(gè)任務(wù)共享的資源,可以是公共內(nèi)存,也可以是公共文件等等
  • 臨界區(qū):要訪問(wèn)使用公共資源,肯定得通過(guò)一些代碼指令去訪問(wèn),這些代碼指令就是臨界區(qū)
  • 并發(fā):?jiǎn)蝹€(gè) CPU 上交替處理多個(gè)任務(wù),宏觀上看就像是同時(shí)進(jìn)行的一樣,但微觀上看任意時(shí)刻還是只有一個(gè)任務(wù)在進(jìn)行
  • 并行:多個(gè)處理器同時(shí)處理多個(gè)任務(wù),能夠做到真正意義上的多個(gè)任務(wù)同時(shí)進(jìn)行。
  • 互斥:也稱為排他,任何時(shí)候公共資源只允許最多一個(gè)任務(wù)獨(dú)享,不允許多個(gè)任務(wù)同時(shí)執(zhí)行臨界區(qū)的代碼訪問(wèn)公共資源
  • 同步:多進(jìn)程并發(fā)時(shí),不同程序之間的制約關(guān)系
  • 競(jìng)爭(zhēng)條件:競(jìng)爭(zhēng)條件指的是多個(gè)任務(wù)以競(jìng)爭(zhēng)的形式并行訪問(wèn)公共資源,公共資源的最終狀態(tài)取決于這些任務(wù)的臨界區(qū)代碼的精確執(zhí)行時(shí)序
競(jìng)爭(zhēng)條件比如說(shuō)多進(jìn)程要并行讀寫(xiě)同一段地址空間,這樣很可能會(huì)破壞讀寫(xiě)原子性,譬如讀取到其他寫(xiě)操作的中間狀態(tài)。顯然競(jìng)爭(zhēng)條件并不是我們想要的,雖然一些競(jìng)爭(zhēng)條件出現(xiàn)的概率很小,但根據(jù)墨菲定律,會(huì)出錯(cuò)的總會(huì)出錯(cuò),加之計(jì)算機(jī)的運(yùn)行頻率,就算出錯(cuò)的概率再小,在某天某時(shí)某刻那也是有可能發(fā)生的。
所以對(duì)于進(jìn)入臨界區(qū)訪問(wèn)公共資源我們要避免競(jìng)爭(zhēng)條件,保證公共資源的互斥排他性,一般有兩種大的解決方案來(lái)實(shí)現(xiàn)互斥

  • 忙等待:沒(méi)進(jìn)入臨界區(qū)時(shí)一直循環(huán),占用 CPU 資源
  • 休眠等待:沒(méi)進(jìn)入臨界區(qū)時(shí)一直休眠,不占用 CPU,CPU 利用率較高,但如果被調(diào)度到的次數(shù)過(guò)多,有進(jìn)程上下文切換的開(kāi)銷(xiāo)
xv6 就根據(jù)上面兩種方案,分別實(shí)現(xiàn)了兩個(gè)類型的鎖,在后面會(huì)看到
1.2 sleep

這里介紹下 sleep、exit、wakeup 三個(gè)函數(shù)的基本作用,更深層次的,在這些函數(shù)中使用的互斥并發(fā),將在介紹了鎖后討論。
void sleep(void *chan, struct spinlock *lk);這里的 sleep 不是那個(gè)系統(tǒng)調(diào)用,而是 xv6 的內(nèi)核函數(shù)。chan 表示進(jìn)程因?yàn)槭裁磳?duì)象而沉睡?lk 表示管理這個(gè)對(duì)象的鎖。
void sleep(void *chan, struct spinlock *lk){
    p->chan = chan;         // 休眠在chan上
    p->state = SLEEPING;    // 狀態(tài)更改為SLEEPING
    sched();                // 讓出CPU調(diào)度
    p->chan = 0;            // 休眠對(duì)象改為0
}sleep 大概就干了上面的事:

  • p 表示當(dāng)前進(jìn)程
  • 將 p->chan 設(shè)置為 chan,表示因 chan 而沉睡(在 wakeup 中會(huì)用到)
  • 將 p->state 設(shè)置為 sleeping
  • 交出 cpu,調(diào)度到其他進(jìn)程
  • 等待從 sched 返回時(shí),表示該進(jìn)程被喚醒,已經(jīng)重新上 cpu 了,此時(shí)設(shè)置休眠對(duì)象為 0,休眠結(jié)束
調(diào)用 sched 之后是不會(huì)返回的,而是去執(zhí)行其他進(jìn)程了,這就是休眠的好處,提高 cpu 的利用率。
1.3 wakeup

static void
wakeup1(void *chan)
{
  struct proc *p;

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}wakeup 的作用就是喚醒所有因 chan 而沉睡的進(jìn)程,具體就是遍歷一遍 pcb 數(shù)組,然后判斷 sleeping && p->chan == chan。wakeup1 是沒(méi)有加鎖版本的 wakeup,一般是獲取了鎖然后調(diào)用這個(gè)函數(shù)。
1.4 exit

這里來(lái)看看 exit 函數(shù)。在進(jìn)程的 main 函數(shù)執(zhí)行完后將會(huì)系統(tǒng)調(diào)用 exit 函數(shù),讓子進(jìn)程回收一部分資源,標(biāo)記為僵尸態(tài)等到父進(jìn)程回收剩下的資源。
1.5 wait

父進(jìn)程調(diào)用 wait 就是用來(lái)負(fù)責(zé)回收子進(jìn)程,流程為:

  • 首先看看遍歷看看有沒(méi)有孩子,沒(méi)有孩子或者當(dāng)前進(jìn)程被殺死就直接退出
  • 如果找到僵尸子進(jìn)程成功,回收他的頁(yè)表、內(nèi)核棧、pcb
2. Lock

簡(jiǎn)單介紹了幾個(gè)函數(shù)和基本概念,接下來(lái)就是鎖是怎么實(shí)現(xiàn)的了!前面說(shuō)過(guò),鎖分為自旋鎖和睡眠鎖。首先介紹最簡(jiǎn)單的自旋鎖
2.1 自旋鎖

2.1.1 結(jié)構(gòu)體

struct spinlock {

  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};有關(guān)自旋鎖的結(jié)構(gòu)定義如上,最重要的就是 locked 元素,用來(lái)表示該鎖是否已被某 cpu 取得,1表示該鎖已某 cpu 取走,0 表示該鎖空閑。其他三個(gè)元素用作為 debug 調(diào)試信息。
因?yàn)殒i不屬于信號(hào)量,所以 locked 最多只能為1,這應(yīng)該很清楚了。
2.1.2 初始化

void initlock(struct spinlock *lk, char *name)
//初始化鎖 lk
{
    lk->name = name; //初始化該鎖的名字
    lk->locked = 0;  //初始化該鎖空閑
    lk->cpu = 0;     //初始化持有該鎖的CPU為空
}這部分就是初始化鎖的名字、設(shè)置為空閑、沒(méi)有 CPU 持有該鎖
2.1.3 開(kāi)關(guān)中斷

void pushcli(void)
{
    int eflags;
    eflags = readeflags(); //cli之前讀取eflags寄存器

    cli(); //關(guān)中斷
    if(mycpu()->ncli == 0) //第一次pushcli()
        mycpu()->intena = eflags & FL_IF; //記錄cli之前的中斷狀態(tài)

    mycpu()->ncli += 1; //關(guān)中斷次數(shù)(深度)加1
}


void popcli(void)
{
    if(readeflags()&FL_IF) //如果eflags寄存器IF位為1
        panic("popcli - interruptible");
    if(--mycpu()->ncli < 0) //如果計(jì)數(shù)小于0
        panic("popcli");
    if(mycpu()->ncli == 0 && mycpu()->intena) //關(guān)中斷次數(shù)為0時(shí)即開(kāi)中斷
        sti();
}開(kāi)關(guān)中斷這兩個(gè)函數(shù)在請(qǐng)求鎖、釋放鎖的時(shí)候會(huì)被用到。這里看下函數(shù)原理:pushcli 和 popcli 為 cli 和 sti 的封裝函數(shù),只是增加了計(jì)數(shù),每個(gè) popcli 與 pushcli 匹配,有多少次 pushcli 就要有多少次 popcli,只有第一次 pushcli、最后一次 popcli 才會(huì)開(kāi)關(guān)中斷。

  • 第一個(gè) pushcli:關(guān)中斷,并且記錄關(guān)中斷之前的 eflags 寄存器狀態(tài)以便恢復(fù),最后增加關(guān)中斷次數(shù)
  • 之后的 pushcli:可以認(rèn)為只是增加中斷計(jì)數(shù),關(guān)中斷已經(jīng)發(fā)生過(guò)了
  • 最后一個(gè) popcli:此時(shí)如果關(guān)中斷次數(shù)為0,并且第一次 pushcli 之前的中斷狀態(tài)(之前保存的 intena)為開(kāi)中斷,這里才會(huì)開(kāi)中斷
  • 之前的 popcli:可以認(rèn)為只是減少了中斷次數(shù)
在持有鎖的過(guò)程中,進(jìn)程將不會(huì)處理中斷,因此后面本文的討論中,只要擁有一個(gè)鎖,此時(shí)是不會(huì)因時(shí)間片到期放棄 CPU 進(jìn)入調(diào)度的。
為什么使用 pushcli() 和 popcli() 而不是使用 cli() sti() 后面詳細(xì)說(shuō)明。
2.1.4 檢測(cè)持有鎖

int holding(struct spinlock *lock)
{
    int r;
    pushcli();
    r = lock->locked && lock->cpu == mycpu(); //檢驗(yàn)鎖lock是否被某CPU鎖持有且上鎖
    popcli();
    return r;
}holding 函數(shù)是在關(guān)中斷下檢查鎖是否被某 CPU 取走,仔細(xì)看檢查是否持有鎖的條件為兩個(gè):一是鎖是否被取走,二鎖是否由當(dāng)前 CPU 取走。
由于不一定持有鎖,所以為了保證獲取信息這個(gè)操作的原子性,需要關(guān)中斷下執(zhí)行,避免被調(diào)度。
2.1.5 取鎖解鎖

void acquire(struct spinlock *lk)
{
    pushcli(); // disable interrupts to avoid deadlock.
    if(holding(lk)) // 如果已經(jīng)取了鎖
        panic("acquire");
    while(xchg(&lk->locked, 1) != 0) //原子賦值
        ;
    __sync_synchronize(); //發(fā)出一個(gè)full barrier

    // 調(diào)試信息
    lk->cpu = mycpu(); //記錄當(dāng)前取鎖的CPU
    getcallerpcs(&lk, lk->pcs); //獲取調(diào)用棧信息
}關(guān)中斷下進(jìn)行取鎖的操作,以避免死鎖,原因見(jiàn)后面 FAQ
acquire 檢查當(dāng)前 CPU 是否已經(jīng)持有鎖,如果已持有,則 panic(),也就是說(shuō) xv6 不允許同一個(gè) CPU 對(duì)同一個(gè)鎖重復(fù)上鎖。
上鎖的語(yǔ)句為 xchg(&lk->locked, 1),xchg 函數(shù)可以看作一個(gè)原子賦值函數(shù)(本是交換,與 1 交換就相當(dāng)于賦值)

  • 將 lk->locked 賦值為 1,返回 locked 的舊值
  • 也就是說(shuō)如果該鎖空閑沒(méi)有 CPU 持有,那么當(dāng)前 CPU 將其賦值為 1 表示取得該鎖,xchg 返回舊值 0,跳出 while 循環(huán)
  • 如果該鎖已經(jīng)被某 CPU 持有,那么 xchg 對(duì)其賦值為 1,但返回值也是 1,不滿足循環(huán)跳出條件,所以一直循環(huán)等待某 CPU 釋放該鎖,當(dāng)前 CPU 因?yàn)槭顷P(guān)中斷只能一直忙等。
因取鎖可能需要一直循環(huán)等待,所以名為自旋鎖。
__sync_synchronize() 是發(fā)出一個(gè) full barrier,簡(jiǎn)單來(lái)說(shuō)就是不允許將這條語(yǔ)句之前的內(nèi)存讀寫(xiě)指令放在這條之后,也不允許將這條語(yǔ)句之后的內(nèi)存讀寫(xiě)指令放在這條指令之前。
由于編譯器翻譯成匯編代碼是亂序的,在 xchg 獲得鎖之前,后面關(guān)于 debug 的語(yǔ)句可能提前執(zhí)行,這條指令就能保證需要等到獲取鎖之后,才會(huì)執(zhí)行 debug 相關(guān)的匯編指令。
void release(struct spinlock *lk)
{
    if(!holding(lk))
        panic("release");
    lk->pcs[0] = 0;

    //清除調(diào)試信息
    lk->cpu = 0;
    __sync_synchronize(); //發(fā)出一個(gè)full barrier
    asm volatile("movl $0, %0" : "+m" (lk->locked) : ); //lk->locked=0
    popcli();
}釋放鎖主要就是將 lk->locked 設(shè)置為 0,其他進(jìn)程被調(diào)度到時(shí)如果檢測(cè)到鎖為空閑,將會(huì)重新上鎖,訪問(wèn)互斥資源。
2.1.6 faq

Ⅰ xv6 內(nèi)核 scheduler 的開(kāi)中斷
在 proc.c 中可以看到
for(;;){
    // Enable interrupts on this processor.
    sti();
   
    // Loop over process table looking for process to run.
    acquire(&ptable.lock);執(zhí)行 scheduler 的時(shí)候是開(kāi)中斷的,內(nèi)核中只有 scheduler 函數(shù)會(huì)短暫的開(kāi)中斷的,其他時(shí)候都是關(guān)中斷的。而如果做過(guò) jos 就知道,jos 在內(nèi)核態(tài)下是全程關(guān)中斷的。
造成兩者區(qū)別的原因是:

  • xv6 scheduler 是一個(gè)無(wú)限循環(huán),每一次都會(huì)請(qǐng)求鎖,遍歷一遍數(shù)組想要找到一個(gè)空閑的進(jìn)程運(yùn)行
  • jos scheduler 只運(yùn)行一遍,如果找不到進(jìn)程運(yùn)行就直接讓 cpu 沉睡、開(kāi)中斷,等待中斷喚醒它
因此,xv6 的 scheduler 要定時(shí)開(kāi)中斷,如果 scheduler 一直無(wú)限運(yùn)行下去,就不能執(zhí)行中斷處理程序。
Ⅱ xv6 加鎖為什么要使用 xchg()
我們知道 xchg 是硬件提供的原子指令,xchg(&lk->locked, 1) 具體執(zhí)行了下面兩件事:
old = lk->locked;
lk->locked = 1;
return old;假如這個(gè)語(yǔ)句不是原子的,那么同時(shí)可能有多個(gè) cpu 標(biāo)記為 lk->locked = 1,然后返回。最后,有多個(gè) cpu 訪問(wèn)互斥資源,顯然是錯(cuò)誤的。
Ⅲ acquire() 加鎖函數(shù)的關(guān)中斷
acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.為什么要關(guān)中斷呢?由于 scheduler 在 acquire 之前臨時(shí)開(kāi)中斷,所以現(xiàn)在內(nèi)核態(tài)下,是可以收到中斷然后運(yùn)行中斷處理程序的。假設(shè)加鎖時(shí)沒(méi)有沒(méi)有關(guān)中斷,這里劃分為幾個(gè)時(shí)間點(diǎn)下產(chǎn)生中斷:

  • 在 acquire 中 xchg 指令前
  • 在 acquire 中 xchg 指令后
  • 在 release 函數(shù)前
逐一分析。假設(shè)在 xchg 指令前發(fā)生中斷,中斷處理程序中可能會(huì)獲取到和 acquire 相同的鎖,因?yàn)榇藭r(shí)內(nèi)核代碼還沒(méi)有獲取鎖,所以不會(huì)發(fā)生死鎖;假設(shè)在 xchg 指令后發(fā)生中斷,中斷處理程序中可能會(huì)請(qǐng)求和 acquire 相同的鎖,由于先前該 CPU 的  scheduler 已經(jīng)獲取了鎖,中斷處理程序?qū)⒆孕蛘叱了l(fā)生死鎖;假設(shè)在 release 函數(shù)前發(fā)生中斷,和上面同理,將會(huì)發(fā)生死鎖;
因此,我們需要在 xchg 前關(guān)中斷,可以回答下面幾個(gè)問(wèn)題:

  • 能不能直接不關(guān)中斷:不行、會(huì)死鎖
  • 能不能先上鎖再關(guān)中斷:不行、會(huì)死鎖
  • 關(guān)中斷持續(xù)的時(shí)間:持續(xù)到 release
release() 函數(shù)先原子賦值釋放鎖再開(kāi)中斷,也就同理了,如果兩者交換先開(kāi)中斷,那么在釋放鎖之前可能發(fā)生中斷,而中斷處理程序剛好需要該鎖,那么發(fā)生死鎖。
Ⅳ xv6 的競(jìng)爭(zhēng)條件有哪些?
可能的情況如下,下面逐一分析:

  • 多個(gè) CPU 并行執(zhí)行
  • 單個(gè) CPU 上,中斷處理程序和內(nèi)核代碼并發(fā)執(zhí)行
  • 單個(gè) CPU 上,不同進(jìn)程并發(fā)執(zhí)行
xv6 是個(gè)支持多處理器的系統(tǒng),各個(gè) CPU 之間可以并行執(zhí)行,所以可能會(huì)出現(xiàn)同時(shí)訪問(wèn)公共資源的情況,是一個(gè)競(jìng)爭(zhēng)條件。
在單個(gè) CPU 上,內(nèi)核態(tài)下,scheduler 是開(kāi)中斷的,所以內(nèi)核代碼可能會(huì)和中斷處理其程序并發(fā)執(zhí)行,并發(fā)的進(jìn)入臨界區(qū)。前面提到過(guò),取鎖時(shí)關(guān)中斷,內(nèi)核代碼就不會(huì)切換到 scheduler,所以不是競(jìng)爭(zhēng)條件。
xv6 不支持線程,而各個(gè)進(jìn)程之間內(nèi)存是不共享的,加之內(nèi)核進(jìn)入臨界區(qū)訪問(wèn)公共資源的時(shí)候是關(guān)了中斷的,關(guān)了中斷除了自己休眠是不會(huì)讓出 CPU 的,所以運(yùn)行在單個(gè)處理器上的各個(gè)進(jìn)程之間的并發(fā)其實(shí)并不會(huì)產(chǎn)生競(jìng)爭(zhēng)條件。
Ⅴ 關(guān)中斷開(kāi)中斷為什么要使用 pushcli() 和 popcli() 而不直接使用 cli() 和 sti()?
為什么要使用 pushcli() 和 popcli(),其實(shí)也簡(jiǎn)單,那是因?yàn)槟硞€(gè)函數(shù)中可能要取多個(gè)鎖,比如先后取了鎖 1 鎖 2,那么釋放鎖 2 之后能開(kāi)中斷嗎?顯然不能,必須等鎖 1 也釋放的時(shí)候才能開(kāi)中斷。所以使用增加了計(jì)數(shù)功能的 pushcli() 和 popcli() 來(lái)實(shí)現(xiàn)最后一個(gè)鎖釋放的時(shí)候才開(kāi)中斷。
Ⅵ 內(nèi)存亂序問(wèn)題
現(xiàn)今的指令的執(zhí)行都有流水線的技術(shù),其實(shí)還有亂序執(zhí)行。亂序執(zhí)行指的是在 CPU 運(yùn)行中的指令不按照代碼既定的順序執(zhí)行,而是按照一定的策略打亂后順序執(zhí)行,以此來(lái)提高性能。
在一些時(shí)候,我們不希望指令順序被打亂,為避免這種情況,我們?cè)O(shè)置了屏障,禁止這個(gè)屏障前后的指令順序打亂
2.2 休眠鎖

2.2.1 結(jié)構(gòu)體

xv6 里面還提供了另一種鎖,休眠鎖,它在自旋鎖的基礎(chǔ)之上實(shí)現(xiàn),定義如下:
struct sleeplock {
    uint locked;
    // Is the lock held? 已鎖?
    struct spinlock lk; // spinlock protecting this sleep lock 自旋鎖
    // For debugging:
    char *name;
    // Name of lock. 名字
    int pid;
    // Process holding lock 哪個(gè)進(jìn)程持有?
};休眠鎖配了一把自旋鎖來(lái)保護(hù),原因見(jiàn)后。休眠鎖的意思是某個(gè)進(jìn)程為了取這個(gè)休眠鎖不得而休眠,所以有個(gè) pid 來(lái)記錄進(jìn)程號(hào)。
2.2.2 取鎖

休眠鎖的初始化,檢驗(yàn)是否持有鎖等都類似,就不贅述了,再這里主要看看取鎖和解鎖的區(qū)別:
void acquiresleep(struct sleeplock *lk)
{
    acquire(&lk->lk); // 優(yōu)先級(jí):->高于&
    while (lk->locked) { // 當(dāng)鎖已被其他進(jìn)程取走
        sleep(lk, &lk->lk); // 休眠
    }
    lk->locked = 1; // 上鎖
    lk->pid = myproc()->pid; // 取鎖進(jìn)程的pid
    release(&lk->lk);
}睡眠鎖需要保證 檢測(cè)鎖狀態(tài)、上鎖操作、休眠操作 這幾個(gè)操作是原子的,在自旋鎖中由于只需要保證前兩個(gè)操作,所以使用 xchg 指令就可以實(shí)現(xiàn)。
為什么要上鎖?一個(gè)原因是原子性,另一個(gè)原因就是為了防止 lost wakeup,防止死鎖??梢钥纯春竺娴?同步:sleep & wakeup。
原子性即代表當(dāng)前 CPU 不發(fā)生調(diào)度、其他 CPU 執(zhí)行 acquiresleep 需要互斥。因此睡眠鎖使用 xchg 很難保證原子性,這里 xv6 使用自旋鎖,就能很好的保證睡眠鎖的原子性。
這里使用自旋鎖的 acquire、release 就保證了原子性。在獲取自旋鎖后,如果此時(shí)休眠鎖上鎖,將會(huì)調(diào)用 sleep 函數(shù)放棄自旋鎖并陷入沉睡。如果此時(shí)休眠鎖空閑,將設(shè)置 lk->locked 表示上鎖,并且記錄進(jìn)程的 pid。
2.2.3 解鎖

void releasesleep(struct sleeplock *lk)
{
    acquire(&lk->lk); //取自旋鎖
    lk->locked = 0;
    lk->pid = 0;
    wakeup(lk); //喚醒
    release(&lk->lk);
}解鎖操作基本上就是上鎖的逆操作,注意一點(diǎn),可能有其他進(jìn)程休眠在休眠鎖上,所以當(dāng)前進(jìn)程解鎖后需要喚醒休眠在休眠鎖上的進(jìn)程,wakeup 將會(huì)請(qǐng)求表鎖,查找因?yàn)樾菝哝i沉睡的進(jìn)程。
3. 同步

鎖同步的問(wèn)題一直是操作系統(tǒng)里面最為復(fù)雜的問(wèn)題之一,,xv6 的鎖設(shè)計(jì)本身不難,難得是鎖的使用,這里就根據(jù)進(jìn)程這一塊使用鎖的地方來(lái)簡(jiǎn)單聊一聊,進(jìn)程中與鎖的有關(guān)地方主要有休眠,喚醒,等待,退出,調(diào)度,切換。
3.1 休眠喚醒

休眠是休眠在某個(gè)對(duì)象上,喚醒是喚醒休眠在某個(gè)對(duì)象上的進(jìn)程,所以想當(dāng)然的可以這樣來(lái)聲明 sleep 和 wakeup:
void sleep(void *obj);
void wakeup(void *obj);那這樣聲明對(duì)不對(duì)呢?來(lái)看個(gè)用戶態(tài)下,簡(jiǎn)單的變種生產(chǎn)者消費(fèi)者的例子
3.1.1 消費(fèi)者生產(chǎn)者

Send:
    while(obj != NULL) //對(duì)象還在
    ;                  //循環(huán)
    obj = 1;           //制作對(duì)象
    wakeup(obj);       //喚醒休眠在其上的消費(fèi)者

Recv:
    while(obj == NULL) //沒(méi)有對(duì)象可消費(fèi)
        sleep(obj);    //休眠
    obj = NULL;        //消耗對(duì)象乍一看感覺(jué)沒(méi)什么問(wèn)題,但是由于我們不能保證進(jìn)程被調(diào)度到的順序。如果 wakeup 發(fā)生在 sleep 之前就會(huì)引起死鎖:



IMG

比如上面這種在單處理器上并發(fā)的情況,指令執(zhí)行順序如下:

  • 消費(fèi)者首先檢測(cè)是否有 obj,由于 obj=NULL,準(zhǔn)備休眠
  • 在 sleep 前發(fā)生時(shí)間中斷,調(diào)度到生產(chǎn)者
  • 生產(chǎn)者設(shè)置 obj=1,調(diào)用 wakeup 函數(shù),喚醒因?yàn)?obj 沉睡的進(jìn)程
  • 切換回消費(fèi)者,消費(fèi)者此時(shí)執(zhí)行 sleep 陷入沉睡
綜上,由于 sleep 發(fā)生在 wakeup 之前,陷入死鎖。就算不是中斷,因?yàn)槭嵌嗵幚砥?,兩者運(yùn)行的時(shí)間是不確定的,完全也可能出現(xiàn)上述的情況。
避免這種前后感知的信息不一致的辦法就是加鎖,來(lái)看第二個(gè)版本:
Send:
    lock(obj_lock);
    while(obj != NULL) // 對(duì)象還在
        sleep(obj);    // 休眠
    obj = 1;           // 制作對(duì)象
    wakeup(obj);       // 喚醒休眠在其上的消費(fèi)者
    unlock(obj_lock);

Recv:
    lock(obj_lock);
    while(obj == NULL) // 沒(méi)有對(duì)象可消費(fèi)
        sleep(obj);    // 休眠
    obj = NULL;        // 消耗對(duì)象
    wakeup(obj)
    unlock(obj_lock);第二種版本做了一個(gè)修改,保證生產(chǎn)者和消費(fèi)者操作的原子性,生產(chǎn)者、消費(fèi)者讀取對(duì)象、使用對(duì)象、喚醒、沉睡是一個(gè)原子操作,在解鎖前不會(huì)被調(diào)度和搶占。
回到上面的問(wèn)題,此時(shí)不會(huì)再出現(xiàn)消費(fèi)者在 sleep 前被調(diào)度,lost wakeup 的情況。recv 在準(zhǔn)備 sleep 之前獲得了 obj_lock,send 沒(méi)有 obj_lock,是不可能 wakeup 的,所以休眠就不會(huì)錯(cuò)過(guò)喚醒。
是否可以檢測(cè)完對(duì)象之后再獲取 obj_lock 呢?這是不正確的,將會(huì)導(dǎo)致進(jìn)程的操作不是原子的,出現(xiàn)多個(gè)進(jìn)程進(jìn)入緩沖區(qū)操作同一個(gè) obj 的情況。
3.1.2 sleep

這個(gè)問(wèn)題倒是解決了,又有一個(gè)新問(wèn)題,如果進(jìn)程帶著 obj_lock 休眠不釋放的話,同樣死鎖。
因此將 sleep 的接口定義為:
void sleep(void *obj, void *obj_lock)這個(gè)鎖作為 sleep 的參數(shù),在進(jìn)程進(jìn)入休眠狀態(tài)時(shí)釋放該鎖,在進(jìn)程返回時(shí)重新獲取該鎖。
也就是說(shuō),在 sleep 函數(shù)中,修改進(jìn)程狀態(tài)前要先釋放鎖,然后才修改為 SLEEPING,讓出 CPU。為了保證在釋放鎖后,不會(huì)出現(xiàn) sleep 錯(cuò)過(guò) wakeup 的情況,修改進(jìn)程狀態(tài)和釋放鎖需要是一個(gè)原子操作。
假設(shè)剛釋放鎖,雖然此時(shí)可能不會(huì)發(fā)生中斷,但是其他 cpu 并行時(shí)會(huì)重新獲取鎖,然后快速執(zhí)行完 wakeup,因此出現(xiàn)了 lost wakeup
由于涉及到了進(jìn)程狀態(tài)的修改,所以 xv6 選擇用 ptable.lock 來(lái)保證上面操作的原子性。
void
sleep(void *chan, struct spinlock *lk)
{

  struct proc *p = myproc();

  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");


  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }

  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;
  // Reacquire original lock.
  
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }

}在放棄 CPU 前:

  • 獲取表鎖,保證 lk 的釋放和狀態(tài)修改是一個(gè)原子操作
  • 釋放 lk
  • 修改休眠對(duì)象和進(jìn)程狀態(tài),交出 CPU
sched 將會(huì)交出 CPU,等到該進(jìn)程被 wakeup,重新獲取 CPU。在重新?lián)碛?CPU 后:

  • 修改休眠對(duì)象為 NULL
  • 被調(diào)度到時(shí),sched 中獲取了表鎖,放棄表鎖重新獲取 lk
進(jìn)程變?yōu)?SLEEPING 之后將不會(huì)被調(diào)度到,因此 sleeplock 減少了 CPU 的負(fù)擔(dān),不會(huì)再讓一個(gè)CPU忙等,而是合理放棄 CPU。
特殊情況在于,如果 lk = 表鎖,那么放棄 cpu 前不請(qǐng)求表鎖,擁有 cpu 后不放棄表鎖。休眠對(duì)象是一個(gè)進(jìn)程,在 xv6 中,這種情況只發(fā)生在 wait 函數(shù)中,父進(jìn)程等待子進(jìn)程退出,這個(gè)時(shí)候它休眠在自己身上。
3.1.3 wakeup

static void wakeup1(void *chan)
{
    struct proc *p;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
        if(p->state == SLEEPING && p->chan == chan)
            p->state = RUNNABLE;
}
void wakeup(void *chan)
{
    acquire(&ptable.lock);
    wakeup1(chan);
    release(&ptable.lock);
}然后是 wakeup 函數(shù),分為兩個(gè)部分:不加鎖的 wakeup1 和加鎖版本的 wakeup。這么劃分是因?yàn)椋谟行┣闆r下已經(jīng)獲取了表鎖,直接調(diào)用 wakeup 會(huì)死鎖,詳見(jiàn)下面的 wait 函數(shù)。
3.3 exit

這里來(lái)看看 exit 函數(shù)
void exit(void){
    struct proc *curproc = myproc();
    if(curproc == initproc)
        panic("init exiting");首先第一個(gè)進(jìn)程是不能退出的,如果退出直接報(bào)錯(cuò)。他需要用來(lái)回收子進(jìn)程和孤兒進(jìn)程的資源
// Close all open files.

  for(fd = 0; fd < NOFILE; fd++){
    if(curproc->ofile[fd]){
      fileclose(curproc->ofile[fd]);
      curproc->ofile[fd] = 0;
    }
  }子進(jìn)程會(huì)自己回收掉一部分資源。比如打開(kāi)文件描述符表,這一部分關(guān)閉所有文件,如果減到 0,再釋放該文件的 inode,如果文件的鏈接數(shù)和引用數(shù)都為 0 了,就刪除該文件。
acquire(&ptable.lock);

  // Parent might be sleeping in wait().
  wakeup1(curproc->parent);

  // Pass abandoned children to init.
  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
    if(p->parent == curproc){
      p->parent = initproc;
      if(p->state == ZOMBIE)
        wakeup1(initproc);
    }
  }

  // Jump into the scheduler, never to return.
  curproc->state = ZOMBIE;
  sched();
  panic("zombie exit");和前文對(duì)應(yīng),由于 exit 函數(shù)已經(jīng)擁有了表鎖,為了避免死鎖,使用不加鎖版本的 wakeup1。
然后是要喚醒父進(jìn)程,通知父進(jìn)程來(lái)回收自己。這里還會(huì)拋棄當(dāng)前進(jìn)程的所有子進(jìn)程,子進(jìn)程成為孤兒進(jìn)程,將孤兒進(jìn)程的父進(jìn)程修改為 init 進(jìn)程,讓 init 進(jìn)程負(fù)責(zé)回收資源。最后將自己的狀態(tài)設(shè)置為 zombie,父進(jìn)程將在 wait 調(diào)用中回收 zombie 的子進(jìn)程。
因?yàn)檫@個(gè)操作會(huì)修改進(jìn)程狀態(tài),所以執(zhí)行前要獲取表鎖。提一嘴,雖然這里是獲得了鎖,sched 函數(shù)需要當(dāng)前進(jìn)程持有鎖,后面會(huì)看到設(shè)計(jì)的原因。
為什么要父進(jìn)程負(fù)責(zé)回收呢?私以為,因?yàn)樽舆M(jìn)程在內(nèi)核中運(yùn)行時(shí)還要使用到頁(yè)表、內(nèi)核棧、進(jìn)程結(jié)構(gòu)體,如果直接回收,執(zhí)行下一條內(nèi)核代碼將會(huì)報(bào)錯(cuò),影響系統(tǒng)執(zhí)行,所以這些資源等到父進(jìn)程回收為好。
3.4 wait

acquire(&ptable.lock);

for(;;){
    // Scan through table looking for exited children.

    havekids = 0;
    for(p = ptable.proc; p < &ptable.proc[NPROC]; p++){
      if(p->parent != curproc)
        continue;

      havekids = 1;

      if(p->state == ZOMBIE) { // Found one.
        pid = p->pid;
        kfree(p->kstack);
        p->kstack = 0;
        freevm(p->pgdir);
        p->pid = 0;
        p->parent = 0;
        p->name[0] = 0;
        p->killed = 0;
        p->state = UNUSED;
        release(&ptable.lock);
        return pid;
      }
    }

    // No point waiting if we don't have any children.
    if(!havekids || curproc->killed){
      release(&ptable.lock);
      return -1;
    }

    // Wait for children to exit.  (See wakeup1 call in proc_exit.)
    sleep(curproc, &ptable.lock);  //DOC: wait-sleep
}父進(jìn)程調(diào)用 wait 就是用來(lái)負(fù)責(zé)回收子進(jìn)程,流程為:

  • 首先遍歷看看有沒(méi)有孩子,沒(méi)有孩子或者當(dāng)前進(jìn)程被殺死就直接退出
  • 有孩子進(jìn)程就一定會(huì) wait 到一個(gè)孩子變?yōu)榻┦M(jìn)程為止,每次掃描一輪沒(méi)有發(fā)生有子進(jìn)程為僵尸進(jìn)程就陷入睡眠,子進(jìn)程調(diào)用 exit 后會(huì)喚醒父進(jìn)程
  • 如果找到僵尸子進(jìn)程成功,回收他的頁(yè)表、內(nèi)核棧、pcb
exit 和 wait 如何保證原子性和防止死鎖(沒(méi)有 lost wakeup)?因?yàn)樗麄兪褂帽礞i進(jìn)行互斥,正好符合上面所說(shuō)的生產(chǎn)者消費(fèi)者模型
一般而言,父進(jìn)程可能不會(huì)sleep兩次?
4.進(jìn)程調(diào)度

切換進(jìn)程的流程是:進(jìn)程 A --> scheduler --> 進(jìn)程 B。進(jìn)程 A 切換到調(diào)度進(jìn)程是使用 sched 函數(shù),sched 函數(shù)是 swtch 函數(shù)的封裝。當(dāng)切換回 scheduler 時(shí),除了返回對(duì)應(yīng)的切換上下文外,此時(shí) scheduler 還需要保證關(guān)中斷和擁有 ptable.lock。
在 swtch 和  scheduler 執(zhí)行的過(guò)程中,必須要持有表鎖。要知道執(zhí)行 swtch 是會(huì)切換進(jìn)程的,因?yàn)檫@個(gè)鎖在一個(gè)進(jìn)程中獲取,在另一個(gè)進(jìn)程中釋放。這樣使用鎖很不常見(jiàn),但這里是必須的。
也就是說(shuō),sched 是進(jìn)程 A 取鎖,進(jìn)程 B 放鎖。即使是 sched 之后的語(yǔ)句一般會(huì)來(lái)個(gè)放鎖操作,新進(jìn)程也是如此。
因此在進(jìn)入 sched 前一定要獲取 ptable.lock,進(jìn)入調(diào)度有多種情況,我們逐一分析及驗(yàn)證:

  • yield 因時(shí)間片放棄 CPU
  • sleep 因阻塞放棄 CPU
  • exit 因死亡放棄 CPU
4.1 yield

4.1.1 時(shí)間中斷

// Force process to give up CPU on clock tick.
  // If interrupts were on while locks held, would need to check nlock.
  if(myproc() && myproc()->state == RUNNING &&
     tf->trapno == T_IRQ0+IRQ_TIMER)
    yield();在 trap.c 中,當(dāng)進(jìn)程因?yàn)闀r(shí)間中斷進(jìn)入中斷處理時(shí),將會(huì) yield 放棄 CPU。
4.1.2 yield

void
yield(void)
{
  acquire(&ptable.lock);  // DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}yield 對(duì)應(yīng)于前文,首先要獲取 ptable.lock 這把鎖,更改狀態(tài)為 RUNNABLE。
如果在 swtch 沒(méi)有持有表鎖會(huì)怎么樣?例如:CPU 0 調(diào)用 yield 放棄進(jìn)程A,然后 yield 中設(shè)置進(jìn)程狀態(tài)為 RUNNABLE。CPU0 在執(zhí)行 sched 之前另一個(gè) CPu1 會(huì)調(diào)度進(jìn)程 A,在這短暫的時(shí)間內(nèi),這樣的話一個(gè)進(jìn)程運(yùn)行在兩個(gè) CPU 上,內(nèi)核棧將會(huì)發(fā)生錯(cuò)誤。
void
sched(void)
{

  int intena;
  struct proc *p = myproc();

  if(!holding(&ptable.lock))
    panic("sched ptable.lock");

  if(mycpu()->ncli != 1)
    panic("sched locks");
   
  if(p->state == RUNNING)
    panic("sched running");

  if(readeflags()&FL_IF)
    panic("sched interruptible");

  intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;
}sched 函數(shù)首先執(zhí)行了一些檢查:

  • 不可中斷
  • 非運(yùn)行態(tài)
  • 持有鎖
intena 這里表示 pushcli 之前 CPU 允許中斷的情況。這個(gè)值保存下來(lái)等待進(jìn)程被調(diào)度到時(shí),用來(lái)恢復(fù) CPU 的中斷情況,保證在同一個(gè)進(jìn)程里中斷允許情況一致。比如說(shuō)在進(jìn)程 A 放棄CPU之前是開(kāi)中斷,進(jìn)程 A 恢復(fù)執(zhí)行后,在 yield 中將會(huì) popcli 恢復(fù)開(kāi)中斷。
xv6 下系統(tǒng)調(diào)用是異常門(mén),所以內(nèi)核下是可以開(kāi)中斷的。
4.1.3 進(jìn)程B解鎖

在 scheduler 中依舊還是關(guān)中斷,等到調(diào)度到進(jìn)程 B 后。
intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;

}進(jìn)程 B 在這里恢復(fù) CPU 的中斷情況
void
yield(void)
{

  acquire(&ptable.lock);  //DOC: yieldlock
  myproc()->state = RUNNABLE;
  sched();
  release(&ptable.lock);
}跳轉(zhuǎn)回到 yield 函數(shù),釋放表鎖,成功對(duì)應(yīng)上文:進(jìn)程A上鎖,進(jìn)程B解鎖。從 yield 返回后就是跑到 trapret 恢復(fù)進(jìn)程執(zhí)行了,這里就不再多說(shuō)。
假如是調(diào)度到一個(gè)新進(jìn)程,新進(jìn)程的第一條指令(不是用戶指令?。。┦?forkret,在這里將會(huì)釋放表鎖,完成”進(jìn)程B解鎖“。
4.2  sleep

sleep 前面也看過(guò)了
// Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;
  sched();在 sched 函數(shù)前會(huì)獲取表鎖,sched 函數(shù)后釋放表鎖,也就保證了前面說(shuō)的。進(jìn)程 A 上鎖,進(jìn)程B解鎖。進(jìn)入 sched 的狀態(tài)不是 RUNNING!!
sleep 什么時(shí)候被使用到呢?

  • sleeplock
  • exit&wait
  • 外設(shè)驅(qū)動(dòng):系統(tǒng)調(diào)用 read、write 時(shí) sleep,中斷處理程序時(shí) wakeup
4.3 exit

// Jump into the scheduler, never to return.
  curproc->state = ZOMBIE;
  sched();
  panic("zombie exit");exit 也是在 sched 前獲取鎖,然后設(shè)置為僵尸態(tài)。

-----------------------------
精選高品質(zhì)二手iPhone,上愛(ài)鋒貝APP
您需要登錄后才可以回帖 登錄 | 立即注冊(cè)   

本版積分規(guī)則

QQ|Archiver|手機(jī)版|小黑屋|愛(ài)鋒貝 ( 粵ICP備16041312號(hào)-5 )

GMT+8, 2025-2-8 23:06

Powered by Discuz! X3.4

© 2001-2013 Discuz Team. 技術(shù)支持 by 巔峰設(shè)計(jì).

快速回復(fù) 返回頂部 返回列表