基于Assammdf第三方python库处理mdf、mf4文件
Published in:2023-11-28 |
Words: 3.1k | Reading time: 14min | reading:

基于Assammdf第三方python库处理mdf、mf4文件

1.ASAMMDF是ASAM(自动化和测量系统标准化协会)MDF(测量数据格式)文件的快速解析器和编辑器。
2.asammdf支持MDF版本2(.dat)、3(.mdf)和4(.mf4)。
3.asammdf适用于python>;=3.6(对于python 2.7、3.4和3.5,请参阅4.x.y版本)

第三方库主要功能

  • 1.mdf、mf4文件创建读取
  • 2.读取、提取can总线日志文件信息
  • 3.不同版本文件转换
  • 4.原始数据导入、导出,包括:HDF5、Matlab(v4、v5和v7.3)、CSV和Parquet

基础用法

  • 参考代码如下:

基础mdf文件引入、数据过滤与解析、数据转换存储、数据图形化绘制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from asammdf import MDF 
mdf=MDF('sample.mdf')
speed=mdf.get('WheelSpeed')
speed.plot()
important_signals=['WheelSpeed','VehicleSpeed','VehicleAcceleration']

# get short measurement with a subset of channels from 10s to 12s
short=mdf.filter(important_signals).cut(start=10,stop=12)

# convert to version 4.10 and save to disk
short.convert('4.10').save('important signals.mf4')

# plot some channels from a huge file
efficient=MDF('huge.mf4')
for signal in efficient.select(['Sensor1','Voltage3']):
signal.plot()

使用流程

第三方库安装

  • 1.pip工具安装
1
2
3
pip install asammdf
# GUI version install
pip install asammdf[gui]
  • 2.基础文件读取(mf4)
  1. 查找MDF文件中特定信号最大值;
  2. 计算平均车速和里程,使用matplotlib作图;
  3. 计算两信号差值,进行波峰判断并记录峰值个数,避免信号毛刺干扰,设定差值门限值,使用matplotlib作图(含主次坐标);
  4. 以上信息直接写入word保存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 处理mf4数据
# 20220530 FQF

# 引入库文件
from asammdf import MDF # 用于处理MDF文件
import os # 用于获取文件路径
import numpy as np # 用于数学处理
import matplotlib.pyplot as plt # 用于绘图

# 读取mf4
path = 'C:/fqf'
os.chdir(path) # 更改python的工作空间
files = os.listdir(path)

for f in files:
if '_decoded' in f and f.endswith('.mf4') and f.startswith('df'): # 查找文件名字含有fish且以.png后缀的文件
# print('Find the mf4: ' + f)
# print(f.title())
mf4_name = f.split()
mdf = MDF(f)
# print(mdf.info())

# 读取数据
Prompt_info = mdf.get("InterSysInfoDisp")
Prompt_value = ICA_Prompt_info.samples
Prompt_timestamps = Prompt_info .timestamps

# 搜索工况
i = 1
while i < len(Prompt_timestamps )-1:
if Prompt_value [i] != 9 & Prompt_value [i+1] == 9: # 搜索工况
print( "============================")
print(f)
print(Prompt_timestamps [i])
i = i + 1

# 绘图
# plt.plot(Prompt_timestamps ,Prompt_value )#r表示颜色,v表示下三角线类型
# plt.xlabel('xlabel',fontsize = 25)
# plt.ylabel('ylabel',fontsize = 16)
# plt.grid()
# plt.show()
  • 3.文件读取处理(mdf)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# coding=utf-8
from asammdf import MDF # 用于处理MDF文件
import os # 用于获取文件路径
import pandas as pd # 用于数据分析
import numpy as np # 用于数组处理
from numpy import trapz # 可用于积分求面积
from matplotlib import pyplot as plt # 用于作图
import datetime as dt # 用于调用系统时间
import docx # 调用word文档库
from docx.shared import Inches # 用于设置图片尺寸


def check_MDF(Document_Adress): # 定义check_MDF函数
print('文件名: '+Adress_str+',相关统计信息如下:') # 提取地址中文件名信息,打印

new_Doc = document.add_paragraph\
('文件名: '+Adress_str+',相关统计信息如下:') # 新增段落文本


