致敬白衣天使,学习Python读取

news/2024/11/20 23:28:18/

名字:阿玥的小东东

学习:Python、c++

主页:阿玥的小东东

故事设定:现在学校要求对所有同学进行核酸采集,每位同学先在宿舍内等候防护人员(以下简称“大白”)叫号,叫到自己时去停车场排队等候大白对自己进行采集,采集完之后的样本由大白统一有序收集并储存。

名词解释:

  • 学生:所有的学生是一个大文件,每个学生是其中的一行数据
  • 宿舍:硬盘
  • 停车场:内存
  • 核酸采集:数据处理
  • 样本:处理后的数据
  • 大白:程序

学生数量特别少的情况

当学生数量特别少时,可以考虑将所有学生统一叫到停车场等候,再依次进行核酸采集。

方法一:简单情况

此时的程序可以模拟为:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

import time

from typing import List

  

  

def pick_all_students(dorm: str) -> List[str]:

    with open(dorm, "rt", encoding="utf8") as fin:

        students = fin.readlines()

        return students

  

  

def pick_sample(student: str) -> str:

    time.sleep(0.01)

    sample = f"{student.strip()}'s sample"

    return sample

  

  

def process(dorm: str, sample_storeroom: str) -> None:

    with open(sample_storeroom, "wt", encoding="utf8") as fout:

        students = pick_all_students(dorm)

        for student in students:

            sample = pick_sample(student)

            fout.write(f"{sample}\n")

            fout.flush()

  

  

if __name__ == "__main__":

    process(

        "student_names.txt",

        "sample_storeroom.txt"

    )

注意,在第19行中,大白一次性把所有同学都叫到了停车场中。这种做法在学生比较少时做起来很快,但是如果学生特别多,停车场装不下怎么办?

停车场空间不够时怎么办?

方法二:边读边处理

一般来说,由于停车场空间有限,我们不会采用一次性把所有学生都叫到停车场中,而是会一个一个地处理,这样可以节约内存空间。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

import time

from typing import Iterator

  

  

def pick_one_student(dorm: str) -> Iterator[str]:

    with open(dorm, "rt", encoding="utf8") as fin:

        for student in fin:

            yield student

  

  

def pick_sample(student: str) -> str:

    time.sleep(0.01)

    sample = f"{student.strip()}'s sample"

    return sample

  

  

def process(dorm: str, sample_storeroom: str) -> None:

    with open(sample_storeroom, "wt", encoding="utf8") as fout:

        for student in pick_one_student(dorm):

            sample = pick_sample(student)

            fout.write(f"{sample}\n")

            fout.flush()

  

  

if __name__ == "__main__":

    process(

        "student_names.txt",

        "sample_storeroom.txt"

    )

这里pick_one_student函数中的返回值是用yield返回的,一次只会返回一名同学。

不过,这种做法虽然确保了停车场不会满员,但是这种做法在人数特别多的时候就不再适合了。虽然可以保证完成任务,但由于每次只能采集一个同学,程序的执行并不高。特别是当你的CPU有多个核时,会浪费机器性能,出现一核有难,其它围观的现象。

怎么加快执行效率?

大家可能也已经注意到了,刚刚我们的场景中,不论采用哪种方法,都只有一名大白在工作。那我们能不能加派人手,从而提高效率呢?

答案当然是可行的。我们现在先考虑增加两名大白,使得一名大白专注于叫号,安排学生进入停车场,另外一名大白专注于采集核酸,最后一名大白用于存储核酸样本。

方法三

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

import time

from multiprocessing import Queue, Process

from typing import Iterator

  

  

def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]:

    print("pick_student: started")

  

    picked_num = 0

    with open(dorm, "rt", encoding="utf8") as fin:

        for student in fin:

            stu_queue.put(student)

            picked_num += 1

            if picked_num % 500 == 0:

                print(f"pick_student: {picked_num}")

  

    # end signal

    stu_queue.put(None)

    print("pick_student: finished")

  

  

def pick_sample(student: str) -> str:

    time.sleep(0.01)

    sample = f"{student.strip()}'s sample"

    return sample

  

  

