揭阳市葬花

Python库使用实战案例(标准库到第三方库)

2026-03-27 15:01:01 浏览次数:1
详细信息

Python 库使用实战案例

我将通过几个实际案例来展示Python标准库和第三方库的使用,涵盖数据处理、Web开发、自动化等多个领域。

1. 数据处理与分析

案例1:使用Pandas进行数据分析

# 安装:pip install pandas matplotlib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# 创建示例销售数据
def create_sales_data():
    np.random.seed(42)
    dates = pd.date_range('2024-01-01', periods=100, freq='D')

    data = {
        'date': dates,
        'product_id': np.random.choice(['A001', 'A002', 'A003', 'B001', 'B002'], 100),
        'category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Books'], 100),
        'quantity': np.random.randint(1, 10, 100),
        'price': np.random.uniform(10, 500, 100),
        'region': np.random.choice(['North', 'South', 'East', 'West'], 100)
    }

    df = pd.DataFrame(data)
    df['revenue'] = df['quantity'] * df['price']
    return df

# 分析销售数据
def analyze_sales_data(df):
    print("=== 销售数据分析 ===")

    # 1. 基本统计信息
    print("\n1. 数据概览:")
    print(f"数据形状: {df.shape}")
    print(f"时间范围: {df['date'].min()} 到 {df['date'].max()}")

    # 2. 按产品分析
    print("\n2. 产品销售额排名:")
    product_sales = df.groupby('product_id')['revenue'].sum().sort_values(ascending=False)
    print(product_sales)

    # 3. 按类别分析
    print("\n3. 类别销售统计:")
    category_stats = df.groupby('category').agg({
        'revenue': 'sum',
        'quantity': 'sum',
        'product_id': 'nunique'
    }).rename(columns={'product_id': 'product_count'})
    print(category_stats)

    # 4. 时间趋势分析
    print("\n4. 每日销售趋势:")
    daily_sales = df.groupby('date')['revenue'].sum()
    print(f"最高销售日: {daily_sales.idxmax()}, 销售额: ${daily_sales.max():.2f}")

    # 5. 区域分析
    print("\n5. 区域表现:")
    region_performance = df.groupby('region')['revenue'].agg(['sum', 'mean', 'count'])
    print(region_performance)

    return product_sales, daily_sales

# 可视化结果
def visualize_sales(df, product_sales, daily_sales):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # 1. 产品销售额柱状图
    product_sales.plot(kind='bar', ax=axes[0, 0], color='skyblue')
    axes[0, 0].set_title('产品销售额排名')
    axes[0, 0].set_xlabel('产品ID')
    axes[0, 0].set_ylabel('销售额 ($)')
    axes[0, 0].tick_params(axis='x', rotation=45)

    # 2. 每日销售趋势
    daily_sales.plot(ax=axes[0, 1], color='green', marker='o')
    axes[0, 1].set_title('每日销售趋势')
    axes[0, 1].set_xlabel('日期')
    axes[0, 1].set_ylabel('销售额 ($)')
    axes[0, 1].grid(True, alpha=0.3)

    # 3. 类别销售额饼图
    category_revenue = df.groupby('category')['revenue'].sum()
    axes[1, 0].pie(category_revenue.values, labels=category_revenue.index, 
                   autopct='%1.1f%%', colors=['gold', 'lightcoral', 'lightblue', 'lightgreen'])
    axes[1, 0].set_title('各类别销售额占比')

    # 4. 区域平均销售额
    region_avg = df.groupby('region')['revenue'].mean()
    region_avg.plot(kind='bar', ax=axes[1, 1], color='orange')
    axes[1, 1].set_title('各区域平均销售额')
    axes[1, 1].set_xlabel('区域')
    axes[1, 1].set_ylabel('平均销售额 ($)')

    plt.tight_layout()
    plt.savefig('sales_analysis.png', dpi=100, bbox_inches='tight')
    plt.show()

# 运行分析
if __name__ == "__main__":
    # 创建数据
    sales_df = create_sales_data()

    # 分析数据
    product_sales, daily_sales = analyze_sales_data(sales_df)

    # 可视化
    visualize_sales(sales_df, product_sales, daily_sales)

    # 保存处理后的数据
    sales_df.to_csv('processed_sales_data.csv', index=False)
    print("\n数据已保存到 'processed_sales_data.csv'")