MDF_File=MDF(Document_Adress) # 使用asammdf第三方包调取.mdf文件内容
TCOil= MDF_File.get("TCOil") # 调用.mdf中零件处油温Signal
TOil=MDF_File.get("TOil") # 调用.mdf中油底壳温度Signal
VehSpd=MDF_File.get("VehSpd") # 调用.mdf中车速Signal

VehSpdCheck(VehSpd) # check_MDF()函数调用VehSpdCheck()函数
Signal_Data_Max(TCOil) # check_MDF()函数调用Signal_Data_Max()函数
T_Subtract_Peak(TCOil, TOil, 20) # check_MDF()函数调用T_Subtract_Peak()函数

document.save(Veh_VIN+'.docx') # 运行完程序后保存word文档

def timeconvert(second): # 将时间秒转化str类型的时分秒
Timestr =str(int(second/3600)) + 'h ' + \
str(int((second%3600)/ 60)) + 'min ' + \
str(int((second%3600) % 60)) + 's'
return Timestr

def Signal_Data_Max(Signal): # 定义Signal_Data_Max()函数
x=Signal.timestamps # 提取Signal中所有时刻值存放列表x()中
y=Signal.samples # 提取Signal中所有实测值存放列表y()中
T_C_Max=np.max(y) # 使用Numpy库中max()函数求y中数据的最大值
Positon=np.where(y==np.max(y)) # 找到y最大值对应的索引值(可理解为序号)
Time=(x[Positon[0][0]]) # 提取索引值对应时刻

print('零件处油温最大值%d' %T_C_Max+\
'时间是'+timeconvert(Time)) # 打印零件处油温最大值及对应的时间(秒转化为分钟)

new_Doc = document.add_paragraph\
('零件处油温最大值为 '+str(T_C_Max)+' ℃,'\
'时间是'+timeconvert(Time),style='List Bullet') # 新增段落文本

def VehSpdCheck(Singal): # 定义Signal_Data_Max()函数
x=Singal.timestamps # 提取Signal中所有时刻值存放列表x()中
#print(Singal.display_name)
y=Singal.samples # 提取Signal中所有实测值存放列表y()中
Average_Vehspd=np.sum(y)/len(y) # 使用Numpy库中sum()函数求和后除以点的个数
Drive_Distance=Average_Vehspd*x[-1]/3600 # 行驶里程通过均值与时间乘积获得
print('该文件平均车速 %.2f km/h,' %Average_Vehspd+\
'行驶里程 %.2f km' %Drive_Distance) # 打印平均车速和行驶里程(使用换行\)

Average_Vehspd_str=str(round(Average_Vehspd, 2)) # 平均速度保留两位,转化为字符串
Drive_Distance_str =str(round( Drive_Distance, 2)) # 行驶里程保留两位,转化为字符串
new_Doc = document.add_paragraph\
('该文件平均车速 '+ Average_Vehspd_str+' km/h,'+\
'行驶里程= '+Drive_Distance_str+' km'+',见下图:',style='List Bullet')
# 新增段落文本

plt.figure(Adress_str+'--'+Singal.display_name) # 图表命名为文件名加信号名称
plt.axis([np.min(x),np.max(x),0,np.max(y)]) # 使用pyplot函数定义x轴和y轴最大及最小值
plt.plot(x,y,label='$VehSpd$') # 使用pyplot作图,图例为VehSpd
plt.xlabel('t (s) ') # 增加X轴-坐标轴标题
plt.ylabel('VehicleSpeed (km/h) ') # 增加y轴-坐标轴标题

plt.fill_between(x,y1=y,y2=0,facecolor='purple',\
alpha=0.2) # 对y与x围成面积着色,展示速度求积分→里程


plt.text(x[-1]/3,np.max(y)-3, 'DriveDistance:%.2f km' \
%Drive_Distance, \
fontdict={'size': 10, 'color': 'red'}) # x轴坐标,y轴坐标,显示内容,字体大小、颜色

plt.title(Adress_str) # 将全局变量信息增加入表头
#plt.title(Adress_str+'__'+'(DriveDistance: %.2f '%Drive_Distance+' km)')
plt.legend(loc='upper right') # 图例选择最佳位置,可选择best
#plt.show() # 显示图例
picture_name=Adress_str +'-'+ \
Singal.display_name+'.png' # 图片保存时名称
plt.savefig(picture_name, dpi=200) # 保存图片

