操作系统_lab3_同步问题

"系统调用的进一步理解"

Posted by neo on April 2, 2019

操作系统实验三 同步问题

“16281052 杨涵晨 计科1601 ”

TASK 1

1.1 实验理解

  • 四个进程按照一定顺序运行,利用多个信号量来保证这个指定顺序

  • 利用fork()产生多个进程

  • 其中p1最先执行执行,利用信号量 p1_signal 初始化为0, 让进程p2,p3,p4等待p1_signal,只有p1执行完毕才会signal()

  • p2,p3互斥,利用wait(p1_signal)来等待p1结束,而且两者都同时等待p1_signal,通过这种方式保证互斥

  • p4最后一个执行,利用wait(p2_signal),wait(p3_signal)来等待所有进程都结束

1.2 实验代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>

sem_t *p1_signal = NULL;
sem_t *p2_signal = NULL;
sem_t *p3_signal = NULL;

int main(int argc, char *argv[])
{
    pid_t pid;
    p1_signal=sem_open("P1_signalname",O_CREAT,0666,0);
	p2_signal=sem_open("P2_signalname",O_CREAT,0666,0);
	p3_signal=sem_open("P3_signalname",O_CREAT,0666,0);
    pid = fork();

    if (pid < 0)
    { // 没有创建成功
        perror("fork create error");
    }
    if (0 == pid)
    { // 子进程
        
        sem_wait(p1_signal);
        printf("I am the Process P2\n");
        sem_post(p1_signal);
        sem_post(p2_signal);
    }
    else if (pid > 0)
    { // 父进程
        printf("I am the Process P1\n");
        sem_post(p1_signal);
        pid = fork();
        if (pid < 0)
        { // 没有创建成功
            perror("fork create error");
        }
        if (0 == pid)
        { // 子进程
            sem_wait(p1_signal);
            printf("I am the Process P3\n");
            sem_post(p1_signal);
            sem_post(p3_signal);
            pid = fork();
            if (pid < 0)
            { // 没有创建成功
                perror("fork create error");
            }
            if (0 == pid)
            { // 子进程
                sem_wait(p2_signal);
                sem_wait(p3_signal);
                printf("I am the Process P4\n");
            }
        }
    }
    sem_close(p1_signal);
    sem_unlink("p1_signalname");
    sem_close(p2_signal);
    sem_unlink("p2_signalname");
    sem_close(p3_signal);
    sem_unlink("p3_signalname");
    return 0;
}

我们利用fork函数产生的进程数如下:

1.3 实验结果

通过结果我们发现,运行后有两种情况

  1. p1->p2->p3->p4

  1. p1->p3->p2->p4

#####1.4 结果解释

在我们的题目要求下,p2,p3互斥,当p1执行完毕后,p2,p3进行竞争,这两个进程谁先获得P1产生的信号量谁就先执行另一个进程等待。最后等P2和P3都执行完了再执行P4,所以会出现上面的两种执行顺序。

TASK 2

2.1 实验理解

  • 利用一个main函数开启多个线程进行退票和售票,利用worker1(), worker2()来实现
  • worker1()售票,ticketcount减少
  • worker2()退票,ticketcount增加

2.2 实验代码

2.2.1 未加信号量时的实验

再刚开始实验的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>

int pthread_yield(void);
volatile int ticketCount = 1000;

sem_t *mySem = NULL;
//read
void *worker1(void *arg)
{
    int temp = 0;
    //sem_wait(mySem);
    temp = ticketCount;
    pthread_yield();
    temp = temp - 1;
    pthread_yield();
    ticketCount = temp;
    //sem_post(mySem);
    return NULL;
}

//print
void *worker2(void *arg)
{
    int temp = 0;
    //sem_wait(mySem);
    temp=ticketCount;
    pthread_yield();
    temp=temp+1;
    pthread_yield();
    ticketCount=temp;
    //sem_post(mySem);
    return NULL;
}

int main(int argc, char *argv[])
{
    // loops = atoi(argv[1]);
    if(argc!=3)
	{
		printf("请正确输入参数!\n");
		exit(0);
	}
    int i;
    printf("初始票数为:%d\n",ticketCount);
    pthread_t p[1000];
    //printf("Initial value : %d\n", counter);

    mySem = sem_open("myname", O_CREAT, 0666, 1);
    for(i=0;i<atoi(argv[1]);i++)
    {
        pthread_create(&p[i], NULL, worker1, NULL);
    }
    for(i=500;i<atoi(argv[2])+500;i++)
    {
        pthread_create(&p[i], NULL, worker2, NULL);
    }
    for(i=0;i<atoi(argv[1]);i++)
    {
        pthread_join(p[i], NULL);
    }
    for(i=500;i<atoi(argv[2])+500;i++)
    {
        pthread_join(p[i], NULL);
    }
    sem_close(mySem);
    printf("the number of tickets are %d\n",ticketCount);

    sem_unlink("myname");
    return 0;
}

当我们没有利用信号量进行操作时,程序会进行读脏数据等,数据读入或者写入的各种错误。如下所示,当我输入200,100时,正确答案应该是900

而下面的答案确不如人意。

