200 lines
7.0 KiB
Python
200 lines
7.0 KiB
Python
import sqlite3
|
||
import pandas as pd
|
||
from contextlib import contextmanager
|
||
from dataclasses import dataclass
|
||
from typing import List, Optional
|
||
|
||
@dataclass
|
||
class AccountData:
|
||
email: str # 邮箱
|
||
original_password: str # 原密码
|
||
original_aux_email: str # 原辅助邮箱
|
||
new_password: str # 新密码
|
||
new_aux_email: str # 新辅助邮箱
|
||
change_status: str # 是否更改完成
|
||
|
||
class AccountManagerSQLite:
|
||
def __init__(self, db_path="accounts.db"):
|
||
self.db_path = db_path
|
||
self._initialize_db()
|
||
|
||
def _initialize_db(self):
|
||
"""初始化数据库结构"""
|
||
with self._get_connection() as conn:
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS accounts (
|
||
email TEXT PRIMARY KEY,
|
||
original_password TEXT,
|
||
original_aux_email TEXT,
|
||
new_password TEXT,
|
||
new_aux_email TEXT,
|
||
change_status TEXT
|
||
)
|
||
""")
|
||
|
||
@contextmanager
|
||
def _get_connection(self):
|
||
"""获取 SQLite 数据库连接"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
try:
|
||
yield conn
|
||
finally:
|
||
conn.close()
|
||
|
||
def clear(self):
|
||
"""清空数据库中的所有数据"""
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.execute("DELETE FROM accounts")
|
||
conn.commit()
|
||
print("数据库已清空。")
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
print(f"清空数据库失败:{e}")
|
||
raise
|
||
|
||
def import_data(self, account_list: List[AccountData]):
|
||
"""批量导入数据"""
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.executemany("""
|
||
INSERT OR REPLACE INTO accounts (
|
||
email, original_password, original_aux_email,
|
||
new_password, new_aux_email, change_status
|
||
) VALUES (?, ?, ?, ?, ?, ?)
|
||
""", [
|
||
(
|
||
account.email, account.original_password, account.original_aux_email,
|
||
account.new_password, account.new_aux_email, account.change_status
|
||
) for account in account_list
|
||
])
|
||
conn.commit()
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
print(f"Error importing data: {e}")
|
||
raise
|
||
|
||
def export_data(self) -> List[AccountData]:
|
||
"""导出所有数据"""
|
||
with self._get_connection() as conn:
|
||
cursor = conn.execute("SELECT * FROM accounts")
|
||
rows = cursor.fetchall()
|
||
return [AccountData(*row) for row in rows]
|
||
|
||
def query(self, **kwargs) -> List[AccountData]:
|
||
"""查询数据"""
|
||
query = "SELECT * FROM accounts WHERE " + " AND ".join([f"{key} = ?" for key in kwargs])
|
||
values = tuple(kwargs.values())
|
||
|
||
with self._get_connection() as conn:
|
||
cursor = conn.execute(query, values)
|
||
rows = cursor.fetchall()
|
||
return [AccountData(*row) for row in rows]
|
||
|
||
def update_record(self, email: str, **kwargs):
|
||
"""
|
||
更新记录的指定字段
|
||
:param email: 要更新的记录的邮箱
|
||
:param kwargs: 要更新的字段和值,键为字段名,值为更新的值
|
||
"""
|
||
if not kwargs:
|
||
raise ValueError("没有指定任何更新的字段")
|
||
|
||
# 动态生成 SQL 的 SET 子句
|
||
set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()])
|
||
values = list(kwargs.values())
|
||
values.append(email) # 将 email 添加到参数列表的最后
|
||
|
||
query = f"""
|
||
UPDATE accounts
|
||
SET {set_clause}
|
||
WHERE email = ?
|
||
"""
|
||
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.execute(query, values)
|
||
conn.commit()
|
||
print(f"成功更新记录: {email}")
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
print(f"更新记录失败:{e}")
|
||
raise
|
||
|
||
|
||
def delete_account(self, email: str):
|
||
"""删除某个账户"""
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.execute("DELETE FROM accounts WHERE email = ?", (email,))
|
||
conn.commit()
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
print(f"Error deleting account: {e}")
|
||
raise
|
||
|
||
def import_from_excel(self, excel_path: str, clear_old: bool = False):
|
||
"""
|
||
从 Excel 文件导入数据
|
||
:param excel_path: Excel 文件路径
|
||
:param clear_old: 是否清空旧数据
|
||
"""
|
||
try:
|
||
# 如果 clear_old 为 True,先清空数据库
|
||
if clear_old:
|
||
self.clear()
|
||
|
||
# 读取 Excel 文件的第一个工作簿
|
||
df = pd.read_excel(excel_path, sheet_name=0)
|
||
|
||
# 校验表格格式
|
||
required_columns = ["邮箱", "原密码", "原辅助邮箱", "新密码", "新辅助邮箱", "是否更改完成"]
|
||
if not all(col in df.columns for col in required_columns):
|
||
raise ValueError(f"表格缺少必要的列:{required_columns}")
|
||
|
||
# 将数据转换为 AccountData 对象
|
||
account_list = [
|
||
AccountData(
|
||
email=row["邮箱"],
|
||
original_password=row["原密码"],
|
||
original_aux_email=row["原辅助邮箱"],
|
||
new_password=row["新密码"],
|
||
new_aux_email=row["新辅助邮箱"],
|
||
change_status=row["是否更改完成"]
|
||
)
|
||
for _, row in df.iterrows()
|
||
]
|
||
|
||
# 批量导入数据到数据库
|
||
self.import_data(account_list)
|
||
print(f"成功导入 {len(account_list)} 条数据!")
|
||
except Exception as e:
|
||
print(f"导入失败:{e}")
|
||
raise
|
||
|
||
def export_to_excel(self, excel_path: str):
|
||
"""
|
||
导出数据到 Excel 文件
|
||
:param excel_path: Excel 文件路径
|
||
"""
|
||
try:
|
||
# 从数据库中获取所有数据
|
||
accounts = self.export_data()
|
||
|
||
# 转换为 DataFrame
|
||
df = pd.DataFrame([{
|
||
"邮箱": account.email,
|
||
"原密码": account.original_password,
|
||
"原辅助邮箱": account.original_aux_email,
|
||
"新密码": account.new_password,
|
||
"新辅助邮箱": account.new_aux_email,
|
||
"是否更改完成": account.change_status
|
||
} for account in accounts])
|
||
|
||
# 写入 Excel 文件
|
||
df.to_excel(excel_path, index=False, sheet_name="Accounts")
|
||
print(f"成功导出数据到 {excel_path}!")
|
||
except Exception as e:
|
||
print(f"导出失败:{e}")
|
||
raise
|