写在前面
初次阅读此书是三年前,当时没经历过完整的项目 觉得这书就是扯淡 后来经历过项目加班与毒打 今天再翻开此书 觉得实乃不可多得之物 花些时间啃下来吧
django版本 3.2
本博客开源项目地址 kimsmith/django企业实战 (gitee.com)
有的代码因为版本混乱报错 自己调调
需求
需求文档
写文档,列举需要实现的功能,详细列举,不考虑技术实现细节
需求评审与分析
主要是将需求文档落实到技术细节,评审需求需要的技术栈,然后评审需求是否可实现,预估每个子需求的工作量等
此处可以考虑后续的衍生需求,考虑技术实现是否可行是否困难,技术需要的工作量等
功能分析
技术人员对需求评审的结果进行技术实现分析,模块划分等
模块划分可以基于数据实体制作ER图,或者建立UML图
模块划分
作用是将一个大项目分成几个小模块,让手下的人去按模块开发
框架基础和技术选型
需要选择 语言 框架 数据库 然后考虑团队开发与实现能力等
wsgi
wsgi,全称Web Server Gateway Interface,Web服务器网关接口,是用来规定web server应如何和程序交互的网关协议。可以理解为一个web应用的容器,适配了程序和操作系统之间功能,将操作系统一些功能抽象为接口提供给程序使用
可以使用wsgi,目的是使用实现统一协议的web server,不然换着乱
简单的web server
一个基本的socket监听程序
# coding:utf-8import socketEOL1 = b'\n\n'
EOL2 = b'\n\r\n'
body = 'hello, world<h1> from tjh </h1>'
resp_params = ['HTTP/1.0 200 OK','Date: Sun, 31 jul 2024 09:35:33 GMT','Content-Type: text/html; charset=utf-8','Content-Length: ()\r\n'.format(len(body.encode())),body
]resp = '\r\n'.join(resp_params)def handle_connection(conn, addr):req = b''while EOL1 not in req and EOL2 not in req:req += conn.recv(1024)print(req)conn.send(resp.encode())conn.close()def main():ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)ss.bind(('127.0.0.1', 8000))ss.listen(5) # conn max queue numberprint('http://127.0.0.1:8000')try:while True:conn, addr = ss.accept()handle_connection(conn, addr)finally:ss.close()if __name__ == '__main__':main()
效果
注意 Content-Type为text/plain 还是text/html
多线程版web server
还是阻塞模式,非阻塞会报错 还没处理
# coding:utf-8import socket
import errno
import threading
import timeEOL1 = b'\n\n'
EOL2 = b'\n\r\n'
body = 'hello, world<h1> from tjh </h1>-from {thread_name}'
resp_params = ['HTTP/1.0 200 OK','Date: Sun, 31 jul 2024 09:35:33 GMT','Content-Type: text/html; charset=utf-8','Content-Length: {length}\r\n',body
]resp = '\r\n'.join(resp_params)def handle_connection(conn, addr):req = b''while EOL1 not in req and EOL2 not in req:req += conn.recv(1024)print(req)current_thread = threading.currentThread()content_length = len(body.format(thread_name=current_thread.name).encode())print(current_thread.name)conn.send(resp.format(thread_name=current_thread.name, length=content_length).encode())conn.close()def main():ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# ss.setblocking(0) # set socket mode as non block # ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)ss.bind(('127.0.0.1', 8000))ss.listen(10) # conn max queue numberprint('http://127.0.0.1:8000')# ss.setblocking(0) # set socket mode as non block try:i = 0while True:try:conn, addr = ss.accept()except socket.error as e:if e.args[0] != errno.EAGAIN:raisecontinuei += 1print(i)t = threading.Thread(target=handle_connection, args=(conn, addr), name='thread-%s' % i)t.start()finally:ss.close()if __name__ == '__main__':main()
效果
简单wsgi application
wsgi协议分为两部分,一个是web server,一个是web application。接受请求时,会通过wsgi协议将数据发给web application,application处理完后,设置对应状体和header,之后返回body给web server,web server拿到数据后,进行http协议封装,返回完整http response
# coding: utf-8import os
import sysfrom app import simple_appdef wsgi_to_bytes(s):return s.encode()def run_with_cgi(app):environ = dict(os.environ.items())environ['wsgi.input'] = sys.stdin.bufferenviron['wsgi.errors'] = sys.stderrenviron['wsgi.version'] = (1, 0)environ['wsgi.multithread'] = Falseenviron['wsgi.multiprocess'] = Trueenviron['wsgi.run_once'] = Trueif environ.get('HTTPS', 'off') in ('on', '1'):environ['wsgi.url_scheme'] = 'https'else:environ['wsgi.url_scheme'] = 'http'headers_set = []headers_sent = []def write(data):out = sys.stdout.bufferif not headers_set:raise AssertionError('write() before start_response()')elif not headers_sent:status, resp_headers = headers_sent[:] = headers_setout.write(wsgi_to_bytes('Status: %s\r\n' % status))for header in resp_headers:out.write(wsgi_to_bytes('%s: %s\r\n' % header))out.write(wsgi_to_bytes('\r\n'))out.write(data)out.flush()def start_response(status, resp_headers, exc_info=None):if exc_info:try:if headers_sent:raise (exc_info[0], exc_info[1], exc_info[2])finally:exc_info = Noneelif headers_set:raise AssertionError('headers already set')headers_set[:] = [status, resp_headers]return writeresult = app(environ, start_response)try:for data in result:if data:write(data)if not headers_sent:write('')finally:if hasattr(result, 'close'):result.close()if __name__ == '__main__':run_with_cgi(simple_app)
理解wsgi
wsgi规定,application必须是一个可调用对象,则这个可调用对象可以是函数或实现了__call__方法的实例
wsgi中间件和werkzeug
flask
截至目前 有了两种方法提供web服务:直接通过socket处理请求,或者通过实现wsgi application部分协议
入门推荐
py微型框架有比如web.py, bottle,flask等 flask是一个不错的微型框架
tornado
高性能。tornado不是基于wsgi协议的框架,但提供了wsgi的支持,特性是异步和非阻塞。可以使用自带的http server进行部署而不是wsgi,因为wsgi是一个同步接口
和flask相比,tornado更侧重性能,整体并不比flask丰富,flask更多的支持对业务的满足
django
和微型框架不同,django框架不是仅需要两三个py文件就能跑起来的web框架,django功能更全也更大
django起步
管理系统后台开发
先安装django,再django-admin startproject project_name创建初始项目
cd到project下,创建一个app
models.py
from django.db import models# Create your models here.
class Student(models.Model):SEX_ITEMS = [(1, '男'), (2, '女'), (0, '未知')]STATUS_ITEMS = [(0, '申请'), (1, '通过'), (2, '拒绝')]name = models.CharField(max_length=128, verbose_name='姓名')sex = models.IntegerField(choices=SEX_ITEMS, verbose_name='性别')profession = models.CharField(max_length=128, verbose_name='职业')email = models.EmailField(verbose_name='Email')qq = models.CharField(max_length=128, verbose_name='QQ')phone = models.CharField(max_length=128, verbose_name='电话')status = models.IntegerField(choices=STATUS_ITEMS, default=0, verbose_name='审核状态')created_time = models.DateTimeField(auto_now_add=True, editable=False, verbose_name='创建时间')def __str__(self):return '<Student: {}>'.format(self.name)class Meta:verbose_name = verbose_name_plural = '学员信息'
admin.py
from django.contrib import admin# Register your models here.
from .models import Studentadmin.site.register(Student)
将创建的app在setting中加到installed_apps下
创建数据库迁移文件 python manage.py makemigrations
创建表 python manage.py migrate
创建超级用户 python manage.py createsuperuser
运行测试服务器,访问127.0.0.1:8000 发现是默认页面
访问127.0.0.1:8000/admin会跳转到刚开发的管理员页面,然后用刚创的管理员用户名和密码登录
管理员页面语言是英文,时区也是UTC,可在setting进行配置
重新启动测试服务器,再次登入管理员页面,发现语言变成中文
管理系统前台开发
创一个默认视图函数index
from django.shortcuts import render# Create your views here.
def index(request):words = 'hello, world!'return render(request, 'index.html', context={'words': words})
视图函数使用了模板文件index.html,在app的templates目录下创建index.html,render渲染时会自动去寻找模板文件。django渲染时会去每个app下查找模板,寻找的目标app是在setting中注册的app,并且是按顺序寻找(从上到下)。如果两个不同app下有同名模板,则第一个app的模板会覆盖第二个app的模板
先在app下创建目录templates再在templates中创建index.html
<!DOCTYPE html>
<html>
<head><title>学员管理系统</title>
</head>
<body>
{{ words }}
</body>
</html>
模板用到了django模板语法 {{ words }},这个变量是从视图函数的context传过来的上下文
在urls.py中配置url
from django.contrib import admin
from django.urls import path, re_pathfrom admin_backend.views import indexurlpatterns = [re_path(r'^$', index, name='index'),path('admin/', admin.site.urls),
]
重新运行测试服务器,访问127.0.0.1:8000查看效果
在admin后台站点尝试手动创建几个学生数据用于练手,需要修改views.py
def index(request):students = Student.objects.all()return render(request, 'index.html', context={'students': students})
<body>
<ul>{% for student in students %}<li>{{ student.name }} - {{student.get_status_display }}</li>{% endfor %}
</ul>
</body>
注意到student的方法get_status_display,实际模型只有status字段没有这个方法,方法是django自己拼出来的,方法作用是展示这个字段实际打算展示的值。
还有一个,模板里的方法可以不用写括号,django会自己帮忙调用
开发提交数据的功能,使用模型表单
from django import formsfrom .models import Studentclass StudentForm(forms.Form):# name = forms.CharField(label='姓名', max_length=128)# sex = forms.ChoiceField(label='性别', choices=Student.SEX_ITEMS)# profession = forms.CharField(label='职业', max_length=128)# email = forms.EmailField(label='邮箱', max_length=128)# qq = forms.CharField(label='QQ', max_length=128)# phone = forms.CharField(label='手机', max_length=128)class Meta:model = Studentfields = ('name', 'sex', 'profession', 'email', 'qq', 'phone')
模型表单可以每一条字段一行,也可以直接复用models.py中对应的模型字段
当需要form和model不完全一致时,可以在模型表单里单独修改,比如增加qq号必须是纯数字校验
class StudentForm(forms.Form):def clean_qq(self):cleaned_data = self.cleaned_data['qq']if not cleaned_data.isdigit():raise forms.ValidationError('必须是数字')class Meta:model = Studentfields = ('name', 'sex', 'profession', 'email', 'qq', 'phone')
clean_qq是模型表单会自动调用处理每个字段的方法
有了表单,修改views.py逻辑,开始使用模型表单
from django.shortcuts import render
from django.http import HttpResponseRedirect
from django.urls import reversefrom .models import Student
from .forms import StudentForm# Create your views here.
def index(request):students = Student.objects.all()aif request.method == 'POST':form = StudentForm(request.POST)if form.is_valid():cleaned_data = form.cleaned_datastudent = Student()student.name = cleaned_data['name']student.sex = cleaned_data['sex']student.email = cleaned_data['email']student.profession = cleaned_data['profession']student.qq = cleaned_data['qq']student.phone = cleaned_data['phone']student.save()return HttpResponseRedirect(reverse('index'))else:form = StudentForm()context = {'students': students,'form': form,}return render(request, 'index.html', context=context)
form.cleaned_data是用户在页面输入完数据提交后,django根据字段类型转换后的字段
reverse方法避免了将url地址硬编码到视图函数中,如果要修改url地址,仅需在urls.py中改即可,无需修改views.py
表单模型提供了表单验证能力,减少了前端进行表单验证代码量
上述代码中,手动将字段值从form实例更新到模型实例可以省略,直接保存表单模型实例就可以
# Create your views here.
def index(request):students = Student.objects.all()if request.method == 'POST':form = StudentForm(request.POST)if form.is_valid():form.save()return HttpResponseRedirect(reverse('index'))else:form = StudentForm()context = {'students': students,'form': form,}return render(request, 'index.html', context=context)
当用户get的时候,页面是填写表单,提交时,保存表单
index.html
<!DOCTYPE html>
<html>
<head><title>学员管理系统</title>
</head>
<body>
<h3><a href="/admin/">Admin</a></h3>
<ul>{% for student in students %}<li>{{ student.name }} - {{ student.get_status_display }}</li>{% endfor %}
</ul>
<hr />
<form action="/" method="post">{% csrf_token %}{{ form }}<input type="submit" value="Submit"/>
</form>
</body>
</html>
csrf_token如果没有这个,提交的数据是无效的,这可以用来防止跨站伪造请求攻击
访问127.0.0.1:8000看下效果
发现提交后没有报错 也没有显示新提交的信息,原因是字段非法,视图函数也没报错
注意到视图函数中包含了查询数据的操作,当需要过滤查询条件,或者需要缓存数据,都需要修改视图函数代码,此时可将数据获取逻辑封装到model中
模型层代码
class Student(models.Model):@classmethoddef get_all(cls):return cls.objects.all()# 省略其他代码
视图函数代码
def index(request):students = Student.all()
管理系统进阶开发
class-based view
当有多个类似的view函数,可以考虑使用class-based抽象
# urls.py
urlpatterns = [re_path(r'^$', index, name='index'),# re_path(r'^$', IndexView.as_view(), name='index'),path('admin/', admin.site.urls),
]
# views.py
class IndexView(View):template_name = 'index.html'def get_context(self):students = Student.get_all()context = {'students': students}return contextdef get(self, request):context = self.get_context()form = StudentForm()context.update({'form': form})return render(request, self.template_name, context=context)def post(self, request):form = StudentForm(request.POST)if form.is_valid():form.save()return HttpResponseRedirect(reverse('index'))context = self.get_context()context.update({'form': form})return render(request, self.template_name, context=context)
创建一个middleware练手,用来统计django接受请求到返回请求的时间
先创建文件middlewares.py,与views.py同级别
# middlewares.py
import timefrom django.urls import reverse
from django.utils.deprecation import MiddlewareMixinclass TimeItMiddleware(MiddlewareMixin):def process_request(self, request):self.start_time = time.time()returndef process_view(self, request, func, *args, **kwargs):if request.path != reverse('index'):return Nonestart = time.time()response = func(request)costed = time.time() - startprint('process view: {:.2f}s'.format(costed))return responsedef process_exception(self, request, exception):passdef process_tempmlate_response(self, request, response):return responsedef process_response(self, request, response):costed = time.time() - self.start_timeprint('request to response cost: {:.2f}s'.format(costed))return response
请求来和返回相应一般会按顺序调用这些函数:process_request, process_view, process_exception, process_template_response, process_response
process_request是请求来第一个处理函数,此函数可放些请求头校验等任务
process_view是process_request之后执行,用来执行view函数,func参数就是视图函数。可以在此处进行视图函数执行耗时统计
process_template_response 当拿到视图函数的返回函数,且返回函数为模板渲染的函数,则会执行该函数
process_response 当处理完模板渲染的视图函数后,会执行该方法
process_exception 当在视图函数处理过程或返回模板渲染时发生异常,才会执行proces_exception函数。但如果在process_view中手动调用func,就不会触发process_exception
middleware写完后,在settings中的MIDDLEWARE加上middleware
运行web,查看控制台打印访问耗时
写单元测试
运行单元测试时,django会创建一个基于内存的测试数据库,也就是说,这对生产部署的环境没有影响
但对于mysql数据库,django会直接用配置的数据库用户名和密码创建一个名为test_modelname_db的数据库用于测试,此时需要保证有见表和建数据库的权限
也可以在settings指定测试创建的数据库名
对于测试用例,django提供了一个名为TestCase的基类,可以继承这个类实现自己的测试用例。基类的方法和python的单元测试方法差不多
# models.py@propertydef sex_show(self):return dict(self.SEX_ITEMS)[self.sex]
# tests.py
from django.test import TestCase, clientfrom .models import Student# Create your tests here.
class StudentTestCase(TestCase):def setUp(self):Student.objects.create(name='tjh',sex=1,email='test@qq.com',profession='coder',qq='2222',phone='3333',)def test_create_and_sex_show(self):student = Student.objects.creat(name='tjh',sex=1,email='aaa@dd.com',profession='coder',qq='4444',phone='5555')self.assertEqual(student.sex_show, '男', "性别字段内容和展示不一致")def test_filter(self):student = Student.objects.creat(name='tjh',sex=1,email='aaa@dd.com',profession='coder',qq='4444',phone='5555')name = 'tjhaa'students = Student.objects.filter(name=name)self.assertEqual(students.count(), 1, 'student count should be 1')
view层测试
# tests.pydef test_get_index(self):client = Client()resp = client.get('/')self.assertEqual(resp.status_code, 200, 'status code should be 200')def test_post_student(self):client = Client()data = dict(name='test_for_post',sex=1,email='123@qq.com',profession='coder',qq='1111',phone='9999')resp = client.post('/', data)self.assertEqual(resp.status_code, 302, 'status code should be 302')resp = client.get('/')self.assertTrue(b'test_for_post' in resp.context, 'resp content should include test_for_post')