分析结果:

  • 在我们的程序中,其中有多个退票和买票进程,当没有信号量控制时,当我们其中随机的一个进程读取了一个值后,后面的进程可能会进行写入,这样我们刚才读的就是脏数据。导致数据有问题。
2.2.2 引入加信号量时的实验

将上面代码加入信号量进行编译

1
2
3
4
5
6
7
8
9
10
11
12
void *worker1(void *arg)
{
    int temp = 0;
    sem_wait(mySem);
    temp = ticketCount;
    pthread_yield();
    temp = temp - 1;
    pthread_yield();
    ticketCount = temp;
    sem_post(mySem);
    return NULL;
}

使用sem_wait().sem_post()进行控制

运行实验可以看到。现在不再有错误.

2.3 实验总结

  • 当我们引入信号量时,可以很好的避免读脏数据等操作。

  • 相当于设置一个flag初始值为1,表示每次只允许一个线程操作ticketCount这个数据。

  • 当两者都是读操作时,一定要注意访问控制。

TASK 3

3.1 实验理解

此题是一个经典的生产者和消费者问题,如果不考虑同步机制的话便会出现如下问题:

  • 输入进程产生过快,buf数组的资源用尽,继续输入会导致数组越界或者之前输入的字符还未打印便被覆盖
  • 输出进程消耗过快,继续输出则会访问到为初始化的数组元素或者将之前打印过的字符再次打印

根据以上存在的问题,我们可以通过使用信号量来实现两个进程之间的同步。略经分析我们需要:

  • 在一个程序编写两个进程,一个用于输入,一个用于输出
  • 利用一个共享的数据段进行存放数据
  • 申请两个信号变量来控制输入输出,当buff不为满的时候才能输入
  • 当buff不为空的时候才能输出,让输出函数用sleep控制速度

3.2 实验代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<semaphore.h>
#include<sys/stat.h>
#include<fcntl.h>
int buf[10];
sem_t *empty=NULL;
sem_t *full=NULL;

void *worker1(void *arg)
{
	
	for(int i=0;i<10;i++)
	{
		sem_wait(empty);
		scanf("%d",&buf[i]);
		sem_post(full);
		if(i==9)
		{
			i=-1;
		}
	}	
	return NULL;
}

void *worker2(void *arg)
{
	for(int i=0;i<10;i++)
	{
		sem_wait(full);
		printf("print : %d\n",buf[i]);
		sem_post(empty);
		sleep(2);
		if(i==9)
		{  
			i=-1;
		
		}
	}	
	return NULL;
}

int main(int argc,char *argv[])
{
	empty=sem_open("empty_",O_CREAT,0666,10);
	full=sem_open("full_",O_CREAT,0666,0);
	pthread_t p1,p2;
	pthread_create(&p1,NULL,worker1,NULL);
	pthread_create(&p2,NULL,worker2,NULL);
	pthread_join(p1,NULL);
	pthread_join(p2,NULL);
	sem_close(empty);
	sem_close(full);
	sem_unlink("empty_");
	sem_unlink("full_");
	return 0;

}

3.3 实验结果

当我加入判断机制时,可以将buff状态输出,并且避免输入和输出不同速而导致的输入为打印就被重写的情况

Task 4

4.1 a) 题