def process(stu_queue: Queue, store_queue: Queue) -> None:

    print("process: started")

  

    process_num = 0

    while True:

        student = stu_queue.get()

        if student is not None:

            sample = pick_sample(student)

            store_queue.put(sample)

            process_num += 1

            if process_num % 500 == 0:

                print(f"process: {process_num}")

        else:

            break

  

    # end signal

    store_queue.put(None)

    print("process: finished")

  

  

def store_sample(store_queue: Queue, sample_storeroom: str) -> None:

    print("store_sample: started")

  

    store_num = 0

    with open(sample_storeroom, "wt", encoding="utf8") as fout:

        while True:

            sample = store_queue.get()

            if sample is not None:

                fout.write(f"{sample}\n")

                fout.flush()

  

                store_num += 1

                if store_num % 500 == 0:

                    print(f"store_sample: {store_num}")

            else:

                break

  

    print("store_sample: finished")

  

  

if __name__ == "__main__":

    dorm = "student_names.txt"

    sample_storeroom = "sample_storeroom.txt"

  

    stu_queue = Queue()

    store_queue = Queue()

  

    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)

    store_p.start()

    process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)

    process_p.start()

    read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True)

    read_p.start()

  

    store_p.join()

这份代码中,我们引入了多进程的思路,将每个大白看作一个进程,并使用了队列Queue作为进程间通信的媒介。stu_queue表示学生叫号进停车场的队列,store_queue表示已经采集过的待存储核酸样本的队列。

此外,为了控制进程的停止,我们在pick_student和 process函数的最后都向各自队列中添加了None作为结束标志符。

假设有1w名学生(student_names.txt文件有1w行),经过测试后发现上述方法的时间如下:

  • 方法一:1m40.716s
  • 方法二:1m40.717s
  • 方法三:1m41.097s

咦?不是做了分工吗?怎么速度还变慢了?经笔者观察,这是因为叫号的大白速度太快了(文件读取速度快)通常是TA已经齐活了,另外俩人还在吭哧吭哧干活呢,体现不出来分工的优势。如果这个时候我们对法二和法三的叫号做延时操作,每个学生叫号之后停滞10ms再叫下一位学生,则方法三的处理时间几乎不变,而方法二的时间则会延长至3m21.345s。

怎么加快处理速度?

上面提到,大白采核酸的时间较长,往往上一个人的核酸还没采完,下一个人就已经在后面等着了。我们能不能提高核酸采集这个动作(数据处理)的速度呢?其实一名大白执行一次核酸采集的时间我们几乎无法再缩短了,但是我们可以通过增加人手的方式,来达到这个目的。就像去银行办业务,如果开放的窗口越多,那么每个人等待的时间就会越短。这里我们也采取类似的策略,增加核酸采集的窗口。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

import time

from multiprocessing import Queue, Process, cpu_count

from typing import Iterator

  

  

def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]:

    print("pick_student: started")

  

    picked_num = 0

    with open(dorm, "rt", encoding="utf8") as fin:

        for student in fin:

            stu_queue.put(student)

            picked_num += 1

            if picked_num % 500 == 0:

                print(f"pick_student: {picked_num}")

  

    # end signal

    for _ in range(num_workers):

        stu_queue.put(None)

  

    print("pick_student: finished")

  

  

def pick_sample(student: str) -> str:

    time.sleep(0.01)

    sample = f"{student.strip()}'s sample"

    return sample

  

  

def process(stu_queue: Queue, store_queue: Queue) -> None:

    print("process: started")

  

    process_num = 0

    while True:

        student = stu_queue.get()

        if student is not None:

            sample = pick_sample(student)

            store_queue.put(sample)

            process_num += 1

            if process_num % 500 == 0:

                print(f"process: {process_num}")

        else:

            break

  

    print("process: finished")

  

  

def store_sample(store_queue: Queue, sample_storeroom: str) -> None:

    print("store_sample: started")

  

    store_num = 0

    with open(sample_storeroom, "wt", encoding="utf8") as fout:

        while True:

            sample = store_queue.get()

            if sample is not None:

                fout.write(f"{sample}\n")

                fout.flush()

  

                store_num += 1

                if store_num % 500 == 0:

                    print(f"store_sample: {store_num}")

            else:

                break

  

    print("store_sample: finished")

  

  

