操作系统学习二:进程同步与互斥之生产者 消费者问题 NetCore实现

目的

1. 掌握进程(线程)的同步与互斥。

2. 掌握生产者消费者问题的实现方法。

3. 掌握VC的多线程编程方法。

内容

本实验要求设计并实现一个进程,该进程拥有3个生产者线程和1个消费者线程,它们使用10个不同的缓冲区。需要使用如下信号量:

l 一个mutex信号量,用以阻止生产者线程和消费者线程同时操作缓冲区队列;

l 一个full信号量,当生产者线程生产出一个物品时可以用它向消费者线程发出信号;

l 一个empty信号量,消费者线程释放出一个空缓冲区时可以用它向生产者线程发出信号;

生产者线程生产物品(通过等待一个时间模拟生产过程),然后将物品放置在一个空缓冲区中供消费者线程消费(通过将缓冲区数组元素值设为产品编号模拟放入过程)。消费者线程从缓冲区中获得物品,然后释放缓冲区(通过将缓冲区数组元素值设为0模拟取出过程)。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。

根据给出的相关函数,实现生产者消费者问题。为了分析结果,要求用文件保存缓冲区变化过程。

伪代码

下面给出的是解决这一问题的部分代码和伪代码,后面附上完整代码

#include <windows.h>    //大多数API函数都在这个头文件中定义
#include <stdio.h>
#include <iostream.h>

const unsigned short SIZE_OF_BUFFER = 10;   //缓冲区个数 
unsigned short ProductID = 0;     //生产的产品号
unsigned short ConsumeID = 0;    //将被消耗的产品号
unsigned short in = 0;         //产品进缓冲区时的缓冲区下标
unsigned short out = 0;       //产品出缓冲区时的缓冲区下标

//缓冲区是个循环队列
int g_buffer[SIZE_OF_BUFFER];    //用整型数组模拟缓冲区
bool  g_continue = true;      //当g_continue = false结束程序

HANDLE g_hMutex;          // g_hMutex是互斥信号量的句柄         
HANDLE g_hEmptySemaphore;     //即empty信号量的句柄
HANDLE g_hFullSemaphore;     //即full信号量的句柄

DWORD WINAPI Producer(LPVOID);    //生产者线程
DWORD WINAPI Consumer(LPVOID);    //消费者线程

//请完成主函数,主函数中要完成创建各信号量对象和创建各线程的工作
main()
{             
//创建生产者和消费者线程
CreateThread(, Producer, );
CreateThread(, Consumer, );

}

//请完成模拟生产者把新生产的产品放入缓冲区的过程

void AddToBuffer()
{
    ...
}

//请完成模拟消费者从缓冲区中取出一个产品的过程
void TakeFromBuffer()
{

}

//请根据伪代码完成生产者线程函数
Producer()
{
                      //获取线程ID
    while(TRUE)
    {   //模拟生产过程,生产一个物品
        sleep(rand);    //可以给出时间参数,1000为1秒,休眠时间 
        P(empty);                       //请求一个空缓冲区
        //操作缓冲区池
        P(mutex);
               //输出线程ID,这样可以看出是哪一个线程在工作
        AddToBuffer();  //将物品放置在一个空缓冲区中
        V(mutex);                   
        V(full);    //用信号通知一个满的缓冲区
    }
   return 0;
}

//请根据伪代码完成消费者线程函数       
Consumer()
{
                      //获取线程ID
    while(TRUE)
    {       
        P(full);                            //请求一个满缓冲区
        //操作缓冲区池
        P(mutex);
       //输出线程ID,这样可以看出是哪一个线程在工作
        TakeFromBuffer();  //从一个满缓冲区中取产品
        V(mutex);                       
        V(empty);   //用信号通知一个空的缓冲区
        sleep(rand);                    //休眠时间,模拟消费产品
    }
    return 0;
}

实现代码

using System;
using System.Dynamic;
using System.Threading;

namespace OperatingSystemExperiment.Exp2 {
    /// <summary>
    /// 生产者-消费者问题
    /// 使用 P-V 操作解决同步和互斥问题
    /// 本实验要求设计并实现一个进程,该进程拥有3个生产者线程和1个消费者线程,它们使用10个不同的缓冲区。
    /// </summary>
    public static class Main {
        private static int[] _buffer = new int[10];

        /// <summary>
        /// 是否继续运行
        /// </summary>
        private static bool _continueRun = true;