题目:通过实验测试,验证共享内存的代码中,receiver能否正确读出sender发送的字符串?如果把其中互斥的代码删除,观察实验结果有何不同?如果在发送和接收进程中打印输出共享内存地址,他们是否相同,为什么?

  1. 实验源码

    Sender.c

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    
    /*
     * Filename: Sender.c
     * Description: 
     */
       
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/sem.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/types.h>
    #include <string.h>
       
    int main(int argc, char *argv[])
    {
        key_t  key;
        int shm_id;
        int sem_id;
        int value = 0;
        //1.Product the key
        key = ftok(".", 0xFF);
        //2. Creat semaphore for visit the shared memory
        sem_id = semget(key, 1, IPC_CREAT|0644);
        if(-1 == sem_id)
        {
            perror("semget");
            exit(EXIT_FAILURE);
        }
        //3. init the semaphore, sem=0
        if(-1 == (semctl(sem_id, 0, SETVAL, value)))
        {
            perror("semctl");
            exit(EXIT_FAILURE);
        }
        //4. Creat the shared memory(1K bytes)
        shm_id = shmget(key, 1024, IPC_CREAT|0644);
        if(-1 == shm_id)
        {
            perror("shmget");
            exit(EXIT_FAILURE);
        }
        //5. attach the shm_id to this process
        char *shm_ptr;
        shm_ptr = shmat(shm_id, NULL, 0);
        if(NULL == shm_ptr)
        {
            perror("shmat");
            exit(EXIT_FAILURE);
        }
        //6. Operation procedure
        struct sembuf sem_b;
        sem_b.sem_num = 0;      //first sem(index=0)
        sem_b.sem_flg = SEM_UNDO;
        sem_b.sem_op = 1;           //Increase 1,make sem=1
           
        while(1)
        {
            if(0 == (value = semctl(sem_id, 0, GETVAL)))
            {
                printf("\nNow, snd message process running:\n");
                printf("\tInput the snd message:  ");
                scanf("%s", shm_ptr);
       
                if(-1 == semop(sem_id, &sem_b, 1))
                {
                    perror("semop");
                    exit(EXIT_FAILURE);
                }
            }
            //if enter "end", then end the process
            if(0 == (strcmp(shm_ptr ,"end")))
            {
                printf("\nExit sender process now!\n");
                break;
            }
        }
        shmdt(shm_ptr);
        return 0;
    }
    

    Receiver.c

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/sem.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/types.h>
    #include <string.h>
       
    int main(int argc, char *argv[])
    {
        key_t  key;
        int shm_id;
        int sem_id;
        int value = 0;
        //1.Product the key
        key = ftok(".", 0xFF);
        //2. Creat semaphore for visit the shared memory
        sem_id = semget(key, 1, IPC_CREAT|0644);
        if(-1 == sem_id)
        {
            perror("semget");
            exit(EXIT_FAILURE);
        }
        //3. init the semaphore, sem=0
        if(-1 == (semctl(sem_id, 0, SETVAL, value)))
        {
            perror("semctl");
            exit(EXIT_FAILURE);
        }
        //4. Creat the shared memory(1K bytes)
        shm_id = shmget(key, 1024, IPC_CREAT|0644);
        if(-1 == shm_id)
        {
            perror("shmget");
            exit(EXIT_FAILURE);
        }
        //5. attach the shm_id to this process
        char *shm_ptr;
        shm_ptr = shmat(shm_id, NULL, 0);
        if(NULL == shm_ptr)
        {
            perror("shmat");
            exit(EXIT_FAILURE);
        }
       
        //6. Operation procedure
        struct sembuf sem_b;
        sem_b.sem_num = 0;      //first sem(index=0)
        sem_b.sem_flg = SEM_UNDO;
        sem_b.sem_op = -1;           //Increase 1,make sem=1
           
        while(1)
        {
            if(1 == (value = semctl(sem_id, 0, GETVAL)))
            {
                printf("\nNow, receive message process running:\n");
                printf("\tThe message is : %s\n", shm_ptr);
       
                if(-1 == semop(sem_id, &sem_b, 1))
                {
                    perror("semop");
                    exit(EXIT_FAILURE);
                }
            }
            //if enter "end", then end the process
            if(0 == (strcmp(shm_ptr ,"end")))
            {
                printf("\nExit the receiver process now!\n");
                break;
            }
        }
        shmdt(shm_ptr);
        //7. delete the shared memory
        if(-1 == shmctl(shm_id, IPC_RMID, NULL))
        {
            perror("shmctl");
            exit(EXIT_FAILURE);
        }
        //8. delete the semaphore
        if(-1 == semctl(sem_id, 0, IPC_RMID))
        {
            perror("semctl");
            exit(EXIT_FAILURE);
        }
        return 0;
    }
    
  2. 程序解释

    • 通过ftok函数创建一个key_t类型的变量,作为共享内存的key,ftok函数的两个参数分别是文档名(一个存在的路径),上例中的路径是.表示当前路径,另一个参数是子序号。

      1
      2
      
       key_t  key;
       key = ftok(".", 0xFF);
      
    • 通过semget()函数创建一个信号量,初始值为1,再通过semctl()函数初始化该信号量。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      
      int sem_id;
      sem_id = semget(key, 1, IPC_CREAT|0644);
      if(-1 == sem_id)
      {
          perror("semget");
          exit(EXIT_FAILURE);
      }
      if(-1 == (semctl(sem_id, 0, SETVAL, value)))
      {
          perror("semctl");
          exit(EXIT_FAILURE);
      }
      
    • 通过shmget()函数创建了一个大小为1000B的共享内存,通过shmat()函数,将刚刚创建的共享内存以可读写的方式挂载在进程上,并且指定系统将自动选择一个合适的地址给共享内存。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      
      //4. Creat the shared memory(1K bytes)
      shm_id = shmget(key, 1024, IPC_CREAT|0644);
      if(-1 == shm_id)
      {
          perror("shmget");
          exit(EXIT_FAILURE);
      }
      //5. attach the shm_id to this process
      char *shm_ptr;
      shm_ptr = shmat(shm_id, NULL, 0);
      if(NULL == shm_ptr)
      {
          perror("shmat");
          exit(EXIT_FAILURE);
      }
      
  • 利用有信号量的原始代码进行运行,我们发现没有任何错误。

  • 删除代码中的共享部分,参考博客中的解释,删除代码,并加入循环和sleep()来输出。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    while(1)
    {
        printf("\nNow, snd message process running:\n");
        printf("\tInput the snd message:  ");
        scanf("%s", shm_ptr);
        //if enter "end", then end the process
        if(0 == (strcmp(shm_ptr ,"end")))
        {
            printf("\nExit sender process now!\n");
            break;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    while(1)
    {
        printf("\nNow, receive message process running:\n");
        printf("\tThe message is : %s\n", shm_ptr);
      
        //if enter "end", then end the process
        if(0 == (strcmp(shm_ptr ,"end")))
        {
            printf("\nExit the receiver process now!\n");
            break;
        }
        sleep(3);
    }
    

    我们可以看到,现在receiver.c一直在读内存中的字符,但是存在的问题是生产者消费者的典型问题:

    • 太快,多读
    • 太慢,写覆盖
  • 添加打印内存地址代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    /*
     * Filename: sender.c
     * Description: 
     */
    while(1)
    {
        if(0 == (value = semctl(sem_id, 0, GETVAL)))
        {
            printf("\nNow, snd message process running:\n");
            printf("The shared memory address is %x\n",shm_ptr);
            printf("\tInput the snd message:  ");
            scanf("%s", shm_ptr);
      
            if(-1 == semop(sem_id, &sem_b, 1))
            {
                perror("semop");
                exit(EXIT_FAILURE);
            }
        }
      
        //if enter "end", then end the process
        if(0 == (strcmp(shm_ptr ,"end")))
        {
            printf("\nExit sender process now!\n");
            break;
        }
    }
      
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    
    /*
     * Filename: Receiver.c
     * Description: 
     */
    while(1)
    {
        if(1 == (value = semctl(sem_id, 0, GETVAL)))
        {
             printf("\nNow, receive message process running:\n");
             printf("The shared memory address is %x\n",shm_ptr);
             printf("\tThe message is : %s\n", shm_ptr);
      
             if(-1 == semop(sem_id, &sem_b, 1))
             {
                 perror("semop");
                 exit(EXIT_FAILURE);
             }
         }
      
         //if enter "end", then end the process
         if(0 == (strcmp(shm_ptr ,"end")))
         {
             printf("\nExit the receiver process now!\n");
             break;
         }
    }
    
  1. 现象解释

    两个进程显示的内存地址并不一致,这似乎与我们内存共享的机制不符。但是结合实验一中我们了解到的虚拟内存机制,这一现象也是能够得到解释的。操作系统为进程分配的内存地址并不是实际的物理内存地址,而是一个虚拟内存地址,通过页表的映射,可以将虚拟内存地址转换为物理内存地址。

    我们运行的两个进程在初始化的时候使用了shmat函数,此函数的作用是将共享内存空间挂载到进程中,实则就是对进程分配字符串的虚拟内存映射到共享内存的物理内存,从而实现内存的共享。所以虽然我们打印出来的内存地址不一样,但是它们实际映射的物理内存地址是一致的。

    程序在挂载内存的时候使用的shmat()函数中的第二个参数使用的是NULL,NULL参数的含义是进程让系统分配给共享内存合适的地址。在shmat()函数中,第二个参数有三种选择,分别是:

    参数值 NULL addr addr
    含义 系统将自动选择一个合适的地址 如果shmaddr非0 并且指定了SHM_RND 则此段连接到shmaddr -(shmaddr mod SHMLAB)所表示的地址上。 第三个参数如果在flag中指定了SHM_RDONLY位,则以只读方式连接此段,否则以读写的方式连接此 段。

4.2 b) 题

题目:有名管道和无名管道通信系统调用是否已经实现了同步机制?通过实验验证,发送者和接收者如何同步的。比如,在什么情况下,发送者会阻塞,什么情况下,接收者会阻塞?

  1. 无名管道

无名管道的同步机制如下:

  • 管道的读写通过两个系统调用write和read实现
  • 发送者在向管道内存中写入数据之前,首先检查内存是否被读进程锁定内存中是否还有剩余空间,如果这两个要求都满足的话write函数会对内存上锁,然后进行写入数据,写完之后解锁;否则就会等待(阻塞)。
  • 写进程在读取管道中的数据之前,也会检查内存是否被读进程锁定管道内存中是否有数据,如果满足这两个条件,read函数会对内存上锁,读取数据后在解锁;否则会等到(阻塞)

无名管道常用于连通父子进程,用于双方的通信,而博客中给出的示例程序并未使用多个进程,因此不能很好地到达实验验证的目的,这里我们对其源代码做了一些修改,采用fork的方式构造父子进程之间的通信,以下为程序源码pipe.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
 * Filename: pipe.c
 */
 
#include <stdio.h>
#include <unistd.h>     //for pipe()
#include <string.h>     //for memset()
#include <stdlib.h>     //for exit()

int main()
{
    int fd[2];
    char buf[20];
    if(-1 == pipe(fd))
    {
        perror("pipe");
        exit(EXIT_FAILURE);
    }
    pid_t pid;
    pid = fork();

    if(!pid){
        write(fd[1], "hello,world", 12);
        memset(buf, '\0', sizeof(buf));
    }

    else if(pid>0){
        read(fd[0], buf, 12);
        printf("The message is: %s\n", buf);
    }

    else{
        perror("fork");
        exit(1);
    }

    return 0;
}
  • 程序解释
    • 过pipe函数创建管道,函数传递一个整形数组fd,fd的两个整形数表示的是两个文件描述符,其中第一个用于读取数据,第二个用于写数据。两个描述符相当远管道的两端,一段负责写数据,一段负责读数据。
    • pipe管道是半双工的工作模式,某一时刻只能读或者只能写
    • 读写管道就和读写普通文件一样,使用write和read
  • 执行结果

    这里将父进程设置为读进程,子进程设置为写进程,我们在Linux终端下将其编译运行:

  1. 有名管道

有名管道可用于更为广泛的进程之间的通信,但其区别于无名通道的一点则是通信双方必须同时存在,否则便会阻塞。由此我们也可以知道其读写操作是同时进行的。下面我们由其给出的示例代码进行实验验证。

写进程源代码fifo_send.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/*
 *File: fifo_send.c
 */
 
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <fcntl.h>


#define FIFO "/tmp/my_fifo"

int main()
{
    char buf[] = "hello,world";

    //`. check the fifo file existed or not
    int ret;
    ret = access(FIFO, F_OK);
    if(ret == 0)    //file /tmp/my_fifo existed
    {
        system("rm -rf /tmp/my_fifo");
    }

    //2. creat a fifo file
    if(-1 == mkfifo(FIFO, 0766))
    {
        perror("mkfifo");
        exit(EXIT_FAILURE);
    }

    //3.Open the fifo file
    int fifo_fd;
    fifo_fd = open(FIFO, O_WRONLY);
    if(-1 == fifo_fd)
    {
        perror("open");
        exit(EXIT_FAILURE);

    }

    //4. write the fifo file
    int num = 0;
    num = write(fifo_fd, buf, sizeof(buf));
    if(num < sizeof(buf))
    {
        perror("write");
        exit(EXIT_FAILURE);
    }

    printf("write the message ok!\n");

    close(fifo_fd);

    return 0;
}

读进程源代码fifo_rcv.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
 *File: fifo_rcv.c
 */
 
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <fcntl.h>


#define FIFO "/tmp/my_fifo"

int main()
{
    char buf[20] ;
    memset(buf, '\0', sizeof(buf));

    //`. check the fifo file existed or not
    int ret;
    ret = access(FIFO, F_OK);
    if(ret != 0)    //file /tmp/my_fifo existed
    {
        fprintf(stderr, "FIFO %s does not existed", FIFO);
        exit(EXIT_FAILURE);
    }

    //2.Open the fifo file
    int fifo_fd;
    fifo_fd = open(FIFO, O_RDONLY);
    if(-1 == fifo_fd)
    {
        perror("open");
        exit(EXIT_FAILURE);

    }

    //4. read the fifo file
    int num = 0;
    num = read(fifo_fd, buf, sizeof(buf));

    printf("Read %d words: %s\n", num, buf);

    close(fifo_fd);

    return 0;
}

在Linux终端在编译并运行:

由图所示,当写进程单独运行时,尽管管道中不存在数据,但其仍处于阻塞状态,随后读进程进入之后,读写进程之间实现了通信,进程得以工作并静止,这与上文的描述相符,完成了实验的验证。

4.3 c) 题

