004 10-web服务器-并发服务器2
10.1. Web静态服务器-5-非堵塞模式¶
Web静态服务器-5-非堵塞模式¶
单进程非堵塞 模型¶
课程代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | import socket import time tcp_server_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #建立tcp服务端 tcp_server_tcp.bind(("", 7899)) #绑定端口 tcp_server_tcp.listen(128) #设置监听 tcp_server_tcp.setblocking(False) # 设置套接字为非堵塞的方式,服务端设置为费堵塞 client_socket_list = list() #建立客户端列表,如果连客户端都没有那么不管怎么样你都接收不到数据,你try也不行,因此使用列表 while True: # time.sleep(0.5) try: #循环接收客户端的请求,如果没有新的客户端就产生异常就好了 new_socket, new_addr = tcp_server_tcp.accept() except Exception as ret: #产生异常 print("---没有新的客户端到来---") else: #如果有新的客户端到来了 print("---只要没有产生异常,那么也就意味着 来了一个新的客户端----") new_socket.setblocking(False) # 设置套接字为非堵塞的方式(True就是默认堵塞),客户端也要设置成费堵塞的方式 #如果不设置客户端是费堵塞的话那么只有客户端接收到数据才能解堵塞是不行的 client_socket_list.append(new_socket) #客户端来了那就存储到服务列表中 for client_socket in client_socket_list: #为客户端进行服务 try: recv_data = client_socket.recv(1024) except Exception as ret: print(ret) print("----这个客户端没有发送过来数据----") else: #try执行是正确的才会进入else print("-----没有异常-----") print(recv_data) if recv_data: # 对方发送过来数据 print("----客户端发送过来了数据-----") else: # 对方调用close 导致了 recv返回 client_socket.close() client_socket_list.remove(client_socket) #服务完成的客户端就给关闭 print("---客户端已经关闭----") |
详细代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | #coding=utf-8 from socket import * import time # 用来存储所有的新链接的socket g_socket_list = list() def main(): server_socket = socket(AF_INET, SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) server_socket.bind(('', 7890)) server_socket.listen(128) # 将套接字设置为非堵塞 # 设置为非堵塞后,如果accept时,恰巧没有客户端connect,那么accept会 # 产生一个异常,所以需要try来进行处理 server_socket.setblocking(False) while True: # 用来测试 time.sleep(0.5) try: newClientInfo = server_socket.accept() except Exception as result: pass else: print("一个新的客户端到来:%s" % str(newClientInfo)) newClientInfo[0].setblocking(False) # 设置为非堵塞 g_socket_list.append(newClientInfo) for client_socket, client_addr in g_socket_list: try: recvData = client_socket.recv(1024) if recvData: print('recv[%s]:%s' % (str(client_addr), recvData)) else: print('[%s]客户端已经关闭' % str(client_addr)) client_socket.close() g_socket_list.remove((client_socket,client_addr)) except Exception as result: pass print(g_socket_list) # for test if __name__ == '__main__': main() |
web静态服务器-单进程非堵塞(长链接)¶
上课版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | import socket import re def service_client(new_socket, request): """为这个客户端返回数据""" # 1. 接收浏览器发送过来的请求 ,即http请求 # GET / HTTP/1.1 # ..... # request = new_socket.recv(1024).decode("utf-8") # print(">>>"*50) # print(request) request_lines = request.splitlines() print("") print(">"*20) print(request_lines) # GET /index.html HTTP/1.1 # get post put del file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) if ret: file_name = ret.group(1) # print("*"*50, file_name) if file_name == "/": file_name = "/index.html" # 2. 返回http格式的数据,给浏览器 try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "------file not found-----" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close() response_body = html_content response_header = "HTTP/1.1 200 OK\r\n" response_header += "Content-Length:%d\r\n" % len(response_body) #告诉客户端接收多长的body,这样浏览器接收到了就不用一直转圈了 response_header += "\r\n" response = response_header.encode("utf-8") + response_body new_socket.send(response) #关闭套接 # new_socket.close()#这里不再关闭,放大主函数里进行关闭实现长链接 def main(): """用来完成整体的控制""" # 1. 创建套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #创建服务端 tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #设置关闭链接还是啥来着 # 2. 绑定 tcp_server_socket.bind(("", 7890)) #绑定端口 # 3. 变为监听套接字 tcp_server_socket.listen(128) tcp_server_socket.setblocking(False) # 将套接字变为非堵塞 client_socket_list = list() #创建服务列表 while True: # 4. 等待新客户端的链接 try: new_socket, client_addr = tcp_server_socket.accept() except Exception as ret: pass else: new_socket.setblocking(False) #设置客户端非堵塞 client_socket_list.append(new_socket) #添加到列表 for client_socket in client_socket_list: try: recv_data = client_socket.recv(1024).decode("utf-8") #接收数据 except Exception as ret: pass else: if recv_data: service_client(client_socket, recv_data) else:#所有的数据请求完了在这里将客户端关闭 client_socket.close() client_socket_list.remove(client_socket) # 关闭监听套接字 tcp_server_socket.close() if __name__ == "__main__": main() |
详细版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | import time import socket import sys import re class WSGIServer(object): """定义一个WSGI服务器的类""" def __init__(self, port, documents_root): # 1. 创建套接字 self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 2. 绑定本地信息 self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(("", port)) # 3. 变为监听套接字 self.server_socket.listen(128) self.server_socket.setblocking(False) self.client_socket_list = list() self.documents_root = documents_root def run_forever(self): """运行服务器""" # 等待对方链接 while True: # time.sleep(0.5) # for test try: new_socket, new_addr = self.server_socket.accept() except Exception as ret: print("-----1----", ret) # for test else: new_socket.setblocking(False) self.client_socket_list.append(new_socket) for client_socket in self.client_socket_list: try: request = client_socket.recv(1024).decode('utf-8') except Exception as ret: print("------2----", ret) # for test else: if request: self.deal_with_request(request, client_socket) else: client_socket.close() self.client_socket_list.remove(client_socket) print(self.client_socket_list) def deal_with_request(self, request, client_socket): """为这个浏览器服务器""" if not request: return request_lines = request.splitlines() for i, line in enumerate(request_lines): print(i, line) # 提取请求的文件(index.html) # GET /a/b/c/d/e/index.html HTTP/1.1 ret = re.match(r"([^/]*)([^ ]+)", request_lines[0]) if ret: print("正则提取数据:", ret.group(1)) print("正则提取数据:", ret.group(2)) file_name = ret.group(2) if file_name == "/": file_name = "/index.html" # 读取文件数据 try: f = open(self.documents_root+file_name, "rb") except: response_body = "file not found, 请输入正确的url" response_header = "HTTP/1.1 404 not found\r\n" response_header += "Content-Type: text/html; charset=utf-8\r\n" response_header += "Content-Length: %d\r\n" % (len(response_body)) response_header += "\r\n" # 将header返回给浏览器 client_socket.send(response_header.encode('utf-8')) # 将body返回给浏览器 client_socket.send(response_body.encode("utf-8")) else: content = f.read() f.close() response_body = content response_header = "HTTP/1.1 200 OK\r\n" response_header += "Content-Length: %d\r\n" % (len(response_body)) response_header += "\r\n" # 将header返回给浏览器 client_socket.send( response_header.encode('utf-8') + response_body) # 设置服务器服务静态资源时的路径 DOCUMENTS_ROOT = "./html" def main(): """控制web服务器整体""" # python3 xxxx.py 7890 if len(sys.argv) == 2: port = sys.argv[1] if port.isdigit(): port = int(port) else: print("运行方式如: python3 xxx.py 7890") return print("http服务器使用的port:%s" % port) http_server = WSGIServer(port, DOCUMENTS_ROOT) http_server.run_forever() if __name__ == "__main__": main() |
10.2. Web静态服务器-6-epoll¶
Web静态服务器-6-epoll¶
IO 多路复用¶
就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。
select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。
它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
课上实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | import socket import re import select #导入select包 def service_client(new_socket, request): """为这个客户端返回数据""" # 1. 接收浏览器发送过来的请求 ,即http请求 # GET / HTTP/1.1 # ..... # request = new_socket.recv(1024).decode("utf-8") # print(">>>"*50) # print(request) request_lines = request.splitlines() print("") print(">"*20) print(request_lines) # GET /index.html HTTP/1.1 # get post put del file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) if ret: file_name = ret.group(1) # print("*"*50, file_name) if file_name == "/": file_name = "/index.html" # 2. 返回http格式的数据,给浏览器 try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "------file not found-----" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close() response_body = html_content response_header = "HTTP/1.1 200 OK\r\n" response_header += "Content-Length:%d\r\n" % len(response_body) response_header += "\r\n" response = response_header.encode("utf-8") + response_body new_socket.send(response) def main(): """用来完成整体的控制""" # 1. 创建套接字 tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 2. 绑定 tcp_server_socket.bind(("", 7890)) # 3. 变为监听套接字 tcp_server_socket.listen(128) tcp_server_socket.setblocking(False) # 将套接字变为非堵塞 # 创建一个epoll对象(与操作系统共享) epl = select.epoll() #使用select的epoll类创建一个epoll对象 # 将监听套接字对应的fd(文件标志符)注册到epoll中 epl.register(tcp_server_socket.fileno(), select.EPOLLIN) #fileno()文件标识符 select.EPOLLIN标识的是检测这个服务端是否有输入 fd_event_dict = dict() while True: fd_event_list = epl.poll() # 默认会堵塞,直到 os监测到数据到来 通过事件通知方式 告诉这个程序,此时才会解堵塞 # [(fd, event), (套接字对应的文件描述符, 这个文件描述符到底是什么事件 例如 可以调用recv接收等)](返回的是一个列表将多个等待处理的事件一块传递) for fd, event in fd_event_list:#(动作的套接字会返回到这个list中每一个list是一个元祖(元祖中是标志符和事件)) # 等待新客户端的链接(监听套接字动作走的是if,进行添加新的客户端) if fd == tcp_server_socket.fileno(): #标志符是监听套接字的那么就要接受客户端 new_socket, client_addr = tcp_server_socket.accept() epl.register(new_socket.fileno(), select.EPOLLIN) #接收到的新的客户端也需要进行注册(这个主要是监听套接字的) fd_event_dict[new_socket.fileno()] = new_socket #存储到字典中,将文件标志符与客户端进行对应 elif event==select.EPOLLIN: #这里就不是判断标志符了,而是判断有没有数据的到来(这里不能使用文件标志符,不能判断是哪个客户端) #客户端进行传数据的时候走的是这个else,因为没有新的客户端进行链接,而是客户端进行数据的传输 # 判断已经链接的客户端是否有数据发送过来 recv_data = fd_event_dict[fd].recv(1024).decode("utf-8") #从字典中取出这个客户端进行服务 if recv_data:#有数据到来 service_client(fd_event_dict[fd], recv_data) else: #结束了链接 fd_event_dict[fd].close() #关闭客户端的链接 epl.unregister(fd) #取消注册 del fd_event_dict[fd] #删除字典中的键值对 # 关闭监听套接字 tcp_server_socket.close() if __name__ == "__main__": main() |
epoll简单模型¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | import socket import select # 创建套接字 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置可以重复使用绑定的信息 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) # 绑定本机信息 s.bind(("",7788)) # 变为被动 s.listen(10) # 创建一个epoll对象 epoll = select.epoll() # 测试,用来打印套接字对应的文件描述符 # print(s.fileno()) # print(select.EPOLLIN|select.EPOLLET) # 注册事件到epoll中 # epoll.register(fd[, eventmask]) # 注意,如果fd已经注册过,则会发生异常 # 将创建的套接字添加到epoll的事件监听中 epoll.register(s.fileno(), select.EPOLLIN|select.EPOLLET) connections = {} addresses = {} # 循环等待客户端的到来或者对方发送数据 while True: # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待 epoll_list = epoll.poll() # 对事件进行判断 for fd, events in epoll_list: # print fd # print events # 如果是socket创建的套接字被激活 if fd == s.fileno(): new_socket, new_addr = s.accept() print('有新的客户端到来%s' % str(new_addr)) # 将 conn 和 addr 信息分别保存起来 connections[new_socket.fileno()] = new_socket addresses[new_socket.fileno()] = new_addr # 向 epoll 中注册 新socket 的 可读 事件 epoll.register(new_socket.fileno(), select.EPOLLIN|select.EPOLLET) # 如果是客户端发送数据 elif events == select.EPOLLIN: # 从激活 fd 上接收 recvData = connections[fd].recv(1024).decode("utf-8") if recvData: print('recv:%s' % recvData) else: # 从 epoll 中移除该 连接 fd epoll.unregister(fd) # server 侧主动关闭该 连接 fd connections[fd].close() print("%s---offline---" % str(addresses[fd])) del connections[fd] del addresses[fd] |
说明¶
- EPOLLIN (可读)
- EPOLLOUT (可写)
- EPOLLET (ET模式)
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
1 2 3 | LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。 ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。 |
web静态服务器-epool¶
以下代码,支持http的长连接,即使用了Content-Length
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import socket import time import sys import re import select class WSGIServer(object): """定义一个WSGI服务器的类""" def __init__(self, port, documents_root): # 1. 创建套接字 self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 2. 绑定本地信息 self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind(("", port)) # 3. 变为监听套接字 self.server_socket.listen(128) self.documents_root = documents_root # 创建epoll对象 self.epoll = select.epoll() # 将tcp服务器套接字加入到epoll中进行监听 self.epoll.register(self.server_socket.fileno(), select.EPOLLIN|select.EPOLLET) # 创建添加的fd对应的套接字 self.fd_socket = dict() def run_forever(self): """运行服务器""" # 等待对方链接 while True: # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待 epoll_list = self.epoll.poll() # 对事件进行判断 for fd, event in epoll_list: # 如果是服务器套接字可以收数据,那么意味着可以进行accept if fd == self.server_socket.fileno(): new_socket, new_addr = self.server_socket.accept() # 向 epoll 中注册 连接 socket 的 可读 事件 self.epoll.register(new_socket.fileno(), select.EPOLLIN | select.EPOLLET) # 记录这个信息 self.fd_socket[new_socket.fileno()] = new_socket # 接收到数据 elif event == select.EPOLLIN: request = self.fd_socket[fd].recv(1024).decode("utf-8") if request: self.deal_with_request(request, self.fd_socket[fd]) else: # 在epoll中注销客户端的信息 self.epoll.unregister(fd) # 关闭客户端的文件句柄 self.fd_socket[fd].close() # 在字典中删除与已关闭客户端相关的信息 del self.fd_socket[fd] def deal_with_request(self, request, client_socket): """为这个浏览器服务器""" if not request: return request_lines = request.splitlines() for i, line in enumerate(request_lines): print(i, line) # 提取请求的文件(index.html) # GET /a/b/c/d/e/index.html HTTP/1.1 ret = re.match(r"([^/]*)([^ ]+)", request_lines[0]) if ret: print("正则提取数据:", ret.group(1)) print("正则提取数据:", ret.group(2)) file_name = ret.group(2) if file_name == "/": file_name = "/index.html" # 读取文件数据 try: f = open(self.documents_root+file_name, "rb") except: response_body = "file not found, 请输入正确的url" response_header = "HTTP/1.1 404 not found\r\n" response_header += "Content-Type: text/html; charset=utf-8\r\n" response_header += "Content-Length: %d\r\n" % len(response_body) response_header += "\r\n" # 将header返回给浏览器 client_socket.send(response_header.encode('utf-8')) # 将body返回给浏览器 client_socket.send(response_body.encode("utf-8")) else: content = f.read() f.close() response_body = content response_header = "HTTP/1.1 200 OK\r\n" response_header += "Content-Length: %d\r\n" % len(response_body) response_header += "\r\n" # 将数据返回给浏览器 client_socket.send(response_header.encode("utf-8")+response_body) # 设置服务器服务静态资源时的路径 DOCUMENTS_ROOT = "./html" def main(): """控制web服务器整体""" # python3 xxxx.py 7890 if len(sys.argv) == 2: port = sys.argv[1] if port.isdigit(): port = int(port) else: print("运行方式如: python3 xxx.py 7890") return print("http服务器使用的port:%s" % port) http_server = WSGIServer(port, DOCUMENTS_ROOT) http_server.run_forever() if __name__ == "__main__": main() |
小总结¶
I/O 多路复用的特点:
通过一种机制使一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,epoll()函数就可以返回。 所以, IO多路复用,本质上不会有并发的功能,因为任何时候还是只有一个进程或线程进行工作,它之所以能提高效率是因为select\epoll 把进来的socket放到他们的 '监视' 列表里面,当任何socket有可读可写数据立马处理,那如果select\epoll 手里同时检测着很多socket, 一有动静马上返回给进程处理,总比一个一个socket过来,阻塞等待,处理高效率。
当然也可以多线程/多进程方式,一个连接过来开一个进程/线程处理,这样消耗的内存和进程切换页会耗掉更多的系统资源。 所以我们可以结合IO多路复用和多进程/多线程 来高性能并发,IO复用负责提高接受socket的通知效率,收到请求后,交给进程池/线程池来处理逻辑。
参考资料¶
- 如果想了解下epoll在Linux中的实现过程可以参考:http://blog.csdn.net/xiajun07061225/article/details/9250579
10.4. 知识扩展-C10K问题¶
知识扩展-C10K问题¶
参考文章 :
《单台服务器并发TCP连接数到底可以有多少》 http://www.52im.net/thread-561-1-1.html
《上一个10年,著名的C10K并发连接问题》 http://www.52im.net/thread-566-1-1.html