基于or-tools的护士排班问题建模求解
- 护士排班问题(Nurse Rostering Problem,NRP)
- ortools官网例题1:A nurse scheduling problem
- 代码解析
- 完整代码
- ortools官网例题2:Scheduling with shift requests
- 代码解析
- 完整代码
)
护士排班问题(Nurse Rostering Problem,NRP)
护士排班问题(Nurse Rostering Problem,NRP)或护士排程问题( nurse scheduling problem,NSP)是员工调度问题(Employee Scheduling)的一种。医院需要反复为护理人员制作值班表,通常情况下,护理人员要花费大量的时间来编制值班表,特别是在有许多工作人员提出要求的情况下,而且在处理对当前值班表的临时更改时可能会花费更多的时间。由于人工调度繁琐、耗时,以及其他种种原因,护士排班问题(NRP)或护士排程问题(NSP)引起了人们的广泛关注。
相关文献:
- http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.1030.5363&rep=rep1&type=pdf
- https://arxiv.org/pdf/1804.05002.pdf
ortools官网例题1:A nurse scheduling problem
or-tools官网给出了一个使用CP-SAT求解器解决NRP的算例(https://developers.google.cn/optimization/scheduling/employee_scheduling#java_4):
医院主管需要满足一个为期 3 天的护士计划,让其在 3 天内满足 4 个护士的条件,但需满足以下条件:
- 每个班次(shift)分为三个 8 小时。
- 每天,每个班次都会分配给一名护士,而每个护士都不例外。
- 在这 3 天时间里,每个护士都至少分配到两次班次。
∑ s = 1 S x n d s = 1 , ∀ d = 1 , 2 , ⋯ , D ; n = 1 , 2 , ⋯ , N ∑ s = 1 S x n d s ≤ 1 , ∀ S × D N ≤ ∑ d = 1 D ∑ n = 1 N x n d s ≤ S × D N + ( S × D ) % N , ∀ s = 1 , 2 , ⋯ , S \sum_{s=1}^Sx_{nds}=1, \quad \forall d=1,2,\cdots,D ; n=1,2,\cdots,N \\ \sum_{s=1}^Sx_{nds} \leq 1,\quad \forall \\ \frac{S \times D}{N} \leq \sum_{d=1}^D\sum_{n=1}^Nx_{nds} \leq \frac{S \times D}{N}+(S \times D)\%N, \quad \forall s=1,2,\cdots,S s=1∑Sxnds=1,∀d=1,2,⋯,D;n=1,2,⋯,Ns=1∑Sxnds≤1,∀NS×D≤d=1∑Dn=1∑Nxnds≤NS×D+(S×D)%N,∀s=1,2,⋯,S
代码解析
1、导入ortools库
from ortools.sat.python import cp_model
2、构造数据
num_nurses = 4 # 护士人数
num_shifts = 3 # 每天有3个班次
num_days = 3 # 3天
all_nurses = range(num_nurses)
all_shifts = range(num_shifts)
all_days = range(num_days)
3、创建模型
model = cp_model.CpModel()
4、创建变量
# 如果将 班次s在d天分配给护士n,则等于 1
shifts = {}
for n in all_nurses:for d in all_days:for s in all_shifts:shifts[(n, d, s)] = model.NewBoolVar(f"shift_n{n}_d{d}_s{s}")
5、约束条件
# 每天每个班次都会分配给一名护士:每天每个班次分配的护士人数之和=1
for (int d : allDays) {for (int s : allShifts) {List<Literal> nurses = new ArrayList<>();for (int n : allNurses) {nurses.add(shifts[n][d][s]);}model.addExactlyOne(nurses);}
}
# 每个护士每天最多上一个班次
for n in all_nurses:for d in all_days:model.AddAtMostOne(shifts[(n, d, s)] for s in all_shifts)
每个护士上的班次尽可能均分,有4个护士,3天*3班次/天=9班次
则每个护士平均分配9 / 4 = 2.25班次,则每个护士至少上2个班次,至多上3个班次。
# Try to distribute the shifts evenly, so that each nurse works
# min_shifts_per_nurse shifts. If this is not possible, because the total
# number of shifts is not divisible by the number of nurses, some nurses will
# be assigned one more shift.
min_shifts_per_nurse = (num_shifts * num_days) // num_nurses
if num_shifts * num_days % num_nurses == 0:max_shifts_per_nurse = min_shifts_per_nurse
else:max_shifts_per_nurse = min_shifts_per_nurse + 1
for n in all_nurses:shifts_worked = []for d in all_days:for s in all_shifts:shifts_worked.append(shifts[(n, d, s)])model.Add(min_shifts_per_nurse <= sum(shifts_worked))model.Add(sum(shifts_worked) <= max_shifts_per_nurse)
6、设置模型参数
# 在非优化模型中,可以启用对所有解决方案的搜索
solver = cp_model.CpSolver()
solver.parameters.linearization_level = 0
# Enumerate all solutions.
solver.parameters.enumerate_all_solutions = True
7、调用回调函数
class NursesPartialSolutionPrinter(cp_model.CpSolverSolutionCallback):"""Print intermediate solutions."""def __init__(self, shifts, num_nurses, num_days, num_shifts, limit):cp_model.CpSolverSolutionCallback.__init__(self)self._shifts = shiftsself._num_nurses = num_nursesself._num_days = num_daysself._num_shifts = num_shiftsself._solution_count = 0self._solution_limit = limitdef on_solution_callback(self):self._solution_count += 1print(f"Solution {self._solution_count}")for d in range(self._num_days):print(f"Day {d}")for n in range(self._num_nurses):is_working = Falsefor s in range(self._num_shifts):if self.Value(self._shifts[(n, d, s)]):is_working = Trueprint(f" Nurse {n} works shift {s}")if not is_working:print(f" Nurse {n} does not work")if self._solution_count >= self._solution_limit:print(f"Stop search after {self._solution_limit} solutions")self.StopSearch()def solution_count(self):return self._solution_count# Display the first five solutions.
solution_limit = 5
solution_printer = NursesPartialSolutionPrinter(shifts, num_nurses, num_days, num_shifts, solution_limit
)
8、调用求解器求解
solver.Solve(model, solution_printer)
完整代码
import numpy as np
from ortools.sat.python import cp_model
import collectionsnum_nurses = 4 # 护士人数
num_shifts = 3 # 每天有3个班次
num_days = 3 # 3天
all_nurses = range(num_nurses)
all_shifts = range(num_shifts)
all_days = range(num_days)print(all_nurses)model = cp_model.CpModel()# 如果将 班次shift s 在d天分配给护士n,则等于 1
shifts = {}
for nurse in all_nurses:for day in all_days:for shift in all_shifts:shifts[(nurse, day, shift)] = model.NewBoolVar(f"nurse{nurse}_day{day}_shift{shift}")# 每天每个班次都会分配给一名护士:每天每个班次分配的护士人数之和=1
# Each shift is assigned to a single nurse per day.
for day in all_days:for shift in all_shifts:model.AddExactlyOne(shifts[(nurse, day, shift)] for nurse in all_nurses)# 每个护士每天最多上一个班次
for nurse in all_nurses:for day in all_days:model.AddAtMostOne(shifts[(nurse, day, shift)] for shift in all_shifts)"""
每个护士上的班次尽可能均分,有4个护士,3天*每天3班次=9班次
则每个护士平均9 // 4 = 2
"""
# Try to distribute the shifts evenly, so that each nurse works
# min_shifts_per_nurse shifts. If this is not possible, because the total
# number of shifts is not divisible by the number of nurses, some nurses will
# be assigned one more shift.
min_shifts_per_nurse = (num_shifts * num_days) // num_nurses
if num_shifts * num_days % num_nurses == 0:max_shifts_per_nurse = min_shifts_per_nurse
else:max_shifts_per_nurse = min_shifts_per_nurse + 1
for n in all_nurses:shifts_worked = []for d in all_days:for s in all_shifts:shifts_worked.append(shifts[(n, d, s)])model.Add(min_shifts_per_nurse <= sum(shifts_worked))model.Add(sum(shifts_worked) <= max_shifts_per_nurse)# 在非优化模型中,可以启用对所有解决方案的搜索
solver = cp_model.CpSolver()
solver.parameters.linearization_level = 0
# Enumerate all solutions.
solver.parameters.enumerate_all_solutions = Trueclass NursesPartialSolutionPrinter(cp_model.CpSolverSolutionCallback):"""Print intermediate solutions.""""""调用回调函数,打印中间结果"""def __init__(self, shifts, num_nurses, num_days, num_shifts, limit):cp_model.CpSolverSolutionCallback.__init__(self)self._shifts = shiftsself._num_nurses = num_nursesself._num_days = num_daysself._num_shifts = num_shiftsself._solution_count = 0self._solution_limit = limitdef on_solution_callback(self):self._solution_count += 1so = np.zeros(shape=(num_days, num_shifts), dtype=np.int64)print(f"Solution {self._solution_count}")for d in range(self._num_days):# print(f"Day {d}")for n in range(self._num_nurses):is_working = Falsefor s in range(self._num_shifts):if self.Value(self._shifts[(n, d, s)]):is_working = True# print(f" Nurse {n} works shift {s}")so[d][s] = nif not is_working:# print(f" Nurse {n} does not work")passif self._solution_count >= self._solution_limit:print(f"Stop search after {self._solution_limit} solutions")self.StopSearch()print(f' shift1 shift2 shift3')for i in range(len(so)):print(f'day{i + 1}', end='\t')for j in range(len(so[i])):print(f'nurse{so[i][j] + 1}', end='\t')print()def solution_count(self):return self._solution_count# Display the first five solutions.显示前5个解
solution_limit = 5
solution_printer = NursesPartialSolutionPrinter(shifts, num_nurses, num_days, num_shifts, solution_limit
)
solver.Solve(model, solution_printer)
输出结果为:
ortools官网例题2:Scheduling with shift requests
例题2相比于例题1,增加了特定班次的护士需求,目标函数为最大化护士需求满足的人数(尽可能满足护士需求)。对于大多数调度问题,输出所有解不太可能,因此需要有一个目标函数。例题2和例题1约束条件相同。
代码解析
1、导入库
from ortools.sat.python import cp_model
2、导入数据
shift_requests
是一个5 * 7 * 3的矩阵,表示5个护士7天,每一天3个班次的值班需求。如shift[2][0][1]
代表护士护士2在第0天想上班次1。
num_nurses = 5
num_shifts = 3
num_days = 7
all_nurses = range(num_nurses)
all_shifts = range(num_shifts)
all_days = range(num_days)
shift_requests = [[[0, 0, 1], [0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 0, 1]],[[0, 0, 0], [0, 0, 0], [0, 1, 0], [0, 1, 0], [1, 0, 0], [0, 0, 0], [0, 0, 1]],[[0, 1, 0], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 0, 0], [0, 1, 0], [0, 0, 0]],[[0, 0, 1], [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 0, 0]],[[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 0]],
]
3、创建模型
model = cp_model.CpModel()
4、模型变量
shifts = {}
for n in all_nurses:for d in all_days:for s in all_shifts:shifts[(n, d, s)] = model.NewBoolVar(f"shift_n{n}_d{d}_s{s}")
5、约束条件
for d in all_days:for s in all_shifts:model.AddExactlyOne(shifts[(n, d, s)] for n in all_nurses)
for n in all_nurses:for d in all_days:model.AddAtMostOne(shifts[(n, d, s)] for s in all_shifts)
# Try to distribute the shifts evenly, so that each nurse works
# min_shifts_per_nurse shifts. If this is not possible, because the total
# number of shifts is not divisible by the number of nurses, some nurses will
# be assigned one more shift.
min_shifts_per_nurse = (num_shifts * num_days) // num_nurses
if num_shifts * num_days % num_nurses == 0:max_shifts_per_nurse = min_shifts_per_nurse
else:max_shifts_per_nurse = min_shifts_per_nurse + 1
for n in all_nurses:num_shifts_worked = 0for d in all_days:for s in all_shifts:num_shifts_worked += shifts[(n, d, s)]model.Add(min_shifts_per_nurse <= num_shifts_worked)model.Add(num_shifts_worked <= max_shifts_per_nurse)
5、目标函数
# pylint: disable=g-complex-comprehension
model.Maximize(sum(shift_requests[n][d][s] * shifts[(n, d, s)]for n in all_nursesfor d in all_daysfor s in all_shifts)
)
这里python用了嵌套的列表推导式,转换成一般写法,更直观:
expr = 0
for n in all_nurses:for d in all_days:for s in all_shifts:expr += shift_requests[n][d][s] * shifts[(n, d, s)]
model.Maximize(expr)
6、调用求解器
solver = cp_model.CpSolver()
status = solver.Solve(model)
solver.Solve(model)
返回的是求解状态(是否得到最优解、可行解等),这里可以从Java语法来看返回值类型,更直观,以上两行代码等价于:
CpSolver solver = new CpSolver();
CpSolverStatus status = solver.solve(model);
7、结果输出
if (status == CpSolverStatus.OPTIMAL || status == CpSolverStatus.FEASIBLE) {System.out.printf("Solution:%n");for (int d : allDays) {System.out.printf("Day %d%n", d);for (int n : allNurses) {for (int s : allShifts) {if (solver.booleanValue(shifts[n][d][s])) {if (shiftRequests[n][d][s] == 1) {System.out.printf(" Nurse %d works shift %d (requested).%n", n, s);} else {System.out.printf(" Nurse %d works shift %d (not requested).%n", n, s);}}}}}System.out.printf("Number of shift requests met = %f (out of %d)%n", solver.objectiveValue(),numNurses * minShiftsPerNurse);
} else {System.out.printf("No optimal solution found !");
}
完整代码
"""Nurse scheduling problem with shift requests."""
from ortools.sat.python import cp_modeldef main():# This program tries to find an optimal assignment of nurses to shifts# (3 shifts per day, for 7 days), subject to some constraints (see below).# Each nurse can request to be assigned to specific shifts.# The optimal assignment maximizes the number of fulfilled shift requests.num_nurses = 5num_shifts = 3num_days = 7all_nurses = range(num_nurses)all_shifts = range(num_shifts)all_days = range(num_days)shift_requests = [[[0, 0, 1], [0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 0, 1]],[[0, 0, 0], [0, 0, 0], [0, 1, 0], [0, 1, 0], [1, 0, 0], [0, 0, 0], [0, 0, 1]],[[0, 1, 0], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 0, 0], [0, 1, 0], [0, 0, 0]],[[0, 0, 1], [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 0, 0]],[[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 0]],]# Creates the model.model = cp_model.CpModel()# Creates shift variables.# shifts[(n, d, s)]: nurse 'n' works shift 's' on day 'd'.shifts = {}for n in all_nurses:for d in all_days:for s in all_shifts:shifts[(n, d, s)] = model.NewBoolVar(f"shift_n{n}_d{d}_s{s}")# Each shift is assigned to exactly one nurse in .for d in all_days:for s in all_shifts:model.AddExactlyOne(shifts[(n, d, s)] for n in all_nurses)# Each nurse works at most one shift per day.for n in all_nurses:for d in all_days:model.AddAtMostOne(shifts[(n, d, s)] for s in all_shifts)# Try to distribute the shifts evenly, so that each nurse works# min_shifts_per_nurse shifts. If this is not possible, because the total# number of shifts is not divisible by the number of nurses, some nurses will# be assigned one more shift.min_shifts_per_nurse = (num_shifts * num_days) // num_nursesif num_shifts * num_days % num_nurses == 0:max_shifts_per_nurse = min_shifts_per_nurseelse:max_shifts_per_nurse = min_shifts_per_nurse + 1for n in all_nurses:num_shifts_worked = 0for d in all_days:for s in all_shifts:num_shifts_worked += shifts[(n, d, s)]model.Add(min_shifts_per_nurse <= num_shifts_worked)model.Add(num_shifts_worked <= max_shifts_per_nurse)# pylint: disable=g-complex-comprehensionmodel.Maximize(sum(shift_requests[n][d][s] * shifts[(n, d, s)]for n in all_nursesfor d in all_daysfor s in all_shifts))# Creates the solver and solve.solver = cp_model.CpSolver()status = solver.Solve(model)if status == cp_model.OPTIMAL:print("Solution:")for d in all_days:print("Day", d)for n in all_nurses:for s in all_shifts:if solver.Value(shifts[(n, d, s)]) == 1:if shift_requests[n][d][s] == 1:print("Nurse", n, "works shift", s, "(requested).")else:print("Nurse", n, "works shift", s, "(not requested).")print()print(f"Number of shift requests met = {solver.ObjectiveValue()}",f"(out of {num_nurses * min_shifts_per_nurse})",)else:print("No optimal solution found !")# Statistics.print("\nStatistics")print(f" - conflicts: {solver.NumConflicts()}")print(f" - branches : {solver.NumBranches()}")print(f" - wall time: {solver.WallTime()}s")if __name__ == "__main__":main()