目录
1.master进程监听socket
2.master和worker进程通信机制
2.1通信渠道
2.2通信方法
2.3通信内容
2.4子进程事件处理
3.epoll封装
4.linux系统下信号查看
1.master进程监听socket
nginx在master进程socket bind listen,accept在通过epoll在子进程中控制,代码如下:
ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
...
s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0);
...
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int))
== -1)
...
if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) {
...
if (listen(s, ls[i].backlog) == -1) {
...
}
调用堆栈如下:
ngx_open_listening_sockets(ngx_cycle_t * cycle) (nginx-1.22.1/src/core/ngx_connection.c:604)
ngx_init_cycle(ngx_cycle_t * old_cycle) (nginx-1.22.1/src/core/ngx_cycle.c:618)
main(int argc, char * const * argv) (nginx-1.22.1/src/core/nginx.c:292)
2.master和worker进程通信机制
2.1通信渠道
创建套接字组用于读和写,master用fd[0]写,worker用fd[1]进行读
socketpair
2.2通信方法
master用sendmsg发送,work用recvmsg接收
#include <sys/types.h>
#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
2.3通信内容
struct msghdr
2.4子进程事件处理
子进程注册fd[1] -> ngx_channel读事件到epoll上
static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
...
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler)
== NGX_ERROR)
{
/* fatal */
exit(2);
}
}ngx_channel在如下函数进行赋值,为fd[1]
ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
char *name, ngx_int_t respawn)
{
...
ngx_channel = ngx_processes[s].channel[1];
...
}
3.epoll封装
文件:
nginx-1.22.1/src/event/modules/ngx_epoll_module.c
结构:
static ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */
{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};
4.linux系统下信号查看
kill -l
5.信号处理模拟
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <stdint.h>int parentProcess(int* fds, int num) {if (num < 2) {return -1;}close(fds[0]); // 关闭fds[0] 使用fds[1]读写,子进程中关闭fds[1] 使用fds[0]读写char buf[128] {0};char *pStr = (char*)"hello child, I am parent";int inum = 0;while (inum++ < 3) {memset(buf, 0x00, sizeof(buf));//写write(fds[1], pStr, strlen(pStr));//读read(fds[1], buf, 128);printf("parent [%d] %d :: %s\n", getpid(), inum, buf);sleep(1);}close(fds[1]);return 0;
}int childProcess(int* fds, int num) {if (num < 2) {return -1;}close(fds[1]);char buf[128] {0};char *pStr = (char*)"hello parent, I am child";char sendBuf[128] = {0};sprintf(sendBuf, "hello parent, I am child %d", getpid());int inum = 0;while (inum++ < 3) {memset(buf, 0x00, sizeof(buf));//读read(fds[0], buf, 128);printf("child [%d] %d :: %s\n", getpid(), inum, buf);//写write(fds[0], sendBuf, strlen(sendBuf));sleep(1);}close(fds[0]);return 0;
}//设置文件描述符非阻塞
int fd_nonblocking(int s)
{int nb;nb = 1;//方法一/*int flag = fcntl(s, F_GETFL);flag |= O_NONBLOCK;return fcntl(s, F_SETFL, flag);*///方法二return ioctl(s, FIONBIO, &nb);
}int close_channel(int* fds) {if (close(fds[0]) == -1) {printf("close() channel fds[0] failed\n");}if (close(fds[1]) == -1) {printf("close() channel fds[1] failed\n");}return 0;
}void testNonblockingSocketFd(int* fds) {if (-1 == fd_nonblocking(fds[0])) {printf("fd_nonblocking fds[0] failed\n");close_channel(fds);}if (-1 == fd_nonblocking(fds[1])) {printf("fd_nonblocking fds[1] failed\n");close_channel(fds);}
}struct TDataExchangeChannel {int fds[2];
};#define GROUP_NUM 2typedef struct {int signo;char *signame;char *name;void (*handler)(int signo, siginfo_t *siginfo, void *ucontext);
} ngx_signal_t;#define ngx_signal_helper(n) SIG##n
#define ngx_signal_value(n) ngx_signal_helper(n)#define ngx_value_helper(n) #n
#define ngx_value(n) ngx_value_helper(n)#define NGX_SHUTDOWN_SIGNAL QUIT
#define NGX_TERMINATE_SIGNAL TERM
#define NGX_NOACCEPT_SIGNAL WINCH
#define NGX_RECONFIGURE_SIGNAL HUP#if (NGX_LINUXTHREADS)
#define NGX_REOPEN_SIGNAL INFO
#define NGX_CHANGEBIN_SIGNAL XCPU
#else
#define NGX_REOPEN_SIGNAL USR1
#define NGX_CHANGEBIN_SIGNAL USR2
#endif#define ngx_errno errnotypedef intptr_t ngx_int_t;
typedef int ngx_err_t;
typedef uintptr_t ngx_uint_t;#define NGX_PROCESS_SINGLE 0
#define NGX_PROCESS_MASTER 1
#define NGX_PROCESS_SIGNALLER 2
#define NGX_PROCESS_WORKER 3
#define NGX_PROCESS_HELPER 4#define NGX_OK 0
#define NGX_ERROR -1
#define NGX_AGAIN -2
#define NGX_BUSY -3
#define NGX_DONE -4
#define NGX_DECLINED -5
#define NGX_ABORT -6ngx_uint_t ngx_process;void
ngx_signal_handler(int signo, siginfo_t *siginfo, void *ucontext);ngx_signal_t signals[] = {{ ngx_signal_value(NGX_RECONFIGURE_SIGNAL),"SIG" ngx_value(NGX_RECONFIGURE_SIGNAL),"reload",ngx_signal_handler },{ ngx_signal_value(NGX_REOPEN_SIGNAL),"SIG" ngx_value(NGX_REOPEN_SIGNAL),"reopen",ngx_signal_handler },{ ngx_signal_value(NGX_NOACCEPT_SIGNAL),"SIG" ngx_value(NGX_NOACCEPT_SIGNAL),"",ngx_signal_handler },{ ngx_signal_value(NGX_TERMINATE_SIGNAL),"SIG" ngx_value(NGX_TERMINATE_SIGNAL),"stop",ngx_signal_handler },{ ngx_signal_value(NGX_SHUTDOWN_SIGNAL),"SIG" ngx_value(NGX_SHUTDOWN_SIGNAL),"quit",ngx_signal_handler },{ ngx_signal_value(NGX_CHANGEBIN_SIGNAL),"SIG" ngx_value(NGX_CHANGEBIN_SIGNAL),"",ngx_signal_handler },{ SIGALRM, "SIGALRM", "", ngx_signal_handler },{ SIGINT, "SIGINT", "", ngx_signal_handler },{ SIGIO, "SIGIO", "", ngx_signal_handler },{ SIGCHLD, "SIGCHLD", "", ngx_signal_handler },{ SIGSYS, "SIGSYS, SIG_IGN", "", NULL },{ SIGPIPE, "SIGPIPE, SIG_IGN", "", NULL },{ 0, NULL, "", NULL }
};void
ngx_signal_handler(int signo, siginfo_t *siginfo, void *ucontext)
{char *action;ngx_int_t ignore;ngx_err_t err;ngx_signal_t *sig;ignore = 0;err = ngx_errno;printf("signo = [%d]\n", signo);for (sig = signals; sig->signo != 0; sig++) {if (sig->signo == signo) {break;}}action = "";switch (ngx_process) {case NGX_PROCESS_MASTER:case NGX_PROCESS_SINGLE:switch (signo) {case ngx_signal_value(NGX_SHUTDOWN_SIGNAL)://ngx_quit = 1;action = ", shutting down";break;case ngx_signal_value(NGX_TERMINATE_SIGNAL):case SIGINT://ngx_terminate = 1;action = ", exiting";break;case ngx_signal_value(NGX_NOACCEPT_SIGNAL):/*if (ngx_daemonized) {ngx_noaccept = 1;action = ", stop accepting connections";}*/break;case ngx_signal_value(NGX_RECONFIGURE_SIGNAL)://ngx_reconfigure = 1;action = ", reconfiguring";break;case ngx_signal_value(NGX_REOPEN_SIGNAL)://ngx_reopen = 1;action = ", reopening logs";break;case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):/*if (ngx_getppid() == ngx_parent || ngx_new_binary > 0) {action = ", ignoring";ignore = 1;break;}ngx_change_binary = 1;*/action = ", changing binary";break;case SIGALRM://ngx_sigalrm = 1;break;case SIGIO://ngx_sigio = 1;break;case SIGCHLD://ngx_reap = 1;break;}break;case NGX_PROCESS_WORKER:case NGX_PROCESS_HELPER:switch (signo) {case ngx_signal_value(NGX_NOACCEPT_SIGNAL):/*if (!ngx_daemonized) {break;}ngx_debug_quit = 1;*//* fall through */case ngx_signal_value(NGX_SHUTDOWN_SIGNAL)://ngx_quit = 1;action = ", shutting down";break;case ngx_signal_value(NGX_TERMINATE_SIGNAL):case SIGINT://ngx_terminate = 1;action = ", exiting";break;case ngx_signal_value(NGX_REOPEN_SIGNAL)://ngx_reopen = 1;action = ", reopening logs";break;case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):case SIGIO:action = ", ignoring";break;}break;}/*if (siginfo && siginfo->si_pid) {printf("signal %d (%s) received from %P%s",signo, sig->signame, siginfo->si_pid, action);} else {printf("signal %d (%s) received%s",signo, sig->signame, action);}*/if (ignore) {printf("the changing binary signal is ignored: ""you should shutdown or terminate ""before either old or new binary's process");}printf("action = [%s]\n", action);
}ngx_int_t
ngx_init_signals()
{ngx_signal_t *sig;struct sigaction sa;for (sig = signals; sig->signo != 0; sig++) {memset(&sa, 0x00, sizeof(struct sigaction));if (sig->handler) {sa.sa_sigaction = sig->handler;sa.sa_flags = SA_SIGINFO;} else {sa.sa_handler = SIG_IGN;}sigemptyset(&sa.sa_mask);if (sigaction(sig->signo, &sa, NULL) == -1) {
#if (NGX_VALGRIND)printf("sigaction(%s) failed, ignored", sig->signame);
#elseprintf("sigaction(%s) failed", sig->signame);return NGX_ERROR;
#endif}}return NGX_OK;
}int main()
{//信号初始化ngx_init_signals();#ifdef NOBLOCKFDtestNonblockingSocketFd(fds);
#endifTDataExchangeChannel channels[GROUP_NUM];for (int i = 0; i < GROUP_NUM; i++) {socketpair(PF_UNIX, SOCK_STREAM, 0, channels[i].fds);int pid = fork();switch (pid) {case -1: // errorreturn -1;case 0: // childchildProcess(channels[i].fds, 2);printf("child exit \n");exit(0);default: // parentbreak;}}//父进程给子进程发送消息,并接收子进程发来的消息for (int i = 0; i < GROUP_NUM; i++) {parentProcess(channels[i].fds, 2);}while(1) {sleep(20);}return 0;
}
运行结果:
Linux下信号处理可以参考如下博客:
https://www.cnblogs.com/Alliswell-WP/p/CPlusPlus_Linux_07.html