题目:消息通信系统调用是否已经实现了同步机制?通过实验验证,发送者和接收者如何同步的。比如,在什么情况下,发送者会阻塞,什么情况下,接收者会阻塞?

客户端源代码Client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <signal.h>

#define BUF_SIZE 128

//Rebuild the strcut (must be)
struct msgbuf
{
    long mtype;
    char mtext[BUF_SIZE];
};


int main(int argc, char *argv[])
{
    //1. creat a mseg queue
    key_t key;
    int msgId;
    
    printf("THe process(%s),pid=%d started~\n", argv[0], getpid());

    key = ftok(".", 0xFF);
    msgId = msgget(key, IPC_CREAT|0644);
    if(-1 == msgId)
    {
        perror("msgget");
        exit(EXIT_FAILURE);
    }

    //2. creat a sub process, wait the server message
    pid_t pid;
    if(-1 == (pid = fork()))
    {
        perror("vfork");
        exit(EXIT_FAILURE);
    }

    //In child process
    if(0 == pid)
    {
        while(1)
        {
            alarm(0);
            alarm(100);     //if doesn't receive messge in 100s, timeout & exit
            struct msgbuf rcvBuf;
            memset(&rcvBuf, '\0', sizeof(struct msgbuf));
            msgrcv(msgId, &rcvBuf, BUF_SIZE, 2, 0);                
            printf("Server said: %s\n", rcvBuf.mtext);
        }
        
        exit(EXIT_SUCCESS);
    }

    else    //parent process
    {
        while(1)
        {
            usleep(100);
            struct msgbuf sndBuf;
            memset(&sndBuf, '\0', sizeof(sndBuf));
            char buf[BUF_SIZE] ;
            memset(buf, '\0', sizeof(buf));
            
            printf("\nInput snd mesg: ");
            scanf("%s", buf);
            
            strncpy(sndBuf.mtext, buf, strlen(buf)+1);
            sndBuf.mtype = 1;

            if(-1 == msgsnd(msgId, &sndBuf, strlen(buf)+1, 0))
            {
                perror("msgsnd");
                exit(EXIT_FAILURE);
            }
            
            //if scanf "end~", exit
            if(!strcmp("end~", buf))
                break;
        }
        
        printf("THe process(%s),pid=%d exit~\n", argv[0], getpid());
    }

    return 0;
}