new_Doc = document.add_picture(picture_name, width=Inches(5.0))
# 图片写入word
os.remove(picture_name) # 删除图片文件

def T_Subtract_Peak(T1,T2,threshold):
# T1.plot() # 使用ASAMMDF自带Plot制图
# T2.plot()
x = T1.timestamps # 提取信号T1中所有时刻值的点存放列表x()中
y1 = T1.samples # 提取信号T1中所有信号值的点存放列表y()中
y2 = T2.samples # 提取信号T2中所有信号值的点存放列表y()中
k = len(x) # 统计数据的总个数

a = 0 # 定义数字a
i = 0 # 定义数字i
m = 0 # 定义数字m

T_Up = [] # 定义T_Up为列表
for a in range(0, k):
T_Up.append(y1[a] - y2[a]) # 将所有温差点写入T_Up
y3 = T_Up

while i < k - 1: # 从0到k,使用while循环,条件满足一直执行
if int(T1.samples[i] - T2.samples[i]) >= \
threshold: # T1和T2在某时刻温差大于门限值,执行一次
j = i # 过滤信号毛刺,寻找大于门限值波峰个数
while j < k - 1:
j += 1
if int(T1.samples[j] - \
T2.samples[j]) < threshold - 3: # 设置差值为2作为过滤信号
m = m + 1 # 记录波峰个数
time = int(j / k * T1.timestamps[-1] / 60)
print('该文件零件处油温与油底壳温差大于' + str(threshold) + '的波峰第 %d 处,' % m + \
'时间是 %d s,' % time + '零件处油温 %d ℃' % T1.samples[j])

new_Doc = document.add_paragraph \
('该文件零件处油温与油底壳温差大于'\
+ str(threshold) + '℃的波峰第' +\
str(m) + '处,时间是' + timeconvert(time))
# 新增段落文本

i = j # 局部变量i更新为j

break # 循环退出
i = i + 1 # 以上程序不执行或者退出,执行i自加

plt.figure(Adress_str + '—' + T1.display_name +
'-' + T1.display_name) # 图表命名为文件名加两信号名称

fig = plt.figure(num=1, figsize=(15, 8), dpi=80) # 开启一个窗口,同时设置大小,分辨率
fig, ax1 = plt.subplots() # 作1个图
ax2 = ax1.twinx() # 坐标轴2为次要坐标
ax1.plot(x, y1, '-', color='g', label='TCOil') # 绘制曲线y1,含线型、颜色、图例名称
ax1.plot(x, y2, '-', color='b', label='TOil') # 绘制曲线y2
ax2.plot(x, y3, '-', color='r', label='T_Up') # 绘制曲线y3
ax1.legend(loc='upper left') # 显示图例位置,plt.legend(),左上
ax2.legend(loc='upper right') # 显示图例位置,右上

# ax1.set_xlim(-5,5)
ax1.set_ylim(np.min(y2) - 5, np.max(y1) + 5) # 设定y1轴最大最小值
ax2.set_ylim(np.min(y3) - 5, np.max(y3) + 5) # 同上

ax1.set_title(Adress_str) # 图表表头

ax1.set_xlabel('time(s)') # 设定x轴标题
ax1.set_ylabel("TC_Oil & T_Oil (℃)", color='b') # 设定y1轴标题
ax2.set_ylabel("T_Up(℃)", color='r') # 设定y2轴标题
# plt.show() # 参考之前注释

picture_name = Adress_str + '-' + T1.display_name + '-' + T2.display_name +'.png'
plt.savefig(picture_name, dpi=100) # 保存图片文件到文件夹

new_Doc = document.add_picture(picture_name, width=Inches(6.0))
# 保存图片到word
os.remove(picture_name) # 删除图片文件

file_path = 'D:\SoftApp\Python\HardWay2StudyPython\MDFData' # 文件所在文件夹
now_time=dt.datetime.now().strftime('%F %T') # 调用系统时间
Veh_VIN=input("请在下方输入故障排查车辆VIN数字编号后,回车:") # 提示输入车辆信息
print(Veh_VIN+"车辆"+'数据分析时间:'+now_time) # 打印输出车辆和系统时间

document = docx.Document() # 创建word文档对象
document.add_heading(Veh_VIN+'车排查信息', 0) # 添加文档标题
new_Doc= document.add_paragraph \
('主要排查行驶里程、最高油温、\
零件处与变速箱油温差值出现的峰值数。 ') # 新增段落文本,字符\用于换行
new_Doc=document.add_paragraph\
(Veh_VIN+"车辆"+'数据分析时间:'+ now_time) # 新增段落文本


