用Python脚本自动备份你的百度网盘文件列表(附完整代码)

张开发
2026/4/19 3:45:22 15 分钟阅读

分享文章

用Python脚本自动备份你的百度网盘文件列表(附完整代码)
Python自动化备份百度网盘文件列表实战指南你是否曾经遇到过这样的场景急需查找几个月前上传到百度网盘的工作文档却因为文件太多而束手无策或者担心重要文件被误删而希望定期备份文件列表作为一名长期依赖云存储的技术从业者我深刻理解手动管理网盘文件的低效与风险。本文将分享如何用Python脚本自动化备份百度网盘文件列表解决这个实际痛点。1. 准备工作与环境配置在开始编写自动化脚本前我们需要完成几个必要的准备工作。首先确保你的开发环境已经安装了Python 3.6或更高版本。我个人推荐使用Python 3.8因为它提供了更好的异步支持和稳定性。1.1 安装必要的Python库运行以下命令安装所需的依赖库pip install requests python-dotenv schedule这些库的作用分别是requests用于发送HTTP请求与百度网盘API交互python-dotenv管理敏感信息如API密钥schedule实现定时任务调度1.2 创建百度网盘开发者应用访问百度开发者中心并登录你的百度账号创建一个新应用选择网盘应用类型记录下分配的API Key和Secret Key提示应用名称可以简单描述用途如文件列表备份工具审核通过通常需要1-2个工作日。创建.env文件保存你的凭证BAIDU_API_KEYyour_api_key BAIDU_SECRET_KEYyour_secret_key2. 获取和刷新Access TokenAccess Token是调用百度网盘API的关键凭证有效期为30天。我们需要实现自动获取和刷新机制。2.1 初次获取Token使用OAuth 2.0授权流程获取初始Token。以下是获取Token的核心代码import requests from dotenv import load_dotenv import os import json load_dotenv() def get_access_token(): url https://openapi.baidu.com/oauth/2.0/token params { grant_type: client_credentials, client_id: os.getenv(BAIDU_API_KEY), client_secret: os.getenv(BAIDU_SECRET_KEY), scope: basic,netdisk } response requests.get(url, paramsparams) if response.status_code 200: token_data response.json() return token_data[access_token] else: raise Exception(f获取Token失败: {response.text})2.2 Token刷新机制Token过期前需要刷新避免脚本中断。实现一个简单的Token管理类class TokenManager: def __init__(self): self.token None self.expires_at None def get_valid_token(self): if not self.token or self._is_expired(): self._refresh_token() return self.token def _refresh_token(self): self.token get_access_token() # 设置25天后过期留出缓冲时间 self.expires_at time.time() 25 * 24 * 3600 def _is_expired(self): return time.time() self.expires_at3. 实现文件列表获取功能百度网盘提供了多个接口获取不同类型的文件列表。我们将重点实现最常用的递归文件列表获取。3.1 基础文件列表接口首先实现获取指定目录下的文件列表def get_file_list(token, path/, start0, limit1000): url https://pan.baidu.com/rest/2.0/xpan/file params { method: list, access_token: token, dir: path, start: start, limit: limit, order: time, # 按修改时间排序 desc: 1 # 降序排列 } response requests.get(url, paramsparams) if response.status_code 200: return response.json() else: raise Exception(f获取文件列表失败: {response.text})3.2 递归获取所有文件对于需要备份整个网盘的情况实现递归获取def get_all_files(token, root_path/): all_files [] stack [root_path] while stack: current_path stack.pop() data get_file_list(token, current_path) for item in data.get(list, []): if item[isdir] 1: # 是目录 stack.append(item[path]) else: # 是文件 all_files.append({ path: item[path], size: item[size], mtime: item[server_mtime], md5: item[md5] }) return all_files4. 数据存储与备份策略获取文件列表后我们需要考虑如何存储这些数据以便后续使用和比较。4.1 多种存储格式实现提供JSON和CSV两种常用格式的存储方式import csv import json from datetime import datetime def save_to_json(data, filenameNone): if not filename: timestamp datetime.now().strftime(%Y%m%d_%H%M%S) filename fbackup_{timestamp}.json with open(filename, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) return filename def save_to_csv(data, filenameNone): if not filename: timestamp datetime.now().strftime(%Y%m%d_%H%M%S) filename fbackup_{timestamp}.csv with open(filename, w, newline, encodingutf-8) as f: writer csv.DictWriter(f, fieldnames[path, size, mtime, md5]) writer.writeheader() writer.writerows(data) return filename4.2 增量备份实现为避免每次全量备份实现增量备份功能def get_new_files(current_files, last_backup_file): with open(last_backup_file, r, encodingutf-8) as f: last_files json.load(f) last_paths {f[path] for f in last_files} return [f for f in current_files if f[path] not in last_paths]5. 自动化部署与监控实现脚本的自动化运行是提高效率的关键。下面介绍几种常见的部署方式。5.1 使用schedule库实现定时任务import schedule import time def backup_job(): token token_manager.get_valid_token() files get_all_files(token) save_to_json(files) print(f备份完成于 {datetime.now()}) # 每天凌晨2点执行备份 schedule.every().day.at(02:00).do(backup_job) while True: schedule.run_pending() time.sleep(60)5.2 系统级定时任务配置对于更稳定的生产环境建议使用系统级的定时任务Linux/Mac (crontab):0 2 * * * /usr/bin/python3 /path/to/your/script.py /var/log/baidubackup.log 21Windows计划任务:打开任务计划程序创建基本任务设置每日触发操作为启动程序指向Python解释器和脚本路径6. 异常处理与日志记录健壮的脚本需要完善的错误处理和日志系统。6.1 增强的异常处理def safe_backup(): try: token token_manager.get_valid_token() files get_all_files(token) backup_file save_to_json(files) log_message f{datetime.now()} - 备份成功共 {len(files)} 个文件 # 可选发送通知邮件或消息 send_notification(备份成功, log_message) except Exception as e: log_message f{datetime.now()} - 备份失败: {str(e)} send_notification(备份失败, log_message) finally: with open(backup.log, a) as log_file: log_file.write(log_message \n)6.2 日志配置使用Python标准库的logging模块配置更专业的日志系统import logging logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(backup.log), logging.StreamHandler() ] ) logger logging.getLogger(__name__)7. 完整脚本整合将所有功能整合成一个完整的、可直接运行的脚本import os import time import json import requests from dotenv import load_dotenv from datetime import datetime import logging # 初始化日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) logger logging.getLogger(BaiduBackup) # 加载环境变量 load_dotenv() class TokenManager: # ... 之前的TokenManager实现 ... def get_file_list(token, path/, start0, limit1000): # ... 之前的get_file_list实现 ... def get_all_files(token, root_path/): # ... 之前的get_all_files实现 ... def save_to_json(data, filenameNone): # ... 之前的save_to_json实现 ... def main(): token_manager TokenManager() try: logger.info(开始备份百度网盘文件列表) token token_manager.get_valid_token() files get_all_files(token) backup_file save_to_json(files) logger.info(f备份成功共 {len(files)} 个文件保存到 {backup_file}) except Exception as e: logger.error(f备份过程中发生错误: {str(e)}) raise if __name__ __main__: main()8. 进阶功能与扩展思路基础功能实现后可以考虑以下扩展方向提升脚本的实用性。8.1 文件变化监控实现文件变化的检测和通知def detect_changes(current_files, last_backup_file): with open(last_backup_file, r) as f: last_files {item[path]: item for item in json.load(f)} changes { added: [], deleted: [], modified: [] } current_paths set() for file in current_files: current_paths.add(file[path]) if file[path] not in last_files: changes[added].append(file) elif file[md5] ! last_files[file[path]][md5]: changes[modified].append(file) changes[deleted] [ path for path in last_files if path not in current_paths ] return changes8.2 多线程加速对于文件数量大的情况可以使用多线程加速from concurrent.futures import ThreadPoolExecutor def parallel_get_all_files(token, root_path/, max_workers4): all_files [] lock threading.Lock() def process_directory(path): nonlocal all_files data get_file_list(token, path) with lock: for item in data.get(list, []): if item[isdir] 1: futures.append(executor.submit(process_directory, item[path])) else: all_files.append({ path: item[path], size: item[size], mtime: item[server_mtime], md5: item[md5] }) with ThreadPoolExecutor(max_workersmax_workers) as executor: futures [executor.submit(process_directory, root_path)] for future in concurrent.futures.as_completed(futures): future.result() return all_files8.3 集成到其他系统将备份数据集成到数据库或其他系统中import sqlite3 def save_to_database(files, db_filebackup.db): conn sqlite3.connect(db_file) cursor conn.cursor() cursor.execute( CREATE TABLE IF NOT EXISTS file_backups ( path TEXT PRIMARY KEY, size INTEGER, mtime INTEGER, md5 TEXT, backup_time TEXT ) ) backup_time datetime.now().isoformat() data [(f[path], f[size], f[mtime], f[md5], backup_time) for f in files] cursor.executemany( INSERT OR REPLACE INTO file_backups VALUES (?, ?, ?, ?, ?) , data) conn.commit() conn.close()

更多文章