服务端源代码Server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#include <signal.h>

#define BUF_SIZE 128

//Rebuild the strcut (must be)
struct msgbuf
{
    long mtype;
    char mtext[BUF_SIZE];
};


int main(int argc, char *argv[])
{
    //1. creat a mseg queue
    key_t key;
    int msgId;
    
    key = ftok(".", 0xFF);
    msgId = msgget(key, IPC_CREAT|0644);
    if(-1 == msgId)
    {
        perror("msgget");
        exit(EXIT_FAILURE);
    }

    printf("Process (%s) is started, pid=%d\n", argv[0], getpid());

    while(1)
    {
        alarm(0);
        alarm(600);     //if doesn't receive messge in 600s, timeout & exit
        struct msgbuf rcvBuf;
        memset(&rcvBuf, '\0', sizeof(struct msgbuf));
        msgrcv(msgId, &rcvBuf, BUF_SIZE, 1, 0);                
        printf("Receive msg: %s\n", rcvBuf.mtext);
        
        struct msgbuf sndBuf;
        memset(&sndBuf, '\0', sizeof(sndBuf));

        strncpy((sndBuf.mtext), (rcvBuf.mtext), strlen(rcvBuf.mtext)+1);
        sndBuf.mtype = 2;

        if(-1 == msgsnd(msgId, &sndBuf, strlen(rcvBuf.mtext)+1, 0))
        {
            perror("msgsnd");
            exit(EXIT_FAILURE);
        }
            
        //if scanf "end~", exit
        if(!strcmp("end~", rcvBuf.mtext))
             break;
    }
        
    printf("THe process(%s),pid=%d exit~\n", argv[0], getpid());

    return 0;
}