        /// <summary>
        /// 是否锁定缓冲区
        /// </summary>
        private static bool _isLock = false;

        /// <summary>
        /// 产品号
        /// </summary>
        private static int _productId = 0;

        /// <summary>
        /// 信号枚举
        /// </summary>
        private enum SemaphoreEnum {
            /// <summary>
            /// 互斥信号量,用以阻止生产者线程和消费者线程同时操作缓冲区队列
            /// </summary>
            Mutex,

            /// <summary>
            /// 当生产者线程生产出一个物品时可以用它向消费者线程发出信号
            /// </summary>
            Full,

            /// <summary>
            /// 消费者线程释放出一个空缓冲区时可以用它向生产者线程发出信号
            /// </summary>
            Empty
        }

        public static void Do()
        {
            ThreadStart producer = () => {
                while (_continueRun) {
                    Thread.Sleep(1200);
                    // 请求空缓冲区
                    var emptyBufferId = GetEmptyBuffer();
                    // 没有空缓冲区,继续等
                    if (emptyBufferId == -1) continue;
                    // 缓冲区锁定,等待
                    if (_isLock) continue;
                    P(SemaphoreEnum.Empty);
                    P(SemaphoreEnum.Mutex);
                    Console.WriteLine("生产线程 {0} 工作中", Thread.CurrentThread.ManagedThreadId);
                    AddToBuffer(emptyBufferId, ++_productId);
                    Console.WriteLine("Produce the {0} product to buffer.", _productId);
                    // 输出缓冲区内容
                    var nextIn = GetEmptyBuffer();
                    var nextOut = GetFullBuffer();
                    for (var i = 0; i < _buffer.Length; i++) {
                        if (i == nextOut)
                            Console.WriteLine("{0}: {1} <- 下一个可取出产品消费的地方", i, _buffer[i]);
                        else if (i == nextIn)
                            Console.WriteLine("{0}: {1} <- 可放下一个产品的位置", i, _buffer[i]);
                        else
                            Console.WriteLine("{0}: {1}", i, _buffer[i]);
                    }

                    V(SemaphoreEnum.Mutex);
                    // 用信号通知一个消费者线程有一个满的缓冲区
                    V(SemaphoreEnum.Full);
                }
            };

            // 1个监视线程
            new Thread(() => {
                while (_continueRun)
                {
                    if (_productId > 20)
                        _continueRun = false;
                }
            }).Start();
            // 3个生产者
            new Thread(producer).Start();
            new Thread(producer).Start();
            new Thread(producer).Start();
            // 1个消费者
            new Thread(() => {
                while (_continueRun) {
                    Thread.Sleep(200);
                    // 请求一个满的缓冲区
                    var fullBufferId = GetFullBuffer();
                    if (fullBufferId == -1) continue;
                    // 缓冲区锁定则继续等待
                    if (_isLock) continue;
                    P(SemaphoreEnum.Full);
                    // 操作缓冲区池
                    P(SemaphoreEnum.Mutex);
                    Console.WriteLine("消费者线程 {0} 工作", Thread.CurrentThread.ManagedThreadId);
                    var productId = TakeFromBuffer(fullBufferId);
                    Console.WriteLine("正在消费产品 {0}", productId);
                    V(SemaphoreEnum.Mutex);
                    // 用信号通知一个空的缓冲区
                    V(SemaphoreEnum.Empty);
                }
            }).Start();
        }

        /// <summary>
        /// 生产者把新生产的产品放入缓冲区
        /// </summary>
        /// <returns>是否成功放入,没有空缓冲区的时候不成功</returns>
        private static bool AddToBuffer(int position, int product)
        {
            if (_buffer[position] != 0) return false;
            _buffer[position] = product;
            return true;
        }

        /// <summary>
        /// 获取一个空的缓冲区,都是满的则返回-1
        /// </summary>
        /// <returns>空缓冲区的编号</returns>
        private static int GetEmptyBuffer()
        {
            for (var i = 0; i < _buffer.Length; i++) {
                if (_buffer[i] == 0) {
                    return i;
                }
            }

            return -1;
        }

        /// <summary>
        /// 获取一个满的缓冲区,都是空的则返回-1
        /// </summary>
        /// <returns>满缓冲区的编号</returns>
        private static int GetFullBuffer()
        {
            for (var i = 0; i < _buffer.Length; i++) {
                if (_buffer[i] != 0) {
                    return i;
                }
            }

            return -1;
        }

