python"># encoding: utf-8
# 版权所有 2025 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:并发编程 (Concurrent Programming) pip install mysql.connector
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, poostgreSQL 17.0 Oracle21C
# Datetime : 2025/2/22 18:15
# User : geovindu
# Product : PyCharm
# Project : Pysimple
# File : Concurrent.py
# explain : 学习
import tkinter as tk
from tkinter import ttk
import pymysql
import asyncio
import threading
# 连接到 MySQL 数据库
mydb = pymysql.connect(
host="localhost",
user="root",
password="geovindu",
database="TechnologyGame"
)
mycursor = mydb.cursor()
# 创建表(如果不存在)
mycursor.execute("CREATE TABLE IF NOT EXISTS DuSchool (SchoolId INT AUTO_INCREMENT PRIMARY KEY, SchoolName VARCHAR(255), SchoolTelNo VARCHAR(255))")
async def insert_data():
"""
:return:
"""
name = entry_name.get()
tel = entry_tel.get()
sql = "INSERT INTO DuSchool (SchoolName, SchoolTelNo) VALUES (%s, %s)"
val = (name, tel)
mycursor.execute(sql, val)
mydb.commit()
await refresh_treeview()
async def update_data():
"""
:return:
"""
selected_item = tree.selection()
if selected_item:
id_ = tree.item(selected_item, "values")[0]
print("id:",id_)
name = entry_name.get()
tel = entry_tel.get()
sql = "UPDATE DuSchool SET SchoolName = %s, SchoolTelNo = %s WHERE SchoolId = %s"
val = (name, tel, id_)
mycursor.execute(sql, val)
mydb.commit()
await refresh_treeview()
async def query_data():
"""
:return:
"""
mycursor.execute("SELECT * FROM DuSchool")
rows = mycursor.fetchall()
print(rows)
#tree.delete(*tree.get_children())
for item in tree.get_children():
tree.delete(item)
for row in rows:
tree.insert("", "end", values=row)
async def delete_data():
"""
:return:
"""
selected_item = tree.selection()
if selected_item:
id_ = tree.item(selected_item, "values")[0]
sql = "DELETE FROM DuSchool WHERE SchoolId = %s"
val = (id_,)
mycursor.execute(sql, val)
mydb.commit()
await refresh_treeview()
async def refresh_treeview():
"""
:return:
"""
await query_data()
def run_async_task(task):
"""
:param task:
:return:
"""
#asyncio.run(task())
asyncio.run_coroutine_threadsafe(task(), loop)
# 创建异步事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 在单独的线程中运行异步事件循环
threading.Thread(target=loop.run_forever, daemon=True).start()
root = tk.Tk()
root.title("并发编程操作 MySQL")
# 输入框
tk.Label(root, text="校名:").grid(row=0, column=0)
entry_name = tk.Entry(root)
entry_name.grid(row=0, column=1)
tk.Label(root, text="电话:").grid(row=1, column=0)
entry_tel = tk.Entry(root)
entry_tel.grid(row=1, column=1)
# 按钮
tk.Button(root, text="插入数据", command=lambda: run_async_task(insert_data)).grid(row=2, column=0)
tk.Button(root, text="更新数据", command=lambda: run_async_task(update_data)).grid(row=2, column=1)
tk.Button(root, text="查询数据", command=lambda: run_async_task(query_data)).grid(row=3, column=0)
tk.Button(root, text="删除数据", command=lambda: run_async_task(delete_data)).grid(row=3, column=1)
# Treeview
tree = ttk.Treeview(root, columns=("ID", "校名", "电话"), show="headings")
tree.heading("ID", text="ID")
tree.heading("校名", text="校名")
tree.heading("电话", text="电话")
tree.grid(row=4, column=0, columnspan=2)
root.mainloop()
# 关闭数据库连接
mycursor.close()
mydb.close()
# 停止异步事件循环
loop.stop()
python"># encoding: utf-8
# 版权所有 2025 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:并发编程 (Concurrent Programming) pip install mysql.connector
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, poostgreSQL 17.0 Oracle21C
# Datetime : 2025/2/22 08:15
# User : geovindu
# Product : PyCharm
# Project : Pysimple
# File : Concurrent.py
# explain : 学习
import tkinter as tk
from tkinter import ttk, messagebox
import pymysql
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
# ================= Model 层 =================
class Model:
def __init__(self):
self.db_config = {
"host": "localhost",
"user": "root",
"password": "geovindu",
"database": "TechnologyGame"
}
self._init_db()
def _init_db(self):
try:
with pymysql.connect(**self.db_config) as conn:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS DuSchool (
SchoolId INT AUTO_INCREMENT PRIMARY KEY,
SchoolName VARCHAR(255),
SchoolTelNo VARCHAR(255)
""")
conn.commit()
except pymysql.Error as e:
print(f"数据库初始化失败: {e}")
async def _execute_query(self, query, params=()):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
try:
def _db_operation():
with pymysql.connect(**self.db_config) as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
if query.strip().upper().startswith("SELECT"):
return cursor.fetchall()
conn.commit()
return True
return await loop.run_in_executor(pool, _db_operation)
except Exception as e:
print(f"数据库操作异常: {e}")
return None
async def insert_data(self, name, tel):
return await self._execute_query(
"INSERT INTO DuSchool (SchoolName, SchoolTelNo) VALUES (%s, %s)",
(name, tel)
)
async def update_data(self, id_, name, tel):
return await self._execute_query(
"UPDATE DuSchool SET SchoolName=%s, SchoolTelNo=%s WHERE SchoolId=%s",
(name, tel, id_)
)
async def delete_data(self, id_):
return await self._execute_query(
"DELETE FROM DuSchool WHERE SchoolId=%s",
(id_,)
)
async def query_data(self, keyword, page, rows_per_page):
offset = (page - 1) * rows_per_page
if keyword:
return await self._execute_query(
"SELECT SchoolId, SchoolName, SchoolTelNo FROM DuSchool "
"WHERE SchoolName LIKE %s LIMIT %s OFFSET %s",
('%' + keyword + '%', rows_per_page, offset)
)
return await self._execute_query(
"SELECT SchoolId, SchoolName, SchoolTelNo FROM DuSchool "
"LIMIT %s OFFSET %s",
(rows_per_page, offset)
)
async def get_total_rows(self, keyword):
if keyword:
result = await self._execute_query(
"SELECT COUNT(*) FROM DuSchool WHERE SchoolName LIKE %s",
('%' + keyword + '%',)
)
else:
result = await self._execute_query("SELECT COUNT(*) FROM DuSchool")
return result[0][0] if result else 0
# ================= View 层 =================
class View(tk.Tk):
def __init__(self):
super().__init__()
self.title("学校信息管理系统")
self._create_widgets()
def _create_widgets(self):
# 搜索区域
search_frame = ttk.Frame(self)
search_frame.pack(pady=10, fill=tk.X)
ttk.Label(search_frame, text="模糊查询:").pack(side=tk.LEFT)
self.search_entry = ttk.Entry(search_frame, width=30)
self.search_entry.pack(side=tk.LEFT, padx=5)
self.search_button = ttk.Button(search_frame, text="搜索")
self.search_button.pack(side=tk.LEFT)
# 数据表格
self.tree = ttk.Treeview(self, columns=("ID", "校名", "电话"), show="headings")
self.tree.heading("ID", text="ID", anchor=tk.W)
self.tree.heading("校名", text="校名", anchor=tk.W)
self.tree.heading("电话", text="电话", anchor=tk.W)
self.tree.column("ID", width=80, minwidth=50)
self.tree.column("校名", width=200, minwidth=150)
self.tree.column("电话", width=150, minwidth=100)
self.tree.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 操作按钮
btn_frame = ttk.Frame(self)
btn_frame.pack(pady=10)
self.insert_btn = ttk.Button(btn_frame, text="新增学校")
self.insert_btn.pack(side=tk.LEFT, padx=5)
self.update_btn = ttk.Button(btn_frame, text="修改信息")
self.update_btn.pack(side=tk.LEFT, padx=5)
self.delete_btn = ttk.Button(btn_frame, text="删除记录")
self.delete_btn.pack(side=tk.LEFT, padx=5)
# 分页控件
page_frame = ttk.Frame(self)
page_frame.pack(pady=5)
self.prev_btn = ttk.Button(page_frame, text="上一页", state=tk.DISABLED)
self.prev_btn.pack(side=tk.LEFT, padx=5)
self.page_label = ttk.Label(page_frame, text="第 1 页")
self.page_label.pack(side=tk.LEFT, padx=5)
self.next_btn = ttk.Button(page_frame, text="下一页", state=tk.DISABLED)
self.next_btn.pack(side=tk.LEFT, padx=5)
def show_data(self, rows):
self.tree.delete(*self.tree.get_children())
for row in rows:
self.tree.insert("", "end", values=row)
def update_page_info(self, current_page, total_pages):
self.page_label.config(text=f"第 {current_page} 页 / 共 {total_pages} 页")
self.prev_btn.config(state=tk.NORMAL if current_page > 1 else tk.DISABLED)
self.next_btn.config(state=tk.NORMAL if current_page < total_pages else tk.DISABLED)
# ================= Controller 层 =================
class Controller:
def __init__(self, model, view):
self.model = model
self.view = view
self.current_page = 1
self.rows_per_page = 10
self.keyword = ""
# 初始化事件循环
self.loop = asyncio.new_event_loop()
self._init_async()
# 绑定事件
self.view.search_button.config(command=self.search)
self.view.insert_btn.config(command=self.open_insert_dialog)
self.view.update_btn.config(command=self.open_update_dialog)
self.view.delete_btn.config(command=self.delete_record)
self.view.prev_btn.config(command=self.prev_page)
self.view.next_btn.config(command=self.next_page)
# 初始加载数据
self.load_data()
def _init_async(self):
def run_loop():
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
threading.Thread(target=run_loop, daemon=True).start()
def _run_async(self, coroutine):
asyncio.run_coroutine_threadsafe(coroutine, self.loop).add_done_callback(
lambda f: f.exception() and self.show_error("错误", str(f.exception()))
)
def load_data(self):
async def _load():
rows = await self.model.query_data(self.keyword, self.current_page, self.rows_per_page)
total = await self.model.get_total_rows(self.keyword)
total_pages = max(1, (total + self.rows_per_page - 1) // self.rows_per_page)
self.view.after(0, self.view.show_data, rows)
self.view.after(0, self.view.update_page_info, self.current_page, total_pages)
self._run_async(_load())
def search(self):
self.keyword = self.view.search_entry.get()
self.current_page = 1
self.load_data()
def open_insert_dialog(self):
dialog = tk.Toplevel(self.view)
dialog.title("新增学校")
ttk.Label(dialog, text="学校名称:").grid(row=0, column=0, padx=5, pady=5)
name_entry = ttk.Entry(dialog)
name_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(dialog, text="联系电话:").grid(row=1, column=0, padx=5, pady=5)
tel_entry = ttk.Entry(dialog)
tel_entry.grid(row=1, column=1, padx=5, pady=5)
def save():
name = name_entry.get()
tel = tel_entry.get()
if not name or not tel:
self.show_error("错误", "所有字段必须填写")
return
async def _save():
success = await self.model.insert_data(name, tel)
if success:
self.load_data()
self.show_info("提示", "数据添加成功")
self._run_async(_save())
dialog.destroy()
ttk.Button(dialog, text="保存", command=save).grid(row=2, columnspan=2, pady=10)
def open_update_dialog(self):
selected = self.view.tree.selection()
if not selected:
self.show_error("错误", "请先选择要修改的记录")
return
item = selected[0]
values = self.view.tree.item(item, "values")
dialog = tk.Toplevel(self.view)
dialog.title("修改信息")
ttk.Label(dialog, text="学校名称:").grid(row=0, column=0, padx=5, pady=5)
name_entry = ttk.Entry(dialog)
name_entry.insert(0, values[1])
name_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(dialog, text="联系电话:").grid(row=1, column=0, padx=5, pady=5)
tel_entry = ttk.Entry(dialog)
tel_entry.insert(0, values[2])
tel_entry.grid(row=1, column=1, padx=5, pady=5)
def save():
new_name = name_entry.get()
new_tel = tel_entry.get()
if not new_name or not new_tel:
self.show_error("错误", "所有字段必须填写")
return
async def _save():
success = await self.model.update_data(values[0], new_name, new_tel)
if success:
self.load_data()
self.show_info("提示", "数据更新成功")
self._run_async(_save())
dialog.destroy()
ttk.Button(dialog, text="保存修改", command=save).grid(row=2, columnspan=2, pady=10)
def delete_record(self):
selected = self.view.tree.selection()
if not selected:
self.show_error("错误", "请先选择要删除的记录")
return
if not messagebox.askyesno("确认", "确定要删除这条记录吗?"):
return
item = selected[0]
school_id = self.view.tree.item(item, "values")[0]
async def _delete():
success = await self.model.delete_data(school_id)
if success:
self.load_data()
self.show_info("提示", "数据删除成功")
self._run_async(_delete())
def prev_page(self):
if self.current_page > 1:
self.current_page -= 1
self.load_data()
def next_page(self):
self.current_page += 1
self.load_data()
def show_info(self, title, message):
self.view.after(0, messagebox.showinfo, title, message)
def show_error(self, title, message):
self.view.after(0, messagebox.showerror, title, message)
def run(self):
self.view.mainloop()
if __name__ == "__main__":
model = Model()
view = View()
controller = Controller(model, view)
controller.run()
输出: