21xrx.com
2025-06-06 13:55:44 Friday
文章检索 我的文章 写文章
C++进程池头文件
2023-07-08 15:14:24 深夜i     19     0
C++ 进程池 头文件 并发 线程池

C++进程池是一个常见的多线程编程技术,它通过复用进程来提高程序的性能和稳定性。进程池管理着一组可复用的进程,这些进程在程序启动时就创建好并一直运行着,等待任务的到来。当任务到来时,进程池从中挑选一个进程来处理任务,任务处理完毕后进程归还给进程池,等待下一个任务。

为了方便使用进程池,通常需要封装一些头文件,来帮助用户使用。下面是一个简单的C++进程池头文件示例:

#ifndef PROCESS_POOL_H
#define PROCESS_POOL_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/wait.h>
#include <iostream>
#include <vector>
#include <exception>
using namespace std;
class process {
public:
  process() : m_pid(-1) {}
public:
  pid_t m_pid;
  int m_pipefd[2];
};
template<typename T>
class process_pool {
private:
  process_pool(int listenfd, int process_number = 8);
public:
  static process_pool<T>* create(int listenfd, int process_number = 8) {
    if (!m_instance) {
      m_instance = new process_pool<T>(listenfd, process_number);
    }
    return m_instance;
  }
  ~process_pool() {
    delete[] m_sub_process;
  }
  void run();
private:
  void setup_sig_pipe();
  void run_parent();
  void run_child();
private:
  static const int MAX_PROCESS_NUMBER = 16;
  static const int USER_PER_PROCESS = 65536;
  static const int MAX_EVENT_NUMBER = 10000;
  int m_process_number;
  int m_idx;
  int m_epollfd;
  int m_listenfd;
  int m_stop;
  vector<T*> m_users;
  vector<process> m_sub_process;
  static process_pool<T>* m_instance;
};
template<typename T>
process_pool<T>* process_pool<T>::m_instance = NULL;
static int setnonblocking(int fd) {
  int old_option = fcntl(fd, F_GETFL);
  int new_option = old_option | O_NONBLOCK;
  fcntl(fd, F_SETFL, new_option);
  return old_option;
}
static void addfd(int epollfd, int fd) {
  epoll_event event;
  event.data.fd = fd;
  event.events = EPOLLIN | EPOLLET;
  epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
  setnonblocking(fd);
}
static void removefd(int epollfd, int fd) {
  epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
  close(fd);
}
static void sig_handler(int sig) {
  int save_errno = errno;
  int msg = sig;
  send(m_sig_pipefd[1], (char*)&msg, 1, 0);
  errno = save_errno;
}
static void addsig(int sig) {
  struct sigaction sa;
  memset(&sa, '\0', sizeof(sa));
  sa.sa_handler = sig_handler;
  sa.sa_flags |= SA_RESTART;
  sigfillset(&sa.sa_mask);
  assert(sigaction(sig, &sa, NULL) != -1);
}
template<typename T>
process_pool<T>::process_pool(int listenfd, int process_number) :
    m_listenfd(listenfd), m_process_number(process_number), m_idx(-1), m_stop(false){
  assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));
  m_sub_process.resize(process_number);
  for (int i = 0; i < process_number; ++i) {
    int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);
    assert(ret != -1);
    m_sub_process[i].m_pid = fork();
    assert(m_sub_process[i].m_pid >= 0);
    if (m_sub_process[i].m_pid > 0) {
      close(m_sub_process[i].m_pipefd[1]);
      continue;
    }
    else {
      close(m_sub_process[i].m_pipefd[0]);
      m_idx = i;
      break;
    }
  }
}
template<typename T>
void process_pool<T>::setup_sig_pipe() {
  m_epollfd = epoll_create(5);
  assert(m_epollfd != -1);
  int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, m_sig_pipefd);
  assert(ret != -1);
  setnonblocking(m_sig_pipefd[1]);
  addfd(m_epollfd, m_sig_pipefd[0]);
  addsig(SIGCHLD);
  addsig(SIGTERM);
  addsig(SIGINT);
  addsig(SIGPIPE);
}
template<typename T>
void process_pool<T>::run() {
  if (m_idx != -1) {
    run_child();
    return;
  }
  run_parent();
}
template<typename T>
void process_pool<T>::run_child() {
  setup_sig_pipe();
  int pipefd = m_sub_process[m_idx].m_pipefd[1];
  addfd(m_epollfd, pipefd);
  epoll_event events[MAX_EVENT_NUMBER];
  T* users = new T[USER_PER_PROCESS];
  assert(users);
  int number = 0;
  int ret = -1;
  while (!m_stop) {
    number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
    if ((number < 0) && (errno != EINTR)) {
      cout << "epoll failure" << endl;
      break;
    }
    for (int i = 0; i < number; ++i) {
      int sockfd = events[i].data.fd;
      if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) {
        int client = 0;
        ret = recv(sockfd, (char*)&client, sizeof(client), 0);
        if (((ret < 0) && (errno != EAGAIN)) || ret == 0) {
          continue;
        }
        else {
          struct sockaddr_in client_address;
          socklen_t client_addrlength = sizeof(client_address);
          int connfd = accept(m_listenfd, (struct sockaddr*)&client_address, &client_addrlength);
          if (connfd < 0) {
            cout << "accept error: " << errno << endl;
            continue;
          }
          addfd(m_epollfd, connfd);
          users[connfd].init(m_epollfd, connfd, client_address);
        }
      }
      else if ((sockfd == m_sig_pipefd[0]) && (events[i].events & EPOLLIN)) {
        int sig;
        char signals[1024];
        ret = recv(m_sig_pipefd[0], signals, sizeof(signals), 0);
        if (ret < 0) {
          continue;
        }
        else if (ret == 0) {
          continue;
        }
        else {
          for (int i = 0; i < ret; ++i) {
            switch (signals[i]) {
              case SIGCHLD:{
                pid_t pid;
                int stat;
                while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
                  continue;
                }
                break;
              }
              case SIGTERM:
              case SIGINT: {
                m_stop = true;
                break;
              }
              default: {
                break;
              }
            }
          }
        }
      }
      else if (events[i].events & EPOLLIN) {
        users[sockfd].process();
      }
      else {
        continue;
      }
    }
  }
  delete[] users;
  users = NULL;
  close(pipefd);
  close(m_epollfd);
  exit(0);
}
template<typename T>
void process_pool<T>::run_parent() {
  setup_sig_pipe();
  addfd(m_epollfd, m_listenfd);
  epoll_event events[MAX_EVENT_NUMBER];
  int sub_process_counter = 0;
  int new_conn = 1;
  int number = 0;
  int ret = -1;
  while (!m_stop) {
    number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
    if ((number < 0) && (errno != EINTR)) {
      cout << "epoll failure" << endl;
      break;
    }
    for (int i = 0; i < number; ++i) {
      int sockfd = events[i].data.fd;
      if (sockfd == m_listenfd) {
        int i = sub_process_counter;
        do {
          if (m_sub_process[i].m_pid != -1) {
            break;
          }
          i = (i + 1) % m_process_number;
        } while (i != sub_process_counter);
        if (m_sub_process[i].m_pid == -1) {
          m_stop = true;
          break;
        }
        sub_process_counter = (i + 1) % m_process_number;
        send(m_sub_process[i].m_pipefd[0], (char*)&new_conn, sizeof(new_conn), 0);
        cout << "send request to child " << i << endl;
      }
      else if ((sockfd == m_sig_pipefd[0]) && (events[i].events & EPOLLIN)) {
        int sig;
        char signals[1024];
        ret = recv(m_sig_pipefd[0], signals, sizeof(signals), 0);
        if (ret <= 0) {
          continue;
        }
        else {
          for (int i = 0; i < ret; ++i) {
            switch (signals[i]) {
              case SIGCHLD:{
                pid_t pid;
                int stat;
                while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
                  for (int i = 0; i < m_process_number; ++i) {
                    if (m_sub_process[i].m_pid == pid) {
                      cout << "child " << i << " join" << endl;
                      close(m_sub_process[i].m_pipefd[0]);
                      m_sub_process[i].m_pid = -1;
                    }
                  }
                }
                m_stop = true;
                for (int i = 0; i < m_process_number; ++i) {
                  if (m_sub_process[i].m_pid != -1) {
                    m_stop = false;
                  }
                }
                break;
              }
              case SIGTERM:
              case SIGINT: {
                cout << "kill all the child now" << endl;
                for (int i = 0; i < m_process_number; ++i) {
                  int pid = m_sub_process[i].m_pid;
                  if (pid != -1) {
                    kill(pid, SIGTERM);
                  }
                }
                break;
              }
              default: {
                break;
              }
            }
          }
        }
      }
      else {
        continue;
      }
    }
  }
  close(m_epollfd);
}
#endif

这个头文件包含了C++进程池实现所需要的各种头文件和常量定义。同时还提供了`process`和`process_pool`两个类,用于管理进程池的各个进程。

在使用C++进程池时,只需要包含这个头文件,并调用相关的函数即可。例如:

#include "process_pool.h"
int main() {
  int listenfd = socket(AF_INET, SOCK_STREAM, 0);
  // ... bind, listen ...
  process_pool<your_class> *pool = process_pool<your_class>::create(listenfd);
  if (pool) {
    pool->run();
    delete pool;
  }
  close(listenfd);
  return 0;
}

这里的`your_class`是用户自定义的类,它必须继承自`process`类,并实现`process`类的各个方法。这样,进程池才能正确的处理用户的请求。

  
  

评论区