        /// <summary>
        /// 消费者从缓冲区中取出一个产品
        /// </summary>
        /// <returns>产品id</returns>
        private static int TakeFromBuffer(int position)
        {
            var temp = _buffer[position];
            _buffer[position] = 0;
            return temp;
        }

        /// <summary>
        /// 申请资源操作
        /// </summary>
        /// <param name="s"></param>
        private static void P(SemaphoreEnum s)
        {
            switch (s) {
                case SemaphoreEnum.Mutex:
                    _isLock = true;
                    break;
                case SemaphoreEnum.Full:
                    break;
                case SemaphoreEnum.Empty:
                    break;
                default:
                    throw new ArgumentOutOfRangeException(nameof(s), s, null);
            }
        }

        /// <summary>
        /// 释放资源操作
        /// </summary>
        /// <param name="s"></param>
        private static void V(SemaphoreEnum s)
        {
            switch (s) {
                case SemaphoreEnum.Mutex:
                    _isLock = false;
                    break;
                case SemaphoreEnum.Full:
                    break;
                case SemaphoreEnum.Empty:
                    break;
                default:
                    throw new ArgumentOutOfRangeException(nameof(s), s, null);
            }
        }
    }
}

运行结果

生产线程 4 工作中
Produce the 1 product to buffer.
0: 1 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 1
生产线程 5 工作中
Produce the 2 product to buffer.
0: 2 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 2
生产线程 4 工作中
Produce the 3 product to buffer.
0: 3 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 3
生产线程 6 工作中
Produce the 4 product to buffer.
0: 4 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 4
生产线程 4 工作中
Produce the 5 product to buffer.
0: 5 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 5
生产线程 5 工作中
Produce the 6 product to buffer.
0: 6 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 7 product to buffer.
0: 6 <- 下一个可取出产品消费的地方
1: 7
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 6
消费者线程 7 工作
正在消费产品 7
生产线程 6 工作中
Produce the 8 product to buffer.
0: 8 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 5 工作中
Produce the 9 product to buffer.
0: 8 <- 下一个可取出产品消费的地方
1: 9
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 10 product to buffer.
0: 8 <- 下一个可取出产品消费的地方
1: 9
2: 10
3: 0 <- 可放下一个产品的位置
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 8
消费者线程 7 工作
正在消费产品 9
消费者线程 7 工作
正在消费产品 10
生产线程 6 工作中
Produce the 11 product to buffer.
0: 11 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 12 product to buffer.
0: 11 <- 下一个可取出产品消费的地方
1: 12
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 11
消费者线程 7 工作
正在消费产品 12
生产线程 5 工作中
Produce the 13 product to buffer.
0: 13 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 6 工作中
Produce the 14 product to buffer.
0: 13 <- 下一个可取出产品消费的地方
1: 14
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 15 product to buffer.
0: 13 <- 下一个可取出产品消费的地方
1: 14
2: 15
3: 0 <- 可放下一个产品的位置
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 13
消费者线程 7 工作
正在消费产品 14
消费者线程 7 工作
正在消费产品 15
生产线程 5 工作中
Produce the 16 product to buffer.
0: 16 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 6 工作中
Produce the 17 product to buffer.
0: 16 <- 下一个可取出产品消费的地方
1: 17
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 18 product to buffer.
0: 16 <- 下一个可取出产品消费的地方
1: 17
2: 18
3: 0 <- 可放下一个产品的位置
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 16
消费者线程 7 工作
正在消费产品 17
消费者线程 7 工作
正在消费产品 18
生产线程 5 工作中
Produce the 19 product to buffer.
0: 19 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 20 product to buffer.
0: 19 <- 下一个可取出产品消费的地方
1: 20
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 19
消费者线程 7 工作
正在消费产品 20
生产线程 5 工作中
Produce the 21 product to buffer.
0: 21 <- 下一个可取出产品消费的地方
1: 0 <- 可放下一个产品的位置
2: 0
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
生产线程 4 工作中
Produce the 22 product to buffer.
0: 21 <- 下一个可取出产品消费的地方
1: 22
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0
消费者线程 7 工作
正在消费产品 21
生产线程 6 工作中
Produce the 23 product to buffer.
0: 23 <- 下一个可取出产品消费的地方
1: 22
2: 0 <- 可放下一个产品的位置
3: 0
4: 0
5: 0
6: 0
7: 0
8: 0
9: 0

欢迎与我交流