Python 自动化 Excel 综合实战案例详细教程
前面我们学习了Python操作Excel的各种基础功能,本文通过3个真实工作场景的综合实战案例,把前面所学的知识整合起来,从零开始实现完整的自动化解决方案,让你能够应对工作中绝大多数Excel自动化需求。所有案例都包含完整代码和详细讲解,可以直接复用修改。
一、环境准备
1.1 安装所有依赖
pip install openpyxl pandas numpy requests python-dotenv smtplib email fuzzywuzzy python-Levenshtein
1.2 案例说明
本文包含3个实战案例,覆盖不同工作场景:
1. 销售数据自动化分析系统:从原始订单数据到自动生成分析报表并发送邮件,完整流程
2. 员工薪资自动核算系统:自动计算工资、生成工资条、批量发送工资条邮件
3. 库存预警自动化系统:自动监控库存水平,生成补货清单,通知采购部门
案例一:销售数据自动化分析系统
2.1 需求背景
> 公司每天都会导出销售订单数据Excel,需要每天完成以下工作:
> 1. 清洗原始订单数据,处理空值、异常值、重复订单
> 2. 统计每日、每周、每月的销售业绩
> 3. 分析各区域、各产品、各销售的业绩排名
> 4. 生成可视化分析报表,包含图表和关键指标
> 5. 自动发送报表给销售总监和各区域经理
> 整个过程手动需要2小时,希望用Python实现全自动化,每天早上8点自动执行。
2.2 实现步骤
2.2.1 配置文件
创建`.env`配置文件,保存敏感信息:
# 数据库配置
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=password
DB_NAME=sales
# 邮箱配置
SMTP_SERVER=smtp.qq.com
SMTP_PORT=465
EMAIL_FROM=report@company.com
EMAIL_PASSWORD=email_password
# 收件人列表
REPORT_RECIPIENTS=manager@company.com,sales1@company.com,sales2@company.com
2.2.2 完整代码实现
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dotenv import load_dotenv
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from openpyxl import load_workbook
from openpyxl.chart import BarChart, Reference, LineChart
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
# 加载配置
load_dotenv()
class SalesReportSystem:
def __init__(self):
self.today = datetime.now().date()
self.yesterday = self.today - timedelta(days=1)
self.report_date = self.yesterday.strftime("%Y%m%d")
self.output_folder = "销售报表/"
os.makedirs(self.output_folder, exist_ok=True)
self.output_file = os.path.join(self.output_folder, f"销售日报_{self.report_date}.xlsx")
# 样式定义
self.header_style = {
"font": Font(bold=True, color="FFFFFF"),
"fill": PatternFill(start_color="3b82f6", end_color="3b82f6", fill_type="solid"),
"alignment": Alignment(horizontal="center", vertical="center"),
"border": Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
}
self.data_style = {
"border": Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
),
"alignment": Alignment(horizontal="center", vertical="center")
}
def load_data(self):
"""加载原始数据,可以从Excel、数据库、API等读取"""
print("🔍 正在加载销售数据...")
# 这里模拟从Excel读取,实际可以从数据库查询
# df = pd.read_sql("SELECT * FROM orders WHERE create_time BETWEEN %s AND %s",
# conn, params=(self.yesterday, self.today))
# 模拟数据
df = pd.read_excel("原始订单数据.xlsx")
df["创建时间"] = pd.to_datetime(df["创建时间"])
# 筛选昨天的数据
df = df[(df["创建时间"] >= str(self.yesterday)) & (df["创建时间"] < str(self.today))]
print(f"✅ 加载完成,共{len(df)}条订单数据")
return df
def clean_data(self, df):
"""数据清洗"""
print("🧹 正在清洗数据...")
original_count = len(df)
# 1. 删除重复订单
df = df.drop_duplicates(subset=["订单号"], keep="last")
# 2. 处理空值
df["收货人"] = df["收货人"].fillna("未知")
df["联系电话"] = df["联系电话"].fillna("")
df = df.dropna(subset=["订单号", "商品名称", "金额", "区域"])
# 3. 处理异常值
df = df[df["金额"] > 0] # 金额必须大于0
df = df[df["数量"] > 0] # 数量必须大于0
# 4. 格式标准化
df["联系电话"] = df["联系电话"].astype(str).str.replace(r"\D", "", regex=True)
df["区域"] = df["区域"].str.strip()
cleaned_count = len(df)
print(f"✅ 清洗完成,原始{original_count}条,清洗后{cleaned_count}条,删除{original_count - cleaned_count}条无效数据")
return df
def analyze_data(self, df):
"""数据分析"""
print("📊 正在分析数据...")
results = {}
# 1. 总体统计
results["总览"] = pd.DataFrame({
"指标": ["订单总数", "总销售额", "总销量", "客单价", "支付订单率"],
"数值": [
len(df),
round(df["金额"].sum(), 2),
df["数量"].sum(),
round(df["金额"].mean(), 2),
f"{round(len(df[df['支付状态'] == '已支付'])/len(df)*100, 2)}%"
]
})
# 2. 区域销售统计
results["区域统计"] = df.groupby("区域", as_index=False).agg(
销售额=("金额", "sum"),
订单数=("订单号", "count"),
平均订单金额=("金额", "mean")
).sort_values(by="销售额", ascending=False).round(2)
results["区域统计"]["占比"] = (results["区域统计"]["销售额"] / results["区域统计"]["销售额"].sum() * 100).round(2)
# 3. 产品销售统计
results["产品统计"] = df.groupby("商品名称", as_index=False).agg(
销售额=("金额", "sum"),
销量=("数量", "sum"),
平均单价=("单价", "mean")
).sort_values(by="销售额", ascending=False).round(2)
results["产品统计"]["排名"] = results["产品统计"].index + 1
# 4. 销售业绩排名
results["销售排名"] = df.groupby("销售人员", as_index=False).agg(
销售额=("金额", "sum"),
订单数=("订单号", "count")
).sort_values(by="销售额", ascending=False).round(2)
results["销售排名"]["排名"] = results["销售排名"].index + 1
# 5. 小时销售趋势
df["小时"] = df["创建时间"].dt.hour
results["小时趋势"] = df.groupby("小时", as_index=False).agg(
销售额=("金额", "sum"),
订单数=("订单号", "count")
).sort_values(by="小时")
print("✅ 数据分析完成")
return results
def generate_excel_report(self, results):
"""生成Excel报表"""
print("📑 正在生成Excel报表...")
with pd.ExcelWriter(self.output_file, engine="openpyxl") as writer:
for sheet_name, df in results.items():
df.to_excel(writer, sheet_name=sheet_name, index=False)
# 添加格式和图表
wb = load_workbook(self.output_file)
# 格式化总览表
ws = wb["总览"]
for cell in ws[1]:
self._apply_style(cell, self.header_style)
for row in ws.iter_rows(min_row=2):
for cell in row:
self._apply_style(cell, self.data_style)
ws.column_dimensions["A"].width = 20
ws.column_dimensions["B"].width = 20
# 区域统计表添加图表
ws = wb["区域统计"]
for cell in ws[1]:
self._apply_style(cell, self.header_style)
for row in ws.iter_rows(min_row=2):
for cell in row:
self._apply_style(cell, self.data_style)
# 添加柱状图
chart = BarChart()
chart.title = "各区域销售额对比"
chart.y_axis.title = "销售额"
chart.x_axis.title = "区域"
data = Reference(ws, min_col=2, min_row=1, max_row=ws.max_row)
categories = Reference(ws, min_col=1, min_row=2, max_row=ws.max_row)
chart.add_data(data, titles_from_data=True)
chart.set_categories(categories)
chart.width = 15
chart.height = 8
ws.add_chart(chart, "F2")
# 小时趋势添加折线图
ws = wb["小时趋势"]
chart = LineChart()
chart.title = "24小时销售趋势"
chart.y_axis.title = "销售额"
chart.x_axis.title = "小时"
data = Reference(ws, min_col=2, min_row=1, max_row=ws.max_row)
categories = Reference(ws, min_col=1, min_row=2, max_row=ws.max_row)
chart.add_data(data, titles_from_data=True)
chart.set_categories(categories)
ws.add_chart(chart, "D2")
wb.save(self.output_file)
print(f"✅ Excel报表生成完成:{self.output_file}")
def send_report_email(self):
"""发送报表邮件"""
print("📧 正在发送邮件...")
smtp_server = os.getenv("SMTP_SERVER")
smtp_port = int(os.getenv("SMTP_PORT"))
email_from = os.getenv("EMAIL_FROM")
email_password = os.getenv("EMAIL_PASSWORD")
recipients = os.getenv("REPORT_RECIPIENTS").split(",")
# 邮件内容
msg = MIMEMultipart()
msg["From"] = email_from
msg["To"] = ", ".join(recipients)
msg["Subject"] = f"【销售日报】{self.yesterday.strftime('%Y年%m月%d日')}销售报表"
# 邮件正文
body = f"""
各位领导好,附件是{self.yesterday.strftime('%Y年%m月%d日')}的销售报表,请查收。
📊 核心指标:
- 订单总数:{self.daily_stats['订单总数']}
- 总销售额:{self.daily_stats['总销售额']}元
- 客单价:{self.daily_stats['客单价']}元
- 销售冠军:{self.daily_stats['销售冠军']}
如有疑问请联系数据部。
"""
msg.attach(MIMEText(body, "html", "utf-8"))
# 添加附件
filename = os.path.basename(self.output_file)
with open(self.output_file, "rb") as f:
part = MIMEApplication(f.read(), Name=filename)
part["Content-Disposition"] = f'attachment; filename="{filename}"'
msg.attach(part)
# 发送邮件
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
server.login(email_from, email_password)
server.sendmail(email_from, recipients, msg.as_string())
server.quit()
print(f"✅ 邮件发送成功,收件人:{recipients}")
def _apply_style(self, cell, style):
"""应用样式"""
if "font" in style:
cell.font = style["font"]
if "fill" in style:
cell.fill = style["fill"]
if "alignment" in style:
cell.alignment = style["alignment"]
if "border" in style:
cell.border = style["border"]
def run(self):
"""运行整个流程"""
try:
print(f"🚀 开始生成{self.yesterday}销售报表...")
# 1. 加载数据
df = self.load_data()
# 2. 清洗数据
df_clean = self.clean_data(df)
# 3. 数据分析
results = self.analyze_data(df_clean)
# 保存核心指标用于邮件
self.daily_stats = {
"订单总数": results["总览"].iloc[0]["数值"],
"总销售额": results["总览"].iloc[1]["数值"],
"客单价": results["总览"].iloc[3]["数值"],
"销售冠军": results["销售排名"].iloc[0]["销售人员"]
}
# 4. 生成报表
self.generate_excel_report(results)
# 5. 发送邮件
self.send_report_email()
print("🎉 报表生成发送全部完成!")
return True
except Exception as e:
print(f"❌ 运行失败:{str(e)}")
# 发送告警邮件
self.send_alert_email(str(e))
return False
def send_alert_email(self, error_msg):
"""发送失败告警邮件"""
# 告警逻辑省略
pass
if __name__ == "__main__":
system = SalesReportSystem()
system.run()
2.3 定时运行
配置Linux定时任务,每天早上8点自动执行:
0 8 * * * cd /path/to/project && /usr/bin/python3 sales_report.py >> /var/log/sales_report.log 2>&1
案例二:员工薪资自动核算系统
3.1 需求背景
> 公司每月10号发工资,人事需要核算全公司员工工资:
> 1. 基础工资+绩效+补贴+奖金-考勤扣款-社保公积金-个税
> 2. 生成每个员工的工资条Excel
> 3. 批量给员工发送工资条邮件
> 手动核算需要3天时间,且容易出错,希望实现自动化。
3.2 完整实现
import pandas as pd
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from openpyxl import Workbook
from datetime import datetime
class SalarySystem:
def __init__(self, month):
self.month = month # 格式:2026-03
self.output_folder = f"工资条_{self.month}/"
os.makedirs(self.output_folder, exist_ok=True)
# 配置
self.smtp_server = "smtp.qq.com"
self.smtp_port = 465
self.email_from = "hr@company.com"
self.email_password = "your_password"
# 个税税率表
self.tax_rates = [
(0, 36000, 0.03, 0),
(36000, 144000, 0.1, 2520),
(144000, 300000, 0.2, 16920),
(300000, 420000, 0.25, 31920),
(420000, 660000, 0.3, 52920),
(660000, 960000, 0.35, 85920),
(960000, float('inf'), 0.45, 181920)
]
def load_data(self):
"""加载各项数据"""
print("🔍 加载薪资数据...")
# 员工基础信息
self.employee_df = pd.read_excel("员工基础信息.xlsx")
# 考勤数据
self.attendance_df = pd.read_excel(f"{self.month}考勤数据.xlsx")
# 绩效数据
self.performance_df = pd.read_excel(f"{self.month}绩效数据.xlsx")
# 社保公积金基数
self.social_security_df = pd.read_excel("社保公积金基数.xlsx")
# 累计个税数据
self.cumulative_tax_df = pd.read_excel("累计个税数据.xlsx")
print(f"✅ 加载完成,共{len(self.employee_df)}名员工")
def calculate_salary(self):
"""计算工资"""
print("🧮 正在核算工资...")
salary_list = []
for _, emp in self.employee_df.iterrows():
emp_id = emp["员工ID"]
name = emp["姓名"]
# 基础工资
base_salary = emp["基础工资"]
# 绩效工资
performance = self.performance_df[self.performance_df["员工ID"] == emp_id]["绩效系数"].iloc[0]
performance_bonus = base_salary * performance
# 补贴
subsidy = emp["交通补贴"] + emp["餐饮补贴"] + emp["住房补贴"]
# 考勤扣款
attendance = self.attendance_df[self.attendance_df["员工ID"] == emp_id]
late_days = attendance["迟到天数"].iloc[0]
absent_days = attendance["旷工天数"].iloc[0]
attendance_deduct = (late_days * 50) + (absent_days * base_salary / 21.75)
# 社保公积金个人部分
ss = self.social_security_df[self.social_security_df["员工ID"] == emp_id]
social_security = ss["养老保险个人"].iloc[0] + ss["医疗保险个人"].iloc[0] + ss["失业保险个人"].iloc[0]
housing_fund = ss["公积金个人"].iloc[0]
# 应纳税所得额
pre_tax_salary = base_salary + performance_bonus + subsidy - attendance_deduct - social_security - housing_fund
# 个税计算(累计预扣法)
cumulative = self.cumulative_tax_df[self.cumulative_tax_df["员工ID"] == emp_id].iloc[0]
cumulative_income = cumulative["累计收入"] + pre_tax_salary
cumulative_deduction = cumulative["累计减除费用"] + 5000 # 每月5000起征点
taxable_income = cumulative_income - cumulative_deduction
# 计算个税
tax = 0
for lower, upper, rate, deduction in self.tax_rates:
if lower < taxable_income <= upper:
tax = taxable_income * rate - deduction - cumulative["累计已缴个税"]
break
tax = max(0, round(tax, 2))
# 实发工资
actual_salary = round(pre_tax_salary - tax, 2)
salary_list.append({
"员工ID": emp_id,
"姓名": name,
"部门": emp["部门"],
"基础工资": base_salary,
"绩效奖金": round(performance_bonus, 2),
"补贴合计": round(subsidy, 2),
"考勤扣款": round(attendance_deduct, 2),
"社会保险": round(social_security, 2),
"住房公积金": round(housing_fund, 2),
"应发工资": round(pre_tax_salary, 2),
"个人所得税": tax,
"实发工资": actual_salary,
"邮箱": emp["邮箱"]
})
self.salary_df = pd.DataFrame(salary_list)
# 保存工资总表
self.salary_df.to_excel(os.path.join(self.output_folder, f"{self.month}工资总表.xlsx"), index=False)
print(f"✅ 工资核算完成,共{len(self.salary_df)}名员工")
return self.salary_df
def generate_individual_salary_slip(self):
"""生成个人工资条"""
print("📄 正在生成个人工资条...")
for _, emp in self.salary_df.iterrows():
wb = Workbook()
ws = wb.active
ws.title = "工资条"
# 标题
ws.merge_cells("A1:L1")
ws["A1"] = f"{self.month}月工资条 - {emp['姓名']}"
ws["A1"].font = Font(size=14, bold=True)
ws["A1"].alignment = Alignment(horizontal="center")
# 表头
headers = ["员工ID", "姓名", "部门", "基础工资", "绩效奖金", "补贴合计",
"考勤扣款", "社会保险", "住房公积金", "应发工资", "个人所得税", "实发工资"]
for col, header in enumerate(headers, start=1):
cell = ws.cell(row=3, column=col, value=header)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="3b82f6", end_color="3b82f6", fill_type="solid")
cell.font = Font(color="FFFFFF")
cell.alignment = Alignment(horizontal="center")
# 数据
data = [emp["员工ID"], emp["姓名"], emp["部门"], emp["基础工资"], emp["绩效奖金"],
emp["补贴合计"], emp["考勤扣款"], emp["社会保险"], emp["住房公积金"],
emp["应发工资"], emp["个人所得税"], emp["实发工资"]]
for col, value in enumerate(data, start=1):
cell = ws.cell(row=4, column=col, value=value)
cell.alignment = Alignment(horizontal="center")
# 设置列宽
for col in range(1, 13):
ws.column_dimensions[chr(64 + col)].width = 15
# 保存
filename = os.path.join(self.output_folder, f"工资条_{emp['姓名']}.xlsx")
wb.save(filename)
emp["工资条路径"] = filename
print(f"✅ 个人工资条生成完成,共{len(self.salary_df)}份")
def send_salary_email(self):
"""批量发送工资条邮件"""
print("📧 正在发送工资条邮件...")
server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
server.login(self.email_from, self.email_password)
success_count = 0
fail_count = 0
for _, emp in self.salary_df.iterrows():
try:
msg = MIMEMultipart()
msg["From"] = self.email_from
msg["To"] = emp["邮箱"]
msg["Subject"] = f"【工资条】{self.month}月工资发放通知"
body = f"""
{emp['姓名']}您好,{self.month}月工资已发放,请查收附件中的工资条。
实发工资:{emp['实发工资']}元
如有疑问请联系人事部。
"""
msg.attach(MIMEText(body, "html", "utf-8"))
# 添加附件
filename = os.path.basename(emp["工资条路径"])
with open(emp["工资条路径"], "rb") as f:
part = MIMEApplication(f.read(), Name=filename)
part["Content-Disposition"] = f'attachment; filename="{filename}"'
msg.attach(part)
server.sendmail(self.email_from, emp["邮箱"], msg.as_string())
success_count += 1
print(f"✅ 已发送:{emp['姓名']} - {emp['邮箱']}")
except Exception as e:
fail_count += 1
print(f"❌ 发送失败:{emp['姓名']} - {str(e)}")
server.quit()
print(f"\n✅ 邮件发送完成,成功{success_count}封,失败{fail_count}封")
def run(self):
try:
print(f"🚀 开始核算{self.month}月工资...")
self.load_data()
self.calculate_salary()
self.generate_individual_salary_slip()
self.send_salary_email()
print("🎉 工资核算发放全部完成!")
return True
except Exception as e:
print(f"❌ 运行失败:{str(e)}")
return False
if __name__ == "__main__":
salary_system = SalarySystem("2026-03")
salary_system.run()
案例三:库存预警自动化系统
4.1 需求背景
> 仓库需要每天监控库存水平:
> 1. 每日导出库存数据,计算每个商品的库存周转天数
> 2. 低于安全库存的商品生成补货清单
> 3. 超过滞销天数的商品生成滞销预警
> 4. 自动发送补货清单给采购部门,滞销清单给运营部门
> 原来每天人工统计需要2小时,希望实现自动化。
4.2 完整实现
import pandas as pd
import os
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime, timedelta
class InventoryAlertSystem:
def __init__(self):
self.today = datetime.now().date()
self.output_folder = "库存预警/"
os.makedirs(self.output_folder, exist_ok=True)
# 配置
self.safety_stock_days = 30 # 安全库存天数
self.stagnant_days = 90 # 滞销天数(90天没卖出算滞销)
self.purchase_email = "purchase@company.com"
self.operation_email = "operation@company.com"
def load_data(self):
"""加载数据"""
print("🔍 加载库存数据...")
# 当前库存
self.inventory_df = pd.read_excel("当前库存.xlsx")
# 近90天销售数据
end_date = self.today
start_date = end_date - timedelta(days=90)
self.sales_df = pd.read_excel("销售数据.xlsx")
self.sales_df["日期"] = pd.to_datetime(self.sales_df["日期"])
self.sales_df = self.sales_df[(self.sales_df["日期"] >= str(start_date)) & (self.sales_df["日期"] < str(end_date))]
print(f"✅ 加载完成,共{len(self.inventory_df)}个SKU,近90天销售数据{len(self.sales_df)}条")
def calculate_inventory_metrics(self):
"""计算库存指标"""
print("📊 计算库存指标...")
# 计算近90天每个SKU的销量
sales_90d = self.sales_df.groupby("SKU", as_index=False).agg(
销量_90天=("数量", "sum")
)
# 合并库存数据
inventory_metrics = pd.merge(
self.inventory_df,
sales_90d,
on="SKU",
how="left"
)
# 填充没有销量的SKU
inventory_metrics["销量_90天"] = inventory_metrics["销量_90天"].fillna(0)
# 计算日均销量
inventory_metrics["日均销量"] = inventory_metrics["销量_90天"] / 90
# 计算库存可售天数
inventory_metrics["可售天数"] = inventory_metrics.apply(
lambda x: round(x["当前库存"] / x["日均销量"], 1) if x["日均销量"] > 0 else 999,
axis=1
)
# 标记需要补货的商品
inventory_metrics["是否需要补货"] = inventory_metrics["可售天数"] < self.safety_stock_days
# 标记滞销商品(近90天销量为0且库存>0)
inventory_metrics["是否滞销"] = (inventory_metrics["销量_90天"] == 0) & (inventory_metrics["当前库存"] > 0)
self.inventory_metrics = inventory_metrics
return inventory_metrics
def generate_alert_list(self):
"""生成预警清单"""
print("📑 生成预警清单...")
# 补货清单
self.purchase_list = self.inventory_metrics[self.inventory_metrics["是否需要补货"]].sort_values(by="可售天数")
self.purchase_list = self.purchase_list[["SKU", "商品名称", "当前库存", "销量_90天", "日均销量", "可售天数", "建议补货量"]]
# 滞销清单
self.stagnant_list = self.inventory_metrics[self.inventory_metrics["是否滞销"]].sort_values(by="当前库存", ascending=False)
self.stagnant_list = self.stagnant_list[["SKU", "商品名称", "当前库存", "成本价", "库存金额"]]
# 保存清单
self.purchase_list.to_excel(os.path.join(self.output_folder, f"补货清单_{self.today}.xlsx"), index=False)
self.stagnant_list.to_excel(os.path.join(self.output_folder, f"滞销清单_{self.today}.xlsx"), index=False)
print(f"✅ 补货清单:{len(self.purchase_list)}个SKU需要补货")
print(f"✅ 滞销清单:{len(self.stagnant_list)}个SKU滞销")
def send_alert_emails(self):
"""发送预警邮件"""
print("📧 发送预警邮件...")
# 发送补货邮件给采购
if len(self.purchase_list) > 0:
self.send_email(
to_email=self.purchase_email,
subject=f"【库存预警】{self.today}补货清单",
body=f"您好,附件是{self.today}的补货清单,共{len(self.purchase_list)}个SKU需要补货,请及时处理。
",
attachment_path=os.path.join(self.output_folder, f"补货清单_{self.today}.xlsx")
)
else:
print("ℹ️ 今日无商品需要补货")
# 发送滞销邮件给运营
if len(self.stagnant_list) > 0:
self.send_email(
to_email=self.operation_email,
subject=f"【库存预警】{self.today}滞销商品清单",
body=f"您好,附件是{self.today}的滞销商品清单,共{len(self.stagnant_list)}个SKU滞销,请及时处理。
",
attachment_path=os.path.join(self.output_folder, f"滞销清单_{self.today}.xlsx")
)
else:
print("ℹ️ 今日无滞销商品")
def send_email(self, to_email, subject, body, attachment_path):
"""发送邮件通用方法"""
smtp_server = "smtp.qq.com"
smtp_port = 465
email_from = "inventory@company.com"
email_password = "your_password"
msg = MIMEMultipart()
msg["From"] = email_from
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "html", "utf-8"))
# 添加附件
filename = os.path.basename(attachment_path)
with open(attachment_path, "rb") as f:
part = MIMEApplication(f.read(), Name=filename)
part["Content-Disposition"] = f'attachment; filename="{filename}"'
msg.attach(part)
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
server.login(email_from, email_password)
server.sendmail(email_from, to_email, msg.as_string())
server.quit()
print(f"✅ 邮件已发送到:{to_email}")
def run(self):
try:
print("🚀 开始库存预警检查...")
self.load_data()
self.calculate_inventory_metrics()
self.generate_alert_list()
self.send_alert_emails()
print("🎉 库存预警检查完成!")
return True
except Exception as e:
print(f"❌ 运行失败:{str(e)}")
return False
if __name__ == "__main__":
system = InventoryAlertSystem()
system.run()
五、最佳实践总结
5.1 系统设计原则
1. 模块化设计:把不同