Python 自动化 Excel 数据清洗(空值、异常值处理)


Python 自动化 Excel 数据清洗(空值、异常值处理)

Python 自动化 Excel 数据清洗(空值、异常值处理)详细教程

在实际工作中,我们拿到的原始Excel数据往往并不干净,包含空值、异常值、重复数据、格式混乱、不一致等问题,直接使用会导致分析结果错误。数据清洗是数据分析前必不可少的步骤,Python可以自动化完成各种数据清洗工作,效率和准确率都远超手动处理。本文将详细介绍各种数据清洗场景的实现方法,包含完整代码示例和最佳实践。


一、环境准备

1.1 安装依赖库

bash
pip install openpyxl pandas numpy fuzzywuzzy python-Levenshtein
  • fuzzywuzzy:用于模糊匹配,处理不一致的文本数据
  • python-Levenshtein:模糊匹配的C扩展,提升速度
  • 1.2 常见数据质量问题

    原始数据通常存在以下问题:

  • **缺失值**:单元格为空、NA、N/A、null等
  • **异常值**:数值明显超出合理范围,比如年龄200岁、销售额负数
  • **重复数据**:完全重复或部分字段重复的记录
  • **格式不一致**:日期格式混乱、数值带单位、同一含义有不同写法(”中国”、”中”、”CN”)
  • **数据类型错误**:数字存为字符串、日期格式不对、数值带单位
  • **特殊字符**:空格、换行符、不可见字符、乱码
  • **逻辑错误**:比如结束时间早于开始时间、年龄和工龄矛盾

  • 二、缺失值处理

    2.1 查看缺失值情况

    首先了解数据的缺失情况:

    python
    import pandas as pd
    import numpy as np
    
    # 读取Excel
    df = pd.read_excel("原始数据.xlsx")
    
    # 查看每列的缺失值数量
    print("各列缺失值数量:")
    print(df.isnull().sum())
    
    # 查看缺失值比例
    print("\n各列缺失值比例(%):")
    print((df.isnull().sum() / len(df) * 100).round(2))
    
    # 查看包含缺失值的行
    missing_rows = df[df.isnull().any(axis=1)]
    print(f"\n包含缺失值的行数:{len(missing_rows)}")
    print("缺失值示例:")
    print(missing_rows.head())
    

    2.2 删除缺失值

    缺失值不多或者不重要时可以直接删除:

    python
    # 删除包含任意缺失值的行
    df_drop_na = df.dropna(axis=0, how="any")
    print(f"删除任意缺失值后剩余行数:{len(df_drop_na)}")
    
    # 删除所有列都为空的行
    df_drop_all_na = df.dropna(axis=0, how="all")
    print(f"删除全空行后剩余行数:{len(df_drop_all_na)}")
    
    # 删除指定列包含缺失值的行
    df_drop_subset = df.dropna(subset=["姓名", "手机号", "身份证号"])
    print(f"删除关键信息缺失的行后剩余行数:{len(df_drop_subset)}")
    
    # 删除缺失值超过30%的列
    threshold = len(df) * 0.7  # 非缺失值至少占70%
    df_drop_col = df.dropna(axis=1, thresh=threshold)
    print(f"删除缺失过多的列后剩余列数:{len(df_drop_col.columns)}")
    

    2.3 填充缺失值

    2.3.1 固定值填充

    python
    # 所有缺失值填充为"未知"
    df_fill_unknown = df.fillna("未知")
    
    # 不同列填充不同值
    fill_values = {
        "性别": "未知",
        "备注": "无",
        "状态": "正常",
        "销售额": 0,
        "年龄": df["年龄"].mean()  # 用平均值填充
    }
    df_fill_custom = df.fillna(value=fill_values)
    

    2.3.2 统计值填充

    python
    # 数值列用平均值填充
    df["年龄"] = df["年龄"].fillna(df["年龄"].mean().round(0))
    df["工资"] = df["工资"].fillna(df["工资"].median())  # 中位数更稳健
    
    # 用众数填充分类列
    df["部门"] = df["部门"].fillna(df["部门"].mode()[0])  # mode返回Series,取第一个值
    
    # 同组内统计值填充:按部门填充平均工资
    df["工资"] = df.groupby("部门")["工资"].transform(
        lambda x: x.fillna(x.mean())
    )
    

    2.3.3 前后值填充

    适合时间序列数据:

    python
    # 用前一个有效值填充(向前填充)
    df_ffill = df.fillna(method="ffill")
    
    # 用后一个有效值填充(向后填充)
    df_bfill = df.fillna(method="bfill")
    
    # 限制填充次数,最多填充连续2个空值
    df_limit_fill = df.fillna(method="ffill", limit=2)
    

    2.3.4 插值填充

    适合数值型连续数据:

    python
    # 线性插值
    df["数值列"] = df["数值列"].interpolate(method="linear")
    
    # 时间序列插值
    df["日期"] = pd.to_datetime(df["日期"])
    df = df.set_index("日期")
    df["数值列"] = df["数值列"].interpolate(method="time")
    

    2.4 空值特殊处理

    python
    # 识别不同形式的空值
    custom_na_values = ["", "NA", "N/A", "null", "NULL", "nan", "Nan", "NaN", "无", "未知", "请填写"]
    df = pd.read_excel("原始数据.xlsx", na_values=custom_na_values)
    
    # 把空格和空字符串转换成NaN
    df = df.replace(r"^\s*$", np.nan, regex=True)
    
    # 标记缺失值,新增一列标记是否缺失
    df["销售额_缺失"] = df["销售额"].isnull()
    

    三、异常值处理

    3.1 识别异常值

    3.1.1 数值型异常值

    python
    # 查看数值列的统计信息,检查是否有异常
    print("数值列统计信息:")
    print(df.describe())
    
    # 筛选异常值:年龄小于0或大于100
    age_outliers = df[(df["年龄"] < 0) | (df["年龄"] > 100)]
    print(f"年龄异常值数量:{len(age_outliers)}")
    
    # 筛选销售额为负数的异常值
    negative_sales = df[df["销售额"] < 0]
    print(f"负销售额数量:{len(negative_sales)}")
    

    3.1.2 3σ原则识别异常值

    数值在平均值±3倍标准差之外认为是异常值:

    python
    def remove_outliers_3sigma(df, column):
        mean = df[column].mean()
        std = df[column].std()
        lower = mean - 3 * std
        upper = mean + 3 * std
        return df[(df[column] >= lower) & (df[column] <= upper)]
    
    # 处理销售额异常值
    df_clean = remove_outliers_3sigma(df, "销售额")
    print(f"移除销售额异常值后剩余:{len(df_clean)}行")
    

    3.1.3 四分位数法(IQR)识别异常值

    python
    def remove_outliers_iqr(df, column):
        q1 = df[column].quantile(0.25)
        q3 = df[column].quantile(0.75)
        iqr = q3 - q1
        lower = q1 - 1.5 * iqr
        upper = q3 + 1.5 * iqr
        return df[(df[column] >= lower) & (df[column] <= upper)]
    
    # 处理工资异常值
    df_clean = remove_outliers_iqr(df, "工资")
    print(f"移除工资异常值后剩余:{len(df_clean)}行")
    

    3.1.4 箱线图可视化识别

    python
    import matplotlib.pyplot as plt
    plt.boxplot(df["销售额"].dropna())
    plt.title("销售额箱线图")
    plt.savefig("销售额箱线图.png")
    plt.close()
    

    3.2 异常值处理方法

    3.2.1 删除异常值

    异常值少或者确认是错误数据时直接删除:

    python
    # 删除年龄异常的行
    df = df[(df["年龄"] >= 0) & (df["年龄"] <= 100)]
    # 删除负销售额
    df = df[df["销售额"] >= 0]
    

    3.2.2 修正异常值

    根据业务逻辑修正:

    python
    # 销售额为负的取绝对值
    df["销售额"] = df["销售额"].abs()
    
    # 年龄大于100的设置为平均值
    mean_age = df["年龄"].mean().round(0)
    df.loc[df["年龄"] > 100, "年龄"] = mean_age
    df.loc[df["年龄"] < 0, "年龄"] = mean_age
    
    # 超过1000的数量设置为最大值1000
    df["数量"] = df["数量"].clip(upper=1000)
    

    3.2.3 标记为特殊值

    不想删除也不确定怎么修改时,可以标记为异常值单独处理:

    python
    # 新增异常标记列
    df["是否异常"] = "正常"
    df.loc[(df["年龄"] < 0) | (df["年龄"] > 100), "是否异常"] = "年龄异常"
    df.loc[df["销售额"] < 0, "是否异常"] = "销售额异常"
    
    # 单独保存异常数据
    abnormal_data = df[df["是否异常"] != "正常"]
    abnormal_data.to_excel("异常数据.xlsx", index=False)
    print(f"已保存{len(abnormal_data)}条异常数据,待人工核实")
    
    # 保留正常数据
    df_normal = df[df["是否异常"] == "正常"]
    

    四、重复数据处理

    4.1 识别重复数据

    python
    # 查看完全重复的行数
    print(f"完全重复行数:{df.duplicated().sum()}")
    
    # 查看按关键字段重复的行数
    print(f"手机号重复行数:{df.duplicated(subset=['手机号']).sum()}")
    print(f"身份证号重复行数:{df.duplicated(subset=['身份证号']).sum()}")
    
    # 查看所有重复记录(包括所有重复项)
    duplicates = df[df.duplicated(subset=['手机号'], keep=False)]
    print("重复记录示例:")
    print(duplicates.sort_values(by='手机号')[['姓名', '手机号']].head())
    

    4.2 重复数据处理

    python
    # 删除完全重复的行,保留第一个
    df_dedup = df.drop_duplicates(keep='first')
    
    # 按手机号去重,保留最新的一条
    df['创建时间'] = pd.to_datetime(df['创建时间'])
    df_dedup = df.sort_values('创建时间', ascending=False).drop_duplicates(subset=['手机号'], keep='first')
    
    # 重复数据合并(重复的订单金额求和)
    df_merged = df.groupby('订单号', as_index=False).agg({
        '销售额': 'sum',
        '创建时间': 'first',
        '产品名称': 'first'
    })
    

    五、格式与一致性处理

    5.1 日期格式处理

    python
    # 自动识别日期格式
    df["日期"] = pd.to_datetime(df["日期"], errors="coerce")
    
    # 处理多种格式的日期
    def parse_date(x):
        for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y年%m月%d日", "%y-%m-%d", "%m/%d/%Y"]:
            try:
                return pd.datetime.strptime(str(x).strip(), fmt)
            except:
                pass
        return pd.NaT  # 解析失败返回空
    
    df["日期"] = df["日期"].apply(parse_date)
    
    # 统一格式化为YYYY-MM-DD
    df["日期_格式化"] = df["日期"].dt.strftime("%Y-%m-%d")
    

    5.2 数值格式处理

    python
    # 清理数值中的单位、逗号、人民币符号等
    def clean_number(x):
        if pd.isna(x):
            return np.nan
        x = str(x).strip()
        # 移除特殊字符
        x = x.replace("¥", "").replace("$", "").replace(",", "").replace("元", "").replace(" ", "")
        try:
            return float(x)
        except:
            return np.nan
    
    df["销售额"] = df["销售额"].apply(clean_number)
    df["工资"] = df["工资"].apply(clean_number)
    

    5.3 字符串规范化

    python
    # 去除首尾空格和特殊字符
    df["姓名"] = df["姓名"].str.strip()
    df["手机号"] = df["手机号"].str.replace(r"\D", "", regex=True)  # 只保留数字
    
    # 统一大小写
    df["邮箱"] = df["邮箱"].str.lower()
    
    # 统一名称:比如"中国"、"中"、"CN"都统一为"中国"
    country_mapping = {
        "中国": "中国",
        "中": "中国",
        "CN": "中国",
        "China": "中国",
        "中华人民共和国": "中国",
        "美国": "美国",
        "美": "美国",
        "US": "美国",
        "USA": "美国"
    }
    df["国家"] = df["国家"].map(country_mapping).fillna(df["国家"])
    

    5.4 模糊匹配统一名称

    对于写法不一致但含义相同的内容,用模糊匹配统一:

    python
    from fuzzywuzzy import process
    
    # 标准部门列表
    standard_departments = ["技术部", "产品部", "销售部", "人力资源部", "财务部", "市场部"]
    
    # 模糊匹配,相似度大于80的替换为标准名称
    def clean_department(dept_name):
        if pd.isna(dept_name):
            return np.nan
        match = process.extractOne(str(dept_name), standard_departments, score_cutoff=80)
        return match[0] if match else dept_name
    
    df["部门"] = df["部门"].apply(clean_department)
    print("部门规范化后的值:")
    print(df["部门"].unique())
    

    5.5 统一枚举值

    python
    # 性别统一
    gender_mapping = {
        "男": "男",
        "m": "男",
        "M": "男",
        "先生": "男",
        "男性": "男",
        "1": "男",
        "女": "女",
        "f": "女",
        "F": "女",
        "女士": "女",
        "女性": "女",
        "2": "女"
    }
    df["性别"] = df["性别"].map(gender_mapping).fillna("未知")
    
    # 是/否统一
    bool_mapping = {
        "是": "是",
        "Y": "是",
        "y": "是",
        "1": "是",
        "YES": "是",
        "true": "是",
        "否": "否",
        "N": "否",
        "n": "否",
        "0": "否",
        "NO": "否",
        "false": "否"
    }
    df["是否在职"] = df["是否在职"].map(bool_mapping).fillna("未知")
    

    六、特殊字符与乱码处理

    6.1 去除特殊字符

    python
    # 去除不可见字符
    def remove_invisible_chars(x):
        if pd.isna(x) or not isinstance(x, str):
            return x
        # 去除控制字符
        x = ''.join(c for c in x if ord(c) >= 32 or c == '\n' or c == '\t')
        # 去除特殊符号
        x = x.replace("\u200b", "").replace("\ufeff", "").replace("\xa0", " ")
        return x.strip()
    
    df["文本列"] = df["文本列"].apply(remove_invisible_chars)
    

    6.2 处理全角半角字符

    python
    def strQ2B(ustring):
        """全角转半角"""
        rstring = ""
        for uchar in ustring:
            inside_code = ord(uchar)
            if inside_code == 12288:  # 全角空格
                inside_code = 32
            elif 65281 <= inside_code <= 65374:  # 全角字符范围
                inside_code -= 65248
            rstring += chr(inside_code)
        return rstring
    
    df["文本列"] = df["文本列"].apply(lambda x: strQ2B(x) if isinstance(x, str) else x)
    

    6.3 中文乱码处理

    python
    # 读取时指定编码
    df = pd.read_excel("乱码文件.xlsx", encoding="utf-8")
    # 尝试其他编码
    df = pd.read_excel("乱码文件.xlsx", encoding="gbk")
    df = pd.read_excel("乱码文件.xlsx", encoding="gb18030")
    

    七、逻辑错误校验

    7.1 日期逻辑校验

    python
    # 结束时间不能早于开始时间
    df["开始时间"] = pd.to_datetime(df["开始时间"])
    df["结束时间"] = pd.to_datetime(df["结束时间"])
    invalid_date_rows = df[df["结束时间"] < df["开始时间"]]
    print(f"日期逻辑错误行数:{len(invalid_date_rows)}")
    
    # 入职日期不能晚于当前日期
    current_date = pd.Timestamp.now()
    invalid_hire_date = df[df["入职日期"] > current_date]
    print(f"未来入职日期行数:{len(invalid_hire_date)}")
    

    7.2 数值逻辑校验

    python
    # 金额不能为负
    invalid_amount = df[df["金额"] < 0]
    print(f"负金額行数:{len(invalid_amount)}")
    
    # 总和应该等于分项之和
    invalid_total = df[df["总金额"] != df["分项1"] + df["分项2"] + df["分项3"]]
    print(f"金额总和不符行数:{len(invalid_total)}")
    
    # 年龄应该大于工龄
    invalid_work_age = df[df["年龄"] <= df["工龄"]]
    print(f"年龄工龄矛盾行数:{len(invalid_work_age)}")
    

    7.3 正则校验格式

    python
    # 手机号校验
    import re
    def is_valid_phone(phone):
        if pd.isna(phone):
            return False
        phone = str(phone).strip()
        return re.match(r"^1[3-9]\d{9}$", phone) is not None
    
    df["手机号格式正确"] = df["手机号"].apply(is_valid_phone)
    invalid_phones = df[~df["手机号格式正确"]]
    print(f"无效手机号数量:{len(invalid_phones)}")
    
    # 身份证号校验
    def is_valid_idcard(idcard):
        if pd.isna(idcard):
            return False
        idcard = str(idcard).strip()
        return re.match(r"(^\d{18}$)|(^\d{17}(\d|X|x)$)", idcard) is not None
    
    df["身份证格式正确"] = df["身份证号"].apply(is_valid_idcard)
    
    # 邮箱校验
    def is_valid_email(email):
        if pd.isna(email):
            return False
        email = str(email).strip()
        return re.match(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", email) is not None
    

    八、批量清洗实战案例

    8.1 清洗流程

    python
    import pandas as pd
    import os
    
    def clean_excel(input_path, output_path):
        """清洗单个Excel文件"""
        print(f"正在处理:{input_path}")
        
        # 1. 读取数据,识别自定义空值
        na_values = ["", "NA", "N/A", "null", "无", "未知", "请填写"]
        df = pd.read_excel(input_path, na_values=na_values)
        
        # 2. 删除空行空列
        df = df.dropna(axis=0, how="all")  # 删除全空行
        df = df.dropna(axis=1, thresh=len(df)*0.5)  # 删除一半以上为空的列
        
        # 3. 去除重复
        df = df.drop_duplicates(subset=["手机号"], keep="last")
        
        # 4. 格式清洗
        # 手机号清理
        df["手机号"] = df["手机号"].astype(str).str.replace(r"\D", "", regex=True)
        # 日期清洗
        df["日期"] = pd.to_datetime(df["日期"], errors="coerce")
        # 金额清理
        df["金额"] = df["金额"].apply(lambda x: float(str(x).replace("¥", "").replace(",", "")) if pd.notna(x) else x)
        
        # 5. 异常值处理
        # 年龄0-100
        df = df[(df["年龄"].isna()) | ((df["年龄"] >= 0) & (df["年龄"] <= 100))]
        # 金额非负
        df = df[(df["金额"].isna()) | (df["金额"] >= 0)]
        
        # 6. 统一枚举值
        gender_mapping = {"男": "男", "m": "男", "女": "女", "f": "女"}
        df["性别"] = df["性别"].map(gender_mapping).fillna("未知")
        
        # 7. 保存清洗后的数据
        df.to_excel(output_path, index=False)
        print(f"✅ 清洗完成,保存到:{output_path},原始行数:{len(df)},清洗后行数:{len(df)}")
        return len(df)
    
    # 批量处理文件夹
    input_folder = "待清洗数据/"
    output_folder = "清洗后数据/"
    os.makedirs(output_folder, exist_ok=True)
    
    total_files = 0
    total_rows = 0
    
    for file in os.listdir(input_folder):
        if file.endswith(".xlsx") and not file.startswith("~$"):
            input_path = os.path.join(input_folder, file)
            output_path = os.path.join(output_folder, f"清洗后_{file}")
            rows = clean_excel(input_path, output_path)
            total_files += 1
            total_rows += rows
    
    print(f"\n🎉 全部处理完成!共处理{total_files}个文件,{total_rows}条数据")
    

    8.2 生成清洗报告

    python
    # 生成清洗报告
    report = f"""数据清洗报告
    生成时间:{pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}
    处理文件数:{total_files}
    总数据行数:{total_rows}
    
    各列缺失率:
    {df.isnull().sum() / len(df) * 100}
    
    各字段唯一值数量:
    {df.nunique()}
    
    部门分布:
    {df['部门'].value_counts()}
    """
    
    with open("清洗报告.txt", "w", encoding="utf-8") as f:
        f.write(report)
    print("✅ 清洗报告已生成")
    

    九、常见问题与解决方案

    9.1 清洗后数据量减少太多

    问题:清洗后数据少了一大半,可能清洗太严格

    解决方案

    逐步执行清洗步骤,每一步查看数据量变化
    异常值不要直接删除,先标记出来人工核实
    空值尽量填充而不是删除,尤其是重要字段

    9.2 清洗耗时太长

    问题:几十万行数据清洗很慢

    解决方案

    优先用pandas的矢量化操作,少用apply自定义函数
    只加载需要的列,减少内存占用
    大文件分批处理,避免一次性加载到内存
    用更高效的库,比如polars代替pandas,速度提升数倍

    9.3 模糊匹配不准确

    问题:fuzzywuzzy匹配错误率高

    解决方案

    提高score_cutoff阈值,比如从80调到90
    建立更完善的映射词典,优先用精确匹配
    匹配后人工校验结果,不断优化映射表

    9.4 日期解析失败

    问题:pd.to_datetime解析后很多NaT

    解决方案

    写自定义解析函数,尝试多种日期格式
    查看解析失败的原始值,针对性处理特殊格式
    无法解析的标记出来,不要直接删除

    9.5 数值转换失败

    问题:清理后数值列很多NaN

    解决方案

    打印转换失败的原始值,看有什么特殊格式
    完善clean_number函数,处理更多特殊情况
    记录转换失败的内容,人工核实

    十、最佳实践

    1. 清洗前备份:任何时候都不要直接修改原文件,清洗结果保存为新文件

    2. 逐步验证:每一步清洗后都统计数据量、查看样本,避免误删正常数据

    3. 先标记后删除:异常数据先标记,不要直接删除,方便后续追溯和人工核对

    4. 保留清洗日志:记录清洗规则、处理行数、删除原因等信息,方便后续排查问题

    5. 规则可配置:清洗规则不要硬编码,做成配置字典,方便修改维护

    6. 人工抽检:清洗完成后抽样检查数据质量,确认没有清洗错误

    7. 自动化复用:相同格式的数据清洗脚本做成通用函数,重复使用

    8. 优先级原则:关键数据优先保证正确性,非关键数据可以适当放宽处理

    数据清洗是数据分析的基础,垃圾数据进、垃圾结果出,高质量的数据才能得到有价值的分析结果。Python可以自动化完成90%以上的清洗工作,大幅提升数据处理效率,同时减少人为错误。掌握这些清洗方法,你处理各种杂乱数据都会得心应手。

    © 版权声明

    相关文章

    暂无评论

    none
    暂无评论...