for root, dirs, files in os.walk(file_path): # os.walk返回三个对象: dirpath(目录路径,string类型) ;
# dirname(多个子目录名,列表); filename(多个文件名,列表)
Documents=[os.path.join(root, name) for name in files] # 遍历文件名存放list(列表)类型的Documets

for Document_Path in Documents: # 遍历文件名对应的地址(假设有3个文件地址)
# print(Document_path.info()) # 打印MDF文件信号,例如可以看到有哪些信号
print('-'*60) # 打印分割行

new_Doc = document.add_paragraph('-'*100) # 新增段落文本,字符\用于换行
new_Doc = document.add_paragraph('-' * 100) # 新增段落文本,字符\用于换行

Adress=Document_Path.split('\\')[-1] # 提取路径中的含后缀的文件名
Adress_str=Adress.split('.')[0] # 提取路径中的文件名

check_MDF(Document_Path) # 调用def check_MDF()函数

常用API

  • 1.基础文件读取
1
2
3
4
5
6
7
from asammdf import MDF

f = r"xxx.mdf"
mdf = MDF(f)
signal = mdf.get('信号名')
data = signal.samples
timestamps = signal.timestamps
  • 2.信号数据获取与数据转换
1
2
3
chn_db = mdf.channels_db
#代码接上
df = mdf.to_dataframe()
  • 3.channel和channelgroup获取

mdf文件一般是用channel和channel group组织的,一个文件可能包含多个chnannel group,一个channel group也可以包含多个channel,channel和signal一一对应,channel保存了一些描述信息,数据和时间戳保存在signal里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for group_index, (virtual_group_index, virtual_group) in enumerate(
mdf_ins.virtual_groups.items()
):
if virtual_group.cycles_nr == 0 and empty_channels == "skip":
continue

channels = [
(None, gp_index, ch_index)
for gp_index, channel_indexes in mdf_ins.included_channels(
virtual_group_index
)[virtual_group_index].items()
for ch_index in channel_indexes
if ch_index != mdf_ins.masters_db.get(gp_index, None)
]
  • 4.基础文件创建(mf4、mdf)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# -*- coding: utf-8 -*-
"""
*asammdf* MDF usage example
"""
import numpy as np

from asammdf import MDF, Signal

# create 3 Signal objects

timestamps = np.array([0.1, 0.2, 0.3, 0.4, 0.5], dtype=np.float32)

# unit8
s_uint8 = Signal(
samples=np.array([0, 1, 2, 3, 4], dtype=np.uint8),
timestamps=timestamps,
name="Uint8_Signal",
unit="u1",
)
# int32
s_int32 = Signal(
samples=np.array([-20, -10, 0, 10, 20], dtype=np.int32),
timestamps=timestamps,
name="Int32_Signal",
unit="i4",
)

# float64
s_float64 = Signal(
samples=np.array([-20, -10, 0, 10, 20], dtype=np.float64),
timestamps=timestamps,
name="Float64_Signal",
unit="f8",
)

# create empty MDf version 4.00 file
with MDF(version="4.10") as mdf4:

# append the 3 signals to the new file
signals = [s_uint8, s_int32, s_float64]
mdf4.append(signals, comment="Created by Python")

# save new file
mdf4.save("my_new_file.mf4", overwrite=True)

# convert new file to mdf version 3.10
mdf3 = mdf4.convert(version="3.10")
print(mdf3.version)

# get the float signal
sig = mdf3.get("Float64_Signal")
print(sig)

# cut measurement from 0.3s to end of measurement
mdf4_cut = mdf4.cut(start=0.3)
mdf4_cut.get("Float64_Signal").plot()

# cut measurement from start of measurement to 0.4s
mdf4_cut = mdf4.cut(stop=0.45)
mdf4_cut.get("Float64_Signal").plot()

# filter some signals from the file
mdf4 = mdf4.filter(["Int32_Signal", "Uint8_Signal"])

# save using zipped transpose deflate blocks
mdf4.save("out.mf4", compression=2, overwrite=True)

参考

Prev:
基于CLion IDE的CPP项目构建与运行
Next:
使用python 读写 ini config 文件