在此机制中,发送端传送的消息都会加入一个消息队列,这个队列中的消息节点的大小和类型由我们编写程序之时自行定义。写进程在此机制中不会被阻塞,其写入的字符串会一直被添加至队列的末端,而读进程会从队列的首端一直读取消息,消息节点一旦被读取便会移除队列。当队列中不含其需要类型的消息时便会阻塞。

在此程序中,实则总共有三个进程,其中客户端有一个写进程和一个读进程,服务端则先后进行读操作和写操作。具体工作流程为客户端写进程先进行写操作,添加1类型信息至队列中,服务端若侦测到消息队列中有为接收的1类型信息便会将其接收并移除队列,随后将此1类型消息变为2类型消息添加至队列中,此时客户端的读进程便会读取此2类型的消息,完成一个完整的通信交互。

为验证我们得出的阻塞规则,我们可以先只开启客户端进行多条信息传输,再开启服务端观察结果:

可以发现这个完全符合我们的要求和想法。

Task 5

5.1 thread.h源码阅读

从git上下载pintos的源码: git clone https://github.com/laiy/Pintos/tree/master/src

进程的实验先进入到thread.h中寻找,可以看到这是对于一个进程的四个状态的定义,还有优先级的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* States in a thread's life cycle. */
enum thread_status
  {
    THREAD_RUNNING,     /* Running thread. */
    THREAD_READY,       /* Not running but ready to run. */
    THREAD_BLOCKED,     /* Waiting for an event to trigger. */
    THREAD_DYING        /* About to be destroyed. */
  };
/* Thread identifier type.
   You can redefine this to whatever type you like. */
typedef int tid_t;
#define TID_ERROR ((tid_t) -1)          /* Error value for tid_t. */

/* Thread priorities. */
#define PRI_MIN 0                       /* Lowest priority. */
#define PRI_DEFAULT 31                  /* Default priority. */
#define PRI_MAX 63                      /* Highest priority. */

再往下看我们可以看到对于进程这个结构体的定义

  • tid_t tid:线程的线程标识符。每个线程必须具有在内核的整个生命周期内唯一的tid。默认情况下,tid_t是int的typedef(在上面定义过了),每个新线程接收数字上的下一个更高的tid,从初始进程的1开始。
  • enum thread_status:线程的状态,一共有以下四种:
    • THREAD_RUNNING:线程在给定时间内正在运行。
    • THREAD_READY:该线程已准备好运行,但它现在没有运行。
    • THREAD_BLOCKED:线程正在等待某些事务,
    • THREAD_DYING:切换到下一个线程后,调度程序将销毁该线程。
  • char name[16]:线程命名的字符串,至少前几个数组单元为字符。
  • uint8_t *stack:线程的栈指针。当线程运行时,CPU的堆栈指针寄存器跟踪堆栈的顶部,并且该成员未使用。但是当CPU切换到另一个线程时,该成员保存线程的堆栈指针。保存线程的寄存器不需要其他成员,因为必须保存的其他寄存器保存在堆栈中。
  • int priority:线程优先级,范围从PRI_MIN(0)到PRI_MAX(63)。较低的数字对应较低的优先级,因此优先级0是最低优先级,优先级63是最高优先级。
  • struct list_elem allelem:用于将线程链接到所有线程的列表中。每个线程在创建时都会插入到此列表中,并在退出时删除。应该使用thread_foreach()函数来迭代所有线程。
  • struct list_elem elem:用于将线程放入双向链表:ready_list(准备好运行的线程列表)或sema_down(等待信号量的线程列表)。上面定义过了,default 31。
  • uint32_t pagedir:页表指针,用于将进程结构的虚拟地址映射到物理地址。
  • unsigned magic:始终设置为THREAD_MAGIC,用于检测堆栈溢出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct thread
  {
    /* Owned by thread.c. */
    tid_t tid;                          /* Thread identifier. */
    enum thread_status status;          /* Thread state. */
    char name[16];                      /* Name (for debugging purposes). */
    uint8_t *stack;                     /* Saved stack pointer. */
    int priority;                       /* Priority. */
    struct list_elem allelem;           /* List element for all threads list. */
    /* Shared between thread.c and synch.c. */
    struct list_elem elem;              /* List element. */

#ifdef USERPROG
    /* Owned by userprog/process.c. */
    uint32_t *pagedir;                  /* Page directory. */
#endif

    /* Owned by thread.c. */
    unsigned magic;                     /* Detects stack overflow. */
  };

