串口调试助手1.0 python版
1. 产品概述
串口调试助手是一款专业的串行通信调试工具,专为工程师、技术人员和物联网开发者设计。该工具提供直观的用户界面,使用户能够轻松地与各种串口设备进行通信、测试和故障排除。无论是硬件调试、设备通信测试还是自动化应答,串口调试助手都能满足您的需求。
2. 核心功能
2.1 串口通信管理
- 支持自动检测和连接各种串行端口
- 可配置通信参数:波特率、数据位、停止位、校验方式和流控制
- 实时状态显示,清晰展示连接状态和数据流量
2.2 数据传输
- 双模式数据发送:文本模式和十六进制模式
- 智能数据显示:自动识别编码格式,支持十六进制和文本双视图
- 文件传输功能:可直接发送文件内容到串口设备
- 数据保存:将通信内容保存为文本文件
2.3 预置指令库
- 创建和管理常用指令集合
- 支持文本和十六进制格式的指令
- 可为每条指令添加注释,便于识别
- 循环自动发送功能,可设置发送顺序和间隔时间
2.4 智能自动应答
- 根据接收到的数据自动回复预设内容
- 支持文本和十六进制匹配条件
- 多种校验方式:无校验、CRC-16、XOR校验
- 规则启用/禁用开关,灵活控制应答行为
2.5 历史记录管理
- 自动保存发送历史
- 支持快速重发历史指令
- 可分类查看和管理历史记录
- 提供便捷的历史记录清理功能
2.6 实用转换工具
- 十六进制与十进制互转
- Hex与ASCII编码互转
- 即时转换,无需额外计算器
3. 用户界面
3.1 主显示区域
- 大型数据展示窗口,彩色区分发送和接收数据
- 可选时间戳显示,精确追踪通信时序
- 滚动功能,方便查看历史通信内容
3.2 串口控制面板
- 简洁的端口选择和参数配置界面
- 一键连接/断开按钮
- 实时状态指示器
3.3 发送控制区
- 多功能文本输入框
- 发送模式切换开关(文本/十六进制)
- 校验方式选择
- 快捷功能按钮(清空、文件发送、保存)
3.4 扩展功能面板
- 可折叠设计,优化工作空间
- 三大功能标签页:预置指令、自动答复、历史记录、转换工具
- 拖拽调整宽度,适应不同工作习惯
4. 使用场景
4.1 硬件开发与测试
- 调试微控制器和单片机通信
- 验证传感器数据输出
- 测试通信协议实现
4.2 工业设备维护
- 与PLC、HMI等工业设备通信
- 诊断设备通信故障
- 模拟设备指令,测试系统响应
4.3 物联网应用
- 调试物联网模块通信
- 验证设备与云平台的数据交换
- 模拟设备行为进行系统测试
4.4 逆向工程
- 分析未知设备的通信协议
- 捕获和重放通信数据
- 理解设备命令结构
5. 快速入门
5.1 基本操作流程
- 连接设备:选择正确的串口和通信参数,点击"打开端口"
- 发送指令:在发送区域输入命令,选择适当的格式,点击"发送"
- 查看响应:接收数据显示在主窗口,彩色区分发送和接收内容
- 保存数据:使用"数据存至文件"功能保存通信记录
5.2 高级功能使用
- 预置指令:在"预置指令"标签页添加常用命令,可设置自动循环发送
- 自动应答:在"自动答复"标签页配置应答规则,实现智能化交互
- 数据转换:使用"转换工具"快速转换不同格式的数据,辅助协议分析
6. 常见问题解答
Q:如何选择正确的串口参数?
A:通常设备手册会提供通信参数。若不确定,可尝试常用配置:9600波特率、8数据位、1停止位、无校验。
Q:如何区分接收到的数据是文本还是十六进制?
A:工具提供"Hex显示"选项,勾选后所有数据显示为十六进制格式,便于分析二进制数据。
Q:自动应答功能如何工作?
A:当接收到符合预设条件的数据时,工具会自动发送对应的回复。例如,收到设备查询指令时,可自动回复模拟的传感器数据。
Q:如何提高大量数据传输时的可读性?
A:使用时间戳功能标记每条消息,选择不同的颜色区分发送和接收数据,或使用过滤功能聚焦重要信息。
Q:工具支持哪些校验方式?
A:支持无校验、CRC-16和XOR校验,可在发送区的校验下拉菜单中选择。
7. 系统要求
- Windows 7/8/10/11 操作系统
- 可用的USB或串行端口
8. 程序代码
Python代码
# 导入必要的库
import serial # 串口通信库
import serial.tools.list_ports # 串口列表工具
import tkinter as tk # GUI库
import os # 操作系统接口
from tkinter import ttk, messagebox, colorchooser, filedialog, simpledialog # GUI组件
from threading import Thread, Event # 多线程支持
import time # 时间处理
import binascii # 二进制/ASCII转换
from datetime import datetime # 时间戳处理
import configparser # 配置文件解析
import re # 正则表达式
#######################
# 历史记录容量
HISTORY_MAX_ITEMS = 100 # 最多保存100条历史记录
# ===================== 自动答复类 =====================
class AutoReplyHandler:
def __init__(self, master, debugger):
self.master = master
self.debugger = debugger
self.rules = [] # 存储自动应答规则
self.config_file = "com.config"
self.setup_ui()
self.load_rules()
def setup_ui(self):
"""创建自动应答界面组件"""
self.main_frame = ttk.Frame(self.master)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 设置初始和最小宽度
self.main_frame.config(width=425)
self.master.master.master.master.columnconfigure(1, minsize=425) # 调整父容器列配置
# 工具栏
toolbar = ttk.Frame(self.main_frame)
toolbar.pack(fill=tk.X, pady=2)
ttk.Button(toolbar, text="添加规则", command=self.add_rule).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar, text="删除选中", command=self.delete_selected).pack(side=tk.LEFT, padx=2)
# 规则表格
self.tree = ttk.Treeview(self.main_frame, columns=('enable', 'match', 'reply', 'hex'),
show='headings', selectmode='extended')
# 配置列
self.tree.heading('enable', text='启用', anchor=tk.CENTER)
self.tree.column('enable', width=40, minwidth=40, stretch=False) # 固定宽度
self.tree.heading('match', text='匹配内容')
self.tree.column('match', width=150, minwidth=100, stretch=True) # 自适应宽度
self.tree.heading('reply', text='回复内容')
self.tree.column('reply', width=150, minwidth=100, stretch=True) # 自适应宽度
self.tree.heading('hex', text='HEX格式', anchor=tk.CENTER)
self.tree.column('hex', width=40, minwidth=40, stretch=False) # 固定宽度
# 添加滚动条
scroll = ttk.Scrollbar(self.main_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scroll.set)
# 布局
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scroll.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定双击事件
self.tree.bind("<Double-1>", self.on_double_click)
# 绑定鼠标事件(新增)
self.tree.bind("<Motion>", self.on_tree_hover)
self.tree.bind("<Leave>", self.hide_tooltip)
self.current_tooltip = None # 当前提示窗口
def on_tree_hover(self, event):
"""处理表格悬停事件(新增)"""
# 确定鼠标所在位置
region = self.tree.identify("region", event.x, event.y)
if region != "cell":
self.hide_tooltip()
return
# 获取行和列信息
column = self.tree.identify_column(event.x)
item = self.tree.identify_row(event.y)
# 只处理匹配内容和回复列
if column in ("#2", "#3"): # 第2列是匹配内容,第3列是回复
index = int(self.tree.index(item))
rule = self.rules[index]
# 获取完整内容
text = rule['match'] if column == "#2" else rule['reply']
if not text: return
# 显示提示窗口
self.show_tooltip(event.x_root, event.y_root, text)
def show_tooltip(self, x, y, text):
"""显示提示窗口(新增)"""
if self.current_tooltip:
self.current_tooltip.destroy()
self.current_tooltip = tk.Toplevel(self.master)
self.current_tooltip.wm_overrideredirect(True)
self.current_tooltip.wm_geometry(f"+{x+15}+{y+10}")
label = ttk.Label(
self.current_tooltip,
text=text,
background="#FFFFE0",
relief="solid",
borderwidth=1,
padding=(3,1),
wraplength=300
)
label.pack()
def hide_tooltip(self, event=None):
"""隐藏提示窗口(新增)"""
if self.current_tooltip:
self.current_tooltip.destroy()
self.current_tooltip = None
def add_rule(self, initial_data=None):
"""添加新的应答规则"""
# 默认初始数据
default_data = {
'enable': True,
'match': '',
'reply': '',
'hex': False,
'checksum': 'None'
}
if initial_data:
default_data.update(initial_data)
# 弹出编辑对话框
if self.edit_rule_dialog(default_data):
self.rules.append(default_data)
self._update_treeview()
self.save_rules()
def edit_rule_dialog(self, rule_data):
"""规则编辑对话框"""
dlg = tk.Toplevel(self.master)
dlg.title("编辑应答规则")
dlg.grab_set()
dlg.confirmed = False # 添加确认标志
# 窗口居中逻辑
dialog_width = 305
dialog_height = 140
screen_width = dlg.winfo_screenwidth()
screen_height = dlg.winfo_screenheight()
x = (screen_width - dialog_width) // 2
y = (screen_height - dialog_height) // 2
dlg.geometry(f"{dialog_width}x{dialog_height}+{x}+{y}")
# 控件变量
enable_var = tk.BooleanVar(value=rule_data['enable'])
match_var = tk.StringVar(value=rule_data['match'])
reply_var = tk.StringVar(value=rule_data['reply'])
hex_var = tk.BooleanVar(value=rule_data['hex'])
checksum_var = tk.StringVar(value=rule_data['checksum'])
# 布局
ttk.Checkbutton(dlg, text="启用规则", variable=enable_var).grid(row=0, column=0, columnspan=2, sticky=tk.W)
ttk.Label(dlg, text="匹配条件:").grid(row=1, column=0, sticky=tk.W)
match_entry = ttk.Entry(dlg, textvariable=match_var, width=30)
match_entry.grid(row=1, column=1, padx=2, pady=2)
ttk.Label(dlg, text="回复内容:").grid(row=2, column=0, sticky=tk.W)
reply_entry = ttk.Entry(dlg, textvariable=reply_var, width=30)
reply_entry.grid(row=2, column=1, padx=2, pady=2)
ttk.Checkbutton(dlg, text="HEX格式 ╎ ", variable=hex_var).grid(row=3, column=0, sticky=tk.W)
ttk.Label(dlg, text="校验方式:").grid(row=3, column=1, sticky=tk.W)
checksum_combo = ttk.Combobox(dlg, values=['None', 'CRC-16', 'XOR'],
textvariable=checksum_var, width=8)
checksum_combo.grid(row=3, column=1)
# 对话框按钮
btn_frame = ttk.Frame(dlg)
btn_frame.grid(row=4, column=0, columnspan=2, pady=5)
def on_ok():
dlg.confirmed = True # 设置确认标志为True
dlg.destroy()
ttk.Button(btn_frame, text="确定", command=on_ok).pack(side=tk.LEFT, padx=10)
ttk.Button(btn_frame, text="取消", command=dlg.destroy).pack(side=tk.LEFT)
dlg.wait_window() # 等待对话框关闭
# 仅在确认时更新数据
if dlg.confirmed:
rule_data.update({
'enable': enable_var.get(),
'match': match_var.get(),
'reply': reply_var.get(),
'hex': hex_var.get(),
'checksum': checksum_var.get()
})
return True
return False
def delete_selected(self):
"""删除选中的规则"""
selected = self.tree.selection()
for item in reversed(selected):
index = int(self.tree.index(item))
del self.rules[index]
self._update_treeview()
self.save_rules()
def on_double_click(self, event):
"""双击编辑规则"""
item = self.tree.identify_row(event.y)
if item:
index = int(self.tree.index(item))
if self.edit_rule_dialog(self.rules[index]):
self._update_treeview()
self.save_rules()
def _update_treeview(self):
"""更新Treeview显示"""
self.tree.delete(*self.tree.get_children())
for rule in self.rules:
status = '✓' if rule['enable'] else '✗'
self.tree.insert('', 'end', values=(
status,
rule['match'],
rule['reply'],
'HEX' if rule['hex'] else 'TXT'
))
def check_and_reply(self, received_data):
"""检查并执行自动应答"""
if not self.debugger.serial_port or not self.debugger.serial_port.is_open:
return
# 将接收数据转换为字符串表示形式
hex_received = ' '.join(f'{b:02X}' for b in received_data)
str_received = self.debugger.auto_decode(received_data)
for rule in self.rules:
if not rule['enable']:
continue
# 根据规则类型进行匹配
if rule['hex']:
match_str = rule['match'].upper().replace(' ', '')
received_match = hex_received.replace(' ', '')
else:
match_str = rule['match']
received_match = str_received
# 执行匹配(这里使用简单包含匹配,可扩展为正则表达式)
if match_str in received_match:
self._send_reply(rule)
def _send_reply(self, rule):
"""发送回复内容"""
try:
# 准备回复数据
if rule['hex']:
data = binascii.unhexlify(rule['reply'].replace(' ', ''))
else:
data = rule['reply'].encode('utf-8')
# 添加校验
data = self._add_checksum(data, rule['checksum'])
# 发送数据
self.debugger.serial_port.write(data)
self.debugger.tx_counter += len(data)
self.debugger.update_counters()
# 显示发送数据
self.debugger.display_data(data, 'send')
except Exception as e:
messagebox.showerror("自动应答错误", f"发送失败: {str(e)}")
def _add_checksum(self, data, checksum_type):
"""添加校验(复用主程序的校验方法)"""
if checksum_type == 'CRC-16':
return data + self.debugger.calculate_crc16(data)
elif checksum_type == 'XOR':
return data + bytes([self.debugger.calculate_xor(data)])
return data
def save_rules(self):
"""保存规则到配置文件"""
config = configparser.ConfigParser()
config.read(self.config_file, encoding='utf-8')
# 清除旧规则
for section in config.sections():
if section.startswith("AutoReply"):
config.remove_section(section)
# 添加新规则
config['AutoReply'] = {'count': str(len(self.rules))}
for idx, rule in enumerate(self.rules):
section = f"AutoReply{idx}"
config[section] = {
'enable': str(rule['enable']),
'match': rule['match'],
'reply': rule['reply'],
'hex': str(rule['hex']),
'checksum': rule['checksum']
}
with open(self.config_file, 'w', encoding='utf-8') as f:
config.write(f)
def load_rules(self):
"""从配置文件加载规则"""
config = configparser.ConfigParser()
config.read(self.config_file, encoding='utf-8')
if 'AutoReply' in config:
rule_count = config.getint('AutoReply', 'count', fallback=0)
for i in range(rule_count):
section = f"AutoReply{i}"
if config.has_section(section):
self.rules.append({
'enable': config.getboolean(section, 'enable'),
'match': config.get(section, 'match'),
'reply': config.get(section, 'reply'),
'hex': config.getboolean(section, 'hex'),
'checksum': config.get(section, 'checksum')
})
self._update_treeview()
# ===================== 历史记录类 =====================
class HistoryHandler:
def __init__(self, master, debugger):
self.master = master
self.debugger = debugger
self.history = []
self.config_file = "com.config"
# 增加容器验证
print(f"[UI DEBUG] 历史记录容器类型: {type(master)}") # 调试父容器类型
# 初始化界面
self.setup_ui()
self.load_history()
def setup_ui(self):
"""重构界面布局(解决显示问题)"""
# 主容器配置
self.frame = ttk.Frame(self.master)
self.frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # 增加边距
# 布局权重配置(关键修复)
self.frame.columnconfigure(0, weight=1) # 列扩展权重
self.frame.rowconfigure(0, weight=1) # 行扩展权重
# 列表控件
self.listbox = tk.Listbox(
self.frame,
selectmode=tk.EXTENDED,
font=('微软雅黑', 10),
relief=tk.GROOVE,
borderwidth=1
)
scrollbar = ttk.Scrollbar(self.frame, orient=tk.VERTICAL, command=self.listbox.yview)
self.listbox.configure(yscrollcommand=scrollbar.set)
# 布局管理(使用grid保证扩展性)
self.listbox.grid(row=0, column=0, sticky="nsew")
scrollbar.grid(row=0, column=1, sticky="ns")
# 按钮面板(固定在底部)
btn_frame = ttk.Frame(self.frame)
btn_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5,0))
# 按钮配置
ttk.Button(btn_frame, text="删除选中", command=self.delete_selected).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="清空历史", command=self.clear_history).pack(side=tk.LEFT, padx=2)
ttk.Button(btn_frame, text="发送选中", command=self.send_selected).pack(side=tk.LEFT, padx=2)
# 右键菜单
self.context_menu = tk.Menu(self.master, tearoff=0)
self.context_menu.add_command(label="发送", command=self.send_selected)
self.context_menu.add_command(label="删除", command=self.delete_selected)
# 绑定事件
self.listbox.bind("<Double-1>", lambda e: self.send_selected())
self.listbox.bind("<Button-3>", self.show_context_menu)
# 强制刷新布局
self.frame.update_idletasks()
print("[UI DEBUG] 历史记录UI初始化完成") # 布局调试日志
def show_context_menu(self, event):
"""修复版右键菜单处理"""
try:
# 获取最近的列表项索引
index = self.listbox.nearest(event.y)
if index == -1: # 无效索引处理
return
# 获取列表项边界框(格式:(x, y, width, height))
bbox = self.listbox.bbox(index)
if not bbox: # 处理空边界框情况
return
# 正确解构元组
_, y_start, _, height = bbox # 分解边界框参数
# 坐标验证(使用正确类型比较)
if not (y_start <= event.y <= y_start + height):
print(f"[DEBUG] 点击在行间隙(Y范围:{y_start}-{y_start+height},实际Y:{event.y})")
return
# 更新选中状态
self.listbox.selection_clear(0, tk.END)
self.listbox.selection_set(index)
# 显示菜单
self.context_menu.tk_popup(event.x_root, event.y_root)
except Exception as e:
print(f"[ERROR] 右键处理失败: {str(e)}")
def add_history(self, command, is_hex):
"""添加历史记录"""
# 去除重复项(保留最新)
self.history = [item for item in self.history if item['command'] != command]
# 添加新记录
self.history.insert(0, {
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'command': command,
'hex': is_hex
})
# 保持最大记录数
if len(self.history) > HISTORY_MAX_ITEMS:
self.history = self.history[:HISTORY_MAX_ITEMS]
self.save_history()
self.update_listbox()
def delete_selected(self):
"""删除选中项"""
selected = self.listbox.curselection()
if not selected:
return
# 倒序删除避免索引变化
for index in reversed(selected):
del self.history[index]
self.save_history()
self.update_listbox()
def clear_history(self):
"""清空历史记录"""
if messagebox.askyesno("确认清空", "确定要清空所有历史记录吗?"):
self.history = []
self.save_history()
self.update_listbox()
def send_selected(self):
"""发送选中指令"""
selected = self.listbox.curselection()
for index in selected:
item = self.history[index]
self.debugger.send_custom_command(item['command'], item['hex'])
def update_listbox(self):
"""更新列表显示"""
self.listbox.delete(0, tk.END)
for item in self.history:
prefix = "[HEX]" if item['hex'] else "[TXT]"
self.listbox.insert(tk.END, f"{prefix} {item['timestamp']} - {item['command'][:50]}")
def load_history(self):
"""从配置文件加载历史记录"""
config = configparser.ConfigParser()
config.read(self.config_file, encoding='utf-8')
if 'History' in config:
history_count = config.getint('History', 'count', fallback=0)
self.history = []
for i in range(history_count):
section = f"History{i}"
if config.has_section(section):
self.history.append({
'timestamp': config.get(section, 'timestamp'),
'command': config.get(section, 'command'),
'hex': config.getboolean(section, 'hex')
})
self.update_listbox()
def save_history(self):
"""保存历史记录到配置文件"""
config = configparser.ConfigParser()
config.read(self.config_file, encoding='utf-8')
# 删除旧的历史记录
for section in config.sections():
if section.startswith("History"):
config.remove_section(section)
# 写入新记录
config['History'] = {'count': str(len(self.history))}
for i, item in enumerate(self.history):
section = f"History{i}"
config[section] = {
'timestamp': item['timestamp'],
'command': item['command'],
'hex': str(item['hex'])
}
with open(self.config_file, 'w', encoding='utf-8') as f:
config.write(f)
# ===================== 预置指令类 =====================
class PresetCommand:
def __init__(self, master, debugger):
self.master = master # 父窗口
self.debugger = debugger # 调试器实例
self.commands = [] # 存储预置指令的列表
self.config_file = "com.config" # 配置文件路径
self.max_commands = 100 # 最大指令数量限制
# 初始化配置解析器
self.config = configparser.ConfigParser(allow_no_value=True)
# 保持配置项大小写敏感
self.config.optionxform = lambda option: option # type: ignore
self.setup_ui() # 初始化UI
# 在创建canvas和scrollable_frame后添加以下事件绑定
self._bind_scroll_events()
self.tooltips = { # 提示信息配置
0: "HEX发送:勾选后以十六进制格式发送指令",
1: "指令内容(双击修改注释):\n- 输入要发送的指令内容\n- 支持ASCII或HEX格式",
2: "点击发送:点击立即发送对应指令",
3: "发送顺序:\n- 0:不参与循环发送\n- 数字越大发送越晚\n- 相同顺序同时发送",
4: "发送延时:本条指令发送完成后\n等待指定毫秒再发送下一条",
5: "删除指令:点击删除本行配置"
}
if not os.path.exists(self.config_file): # 如果配置文件不存在,创建默认指令
self.create_default_commands()
else: # 否则加载配置文件中的指令
self.load_commands()
def setup_ui(self):
"""重构界面布局(解决显示问题)"""
# 主容器配置
self.main_frame = ttk.Frame(self.master)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 顶部控制栏
top_row = ttk.Frame(self.main_frame)
top_row.pack(fill=tk.X, pady=(0,5))
ttk.Label(top_row, text="<--拖动加宽", foreground="gray").pack(side=tk.LEFT, padx=5)
self.loop_send_var = tk.BooleanVar()
ttk.Checkbutton(top_row, text="循环发送", variable=self.loop_send_var,
command=lambda: self.debugger.toggle_loop_send()).pack(side=tk.LEFT)
# 表格容器(包含标题和滚动区域)
table_container = ttk.Frame(self.main_frame)
table_container.pack(fill=tk.BOTH, expand=True)
# 表头
header_frame = ttk.Frame(table_container)
header_frame.pack(fill=tk.X)
header_columns = [
("HEX", 6), ("字符串(双击改名)", 1), ("点击发送", 10),
("顺序", 6), ("延时(ms)", 8), ("删", 4)
]
for col, (text, width) in enumerate(header_columns):
lbl = ttk.Label(header_frame, text=text, width=width, anchor=tk.CENTER)
lbl.grid(row=0, column=col, sticky='ew', padx=2)
header_frame.grid_columnconfigure(1, weight=1) # 字符串列自适应
# 滚动区域容器
scroll_container = ttk.Frame(table_container)
scroll_container.pack(fill=tk.BOTH, expand=True)
# 滚动条和Canvas
self.canvas = tk.Canvas(scroll_container, borderwidth=0, highlightthickness=0)
self.scrollbar = ttk.Scrollbar(scroll_container, orient="vertical", command=self.canvas.yview)
self.scrollable_frame = ttk.Frame(self.canvas)
# 布局管理
self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Canvas配置
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.canvas.create_window((0,0), window=self.scrollable_frame, anchor="nw", tags="scroll_frame")
# 绑定事件
self.scrollable_frame.bind("<Configure>", lambda e: self.canvas.configure(
scrollregion=self.canvas.bbox("all")
))
self.canvas.bind("<Configure>", self._on_canvas_resize)
# 底部按钮
self.add_btn = ttk.Button(self.main_frame, text="添加预置命令", command=self.add_preset_command)
self.add_btn.pack(side=tk.BOTTOM, anchor=tk.E, padx=5, pady=2)
def _bind_scroll_events(self):
"""绑定所有相关控件的滚轮事件(新增方法)"""
# 绑定到Canvas和可滚动区域
for widget in [self.canvas, self.scrollable_frame]:
widget.bind("<MouseWheel>", self._on_mouse_wheel)
widget.bind("<Button-4>", self._on_mouse_wheel) # Linux向上滚动
widget.bind("<Button-5>", self._on_mouse_wheel) # Linux向下滚动
# 绑定现有行框架
for cmd in self.commands:
self._bind_row_events(cmd['row_frame'])
def _bind_row_events(self, row_frame):
"""为行框架绑定事件(新增方法)"""
row_frame.bind("<MouseWheel>", self._on_mouse_wheel)
row_frame.bind("<Button-4>", self._on_mouse_wheel)
row_frame.bind("<Button-5>", self._on_mouse_wheel)
def _on_mouse_wheel(self, event):
"""统一处理滚轮事件(新增方法)"""
# Windows/Mac 处理
if event.delta:
delta = event.delta
# Linux 处理
elif event.num in (4, 5):
delta = 1 if event.num == 4 else -1
else:
return
# 计算滚动方向(反转数值以适应自然滚动)
scroll_units = -1 * (delta // abs(delta))
# 执行滚动
self.canvas.yview_scroll(scroll_units, "units")
return "break" # 阻止事件传播
def _update_button_numbers(self):
for idx, cmd in enumerate(self.commands): # 遍历所有指令
current_text = cmd['widgets']['comment_btn']['text'].split(' ', 1)[-1] # 获取当前按钮文本
cmd['widgets']['comment_btn'].config(text=f"#{idx+1} {current_text}") # 更新按钮序号
def _on_canvas_resize(self, event):
"""Canvas尺寸变化时调整内部框架宽度"""
canvas_width = event.width
self.canvas.itemconfigure("scroll_frame", width=canvas_width) # 关键:让可滚动区域宽度匹配Canvas
def add_preset_command(self, initial_data=None):
if len(self.commands) >= self.max_commands: # 检查指令数量是否超过限制
messagebox.showwarning("提示", f"最多只能添加{self.max_commands}条指令")
return
row_frame = ttk.Frame(self.scrollable_frame) # 创建指令行框架
row_frame.pack(fill=tk.X, pady=1) # 放置框架
widgets = { # 定义控件
'hex_var': tk.BooleanVar(),
'command_entry': ttk.Entry(row_frame),
'comment_btn': ttk.Button(row_frame, text=f"#{len(self.commands)+1} 无注释", width=12),
'order_entry': ttk.Entry(row_frame, width=4),
'delay_entry': ttk.Entry(row_frame, width=6),
'del_btn': ttk.Button(row_frame, text="-", width=2)
}
# 为每个控件绑定滚轮事件
for widget in widgets.values():
if isinstance(widget, (ttk.Entry, ttk.Button)):
widget.bind("<MouseWheel>", self._on_mouse_wheel)
widget.bind("<Button-4>", self._on_mouse_wheel) # Linux向上滚动
widget.bind("<Button-5>", self._on_mouse_wheel) # Linux向下滚动
validate_num = (self.master.register(self.validate_number), '%P') # 输入验证配置
widgets['order_entry'].insert(0, "0") # 初始化顺序输入框
widgets['order_entry'].config(validate="key", validatecommand=validate_num) # 配置验证
widgets['delay_entry'].insert(0, "1000") # 初始化延时输入框
widgets['delay_entry'].config(validate="key", validatecommand=validate_num) # 配置验证
ttk.Checkbutton(row_frame, variable=widgets['hex_var']).grid(row=0, column=0, padx=2, sticky='w') # HEX复选框
widgets['command_entry'].grid(row=0, column=1, padx=2, sticky='ew') # 指令输入框
widgets['comment_btn'].grid(row=0, column=2, padx=2, sticky='e') # 注释按钮
widgets['order_entry'].grid(row=0, column=3, padx=2, sticky='e') # 顺序输入框
widgets['delay_entry'].grid(row=0, column=4, padx=2, sticky='e') # 延时输入框
widgets['del_btn'].grid(row=0, column=5, padx=2, sticky='e') # 删除按钮
widgets['command_entry'].bind("<Double-1>", lambda e, w=widgets: self.rename_comment(w)) # 双击修改注释
widgets['comment_btn'].config(command=lambda w=widgets: self.debugger.send_custom_command(
w['command_entry'].get(), w['hex_var'].get())) # 点击发送指令
widgets['del_btn'].config(command=lambda w=row_frame: self.delete_command(w)) # 点击删除指令
# 新增行绑定滚轮事件
self._bind_row_events(row_frame)
if initial_data: # 如果有初始数据,初始化控件
widgets['hex_var'].set(initial_data.get('hex', False))
widgets['command_entry'].insert(0, initial_data.get('command', ''))
widgets['comment_btn'].config(text=f"#{len(self.commands)+1} {initial_data.get('comment', '无注释')}")
widgets['order_entry'].delete(0, tk.END)
widgets['order_entry'].insert(0, str(initial_data.get('order', 0)))
#添加右键菜单绑定
for entry in [widgets['command_entry'], widgets['order_entry'], widgets['delay_entry']]:
entry.bind("<Button-3>", self.debugger.show_context_menu)
for col, widget in enumerate(widgets.values()): # 添加工具提示
if col != 0: # 跳过BooleanVar
self._add_tooltip(widget, col)
row_frame.columnconfigure(1, weight=1) # 设置列权重
self.commands.append({"row_frame": row_frame, "widgets": widgets}) # 添加指令到列表
self._save_commands() # 保存指令
widgets['hex_var'].trace_add('write', lambda *_: self._save_commands()) # 绑定数据变更事件
widgets['command_entry'].bind('<KeyRelease>', lambda e: self._save_commands())
widgets['order_entry'].bind('<KeyRelease>', lambda e: self._save_commands())
widgets['delay_entry'].bind('<KeyRelease>', lambda e: self._save_commands())
self._update_button_numbers() # 更新按钮序号
def _add_tooltip(self, widget, col_index):
tooltip_text = self.tooltips.get(col_index, "") # 获取提示文本
tooltip = tk.Toplevel(self.master) # 创建提示框
tooltip.withdraw() # 隐藏提示框
tooltip.overrideredirect(True) # 去除窗口装饰
label = ttk.Label(tooltip, text=tooltip_text, background="#FFFFE0",
relief="solid", borderwidth=1, padding=(4,2),
font=('微软雅黑', 9)) # 提示框样式
label.pack()
tooltip_visible = False # 提示框可见状态
scheduled_id = None # 定时器ID
def show_tooltip(): # 显示提示框
nonlocal tooltip_visible
tooltip_visible = True
x = widget.winfo_rootx() + 20
y = widget.winfo_rooty() + 25
tooltip.geometry(f"+{x}+{y}")
tooltip.deiconify()
def schedule_show(): # 安排显示提示框
nonlocal scheduled_id
scheduled_id = self.master.after(500, show_tooltip)
def hide_tooltip(): # 隐藏提示框
nonlocal tooltip_visible, scheduled_id
if scheduled_id:
self.master.after_cancel(scheduled_id)
if tooltip_visible:
tooltip.withdraw()
tooltip_visible = False
widget.bind("<Enter>", lambda e: schedule_show()) # 绑定事件
widget.bind("<Leave>", lambda e: hide_tooltip())
widget.bind("<ButtonPress>", lambda e: hide_tooltip())
def validate_number(self, value):
return value.isdigit() or value == "" # 验证数字输入
def rename_comment(self, widgets):
current_text = widgets['comment_btn'].cget('text') # 获取当前按钮文本
new_name = simpledialog.askstring( # 弹出对话框
"修改注释",
"请输入新的按钮名称:",
parent=self.master,
initialvalue=current_text
)
if new_name: # 如果有新名称,更新按钮文本
widgets['comment_btn'].config(text=new_name)
self._save_commands()
self._update_button_numbers()
def send_command(self, widgets):
command = widgets['command_entry'].get() # 获取指令内容
if not command: # 如果指令为空,返回
return
hex_send = widgets['hex_var'].get() # 获取HEX发送状态
self.debugger.send_custom_command(command, hex_send) # 发送指令
def delete_command(self, row_frame):
if messagebox.askyesno("确认删除", "确定要删除该指令吗?"): # 确认删除
for cmd in self.commands: # 遍历指令列表
if cmd['row_frame'] == row_frame: # 找到对应指令
self.commands.remove(cmd) # 移除指令
break
row_frame.destroy() # 销毁UI组件
self._save_commands() # 保存指令
self._update_button_numbers() # 更新序号
def get_sorted_commands(self):
valid_commands = [] # 有效指令列表
for cmd in self.commands: # 遍历指令
order = cmd['widgets']['order_entry'].get() # 获取顺序
if order.isdigit() and int(order) > 0: # 如果顺序有效
valid_commands.append({ # 添加到有效指令列表
'order': int(order),
'delay': int(cmd['widgets']['delay_entry'].get()),
'command': cmd['widgets']['command_entry'].get(),
'hex': cmd['widgets']['hex_var'].get()
})
return sorted(valid_commands, key=lambda x: x['order']) # 按顺序排序
def create_default_commands(self):
"""创建默认的INI配置和界面元素"""
# 清空现有配置
self.config = configparser.ConfigParser()
self.config.optionxform = lambda option: option # type: ignore
# 添加元数据
self.config['Meta'] = {
'version': '2.0',
'create_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'comment': 'Auto generated default config'
}
default_commands = [
{'hex': True, 'command': '01 03 00 00 00 01', 'comment': '读保持寄存器', 'order': 1, 'delay': 1000},
{'hex': True, 'command': '01 06 00 00 00 01 00 01', 'comment': '写单个寄存器', 'order': 2, 'delay': 3000},
{'hex': True, 'command': '01 03 00 01 00 01', 'comment': '读输入寄存器', 'order': 3, 'delay': 1000},
{'hex': True, 'command': '01 04 00 00 00 01', 'comment': '读输入状态', 'order': 4, 'delay': 3000},
*[
{'hex': True, 'command': cmd, 'comment': comment, 'order': 0, 'delay': 1000}
for cmd, comment in [
('01 01 00 00 00 01', '读线圈状态'),
('01 05 00 00 FF 00', '写单个线圈'),
('01 0F 00 00 00 01', '写多个线圈'),
('01 10 00 00 00 02 04 00 01', '写多个寄存器'),
('01 03 00 00 00 01', '读设备标识'),
('08 01 00 00 00 01', '回送测试'),
]
]
]
# 写入配置并添加界面元素
for idx, cmd in enumerate(default_commands):
# 添加到INI配置
section_name = f"Command{idx}"
self.config[section_name] = {
'hex': str(cmd['hex']),
'command': cmd['command'],
'comment': cmd['comment'],
'order': str(cmd['order']),
'delay': str(cmd['delay'])
}
# 添加界面元素
self.add_preset_command(initial_data=cmd)
# 保存到文件
with open(self.config_file, 'w', encoding='utf-8') as f:
self.config.write(f)
def load_commands(self):
"""从INI文件加载预置指令配置"""
try:
# 清空现有配置
self.config.read(self.config_file, encoding='utf-8')
# 获取所有Command开头的section并按数字排序
command_sections = sorted(
[s for s in self.config.sections() if s.startswith("Command")],
key=lambda x: int(x[7:]) # 提取Command后的数字作为排序依据
)
for section in command_sections:
# 读取每个指令的配置项
cmd_data = {
'hex': self.config.getboolean(section, 'hex'),
'command': self.config[section]['command'],
'comment': self.config[section].get('comment', '无注释'),
'order': self.config[section].get('order', '0'),
'delay': self.config[section].get('delay', '1000')
}
# 添加指令到界面
self.add_preset_command(initial_data=cmd_data)
except Exception as e:
messagebox.showerror("配置加载错误",
f"配置文件格式错误: {str(e)}\n将使用默认配置")
# 创建备份文件以便调试
if os.path.exists(self.config_file):
os.rename(self.config_file, f"{self.config_file}.bak")
self.create_default_commands()
def _save_commands(self):
"""保存当前指令到INI文件"""
# 清空现有配置(保留非Command的section)
new_config = configparser.ConfigParser()
new_config.optionxform = lambda option: option # type: ignore
# 保留非Command的全局配置(如果有)
for section in self.config.sections():
if not section.startswith("Command"):
new_config[section] = self.config[section]
# 添加指令配置
for idx, cmd in enumerate(self.commands):
section_name = f"Command{idx}"
widgets = cmd['widgets']
new_config[section_name] = {
'hex': str(widgets['hex_var'].get()),
'command': widgets['command_entry'].get(),
'comment': widgets['comment_btn']['text'].split(' ', 1)[-1],
'order': widgets['order_entry'].get(),
'delay': widgets['delay_entry'].get()
}
# 写入文件
with open(self.config_file, 'w', encoding='utf-8') as f:
new_config.write(f)
# ===================== 主程序类 =====================
class SerialDebugger:
def __init__(self, master):
self.master = master # 父窗口
self.serial_port = None # 串口对象
self.receive_flag = Event() # 接收标志
self.auto_send_flag = False # 自动发送标志
self.rx_counter = 0 # 接收计数器
self.tx_counter = 0 # 发送计数器
self.recv_color = '#000000' # 接收颜色
self.send_color = '#0000FF' # 发送颜色
self.extension_visible = False # 扩展窗口可见状态
self.drag_start_x = 0 # 拖动起始位置
self.initial_width = 425 # 初始宽度
self.is_dragging = False # 是否正在拖动
# 新增接收缓冲区相关属性
self.receive_buffer = bytearray() # 接收数据缓冲区
self.last_receive_time = 0 # 最后接收时间戳
self.frame_timeout = 0.05 # 帧超时判定阈值(单位:秒)
self.min_frame_length = 4 # 最小合法帧长度(按Modbus RTU等协议设定)
self.max_frame_length = 256 # 最大允许帧长度(防止内存溢出)
temp = tk.Checkbutton(master) # 获取默认背景颜色
self.default_bg = temp.cget('bg')
temp.destroy()
self.style = ttk.Style() # 初始化ttk样式
self.style.configure('Yellow.TCombobox', fieldbackground='yellow')
self.setup_ui() # 初始化主界面
self.setup_extension_window() # 初始化扩展窗口
self.preset_commands = PresetCommand(self.preset_frame, self) # 初始化预置指令
self.auto_reply_handler = AutoReplyHandler(self.auto_reply_frame, self) #初始化自动应答
# 初始化完毕,隐藏扩展窗口
self.extension_frame.grid_remove()
self.master.grid_columnconfigure(1, weight=0, minsize=0)
self.update_ports() # 更新端口列表
self.config_file = "com.config" # 配置文件路径
self.load_serial_settings() # 新增:加载历史配置
self.port_combo.bind("<<ComboboxSelected>>", self.on_port_change) # 绑定端口选择事件
self.loop_send_active = False # 循环发送状态
# 创建右键菜单
self.context_menu = None
self.create_context_menu() # 创建右键菜单
def setup_ui(self):
#self.master.geometry("990x700") # 设置窗口大小
# 新增窗口居中逻辑
self._center_window(900, 600) # 参数保持原窗口尺寸
self.master.title("串口调试工具 - 智能模拟传感器答复") # 设置窗口标题
self.master.minsize(650, 450) # 设置最小窗口大小
self.master.grid_columnconfigure(0, weight=1) # 配置主窗口网格布局
self.master.grid_columnconfigure(1, weight=0, minsize=0)
self.master.grid_rowconfigure(0, weight=1)
self.master.grid_rowconfigure(1, weight=0)
self.master.grid_rowconfigure(2, weight=0)
display_frame = ttk.Frame(self.master) # 数据显示区
display_frame.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
self.text_display = tk.Text(display_frame, state=tk.DISABLED, wrap=tk.WORD) # 数据显示文本框
self.text_display.bind("<Button-3>", self.show_context_menu) # 绑定右键菜单
scroll_display = ttk.Scrollbar(display_frame, orient="vertical", command=self.text_display.yview) # 滚动条
self.text_display.configure(yscrollcommand=scroll_display.set) # 配置滚动
self.text_display.grid(row=0, column=0, sticky="nsew") # 放置文本框
scroll_display.grid(row=0, column=1, sticky="ns") # 放置滚动条
display_frame.grid_columnconfigure(0, weight=1) # 配置列权重
display_frame.grid_rowconfigure(0, weight=1)
control_frame = ttk.Frame(self.master) # 中间控制区
control_frame.grid(row=1, column=0, columnspan=2, sticky="nsew", padx=5, pady=2)
control_frame.grid_columnconfigure(0, minsize=200, weight=0)
control_frame.grid_columnconfigure(1, weight=1)
control_frame.grid_columnconfigure(2, minsize=250, weight=0)
control_frame.grid_rowconfigure(0, minsize=155, weight=0)
self.setup_serial_controls(control_frame) # 初始化串口设置区
self.setup_send_controls(control_frame) # 初始化发送输入区
self.setup_function_controls(control_frame) # 初始化功能区
self.setup_status_bar() # 初始化状态栏
def _center_window(self, width, height):
"""使窗口在屏幕居中显示"""
# 获取屏幕尺寸
screen_width = self.master.winfo_screenwidth()
screen_height = self.master.winfo_screenheight()
# 计算居中坐标
x = (screen_width - width) // 2
y = (screen_height - height) // 2
# 设置窗口位置和尺寸
self.master.geometry(f"{width}x{height}+{x}+{y}")
def setup_extension_window(self):
self.extension_frame = ttk.Frame(self.master) # 扩展窗口框架
self.extension_frame.grid(row=0, column=1, sticky="nsew")
self.grip = ttk.Frame(self.extension_frame, width=5, cursor="sb_h_double_arrow") # 分隔条
self.grip.pack(side="left", fill="y")
self.grip.bind("<Enter>", self.on_grip_enter) # 绑定鼠标事件
self.grip.bind("<Leave>", self.on_grip_leave)
self.grip.bind("<ButtonPress-1>", self.on_grip_press)
self.grip.bind("<B1-Motion>", self.on_grip_drag)
self.grip.bind("<ButtonRelease-1>", self.on_grip_release)
self.notebook = ttk.Notebook(self.extension_frame) # 标签页
self.notebook.pack(side="left", expand=True, fill='both')
self.preset_frame = ttk.Frame(self.notebook) # 预置指令标签页
self.notebook.add(self.preset_frame, text="预置指令")
self.auto_reply_frame = ttk.Frame(self.notebook) # 自动答复标签页
self.notebook.add(self.auto_reply_frame, text="自动答复")
self.history_frame = ttk.Frame(self.notebook) #历史记录标签页
self.notebook.add(self.history_frame, text="历史记录")
self.history_handler = HistoryHandler(self.history_frame, self)
# 新增“转换工具”标签页
self.conversion_frame = ttk.Frame(self.notebook)
self.notebook.add(self.conversion_frame, text="转换工具")
self.setup_conversion_tools() # 初始化转换工具界面
def setup_conversion_tools(self):
"""初始化转换工具界面"""
# 创建主框架
main_frame = ttk.Frame(self.conversion_frame)
main_frame.pack(fill="both", expand=True, padx=10, pady=10)
# 十六进制与十进制互转
hex_dec_frame = ttk.LabelFrame(main_frame, text="十六进制 ⇌ 十进制", padding=10)
hex_dec_frame.pack(fill="x", pady=5)
# 输入框和标签
ttk.Label(hex_dec_frame, text="十六进制:").grid(row=0, column=0, sticky="w")
self.hex_input = ttk.Entry(hex_dec_frame, width=30)
self.hex_input.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(hex_dec_frame, text="十进制:").grid(row=1, column=0, sticky="w")
self.dec_input = ttk.Entry(hex_dec_frame, width=30)
self.dec_input.grid(row=1, column=1, padx=5, pady=5)
# 转换按钮
ttk.Button(hex_dec_frame, text="十六进制 ►► 十进制", command=self.hex_to_dec).grid(row=0, column=2, padx=5)
ttk.Button(hex_dec_frame, text="十进制 ►► 十六进制", command=self.dec_to_hex).grid(row=1, column=2, padx=5)
# Hex与ASCII互转
hex_ascii_frame = ttk.LabelFrame(main_frame, text="Hex ⇌ ASCII", padding=10)
hex_ascii_frame.pack(fill="x", pady=5)
# 输入框和标签
ttk.Label(hex_ascii_frame, text="Hex:").grid(row=0, column=0, sticky="w")
self.hex_ascii_input = ttk.Entry(hex_ascii_frame, width=30)
self.hex_ascii_input.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(hex_ascii_frame, text="ASCII:").grid(row=1, column=0, sticky="w")
self.ascii_input = ttk.Entry(hex_ascii_frame, width=30)
self.ascii_input.grid(row=1, column=1, padx=5, pady=5)
# 转换按钮
ttk.Button(hex_ascii_frame, text="Hex ►► ASCII", command=self.hex_to_ascii).grid(row=0, column=2, padx=5)
ttk.Button(hex_ascii_frame, text="ASCII ►► Hex", command=self.ascii_to_hex).grid(row=1, column=2, padx=5)
def hex_to_dec(self):
"""将十六进制转换为十进制"""
hex_str = self.hex_input.get().strip()
try:
if hex_str: # 确保输入不为空
dec_value = int(hex_str, 16) # 将十六进制字符串转换为整数
self.dec_input.delete(0, tk.END) # 清空十进制输入框
self.dec_input.insert(0, str(dec_value)) # 显示转换结果
else:
messagebox.showwarning("输入错误", "请输入十六进制值")
except ValueError:
messagebox.showerror("转换错误", "无效的十六进制格式")
def dec_to_hex(self):
"""将十进制转换为十六进制"""
dec_str = self.dec_input.get().strip()
try:
if dec_str: # 确保输入不为空
hex_value = hex(int(dec_str))[2:].upper() # 将十进制字符串转换为十六进制,并去除前缀
self.hex_input.delete(0, tk.END) # 清空十六进制输入框
self.hex_input.insert(0, hex_value) # 显示转换结果
else:
messagebox.showwarning("输入错误", "请输入十进制值")
except ValueError:
messagebox.showerror("转换错误", "无效的十进制格式")
def hex_to_ascii(self):
"""将Hex转换为ASCII"""
hex_str = self.hex_ascii_input.get().strip()
try:
if hex_str: # 确保输入不为空
# 将Hex字符串转换为字节,再解码为ASCII
ascii_value = bytes.fromhex(hex_str).decode('ascii')
self.ascii_input.delete(0, tk.END) # 清空ASCII输入框
self.ascii_input.insert(0, ascii_value) # 显示转换结果
else:
messagebox.showwarning("输入错误", "请输入Hex值")
except ValueError:
messagebox.showerror("转换错误", "无效的Hex格式")
def ascii_to_hex(self):
"""将ASCII转换为Hex"""
ascii_str = self.ascii_input.get().strip()
try:
if ascii_str: # 确保输入不为空
# 将ASCII字符串编码为字节,再转换为Hex
hex_value = ascii_str.encode('ascii').hex().upper()
self.hex_ascii_input.delete(0, tk.END) # 清空Hex输入框
self.hex_ascii_input.insert(0, hex_value) # 显示转换结果
else:
messagebox.showwarning("输入错误", "请输入ASCII值")
except UnicodeEncodeError:
messagebox.showerror("转换错误", "无效的ASCII格式")
def validate_hex_input(self, hex_str):
"""验证Hex输入是否有效"""
try:
int(hex_str, 16) # 尝试将Hex字符串转换为整数
return True
except ValueError:
return False
def validate_dec_input(self, dec_str):
"""验证十进制输入是否有效"""
try:
int(dec_str) # 尝试将十进制字符串转换为整数
return True
except ValueError:
return False
def on_grip_enter(self, event):
self.grip.config(cursor="sb_h_double_arrow") # 鼠标进入分隔条区域
def on_grip_leave(self, event):
if not self.is_dragging: # 鼠标离开分隔条区域
self.grip.config(cursor="")
def on_grip_press(self, event):
self.is_dragging = True # 开始拖动
self.drag_start_x = event.x_root # 记录起始位置
self.initial_width = self.extension_frame.winfo_width() # 获取当前宽度
def on_grip_drag(self, event):
if self.is_dragging: # 拖动过程中调整宽度
delta = self.drag_start_x - event.x_root # 计算变化量
new_width = max(425, self.initial_width + delta) # 计算新宽度
self.master.grid_columnconfigure(1, minsize=new_width, weight=0) # 更新列配置
self.master.update_idletasks() # 强制更新界面
def on_grip_release(self, event):
self.is_dragging = False # 结束拖动
self.initial_width = self.extension_frame.winfo_width() # 记录当前宽度
self.grip.config(cursor="sb_h_double_arrow") # 恢复光标
def toggle_extension(self):
"""切换扩展窗口显示/隐藏"""
self.extension_visible = not self.extension_visible
if self.extension_visible:
self.extension_frame.grid(row=0, column=1, sticky="nsew")
self.master.grid_columnconfigure(1, minsize=425, weight=0)
self.extension_btn.config(text="更多 <<") # 切换按钮文本
else:
self.extension_frame.grid_remove()
self.master.grid_columnconfigure(1, weight=0, minsize=0)
self.extension_btn.config(text="更多 >>") # 切换按钮文本
# 强制更新界面(可选,确保即时刷新)
self.master.update_idletasks()
def setup_status_bar(self):
status_bar = ttk.Frame(self.master, height=22) # 状态栏框架
status_bar.grid(row=2, column=0, columnspan=2, sticky="sew")
self.status_conn = ttk.Label(status_bar, text="未连接", anchor=tk.W) # 连接状态标签
self.status_rx = ttk.Label(status_bar, text="RX:0", width=8) # 接收计数器标签
self.status_tx = ttk.Label(status_bar, text="TX:0", width=8) # 发送计数器标签
self.status_author = ttk.Label(status_bar, text="Power by Qwen", anchor=tk.E) # 作者标签
self.status_conn.pack(side=tk.LEFT, fill=tk.X, expand=True) # 放置标签
self.status_rx.pack(side=tk.LEFT, padx=5)
self.status_tx.pack(side=tk.LEFT, padx=5)
self.status_author.pack(side=tk.RIGHT)
def setup_serial_controls(self, parent):
frame = ttk.LabelFrame(parent, text="串口设置", padding=5) # 串口设置框架
frame.grid(row=0, column=0, sticky="nsew", padx=2)
frame.grid_propagate(False)
frame.config(width=200, height=155)
frame.grid_columnconfigure(1, weight=1) # 配置列权重
row = 0
ttk.Label(frame, text="端口号:").grid(row=row, column=0, sticky=tk.W) # 端口号标签
self.port_combo = ttk.Combobox(frame) # 端口号下拉框
self.port_combo.grid(row=row, column=1, sticky=tk.EW, padx=6)
row += 1
ttk.Label(frame, text="波特率:").grid(row=row, column=0, sticky=tk.W) # 波特率标签
self.baud_combo = ttk.Combobox(frame, values=[ # 波特率下拉框
'300', '600', '1200', '2400', '4800', '9600',
'14400', '19200', '38400', '57600', '115200'
])
self.baud_combo.set('9600') # 默认波特率
self.baud_combo.grid(row=row, column=1, sticky=tk.EW, padx=6)
row += 1
param_row = ttk.Frame(frame) # 数据位和校验行
param_row.grid(row=row, column=0, columnspan=2, sticky=tk.EW)
ttk.Label(param_row, text="数据位:").grid(row=0, column=0, padx=1) # 数据位标签
self.data_bits = ttk.Combobox(param_row, values=['5', '6', '7', '8'], width=3) # 数据位下拉框
self.data_bits.set('8') # 默认数据位
self.data_bits.grid(row=0, column=1, padx=4)
ttk.Label(param_row, text="校验:").grid(row=0, column=2, padx=1) # 校验标签
self.parity = ttk.Combobox(param_row, values=['无', '奇校验', '偶校验'], width=3) # 校验下拉框
self.parity.set('无') # 默认校验
self.parity.grid(row=0, column=3, sticky=tk.EW)
param_row.grid_columnconfigure(3, weight=1) # 配置列权重
row += 1
param_row = ttk.Frame(frame) # 停止位和流控行
param_row.grid(row=row, column=0, columnspan=2, sticky=tk.EW)
ttk.Label(param_row, text="停止位:").grid(row=0, column=0, padx=1) # 停止位标签
self.stop_bits = ttk.Combobox(param_row, values=['1', '1.5', '2'], width=3) # 停止位下拉框
self.stop_bits.set('1') # 默认停止位
self.stop_bits.grid(row=0, column=1, padx=4)
ttk.Label(param_row, text="流控:").grid(row=0, column=2, padx=1) # 流控标签
self.flow_control = ttk.Combobox(param_row, values=['无', 'RTS/CTS', 'XON/XOFF'], width=3) # 流控下拉框
self.flow_control.set('无') # 默认流控
self.flow_control.grid(row=0, column=3, sticky=tk.EW)
param_row.grid_columnconfigure(3, weight=1) # 配置列权重
row += 1
self.open_btn = ttk.Button(frame, text="打开端口", command=self.toggle_serial) # 打开端口按钮
self.open_btn.grid(row=row, column=0, columnspan=2, pady=5, sticky=tk.EW)
def setup_send_controls(self, parent):
frame = ttk.LabelFrame(parent, text="发送区", padding=5) # 发送输入区框架
frame.grid(row=0, column=1, sticky="nsew", padx=2)
frame.grid_propagate(False)
frame.config(height=155)
frame.grid_rowconfigure(0, weight=0) # 配置行权重
frame.grid_rowconfigure(1, weight=1)
frame.grid_columnconfigure(0, weight=1)
top_row = ttk.Frame(frame) # 顶部控制栏
top_row.grid(row=0, column=0, sticky="ew", pady=2)
ttk.Button(top_row, text="文件发送", command=self.send_file).pack(side=tk.LEFT, padx=2) # 文件发送按钮
ttk.Button(top_row, text="数据存至文件", command=self.save_data).pack(side=tk.LEFT, padx=2) # 保存数据按钮
# 修改标签为tk.Label并保存引用
self.checksum_label = tk.Label(top_row, text="末尾添加校验:")
self.checksum_label.pack(side=tk.LEFT)
self.checksum_combo = ttk.Combobox(top_row, values=['None', 'CRC-16', 'XOR'], width=8)
self.checksum_combo.set('None')
self.checksum_combo.pack(side=tk.LEFT, padx=2)
self.checksum_combo.bind("<<ComboboxSelected>>", self.on_checksum_selected)
self.on_checksum_selected(None)
text_frame = ttk.Frame(frame) # 文本输入框框架
text_frame.grid(row=1, column=0, sticky="nsew")
self.send_text = tk.Text(text_frame, wrap=tk.WORD, font=('Consolas', 10)) # 发送文本框
self.send_text.bind("<Button-3>", self.show_context_menu) # 发送框右键菜单
scroll_send = ttk.Scrollbar(text_frame, orient="vertical", command=self.send_text.yview) # 滚动条
self.send_text.configure(yscrollcommand=scroll_send.set) # 配置滚动
self.send_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # 放置文本框
scroll_send.pack(side=tk.RIGHT, fill=tk.Y) # 放置滚动条
def on_checksum_selected(self, event):
selected = self.checksum_combo.get()
# 更新组合框和标签的背景颜色
if selected != 'None':
self.checksum_combo.config(style='Yellow.TCombobox')
self.checksum_label.config(bg='yellow')
else:
self.checksum_combo.config(style='TCombobox')
self.checksum_label.config(bg=self.default_bg)
def setup_function_controls(self, parent):
frame = ttk.LabelFrame(parent, text="功能设置", padding=5) # 功能区框架
frame.grid(row=0, column=2, sticky="nsew", padx=2)
frame.grid_propagate(False)
frame.config(width=250, height=155)
frame.grid_columnconfigure(0, weight=1) # 配置列权重
top_row = ttk.Frame(frame) # 顶部控制栏
top_row.grid(row=0, column=0, sticky="ew", pady=2)
self.hex_send = tk.BooleanVar() # HEX发送复选框变量
self.hex_send_cb = tk.Checkbutton(top_row, text="Hex发送", variable=self.hex_send) # HEX发送复选框
self.hex_send_cb.pack(side=tk.LEFT)
self.hex_send.trace_add('write', lambda *args: self.update_checkbutton_bg(self.hex_send_cb, self.hex_send)) # 绑定事件
self.hex_display = tk.BooleanVar() # HEX显示复选框变量
self.hex_display_cb = tk.Checkbutton(top_row, text="Hex显示", variable=self.hex_display) # HEX显示复选框
self.hex_display_cb.pack(side=tk.LEFT, padx=5)
self.hex_display.trace_add('write', lambda *args: self.update_checkbutton_bg(self.hex_display_cb, self.hex_display)) # 绑定事件
ttk.Button(top_row, text="清空窗口", command=self.clear_display).pack(side=tk.RIGHT) # 清空窗口按钮
middle_row = ttk.Frame(frame) # 中间控制栏
middle_row.grid(row=1, column=0, sticky="ew", pady=2)
self.timestamp = tk.BooleanVar() # 时间戳复选框变量
self.timestamp_cb = tk.Checkbutton(middle_row, text="时间戳", variable=self.timestamp) # 时间戳复选框
self.timestamp_cb.pack(side=tk.LEFT)
self.timestamp.trace_add('write', lambda *args: self.update_checkbutton_bg(self.timestamp_cb, self.timestamp)) # 绑定事件
color_frame = ttk.Frame(middle_row) # 颜色选择框架
color_frame.pack(side=tk.RIGHT)
ttk.Label(color_frame, text="收:").pack(side=tk.LEFT) # 接收颜色标签
self.recv_color_lbl = tk.Label(color_frame, width=2, bg=self.recv_color, relief="solid") # 接收颜色标签
self.recv_color_lbl.bind("<Button-1>", lambda e: self.choose_color('recv')) # 绑定点击事件
self.recv_color_lbl.pack(side=tk.LEFT, padx=2)
ttk.Label(color_frame, text="发:").pack(side=tk.LEFT) # 发送颜色标签
self.send_color_lbl = tk.Label(color_frame, width=2, bg=self.send_color, relief="solid") # 发送颜色标签
self.send_color_lbl.bind("<Button-1>", lambda e: self.choose_color('send')) # 绑定点击事件
self.send_color_lbl.pack(side=tk.LEFT, padx=2)
auto_frame = ttk.Frame(frame) # 自动发送框架
auto_frame.grid(row=2, column=0, sticky="ew", pady=2)
ttk.Label(auto_frame, text="间隔(ms):").pack(side=tk.LEFT) # 间隔标签
self.interval_var = ttk.Entry(auto_frame, width=8) # 间隔输入框
self.interval_var.insert(0, "1000") # 默认间隔
self.interval_var.pack(side=tk.LEFT, padx=2)
self.auto_send = tk.BooleanVar() # 自动发送复选框变量
self.auto_send_cb = tk.Checkbutton(auto_frame, text="自动发送", variable=self.auto_send, command=self.toggle_auto_send) # 自动发送复选框
self.auto_send_cb.pack(side=tk.LEFT)
self.auto_send.trace_add('write', lambda *args: self.update_checkbutton_bg(self.auto_send_cb, self.auto_send)) # 绑定事件
button_frame = ttk.Frame(frame) # 按钮框架
button_frame.grid(row=3, column=0, sticky="ew", pady=1)
ttk.Button(button_frame, text="发送", command=self.send_data).pack(side=tk.LEFT) # 发送按钮
#ttk.Button(button_frame, text="更多", command=self.toggle_extension).pack(side=tk.RIGHT) # 更多按钮
self.extension_btn = ttk.Button(button_frame, text="更多 >>", command=self.toggle_extension)
self.extension_btn.pack(side=tk.RIGHT) # 注意这里用实例变量保存按钮
def choose_color(self, direction):
chinese_dir = "接收" if direction == "recv" else "发送" # 选择颜色
color = colorchooser.askcolor(title=f'选择{chinese_dir}颜色')[1] # 弹出颜色选择对话框
if color: # 如果有选择颜色
if direction == 'recv': # 如果是接收颜色
self.recv_color = color # 更新接收颜色
self.recv_color_lbl.config(bg=color) # 更新标签背景
else: # 如果是发送颜色
self.send_color = color # 更新发送颜色
self.send_color_lbl.config(bg=color) # 更新标签背景
def update_checkbutton_bg(self, checkbutton, var):
checkbutton.config(bg='yellow' if var.get() else self.default_bg) # 更新复选框背景颜色
def send_file(self):
if not self.serial_port or not self.serial_port.is_open: # 检查串口是否打开
messagebox.showwarning("警告", "请先打开串口")
return
file_path = filedialog.askopenfilename() # 选择文件
if not file_path: return
try:
with open(file_path, 'rb') as f: # 打开文件
data = f.read()
if self.hex_send.get(): # 如果是HEX发送模式
hex_str = data.hex() # 转换为HEX字符串
data = binascii.unhexlify(hex_str) # 转换为二进制数据
data = self.add_checksum(data) # 添加校验和
self.serial_port.write(data) # 发送数据
self.tx_counter += len(data) # 更新发送计数器
self.display_data(data, 'send') # 显示发送数据
self.update_counters() # 更新计数器
except Exception as e: # 异常处理
messagebox.showerror("发送文件错误", str(e))
def validate_hex(self, input_str):
errors = [] # 错误列表
valid_chars = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f',' '} # 有效字符
has_content = False # 是否有有效字符
illegal_positions = [] # 非法字符位置
for idx, char in enumerate(input_str): # 遍历输入字符串
lower_char = char.lower()
if lower_char in valid_chars: # 如果是有效字符
if lower_char != ' ':
has_content = True
else: # 如果是非法字符
illegal_positions.append( (idx+1, char) ) # 记录位置和字符
if illegal_positions: # 如果有非法字符
error_msg = "发现非法字符:\n"
for pos, char in illegal_positions[:3]: # 最多显示前三处错误
error_msg += f" 第{pos}个字符 '{char}'"
if len(illegal_positions) > 3: # 如果错误超过三处
error_msg += f"\n...等共{len(illegal_positions)}处非法字符"
errors.append(error_msg)
clean_str = input_str.replace(' ', '') # 去除空格
if has_content and len(clean_str) % 2 != 0: # 检查长度是否为偶数
errors.append("长度错误:有效HEX字符数必须为偶数(去除空格后)")
errors.append(f"当前有效字符数:{len(clean_str)} ({clean_str})")
return errors
def show_hex_error(self, input_str, error_list):
error_msg = "⚠ HEX格式验证失败!\n\n" # 错误提示信息
for error in error_list: # 构建错误详情
error_msg += f"• {error}\n"
error_msg += "\n✅ 正确HEX格式要求:" # 添加格式说明
error_msg += "\n - 允许字符: 0-9, A-F (不区分大小写)"
error_msg += "\n - 允许用空格分隔,如: 01 A3 FF"
error_msg += "\n - 有效字符数必须为偶数(去除空格后)"
sample = input_str.strip() # 显示输入样本
if len(sample) > 40:
display_sample = sample[:37] + "..."
else:
display_sample = sample or "<空输入>"
error_msg += f"\n\n📝 您的输入:\n{display_sample}"
messagebox.showerror("HEX发送错误", error_msg) # 显示错误提示框
def save_data(self):
content = self.text_display.get("1.0", tk.END) # 获取显示内容
file_path = filedialog.asksaveasfilename( # 选择保存路径
defaultextension=".txt",
filetypes=[("Text Files", "*.txt"), ("All Files", "*.*")]
)
if not file_path: return
try:
with open(file_path, 'w', encoding='utf-8') as f: # 写入文件
f.write(content)
messagebox.showinfo("保存成功", "数据已保存至文件") # 显示成功提示
except Exception as e: # 异常处理
messagebox.showerror("保存错误", str(e))
def add_checksum(self, data):
checksum_type = self.checksum_combo.get() # 获取校验类型
if checksum_type == 'None': # 如果没有校验
return data
elif checksum_type == 'CRC-16': # 如果是CRC-16校验
crc = self.calculate_crc16(data) # 计算CRC-16
return data + crc
elif checksum_type == 'XOR': # 如果是异或校验
xor = self.calculate_xor(data) # 计算异或
return data + xor.to_bytes(1, 'big')
return data
def calculate_crc16(self, data):
crc = 0xFFFF # CRC初始值
for byte in data: # 遍历数据
crc ^= byte
for _ in range(8): # 计算CRC
if crc & 0x0001:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, 'little') # 返回CRC值
def calculate_xor(self, data):
xor = 0 # 异或初始值
for byte in data: # 遍历数据
xor ^= byte # 计算异或
return xor # 返回异或值
def update_ports(self):
ports = [port.device for port in serial.tools.list_ports.comports()] # 获取端口列表
self.port_combo['values'] = ports # 更新端口下拉框
self.port_combo.set(ports[0] if ports else '') # 设置默认端口
def update_status(self, status, success=True):
if success: # 如果连接成功
conn_info = f"{self.port_combo.get()} | {self.baud_combo.get()}波特 | {self.data_bits.get()}数据位 | "
conn_info += f"{self.stop_bits.get()}停止位 | {self.parity.get()} | {self.flow_control.get()}"
self.status_conn.config(text=conn_info, foreground='green') # 更新连接状态
else: # 如果连接失败
self.status_conn.config(text=status, foreground='red') # 显示错误信息
def update_counters(self):
self.status_rx.config(text=f"RX:{self.rx_counter}") # 更新接收计数器
self.status_tx.config(text=f"TX:{self.tx_counter}") # 更新发送计数器
def clear_display(self):
self.text_display.config(state=tk.NORMAL) # 清空显示
self.text_display.delete(1.0, tk.END)
self.text_display.config(state=tk.DISABLED)
self.rx_counter = self.tx_counter = 0 # 重置计数器
self.update_counters()
def start_loop_send(self):
if not self.loop_send_active: # 如果循环发送未激活
return
commands = [] # 有效指令列表
for cmd in self.preset_commands.commands: # 遍历预置指令
order = cmd['widgets']['order_entry'].get() # 获取顺序
if order.isdigit() and int(order) > 0: # 如果顺序有效
commands.append({ # 添加到有效指令列表
'order': int(order),
'delay': int(cmd['widgets']['delay_entry'].get()),
'command': cmd['widgets']['command_entry'].get(),
'hex': cmd['widgets']['hex_var'].get()
})
if not commands: # 如果没有有效指令
messagebox.showwarning("提示", "没有配置有效发送指令")
self.preset_commands.loop_send_var.set(False) # 关闭循环发送
return
self._send_sequence(sorted(commands, key=lambda x: x['order'])) # 发送指令序列
def _send_sequence(self, commands, index=0):
if not self.loop_send_active or index >= len(commands): # 如果循环发送未激活或索引超出范围
return
cmd = commands[index] # 获取当前指令
try:
data = self._prepare_command_data(cmd['command'], cmd['hex']) # 准备数据
data = self.add_checksum(data) # 添加校验和
self._execute_send(data) # 执行发送
if index + 1 < len(commands): # 如果有下一条指令
next_delay = cmd['delay'] # 获取延时
self.master.after(next_delay, self._send_sequence, commands, index + 1) # 安排下一条指令
else: # 如果是最后一条指令
self.master.after(commands[-1]['delay'], self.start_loop_send) # 循环发送
except Exception as e: # 异常处理
messagebox.showerror("发送错误", f"指令发送失败:{str(e)}")
self.preset_commands.loop_send_var.set(False) # 关闭循环发送
def toggle_loop_send(self):
if self.preset_commands.loop_send_var.get(): # 如果循环发送激活
if not self.serial_port or not self.serial_port.is_open: # 检查串口是否打开
messagebox.showwarning("警告", "请先打开串口")
self.preset_commands.loop_send_var.set(False) # 关闭循环发送
return
self.loop_send_active = True # 激活循环发送
self.start_loop_send() # 启动循环发送
else: # 如果循环发送未激活
self.loop_send_active = False # 关闭循环发送
def toggle_auto_send(self):
self.auto_send_flag = self.auto_send.get() # 切换自动发送状态
if self.auto_send_flag: # 如果自动发送激活
self.auto_send_loop() # 启动自动发送循环
def auto_send_loop(self):
if self.auto_send_flag and self.serial_port.is_open: # 如果自动发送激活且串口打开
self.send_data() # 发送数据
self.master.after(max(100, int(self.interval_var.get())), self.auto_send_loop) # 安排下一次发送
def on_port_change(self, event):
if self.serial_port and self.serial_port.is_open: # 如果串口打开
self.close_serial() # 关闭串口
self.open_serial() # 打开串口
def toggle_serial(self):
if self.serial_port and self.serial_port.is_open: # 如果串口打开
self.close_serial() # 关闭串口
else: # 如果串口关闭
self.open_serial() # 打开串口
def open_serial(self):
try:
params = { # 串口参数
'port': self.port_combo.get(),
'baudrate': int(self.baud_combo.get()),
'bytesize': int(self.data_bits.get()),
'stopbits': {'1':1, '1.5':1.5, '2':2}[self.stop_bits.get()],
'parity': {'无':'N', '奇校验':'O', '偶校验':'E'}[self.parity.get()],
'xonxoff': 1 if self.flow_control.get() == 'XON/XOFF' else 0,
'rtscts': 1 if self.flow_control.get() == 'RTS/CTS' else 0
}
self.serial_port = serial.Serial(**params) # 打开串口
self.open_btn.config(text="关闭端口") # 更新按钮文本
self.update_status("", True) # 更新状态
self.receive_flag.set() # 设置接收标志
Thread(target=self.receive_data, daemon=True).start() # 启动接收线程
# 在成功打开后保存配置
self.save_serial_settings()
self.update_status("", True)
except Exception as e: # 异常处理
self.update_status(f"连接失败:{str(e)}", False) # 更新状态
def save_serial_settings(self):
"""保存当前串口配置到文件"""
config = configparser.ConfigParser()
config.read(self.config_file, encoding='utf-8')
if not config.has_section('SerialSettings'):
config.add_section('SerialSettings')
# 串口参数
config.set('SerialSettings', 'port', self.port_combo.get())
config.set('SerialSettings', 'baudrate', self.baud_combo.get())
config.set('SerialSettings', 'databits', self.data_bits.get())
config.set('SerialSettings', 'stopbits', self.stop_bits.get())
config.set('SerialSettings', 'parity', self.parity.get())
config.set('SerialSettings', 'flowcontrol', self.flow_control.get())
# 功能参数
config.set('SerialSettings', 'hex_send', str(self.hex_send.get()))
config.set('SerialSettings', 'hex_display', str(self.hex_display.get()))
config.set('SerialSettings', 'timestamp', str(self.timestamp.get()))
config.set('SerialSettings', 'checksum', self.checksum_combo.get())
config.set('SerialSettings', 'recv_color', self.recv_color)
config.set('SerialSettings', 'send_color', self.send_color)
with open(self.config_file, 'w', encoding='utf-8') as f:
config.write(f)
def load_serial_settings(self):
"""从配置文件加载历史设置"""
config = configparser.ConfigParser()
config.read(self.config_file, encoding='utf-8')
if not config.has_section('SerialSettings'):
return
# 加载串口参数
def safe_get(option, default):
return config.get('SerialSettings', option, fallback=default)
# 端口号处理(需验证是否存在)
saved_port = safe_get('port', '')
if saved_port in self.port_combo['values']:
self.port_combo.set(saved_port)
# 其他串口参数
self.baud_combo.set(safe_get('baudrate', '9600'))
self.data_bits.set(safe_get('databits', '8'))
self.stop_bits.set(safe_get('stopbits', '1'))
self.parity.set(safe_get('parity', '无'))
self.flow_control.set(safe_get('flowcontrol', '无'))
# 功能参数
self.hex_send.set(config.getboolean('SerialSettings', 'hex_send', fallback=False))
self.hex_display.set(config.getboolean('SerialSettings', 'hex_display', fallback=False))
self.timestamp.set(config.getboolean('SerialSettings', 'timestamp', fallback=True))
self.checksum_combo.set(safe_get('checksum', 'None'))
self.on_checksum_selected(None) # 触发校验类型更新
# 颜色设置
self.recv_color = safe_get('recv_color', '#000000')
self.send_color = safe_get('send_color', '#0000FF')
self.recv_color_lbl.config(bg=self.recv_color)
self.send_color_lbl.config(bg=self.send_color)
def close_serial(self):
self.receive_flag.clear() # 清除接收标志
if self.serial_port: # 如果串口存在
self.serial_port.close() # 关闭串口
self.open_btn.config(text="打开端口") # 更新按钮文本
self.status_conn.config(text="未连接", foreground='black') # 更新状态
def receive_data(self):
"""重构后的接收方法(支持智能组包)"""
while self.receive_flag.is_set():
try:
# 优先处理缓冲区超时数据
self._check_buffer_timeout()
# 读取新数据
if self.serial_port.in_waiting:
data = self.serial_port.read(self.serial_port.in_waiting)
self._process_incoming_data(data)
time.sleep(0.001) # 缩短等待时间以提高响应速度
except Exception as e:
print("接收错误:", e)
self.receive_flag.clear()
break
def _process_incoming_data(self, data):
"""处理新接收到的数据"""
# 更新缓冲区及时间戳
self.receive_buffer.extend(data)
self.last_receive_time = time.time()
# 尝试协议特征解析
while len(self.receive_buffer) >= self.min_frame_length:
# 情况1:Modbus RTU协议 (示例)
if self._check_modbus_frame():
return
# 情况2:以0x0A结尾的协议(如某些文本协议)
if b'\x0A' in self.receive_buffer:
pos = self.receive_buffer.find(b'\x0A') + 1
self._commit_frame(pos)
return
# 其他协议特征可在此扩展
break # 未识别时退出循环
def _check_buffer_timeout(self):
"""超时提交机制"""
if len(self.receive_buffer) == 0:
return
# 超时判定(同时防止半包永久驻留)
if (time.time() - self.last_receive_time > self.frame_timeout) or \
(len(self.receive_buffer) >= self.max_frame_length):
self._commit_frame(len(self.receive_buffer)) # 提交全部数据
def _check_modbus_frame(self):
"""Modbus RTU协议解析示例"""
if len(self.receive_buffer) < 4: # 最小合法帧长度为4字节
return False
# 步骤1:检查起始地址有效性(0x00-0xFF)
start_address = self.receive_buffer[0]
if start_address not in range(0x00, 0xFF + 1):
return False
# 步骤2:检查功能码有效性(0x01-0x06, 0x10等)
function_code = self.receive_buffer[1]
valid_func_codes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x10]
if function_code not in valid_func_codes:
return False
# 步骤3:根据功能码判断帧长度
expected_length = {
0x01: 6 + (self.receive_buffer[2] // 8) + 3, # 读线圈状态
0x03: 5 + 2 * self.receive_buffer[2], # 读保持寄存器
0x04: 5 + 2 * self.receive_buffer[2], # 读输入寄存器
0x05: 6, # 写单个线圈
0x06: 6, # 写单个寄存器
0x10: 7 + 2 * ((self.receive_buffer[4] * 2) - 1) # 写多个寄存器
}.get(function_code, 8) # 默认按8字节判断
# 步骤4:检查缓冲区长度是否足够
if len(self.receive_buffer) < expected_length:
return False
# 步骤5:CRC校验验证
crc_received = self.receive_buffer[-2:] # 最后两个字节为CRC
crc_calculated = self.calculate_crc16(self.receive_buffer[:-2])
if crc_received == crc_calculated:
self._commit_frame(expected_length)
return True
return False
def _commit_frame(self, length):
"""提交完整帧"""
# 提取数据并清空缓冲区
packet = bytes(self.receive_buffer[:length])
del self.receive_buffer[:length]
# 更新计数器及显示
self.rx_counter += len(packet)
self.display_data(packet, 'recv')
self.update_counters()
# 触发自动应答
self.auto_reply_handler.check_and_reply(packet)
def calculate_crc16(self, data):
"""CRC16校验计算(与工控设备一致)"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, 'little')
def send_data(self):
if not (self.serial_port and self.serial_port.is_open): # 检查串口是否打开
messagebox.showwarning("警告", "请先打开串口")
return
raw_text = self.send_text.get("1.0", tk.END).strip() # 获取输入内容
if not raw_text: # 如果输入为空
return
try:
if self.hex_send.get(): # 如果是HEX发送模式
error_list = self.validate_hex(raw_text) # 验证HEX格式
if error_list: # 如果有错误
self.show_hex_error(raw_text, error_list) # 显示错误
return
hex_str = raw_text.replace(' ', '') # 去除空格
if len(hex_str) % 2 != 0: # 检查长度是否为偶数
messagebox.showerror("格式错误", "HEX长度必须为偶数")
return
try:
data = binascii.unhexlify(hex_str) # 转换为二进制数据
except binascii.Error as e: # 异常处理
messagebox.showerror("HEX解析错误", f"无效的HEX格式: {str(e)}")
return
else: # 如果是ASCII模式
data = raw_text.encode('gbk') # 编码为UTF-8
data_with_checksum = self.add_checksum(data) # 添加校验和
self.serial_port.write(data_with_checksum) # 发送数据
self.tx_counter += len(data_with_checksum) # 更新发送计数器
self.update_counters() # 更新计数器
# 在发送成功后添加历史记录
if self.hex_send.get():
display_cmd = ' '.join(f'{b:02X}' for b in data)
else:
display_cmd = raw_text
self.history_handler.add_history(display_cmd, self.hex_send.get())
self.display_data(data_with_checksum, 'send') # 显示发送数据
except Exception as e: # 异常处理
error_msg = f"发送失败: {str(e)}"
if isinstance(e, serial.SerialException): # 如果是串口异常
error_msg += "\n请检查串口连接状态"
messagebox.showerror("发送错误", error_msg)
def send_custom_command(self, command, hex_mode):
if not self.serial_port or not self.serial_port.is_open: # 检查串口是否打开
messagebox.showwarning("警告", "请先打开串口")
return
if not command.strip(): # 如果指令为空
messagebox.showwarning("提示", "指令内容不能为空")
return
try:
if hex_mode: # 如果是HEX模式
clean_command = command.replace(' ', '') # 去除空格
error_list = self.validate_hex(clean_command) # 验证HEX格式
if error_list: # 如果有错误
self.show_hex_error(command, error_list) # 显示错误
return
try:
data = binascii.unhexlify(clean_command) # 转换为二进制数据
except binascii.Error as e: # 异常处理
messagebox.showerror("HEX解析错误",
f"无效的HEX数据: {str(e)}\n"
f"原始输入: {command[:50]}{'...' if len(command)>50 else ''}")
return
else: # 如果是ASCII模式
data = command.encode('gbk') # 编码为UTF-8
data_with_checksum = self.add_checksum(data) # 添加校验和
self.serial_port.write(data_with_checksum) # 发送数据
self.tx_counter += len(data_with_checksum) # 更新发送计数器
self.update_counters() # 更新计数器
# 在发送成功后添加历史记录
self.history_handler.add_history(command, hex_mode)
# 显示发送数据
self.display_data(data_with_checksum, 'send')
except Exception as e: # 异常处理
error_type = "硬件错误" if isinstance(e, serial.SerialException) else "程序错误"
error_msg = f"{error_type}: {str(e)}"
if isinstance(e, serial.SerialException): # 如果是串口异常
error_msg += "\n可能原因:串口断开连接或设备无响应"
messagebox.showerror("发送失败", error_msg)
def _prepare_command_data(self, command, hex_mode):
if hex_mode: # 如果是HEX模式
hex_str = command.replace(' ', '') # 去除空格
return binascii.unhexlify(hex_str) # 转换为二进制数据
return command.encode('utf-8') # 编码为UTF-8
def _execute_send(self, data):
self.serial_port.write(data) # 发送数据
self.tx_counter += len(data) # 更新发送计数器
self.display_data(data, 'send') # 显示发送数据
self.update_counters() # 更新计数器
def display_data(self, data, direction):
prefix = "收←◆ " if direction == 'recv' else "发→◇ " # 前缀
color = self.send_color if direction == 'send' else self.recv_color # 颜色
if self.hex_display.get(): # 如果是HEX显示模式
display = ' '.join(f'{b:02X}' for b in data) # 转换为HEX字符串
else: # 智能编码检测显示模式
display = self.auto_decode(data) # 修改这里
if self.timestamp.get(): # 如果显示时间戳
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] # 获取时间戳
full_text = f"[{timestamp}] {prefix}{display}" # 构建完整文本
else: # 如果不显示时间戳
full_text = f"{prefix}{display}" # 构建完整文本
self.text_display.config(state=tk.NORMAL) # 设置文本框为可编辑状态
self.text_display.insert(tk.END, full_text + '\n', (color,)) # 插入文本
self.text_display.tag_config(color, foreground=color) # 配置文本颜色
self.text_display.see(tk.END) # 滚动到末尾
self.text_display.config(state=tk.DISABLED) # 设置文本框为不可编辑状态
def auto_decode(self, data):
"""尝试用多种编码自动解码数据"""
encodings = ['utf-8', 'gb18030', 'big5', 'shift_jis', 'latin-1'] # 调整顺序,UTF-8优先
for encoding in encodings:
try:
return data.decode(encoding, errors='strict')
except UnicodeDecodeError:
continue
# 所有编码尝试失败后,使用替换策略
try:
return data.decode('utf-8', errors='replace')
except:
return str(data)
def create_context_menu(self):
"""创建公共的右键上下文菜单"""
self.context_menu = tk.Menu(self.master, tearoff=0)
self.context_menu.add_command(label="复制", command=self.on_copy)
self.context_menu.add_command(label="粘贴", command=self.on_paste)
self.context_menu.add_command(label="剪切", command=self.on_cut)
self.context_menu.add_separator()
self.context_menu.add_command(label="全选", command=self.on_select_all)
def show_context_menu(self, event):
"""显示右键菜单并根据控件类型调整状态"""
widget = event.widget
# 获取当前控件的状态
is_text_widget = isinstance(widget, tk.Text)
is_entry_widget = isinstance(widget, ttk.Entry)
is_readonly = False
if is_text_widget:
is_readonly = widget.cget('state') == 'disabled'
elif is_entry_widget:
is_readonly = widget.cget('state') == 'disabled'
# 更新菜单项状态
self.context_menu.entryconfig("剪切",
state="normal" if (not is_readonly) and (widget != self.text_display) else "disabled")
self.context_menu.entryconfig("粘贴",
state="normal" if not is_readonly else "disabled")
try:
self.context_menu.tk_popup(event.x_root, event.y_root)
finally:
self.context_menu.grab_release()
def on_copy(self):
"""处理复制操作"""
widget = self.master.focus_get()
if isinstance(widget, tk.Text):
try:
widget.clipboard_clear()
text = widget.get(tk.SEL_FIRST, tk.SEL_LAST)
widget.clipboard_append(text)
except tk.TclError:
pass
elif isinstance(widget, ttk.Entry):
widget.event_generate("<<Copy>>")
def on_paste(self):
"""处理粘贴操作"""
widget = self.master.focus_get()
if isinstance(widget, tk.Text):
widget.insert(tk.INSERT, widget.clipboard_get())
elif isinstance(widget, ttk.Entry):
widget.event_generate("<<Paste>>")
def on_cut(self):
"""处理剪切操作"""
widget = self.master.focus_get()
if isinstance(widget, tk.Text) and widget != self.text_display:
try:
widget.clipboard_clear()
text = widget.get(tk.SEL_FIRST, tk.SEL_LAST)
widget.clipboard_append(text)
widget.delete(tk.SEL_FIRST, tk.SEL_LAST)
except tk.TclError:
pass
elif isinstance(widget, ttk.Entry):
widget.event_generate("<<Cut>>")
def on_select_all(self):
"""处理全选操作"""
widget = self.master.focus_get()
if isinstance(widget, tk.Text):
widget.tag_add(tk.SEL, "1.0", tk.END)
widget.mark_set(tk.INSERT, "1.0")
widget.see(tk.INSERT)
elif isinstance(widget, ttk.Entry):
widget.select_range(0, tk.END)
if __name__ == "__main__":
root = tk.Tk() # 创建主窗口
app = SerialDebugger(root) # 创建调试器实例
root.mainloop() # 进入主循环