if __name__ == "__main__":

    dorm = "student_names.txt"

    sample_storeroom = "sample_storeroom.txt"

    num_process = max(1, cpu_count() - 1)

  

    maxsize = 10 * num_process

    stu_queue = Queue(maxsize=maxsize)

    store_queue = Queue(maxsize=maxsize)

  

    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)

    store_p.start()

    process_workers = []

    for _ in range(num_process):

        process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)

        process_p.start()

        process_workers.append(process_p)

    read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True)

    read_p.start()

  

    for worker in process_workers:

        worker.join()

  

    # end signal

    store_queue.put(None)

    store_p.join()

总耗时 0m4.160s !我们来具体看看其中的细节部分:

首先我们将CPU核数 - 3作为采核酸的大白数量。这里减3是为其它工作进程保留了一些资源,你也可以根据自己的具体情况做调整

这次我们在 Queue中增加了 maxsize参数,这个参数是限制队列的最大长度,这个参数通常与你的实际内存情况有关。如果数据特别多时要考虑做些调整。这里我采用10倍的工作进程数目作为队列的长度

注意这里pick_student函数中要为每个后续的工作进程都添加一个结束标志,因此最后会有个for循环

我们把之前放在process函数中的结束标志提取出来,放在了最外侧,使得所有工作进程均结束之后再关闭最后的store_p进程

结语

总结来说,如果你的数据集特别小,用法一;通常情况下用法二;数据集特别大时用法四。


http://www.ppmy.cn/news/24118.html

相关文章

Android框架WiFi架构

同学,别退出呀,我可是全网最牛逼的 WIFI/BT/GPS/NFC分析博主,我写了上百篇文章,请点击下面了解本专栏,进入本博主主页看看再走呗,一定不会让你后悔的,记得一定要去看主页置顶文章哦。 一、wpa_supplicant:wpa_supplicant本身开源项目源码,被谷歌收购之后加入Android移…

2.11整理(2)(主要关于teacher forcing)

teacher forcing 训练迭代过程早期的RNN预测能力非常弱,几乎不能给出好的生成结果。如果某一个unit产生了垃圾结果,必然会影响后面一片unit的学习。RNN存在着两种训练模式(mode): free-running mode:就是常见的那种训练网络的方式: 上一个sta…

电脑重装系统注册表恢复方法

​今天讲关于大家的电脑在遇到一些故障的时候,以及电脑用久了之后会卡顿,那么这时候大家一般都会给电脑重装系统。重装系统之后却发现自己电脑里的注册表不见了,重装系统后怎么恢复注册表?小编就带着大家一起学习重装系统注册表恢复到底是怎…

RocketMQ基础学习

前言: RocketMQ阿里开源的,一款分布式的消息中间件,它经过阿里的生产环境的高并发、高吞吐的考验,同时,还支持分布式事务等场景。RocketMQ使用Java语言进行开发,方便Java开发者学习源码。但是,R…

2.11知识点整理(关于pycharm,python,pytorch,conda)

pycharm 设置anaconda环境: File -> Settings->选择左侧的project xxx再选择打开Project Interpreter页->选择add添加解释器->添加Anaconda中Python解释器(Anaconda安装目录下的python.exe) (选择existing environment &#xff…

4.SpringWeb

一、创建项目LomBok:辅助开发工具,减少代码编写Spring Web:带上Spring MVC,可以做Web开发了Thymleaf: Web开发末班引擎(不常用)创建好,如下:static/ 放置静态资源的根目录templates/ 放置模板文件的根目录 二、资源配置…

Java学习记录day6

书接上回 类与对象 static关键字 static的作用: 修饰一个属性:声明为static的变量实质上就是一个全局变量,其生命周期为从类被加载开始一直到程序结束;修饰方法:无须本类的对象也可以调用该方法;修饰一个类&#x…

STL——list

一、list介绍及使用 1. list文档介绍 (1)list是可以在常数范围内,在任意位置进行插入、删除的序列式容器,并且该容器可以前后双向迭代。 (2)list的底层是带头结点的双向循环链表,其中每个元素…