当然还有很多的函数声明,我们后面继续说。

5.2 thread.c源码阅读

这个文件很长,我们先看init()函数,它为一个新建的进程指定了状态,分配进程号。调用init_thread()分配地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void
thread_init (void) 
{
  ASSERT (intr_get_level () == INTR_OFF);

  lock_init (&tid_lock);
  list_init (&ready_list);
  list_init (&all_list);

  /* Set up a thread structure for the running thread. */
  initial_thread = running_thread ();
  init_thread (initial_thread, "main", PRI_DEFAULT);
  initial_thread->status = THREAD_RUNNING;
  initial_thread->tid = allocate_tid ();
}

/* Does basic initialization of T as a blocked thread named
   NAME. */
static void
init_thread (struct thread *t, const char *name, int priority)
{
  enum intr_level old_level;

  ASSERT (t != NULL);
  ASSERT (PRI_MIN <= priority && priority <= PRI_MAX);
  ASSERT (name != NULL);

  memset (t, 0, sizeof *t);
  t->status = THREAD_BLOCKED;
  strlcpy (t->name, name, sizeof t->name);
  t->stack = (uint8_t *) t + PGSIZE;
  t->priority = priority;
  t->magic = THREAD_MAGIC;

  old_level = intr_disable ();
  list_push_back (&all_list, &t->allelem);
  intr_set_level (old_level);
}

下面我们关注到跟进程切换的函数,我们第一个看到的就是thread_block (void)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Puts the current thread to sleep.  It will not be scheduled
   again until awoken by thread_unblock().

   This function must be called with interrupts turned off.  It
   is usually a better idea to use one of the synchronization
   primitives in synch.h. */
void
thread_block (void) 
{
  ASSERT (!intr_context ());
  ASSERT (intr_get_level () == INTR_OFF);

  thread_current ()->status = THREAD_BLOCKED;
  schedule ();
}

递归的看,我们发现它的核心在于调用了schedule ();而且通过搜索,我们发现很多pintos都调用了这个函数,包括我们常用的thread_yield(),thread_exit().

我们发现它的声明就是在thread.c中,首先其定义了三个thread结构体的指针,均为局部变量,cur指针指向running_thread ()函数的返回值,指针next也是一个函数的返回值。pre之间为null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Schedules a new process.  At entry, interrupts must be off and
   the running process's state must have been changed from
   running to some other state.  This function finds another
   thread to run and switches to it.

   It's not safe to call printf() until thread_schedule_tail()
   has completed. */
static void
schedule (void) 
{
  struct thread *cur = running_thread ();
  struct thread *next = next_thread_to_run ();
  struct thread *prev = NULL;

  ASSERT (intr_get_level () == INTR_OFF);
  ASSERT (cur->status != THREAD_RUNNING);
  ASSERT (is_thread (next));

  if (cur != next)
    prev = switch_threads (cur, next);
  thread_schedule_tail (prev);
}
  1. 我们递归到running_thread()来看它的返回值, 此函数嵌入了汇编代码,将CPU堆栈指针(总是在最顶端)复制到“esp”中,然后四舍五入到页面的开头。因为“struct thread”总是在页面的开头,而堆栈指针位于中间的某个位置,所以它定位当前线程。因此cur指针符合我们的猜想,就是当前运行线程的指针。
1
2
3
4
5
6
7
8
9
10
11
12
13
/* Returns the running thread. */
struct thread *
running_thread (void)
{
  uint32_t *esp;

  /* Copy the CPU's stack pointer into `esp', and then round that
     down to the start of a page.  Because `struct thread' is
     always at the beginning of a page and the stack pointer is
     somewhere in the middle, this locates the curent thread. */
  asm ("mov %%esp, %0" : "=g" (esp));
  return pg_round_down (esp);
}
  1. 上面next指针对应的函数next_thread_to_run(),是一个返回list_entry中pop的值的函数。如果list为空,返回idle_thread.
1
2
3
4
5
6
7
8
9
10
11
12
13
/* Chooses and returns the next thread to be scheduled.  Should
   return a thread from the run queue, unless the run queue is
   empty.  (If the running thread can continue running, then it
   will be in the run queue.)  If the run queue is empty, return
   idle_thread. */
static struct thread *
next_thread_to_run (void)
{
  if (list_empty (&ready_list))
    return idle_thread;
  else
    return list_entry (list_pop_front (&ready_list), struct thread, elem);
}
  1. pre指针为null