2. Web开发与API调用

案例2:使用Flask创建REST API并调用外部API

# 安装:pip install flask requests python-dotenv
from flask import Flask, request, jsonify
import requests
import json
import sqlite3
from datetime import datetime
from functools import wraps
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

app = Flask(__name__)

# 简单的API密钥验证装饰器
def require_api_key(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key')
        if api_key != os.getenv('API_KEY', 'default-secret-key'):
            return jsonify({'error': 'Invalid API key'}), 401
        return f(*args, **kwargs)
    return decorated_function

# 初始化数据库
def init_db():
    conn = sqlite3.connect('weather_api.db')
    cursor = conn.cursor()

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS weather_requests (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        city TEXT NOT NULL,
        temperature REAL,
        weather_description TEXT,
        humidity INTEGER,
        request_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS api_logs (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        endpoint TEXT,
        method TEXT,
        status_code INTEGER,
        response_time REAL,
        timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''')

    conn.commit()
    conn.close()

# 外部API调用 - 天气数据
class WeatherAPI:
    def __init__(self, api_key=None):
        self.api_key = api_key or os.getenv('WEATHER_API_KEY', 'your-api-key-here')
        self.base_url = "http://api.openweathermap.org/data/2.5/weather"

    def get_weather(self, city):
        """获取城市天气信息"""
        params = {
            'q': city,
            'appid': self.api_key,
            'units': 'metric'
        }

        try:
            response = requests.get(self.base_url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()

            # 解析响应数据
            weather_info = {
                'city': city,
                'temperature': data['main']['temp'],
                'feels_like': data['main']['feels_like'],
                'humidity': data['main']['humidity'],
                'pressure': data['main']['pressure'],
                'weather': data['weather'][0]['description'],
                'wind_speed': data['wind']['speed'],
                'timestamp': datetime.now().isoformat()
            }

            # 保存到数据库
            self._save_to_db(weather_info)

            return weather_info

        except requests.exceptions.RequestException as e:
            return {'error': str(e), 'city': city}

    def _save_to_db(self, weather_info):
        """保存天气数据到数据库"""
        conn = sqlite3.connect('weather_api.db')
        cursor = conn.cursor()

        cursor.execute('''
        INSERT INTO weather_requests 
        (city, temperature, weather_description, humidity) 
        VALUES (?, ?, ?, ?)
        ''', (
            weather_info['city'],
            weather_info['temperature'],
            weather_info['weather'],
            weather_info['humidity']
        ))

        conn.commit()
        conn.close()

# Flask路由
@app.route('/')
def home():
    return jsonify({
        'message': 'Weather API Service',
        'endpoints': {
            '/weather/<city>': 'GET - 获取城市天气',
            '/history': 'GET - 获取查询历史',
            '/stats': 'GET - 获取使用统计',
            '/batch-weather': 'POST - 批量查询天气'
        }
    })

@app.route('/weather/<city>')
@require_api_key
def get_weather(city):
    """获取单个城市天气"""
    weather_api = WeatherAPI()
    result = weather_api.get_weather(city)

    # 记录API日志
    log_api_call('/weather', 'GET', 200 if 'error' not in result else 500)

    return jsonify(result)

@app.route('/history')
@require_api_key
def get_history():
    """获取查询历史"""
    conn = sqlite3.connect('weather_api.db')
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    cursor.execute('''
    SELECT city, temperature, weather_description, humidity, 
           datetime(request_time, 'localtime') as request_time 
    FROM weather_requests 
    ORDER BY request_time DESC 
    LIMIT 20
    ''')

    history = [dict(row) for row in cursor.fetchall()]
    conn.close()

    return jsonify({'history': history})

@app.route('/batch-weather', methods=['POST'])
@require_api_key
def batch_weather():
    """批量查询多个城市天气"""
    data = request.get_json()

    if not data or 'cities' not in data:
        return jsonify({'error': 'Please provide cities list'}), 400

    cities = data['cities']
    results = []
    weather_api = WeatherAPI()

    for city in cities[:10]:  # 限制最多10个城市
        result = weather_api.get_weather(city)
        results.append(result)

    log_api_call('/batch-weather', 'POST', 200)

    return jsonify({'results': results})

def log_api_call(endpoint, method, status_code):
    """记录API调用日志"""
    conn = sqlite3.connect('weather_api.db')
    cursor = conn.cursor()

    cursor.execute('''
    INSERT INTO api_logs (endpoint, method, status_code) 
    VALUES (?, ?, ?)
    ''', (endpoint, method, status_code))

    conn.commit()
    conn.close()

if __name__ == '__main__':
    init_db()
    print("数据库初始化完成")
    print("API服务启动...")
    print("访问 http://localhost:5000 查看API文档")

    # 创建.env文件示例
    if not os.path.exists('.env'):
        with open('.env', 'w') as f:
            f.write('API_KEY=your-secret-api-key\n')
            f.write('WEATHER_API_KEY=your-openweather-api-key\n')
        print("已创建.env文件,请填入你的API密钥")

    app.run(debug=True, port=5000)

3. 自动化任务

案例3:使用标准库实现文件管理自动化

"""
文件管理系统自动化工具
包含文件整理、备份、重复文件检测等功能
"""
import os
import shutil
import hashlib
import zipfile
import tarfile
import fnmatch
import time
from datetime import datetime
from pathlib import Path
import json
import csv
from collections import defaultdict
import argparse

class FileManager:
    """文件管理器类"""

    def __init__(self, base_path='.'):
        self.base_path = Path(base_path)
        self.stats = {
            'files_processed': 0,
            'files_moved': 0,
            'duplicates_found': 0,
            'backups_created': 0
        }

    def organize_files_by_type(self, target_dir, organize_rules=None):
        """
        按文件类型整理文件

        Args:
            target_dir: 目标目录路径
            organize_rules: 自定义整理规则
        """
        if organize_rules is None:
            organize_rules = {
                'Images': ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.bmp', '*.svg'],
                'Documents': ['*.pdf', '*.doc', '*.docx', '*.txt', '*.rtf', '*.md'],
                'Spreadsheets': ['*.xls', '*.xlsx', '*.csv'],
                'Presentations': ['*.ppt', '*.pptx'],
                'Archives': ['*.zip', '*.tar', '*.gz', '*.rar', '*.7z'],
                'Code': ['*.py', '*.js', '*.html', '*.css', '*.java', '*.cpp', '*.c'],
                'Audio': ['*.mp3', '*.wav', '*.flac', '*.aac'],
                'Video': ['*.mp4', '*.avi', '*.mov', '*.mkv'],
                'Others': ['*']  # 其他文件
            }

        target_path = Path(target_dir)
        if not target_path.exists():
            target_path.mkdir(parents=True)

        print(f"开始整理目录: {target_dir}")

        # 创建分类文件夹
        for category in organize_rules.keys():
            (target_path / category).mkdir(exist_ok=True)

        # 遍历并整理文件
        for item in target_path.iterdir():
            if item.is_file():
                moved = False

                for category, patterns in organize_rules.items():
                    if category == 'Others':
                        continue

                    for pattern in patterns:
                        if fnmatch.fnmatch(item.name.lower(), pattern.lower()):
                            destination = target_path / category / item.name
                            # 处理重名文件
                            counter = 1
                            while destination.exists():
                                stem = item.stem
                                suffix = item.suffix
                                new_name = f"{stem}_{counter}{suffix}"
                                destination = target_path / category / new_name
                                counter += 1

                            try:
                                shutil.move(str(item), str(destination))
                                self.stats['files_moved'] += 1
                                print(f"移动: {item.name} -> {category}/{destination.name}")
                                moved = True
                                break
                            except Exception as e:
                                print(f"移动失败 {item.name}: {e}")

                    if moved:
                        break

                # 其他文件
                if not moved:
                    destination = target_path / 'Others' / item.name
                    counter = 1
                    while destination.exists():
                        stem = item.stem
                        suffix = item.suffix
                        new_name = f"{stem}_{counter}{suffix}"
                        destination = target_path / 'Others' / new_name
                        counter += 1

                    try:
                        shutil.move(str(item), str(destination))
                        self.stats['files_moved'] += 1
                        print(f"移动: {item.name} -> Others/{destination.name}")
                    except Exception as e:
                        print(f"移动失败 {item.name}: {e}")

                self.stats['files_processed'] += 1

        print(f"整理完成。处理了 {self.stats['files_processed']} 个文件。")

    def find_duplicate_files(self, search_dir, use_content=True):
        """
        查找重复文件

        Args:
            search_dir: 搜索目录
            use_content: 是否使用文件内容对比(更准确但更慢)
        """
        search_path = Path(search_dir)
        files_by_size = defaultdict(list)
        duplicates = []

        print("正在扫描文件...")

        # 第一步:按文件大小分组
        for file_path in search_path.rglob('*'):
            if file_path.is_file():
                try:
                    file_size = file_path.stat().st_size
                    files_by_size[file_size].append(file_path)
                except (OSError, PermissionError):
                    continue

        # 第二步:检查大小相同的文件
        for size, files in files_by_size.items():
            if len(files) > 1:
                if use_content:
                    # 使用文件哈希值比较
                    hash_groups = defaultdict(list)
                    for file_path in files:
                        try:
                            file_hash = self._calculate_file_hash(file_path)
                            hash_groups[file_hash].append(file_path)
                        except (OSError, PermissionError):
                            continue

                    for hash_value, hash_files in hash_groups.items():
                        if len(hash_files) > 1:
                            duplicates.append({
                                'size': size,
                                'hash': hash_value,
                                'files': hash_files
                            })
                            self.stats['duplicates_found'] += len(hash_files) - 1
                else:
                    # 只按大小判断
                    duplicates.append({
                        'size': size,
                        'hash': None,
                        'files': files
                    })
                    self.stats['duplicates_found'] += len(files) - 1

        return duplicates

    def _calculate_file_hash(self, file_path, chunk_size=8192):
        """计算文件哈希值"""
        hash_md5 = hashlib.md5()

        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(chunk_size), b''):
                hash_md5.update(chunk)

        return hash_md5.hexdigest()

    def create_backup(self, source_dir, backup_dir, compress=True):
        """
        创建备份

        Args:
            source_dir: 源目录
            backup_dir: 备份目录
            compress: 是否压缩备份
        """
        source_path = Path(source_dir)
        backup_path = Path(backup_dir)

        if not source_path.exists():
            print(f"源目录不存在: {source_dir}")
            return

        # 创建备份目录
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_name = f"backup_{source_path.name}_{timestamp}"

        if compress:
            backup_file = backup_path / f"{backup_name}.zip"

            print(f"创建压缩备份: {backup_file}")

            with zipfile.ZipFile(backup_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for file_path in source_path.rglob('*'):
                    if file_path.is_file():
                        arcname = file_path.relative_to(source_path)
                        try:
                            zipf.write(file_path, arcname)
                            print(f"添加: {arcname}")
                        except Exception as e:
                            print(f"跳过 {file_path}: {e}")

            self.stats['backups_created'] += 1
            print(f"备份创建完成: {backup_file}")
            return str(backup_file)

        else:
            backup_target = backup_path / backup_name
            shutil.copytree(source_dir, backup_target)

            self.stats['backups_created'] += 1
            print(f"备份创建完成: {backup_target}")
            return str(backup_target)

    def clean_empty_dirs(self, target_dir):
        """清理空目录"""
        target_path = Path(target_dir)
        empty_dirs = []

        for dir_path in sorted(target_path.rglob('*'), key=lambda x: len(str(x)), reverse=True):
            if dir_path.is_dir() and not any(dir_path.iterdir()):
                try:
                    dir_path.rmdir()
                    empty_dirs.append(str(dir_path))
                    print(f"删除空目录: {dir_path}")
                except Exception as e:
                    print(f"无法删除目录 {dir_path}: {e}")

        return empty_dirs

    def generate_report(self, output_file='file_management_report.json'):
        """生成报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'base_path': str(self.base_path),
            'statistics': self.stats,
            'actions_performed': []
        }

        # 保存报告
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)

        print(f"报告已保存到: {output_file}")
        return report

# 使用示例
def main():
    parser = argparse.ArgumentParser(description='文件管理工具')
    parser.add_argument('action', choices=['organize', 'find-duplicates', 'backup', 'clean', 'report'],
                       help='执行的操作')
    parser.add_argument('--path', default='.', help='目标路径')
    parser.add_argument('--backup-dir', default='./backups', help='备份目录')
    parser.add_argument('--no-compress', action='store_true', help='不压缩备份')

    args = parser.parse_args()

    manager = FileManager(args.path)

    if args.action == 'organize':
        manager.organize_files_by_type(args.path)

    elif args.action == 'find-duplicates':
        duplicates = manager.find_duplicate_files(args.path, use_content=True)

        if duplicates:
            print(f"\n发现 {len(duplicates)} 组重复文件:")
            for i, dup in enumerate(duplicates, 1):
                print(f"\n第 {i} 组 (大小: {dup['size']} 字节):")
                for file_path in dup['files']:
                    print(f"  {file_path}")
        else:
            print("未发现重复文件")

    elif args.action == 'backup':
        backup_path = manager.create_backup(
            args.path, 
            args.backup_dir, 
            compress=not args.no_compress
        )
        print(f"备份已创建: {backup_path}")

    elif args.action == 'clean':
        empty_dirs = manager.clean_empty_dirs(args.path)
        print(f"清理了 {len(empty_dirs)} 个空目录")

    elif args.action == 'report':
        report = manager.generate_report()
        print("报告已生成")

    # 显示统计信息
    print(f"\n统计信息:")
    for key, value in manager.stats.items():
        print(f"  {key}: {value}")

if __name__ == "__main__":
    main()

4. 机器学习入门

案例4:使用Scikit-learn进行分类任务

# 安装:pip install scikit-learn pandas numpy matplotlib seaborn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_iris, load_wine, make_classification
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import (accuracy_score, classification_report, 
                           confusion_matrix, roc_curve, auc, roc_auc_score)
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier, plot_tree
import warnings
warnings.filterwarnings('ignore')

class MLClassifier:
    """机器学习分类器"""

    def __init__(self, dataset_name='iris'):
        """初始化分类器"""
        self.dataset_name = dataset_name
        self.X = None
        self.y = None
        self.feature_names = None
        self.target_names = None
        self.models = {}
        self.results = {}
        self.scaler = StandardScaler()

        # 加载数据
        self.load_data()

    def load_data(self):
        """加载数据集"""
        if self.dataset_name == 'iris':
            data = load_iris()
            self.X = data.data
            self.y = data.target
            self.feature_names = data.feature_names
            self.target_names = data.target_names
            self.dataset_type = 'multiclass'

        elif self.dataset_name == 'wine':
            data = load_wine()
            self.X = data.data
            self.y = data.target
            self.feature_names = data.feature_names
            self.target_names = data.target_names
            self.dataset_type = 'multiclass'

        elif self.dataset_name == 'synthetic':
            # 生成合成数据
            self.X, self.y = make_classification(
                n_samples=1000,
                n_features=20,
                n_informative=15,
                n_redundant=5,
                n_classes=2,
                random_state=42
            )
            self.feature_names = [f'feature_{i}' for i in range(self.X.shape[1])]
            self.target_names = ['class_0', 'class_1']
            self.dataset_type = 'binary'

        print(f"数据集: {self.dataset_name}")
        print(f"样本数: {self.X.shape[0]}, 特征数: {self.X.shape[1]}")
        print(f"类别数: {len(np.unique(self.y))}")

    def explore_data(self):
        """探索性数据分析"""
        print("=== 数据探索 ===")

        # 创建DataFrame以便分析
        df = pd.DataFrame(self.X, columns=self.feature_names)
        df['target'] = self.y

        # 基本统计信息
        print("\n1. 基本统计信息:")
        print(df.describe())

        # 类别分布
        print("\n2. 类别分布:")
        class_dist = pd.Series(self.y).value_counts().sort_index()
        for idx, count in class_dist.items():
            class_name = self.target_names[idx] if self.target_names is not None else f'Class {idx}'
            print(f"  {class_name}: {count} 样本 ({count/len(self.y)*100:.1f}%)")

        # 可视化
        self._plot_exploratory_charts(df)

    def _plot_exploratory_charts(self, df):
        """绘制探索性图表"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))

        # 1. 类别分布柱状图
        class_dist = pd.Series(self.y).value_counts().sort_index()
        class_labels = [self.target_names[i] if self.target_names else f'Class {i}' 
                       for i in class_dist.index]

        axes[0, 0].bar(class_labels, class_dist.values, color=['skyblue', 'lightcoral', 'lightgreen'][:len(class_dist)])
        axes[0, 0].set_title('类别分布')
        axes[0, 0].set_xlabel('类别')
        axes[0, 0].set_ylabel('样本数')
        axes[0, 0].tick_params(axis='x', rotation=45)

        # 2. 特征相关性热力图(仅显示前10个特征)
        if len(self.feature_names) > 1:
            corr_matrix = df.iloc[:, :min(10, len(self.feature_names))].corr()
            sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm', 
                       ax=axes[0, 1], center=0)
            axes[0, 1].set_title('特征相关性热力图')

        # 3. 特征箱线图(仅显示前4个特征)
        if len(self.feature_names) >= 4:
            plot_data = pd.melt(df.iloc[:, :4], value_vars=self.feature_names[:4])
            sns.boxplot(x='variable', y='value', data=plot_data, ax=axes[1, 0])
            axes[1, 0].set_title('特征分布箱线图')
            axes[1, 0].set_xlabel('特征')
            axes[1, 0].set_ylabel('值')
            axes[1, 0].tick_params(axis='x', rotation=45)

        # 4. 特征与目标的关系散点图
        if len(self.feature_names) >= 2:
            scatter = axes[1, 1].scatter(df.iloc[:, 0], df.iloc[:, 1], 
                                         c=self.y, cmap='viridis', alpha=0.7)
            axes[1, 1].set_xlabel(self.feature_names[0])
            axes[1, 1].set_ylabel(self.feature_names[1])
            axes[1, 1].set_title('特征散点图(按类别着色)')
            plt.colorbar(scatter, ax=axes[1, 1])

        plt.tight_layout()
        plt.savefig('data_exploration.png', dpi=100, bbox_inches='tight')
        plt.show()

    def prepare_data(self, test_size=0.2, random_state=42):
        """准备训练和测试数据"""
        # 划分数据集
        X_train, X_test, y_train, y_test = train_test_split(
            self.X, self.y, test_size=test_size, random_state=random_state, stratify=self.y
        )

        # 特征标准化
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        print(f"训练集大小: {X_train.shape[0]}, 测试集大小: {X_test.shape[0]}")

        return X_train_scaled, X_test_scaled, y_train, y_test

    def train_models(self, X_train, X_test, y_train, y_test):
        """训练多个模型并比较"""
        print("\n=== 模型训练与评估 ===")

        # 定义要训练的模型
        models = {
            '决策树': DecisionTreeClassifier(random_state=42),
            '随机森林': RandomForestClassifier(n_estimators=100, random_state=42),
            '梯度提升': GradientBoostingClassifier(random_state=42),
            'K近邻': KNeighborsClassifier(),
            '支持向量机': SVC(probability=True, random_state=42)
        }

        results = {}

        for name, model in models.items():
            print(f"\n训练 {name}...")

            # 训练模型
            model.fit(X_train, y_train)

            # 预测
            y_pred = model.predict(X_test)
            y_pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None

            # 评估
            accuracy = accuracy_score(y_test, y_pred)
            report = classification_report(y_test, y_pred, 
                                          target_names=self.target_names, 
                                          output_dict=True)

            # 交叉验证
            cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')

            # 保存结果
            results[name] = {
                'model': model,
                'accuracy': accuracy,
                'classification_report': report,
                'cv_mean': cv_scores.mean(),
                'cv_std': cv_scores.std(),
                'predictions': y_pred,
                'probabilities': y_pred_proba
            }

            print(f"  准确率: {accuracy:.4f}")
            print(f"  交叉验证平均分: {cv_scores.mean():.4f} (±{cv_scores.std():.4f})")

        self.models = models
        self.results = results

        return results

    def compare_models(self):
        """比较模型性能"""
        if not self.results:
            print("请先训练模型")
            return

        print("\n=== 模型比较 ===")

        # 创建比较表格
        comparison_data = []
        for name, result in self.results.items():
            comparison_data.append({
                'Model': name,
                'Accuracy': f"{result['accuracy']:.4f}",
                'CV Mean': f"{result['cv_mean']:.4f}",
                'CV Std': f"±{result['cv_std']:.4f}",
                'Precision (Avg)': f"{result['classification_report']['macro avg']['precision']:.4f}",
                'Recall (Avg)': f"{result['classification_report']['macro avg']['recall']:.4f}",
                'F1-Score (Avg)': f"{result['classification_report']['macro avg']['f1-score']:.4f}"
            })

        df_comparison = pd.DataFrame(comparison_data)
        print(df_comparison.to_string(index=False))

        # 可视化比较
        self._plot_model_comparison(df_comparison)

        return df_comparison

    def _plot_model_comparison(self, df_comparison):
        """绘制模型比较图"""
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))

        # 1. 准确率比较
        models = df_comparison['Model']
        accuracy = pd.to_numeric(df_comparison['Accuracy'])
        cv_mean = pd.to_numeric(df_comparison['CV Mean'])

        x = np.arange(len(models))
        width = 0.35

        axes[0, 0].bar(x - width/2, accuracy, width, label='测试集准确率', color='skyblue')
        axes[0, 0].bar(x + width/2, cv_mean, width, label='交叉验证平均', color='lightcoral')
        axes[0, 0].set_xlabel('模型')
        axes[0, 0].set_ylabel('准确率')
        axes[0, 0].set_title('模型准确率比较')
        axes[0, 0].set_xticks(x)
        axes[0, 0].set_xticklabels(models, rotation=45)
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)

        # 2. 混淆矩阵(最佳模型)
        best_model_name = df_comparison.loc[pd.to_numeric(df_comparison['Accuracy']).idxmax(), 'Model']
        best_result = self.results[best_model_name]

        # 获取测试集
        _, X_test, _, y_test = self.prepare_data()
        y_pred = best_result['predictions']

        cm = confusion_matrix(y_test, y_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                   xticklabels=self.target_names if self.target_names else None,
                   yticklabels=self.target_names if self.target_names else None,
                   ax=axes[0, 1])
        axes[0, 1].set_title(f'{best_model_name} 混淆矩阵')
        axes[0, 1].set_xlabel('预测标签')
        axes[0, 1].set_ylabel('真实标签')

        # 3. 特征重要性(如果模型支持)
        best_model = best_result['model']
        if hasattr(best_model, 'feature_importances_'):
            if len(self.feature_names) > 0:
                importances = best_model.feature_importances_
                indices = np.argsort(importances)[::-1]

                # 只显示前10个重要特征
                top_n = min(10, len(importances))
                axes[1, 0].bar(range(top_n), importances[indices[:top_n]], color='lightgreen')
                axes[1, 0].set_xlabel('特征排名')
                axes[1, 0].set_ylabel('重要性')
                axes[1, 0].set_title(f'{best_model_name} 特征重要性 (Top {top_n})')
                axes[1, 0].set_xticks(range(top_n))
                axes[1, 0].set_xticklabels([self.feature_names[i] for i in indices[:top_n]], 
                                          rotation=45, ha='right')

        # 4. ROC曲线(仅适用于二分类)
        if self.dataset_type == 'binary' and best_result['probabilities'] is not None:
            y_prob = best_result['probabilities'][:, 1]
            fpr, tpr, _ = roc_curve(y_test, y_prob)
            roc_auc = auc(fpr, tpr)

            axes[1, 1].plot(fpr, tpr, color='darkorange', lw=2, 
                           label=f'ROC曲线 (AUC = {roc_auc:.2f})')
            axes[1, 1].plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
            axes[1, 1].set_xlim([0.0, 1.0])
            axes[1, 1].set_ylim([0.0, 1.05])
            axes[1, 1].set_xlabel('假阳性率')
            axes[1, 1].set_ylabel('真阳性率')
            axes[1, 1].set_title(f'{best_model_name} ROC曲线')
            axes[1, 1].legend(loc='lower right')
            axes[1, 1].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.savefig('model_comparison.png', dpi=100, bbox_inches='tight')
        plt.show()

        print(f"\n最佳模型: {best_model_name} (准确率: {max(accuracy):.4f})")

    def hyperparameter_tuning(self, model_name='随机森林'):
        """超参数调优"""
        if model_name not in self.models:
            print(f"模型 {model_name} 未找到")
            return

        print(f"\n=== {model_name} 超参数调优 ===")

        # 定义参数网格
        param_grids = {
            '随机森林': {
                'n_estimators': [50, 100, 200],
                'max_depth': [None, 10, 20, 30],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            },
            '梯度提升': {
                'n_estimators': [50, 100, 200],
                'learning_rate': [0.01, 0.1, 0.2],
                'max_depth': [3, 4, 5]
            },
            '支持向量机': {
                'C': [0.1, 1, 10, 100],
                'gamma': [1, 0.1, 0.01, 0.001],
                'kernel': ['rbf', 'linear']
            },
            'K近邻': {
                'n_neighbors': [3, 5, 7, 9, 11],
                'weights': ['uniform', 'distance'],
                'metric': ['euclidean', 'manhattan']
            }
        }

        if model_name not in param_grids:
            print(f"未找到 {model_name} 的参数网格")
            return

        # 准备数据
        X_train, X_test, y_train, y_test = self.prepare_data()

        # 网格搜索
        grid_search = GridSearchCV(
            estimator=self.models[model_name],
            param_grid=param_grids[model_name],
            cv=5,
            scoring='accuracy',
            n_jobs=-1,
            verbose=1
        )

        print(f"开始网格搜索...")
        grid_search.fit(X_train, y_train)

        print(f"\n最佳参数: {grid_search.best_params_}")
        print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")

        # 使用最佳参数重新训练
        best_model = grid_search.best_estimator_
        best_model.fit(X_train, y_train)

        # 在测试集上评估
        y_pred = best_model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        print(f"测试集准确率: {accuracy:.4f}")

        # 更新模型
        self.models[f"{model_name} (调优后)"] = best_model

        return grid_search.best_params_, accuracy

# 使用示例
def main():
    print("机器学习分类实战案例")
    print("=" * 50)

    # 创建分类器实例
    classifier = MLClassifier(dataset_name='iris')  # 可以改为 'wine' 或 'synthetic'

    # 1. 探索数据
    classifier.explore_data()

    # 2. 准备数据
    X_train, X_test, y_train, y_test = classifier.prepare_data()

    # 3. 训练模型
    results = classifier.train_models(X_train, X_test, y_train, y_test)

    # 4. 比较模型
    comparison_df = classifier.compare_models()

    # 5. 超参数调优(可选)
    if input("\n是否进行超参数调优?(y/n): ").lower() == 'y':
        classifier.hyperparameter_tuning('随机森林')

        # 重新比较
        print("\n调优后重新比较模型:")
        classifier.compare_models()

    print("\n=== 模型训练完成 ===")
    print("已生成以下文件:")
    print("  - data_exploration.png: 数据探索图表")
    print("  - model_comparison.png: 模型比较图表")

if __name__ == "__main__":
    main()

总结

这些实战案例展示了Python库在不同场景下的应用:

数据处理:使用Pandas进行数据清洗、分析和可视化 Web开发:使用Flask创建REST API并集成外部服务 自动化:使用标准库实现文件管理和系统自动化 机器学习:使用Scikit-learn进行分类任务和模型评估

每个案例都包含了完整的错误处理、文档字符串和实际应用场景。你可以根据自己的需求修改和扩展这些代码。这些案例展示了Python生态系统如何通过标准库和第三方库的组合,高效解决实际问题。

相关推荐