所以我们之间进入到schedule ();的后半部分:断言assert是用来判断的。下面调用switch_threads (cur, next)将当前进程和下一个进程进行切换。查看此函数,竟然是用汇编编写的。存放在siwth.S中,结合官方解释,这是将当前堆栈的指针保存到cur线程的堆栈,接着从next线程的堆栈中恢复当前堆栈的指针,也就是寄存器esp的操作。由此我们可以确定进程的保存与恢复就是利用CPU栈顶指针的变化进行的,进程的状态则是保存在自身的堆栈当中。

#include "threads/switch.h"

#### struct thread *switch_threads (struct thread *cur, struct thread *next);
####
#### Switches from CUR, which must be the running thread, to NEXT,
#### which must also be running switch_threads(), returning CUR in
#### NEXT's context.
####
#### This function works by assuming that the thread we're switching
#### into is also running switch_threads().  Thus, all it has to do is
#### preserve a few registers on the stack, then switch stacks and
#### restore the registers.  As part of switching stacks we record the
#### current stack pointer in CUR's thread structure.

.globl switch_threads
.func switch_threads
switch_threads:
	# Save caller's register state.
	#
	# Note that the SVR4 ABI allows us to destroy %eax, %ecx, %edx,
	# but requires us to preserve %ebx, %ebp, %esi, %edi.  See
	# [SysV-ABI-386] pages 3-11 and 3-12 for details.
	#
	# This stack frame must match the one set up by thread_create()
	# in size.
	pushl %ebx
	pushl %ebp
	pushl %esi
	pushl %edi

	# Get offsetof (struct thread, stack).
.globl thread_stack_ofs
	mov thread_stack_ofs, %edx

	# Save current stack pointer to old thread's stack, if any.
	movl SWITCH_CUR(%esp), %eax
	movl %esp, (%eax,%edx,1)

	# Restore stack pointer from new thread's stack.
	movl SWITCH_NEXT(%esp), %ecx
	movl (%ecx,%edx,1), %esp

	# Restore caller's register state.
	popl %edi
	popl %esi
	popl %ebp
	popl %ebx
        ret
.endfunc

分析一下这个汇编代码: 先4个寄存器压栈保存寄存器状态(保护作用), 这4个寄存器是switch_threads_frame的成员:

1
2
3
4
5
6
7
8
9
10
11
 1 /* switch_thread()'s stack frame. */
 2 struct switch_threads_frame 
 3   {
 4     uint32_t edi;               /*  0: Saved %edi. */
 5     uint32_t esi;               /*  4: Saved %esi. */
 6     uint32_t ebp;               /*  8: Saved %ebp. */
 7     uint32_t ebx;               /* 12: Saved %ebx. */
 8     void (*eip) (void);         /* 16: Return address. */
 9     struct thread *cur;         /* 20: switch_threads()'s CUR argument. */
10     struct thread *next;        /* 24: switch_threads()'s NEXT argument. */
11   };

然后全局变量thread_stack_ofs记录线程和棧之间的间隙, 我们都知道线程切换有个保存现场的过程,来看34,35行, 先把当前的线程指针放到eax中, 并把线程指针保存在相对基地址偏移量为edx的地址中。38,39: 切换到下一个线程的线程棧指针, 保存在ecx中, 再把这个线程相对基地址偏移量edx地址(上一次保存现场的时候存放的)放到esp当中继续执行。这里ecx, eax起容器的作用, edx指向当前现场保存的地址偏移量。简单来说就是保存当前线程状态, 恢复新线程之前保存的线程状态。然后再把4个寄存器拿出来, 这个是硬件设计要求的, 必须保护switch_threads_frame里面的寄存器才可以destroy掉eax, edx, ecx。然后注意到现在eax(函数返回值是eax)就是被切换的线程棧指针。我们由此得到一个结论, schedule先把当前线程丢到就绪队列,然后把线程切换如果下一个线程和当前线程不一样的话。

最后到schedule ();的最后一个函数:thread_schedule_tail (prev),通过激活新线程的页表完成线程切换,如果前一个线程正在死亡,则销毁它。首先会获取当前运行进程的指针并保证此时程序不能被中断。接着其会将当其运行进程的状态改变为THREAD_RUNNING以及初始化其时间切片,这可以看做切换进程后对新进程的一个激活。最后的部分表示如果我们切换的线程正在死亡,销毁它的struct线程。而我们传入的prev一定为NULL,所以在切换过程中这一部分并不会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void
thread_schedule_tail (struct thread *prev)
{
  struct thread *cur = running_thread ();

  ASSERT (intr_get_level () == INTR_OFF);

  /* Mark us as running. */
  cur->status = THREAD_RUNNING;

  /* Start new time slice. */
  thread_ticks = 0;

#ifdef USERPROG
  /* Activate the new address space. */
  process_activate ();
#endif

  /* If the thread we switched from is dying, destroy its struct
     thread.  This must happen late so that thread_exit() doesn't
     pull out the rug under itself.  (We don't free
     initial_thread because its memory was not obtained via
     palloc().) */
  if (prev != NULL && prev->status == THREAD_DYING && prev != initial_thread)
    {
      ASSERT (prev != cur);
      palloc_free_page (prev);
    }
}

5.3 总结

  • 主要利用schedule()完成进程的调度。
  • 存放线程的堆栈起很大作用。