T1K to FCD 文件转换:详细代码解析和优化
该函数的作用是将一个 .t1k 格式的文件转换成 .fcd 格式的文件。函数主要分为以下步骤:
- 读取 .t1k 文件的版本号,如果版本号大于 3,则跳过 69 个块。
- 读取 .t1k 文件的头部信息,包括描述信息和轨道 ID 等。
- 将读取的信息写入 .fcd 文件中。
- 读取 .t1k 文件中的数据块,并将数据块中的信息写入 .fcd 文件中。
- 将转换后的 .fcd 文件保存到指定的路径中。
具体的实现细节如下:
- 打开 .t1k 文件和要保存的 .fcd 文件,并读取 .t1k 文件的版本号。
- 如果版本号大于 3,则跳过 69 个块。
- 读取 .t1k 文件的头部信息,包括描述信息和轨道 ID 等,并将读取的信息写入 .fcd 文件中。
- 读取 .t1k 文件中的数据块,并将数据块中的信息写入 .fcd 文件中。主要包括以下几种类型的数据块:
- 轨道信息块:包括轨道编号、区域、分区等。
- 位置信息块:包括位置、速度等。
- 缺陷信息块:包括缺陷类型、位置、深度等。
- 开关信息块:包括开关编号、位置、状态等。
- 键盘信息块:包括键盘编号、状态等。
- 将转换后的 .fcd 文件保存到指定的路径中。
需要注意的是,函数中使用了一些辅助函数,如 read_block、skip_block、get_string 和 parse_point 等,这些函数的具体实现并没有给出,需要根据实际情况进行补充。
def fcd_convert(tmp_fcd_path, save_fcd_path, origin_t1k_name):
try:
with open(tmp_fcd_path, 'rb') as t1k:
with open(save_fcd_path, 'wb') as fcd:
ver = read_block(t1k)[6]
if ver > 3:
[skip_block(t1k) for _ in range(69)]
block_len = t1k.read(1)[0]
header = t1k.read(6)
desc = [get_string(t1k.read(20), ver) for _ in range(6)]
desc += [get_string(t1k.read(15), ver) for _ in range(3)]
t1k.seek(79, 1)
if ver > 3:
for i in range(4):
block = read_block(t1k)
desc[i] = get_string(block[6: -(len(block) % 2)], ver)
railway_name = desc[0] if len(desc[0]) <= 20 else desc[0][:20]
region = desc[1] if len(desc[1]) <= 20 else desc[1][:20]
subdivision = desc[2] if len(desc[2]) <= 20 else desc[2][:20]
track_id = desc[4]
post_direction = desc[6].split(' ')[0]
if 'ascending' in post_direction.lower():
post_direction = 0
else:
post_direction = 1
movement = desc[5]
if 'forward' in movement.lower():
movement = 1
stock_type = 0
else:
movement = 0
stock_type = 1
# try:
fcd.write(struct.pack('i', 50))
fcd.write(struct.pack('20s', '1.0'.encode('gb2312').ljust(20)))
fcd.write(struct.pack('5s', b' '.ljust(5)))
fcd.write(region.replace('—', ' ').encode('gb2312').ljust(30)[:30])
fcd.write(struct.pack('i', post_direction))
fcd.write(struct.pack('i', movement))
fcd.write(struct.pack('i', 0))
fcd.write(struct.pack('20s', b' '.ljust(20)))
fcd.write(struct.pack('i', 0))
fcd.write(struct.pack('d', 0))
fcd.write(struct.pack('d', 0))
fcd.write(subdivision.replace('—', ' ').encode('gb2312').ljust(60)[:60])
fcd.write(struct.pack('16s', b' '.ljust(16)))
fcd.write(struct.pack('i', 0))
fcd.write(struct.pack('i', 0))
fcd.write(struct.pack('i', 0))
fcd.write(struct.pack('i', 0))
fcd.write(struct.pack('i', 1))
fcd.write(struct.pack('i', 1))
fcd.write(struct.pack('20s', b' '.ljust(20)))
fcd.write(railway_name.replace('—', ' ').encode('gb2312').ljust(20)[:20])
fcd.write(track_id.replace('—', ' ').encode('gb2312').ljust(20)[:20])
fcd.write(struct.pack('i', stock_type))
fcd.write(struct.pack('f', 1.6))
fcd.write(struct.pack('i', 4 + 4 + 351))
fcd.write(struct.pack('i', 10))
filename = origin_t1k_name
fcd.write(struct.pack('255s', filename.replace('—', ' ').encode('gb2312').ljust(255))[:255])
fcd.write(struct.pack('20s', b' '.ljust(20)))
fcd.write(struct.pack('20s', b' '.ljust(20)))
fcd.write(struct.pack('I', 0))
fcd.write(struct.pack('I', 0))
fcd.write(struct.pack('f', 0))
fcd.write(struct.pack('f', 0))
fcd.write(struct.pack('f', 0))
fcd.write(struct.pack('f', 0))
for i in range(4):
fcd.write(struct.pack('d', 0))
# except Exception:
# pos = fcd.tell()
# length = 638 - pos
# fcd.write(struct.pack(f'{length}s', b''.ljust(length, b'*')))
post_list = []
bus_pos_list = []
flag_pos = True
min_pos = 0
max_pos = 0
g_bus_pos = 0
change_param = 1
# points = []
while True:
cnt = t1k.read(1)
if len(cnt) == 1:
block_len = cnt[0]
header = t1k.read(5)
if len(header) != 5:
break
if header[: 4] == b'\x00\x00\t\x22' and (header[4] == 33 or header[4] == 34):
ret = t1k.read(1)
if ret == b'\x11':
post = struct.unpack('f', t1k.read(4))[0]
if ver > 3:
post = post * 1.609344
bus_pos = struct.unpack('i', t1k.read(4))[0]
speed = struct.unpack('f', t1k.read(4))[0]
if len(post_list) < 2:
post_list.append(post)
if len(post_list) == 2:
post_list[1] = post
if flag_pos:
min_pos = post
max_pos = post
flag_pos = False
else:
if post >= max_pos:
max_pos = post
if post <= min_pos:
min_pos = post
fcd.write(struct.pack('i', 4 + 4 + 12))
fcd.write(struct.pack('i', 30))
fcd.write(struct.pack('I', bus_pos))
fcd.write(struct.pack('f', post))
fcd.write(struct.pack('f', speed))
elif ret == b'\x18':
t1k.read(26)
else:
pass
elif header[:5] == b'\x00\x00\x07\x22\x23' or header[:5] == b'\x00\x00\x07\x21\x23':
flag = t1k.read(1)[0]
top_rail = flag & 0x040 != 0
code = flag & 0x3f
bus_pos = struct.unpack('i', t1k.read(4))[0]
max_depth = t1k.read(1)[0]
width = struct.unpack('h', t1k.read(2))[0]
min_depth = t1k.read(1)[0]
severity = t1k.read(1)[0]
tag_id = struct.unpack('i', t1k.read(4))
fcd.write(struct.pack('i', 4 + 4 + 28))
fcd.write(struct.pack('i', 110))
fcd.write(struct.pack('I', bus_pos))
fcd.write(struct.pack('i', top_rail))
fcd.write(struct.pack('i', code))
fcd.write(struct.pack('i', min_depth))
fcd.write(struct.pack('i', max_depth))
fcd.write(struct.pack('i', width))
if severity == 125:
severity = 2
elif severity == 126:
severity = 1
elif severity == 128:
severity = 3
else:
severity = 1
fcd.write(struct.pack('i', severity))
elif header[1:4] == b'\x00\x05\x23' and block_len >= 11 and (
header[4] == 36 or header[4] == 100):
top_rail = (header[4] & 0x40 == 0)
start = struct.unpack('i', t1k.read(4))[0]
width = t1k.read(1)[0]
for g_bus_pos in range(start, start + width, 1):
record_count = t1k.read(1)[0]
for j in range(record_count):
depth = t1k.read(1)[0]
channels = struct.unpack('h', t1k.read(2))[0]
point = [g_bus_pos, top_rail, depth, channels]
points = parse_point(point)
# points.append(point)
for point in points:
g_bus_pos, top_rail, depth, channels = point
if len(bus_pos_list) < 2:
bus_pos_list.append(g_bus_pos)
if len(bus_pos_list) == 2:
bus_pos_list[1] = g_bus_pos
fcd.write(struct.pack('i', 4 + 4 + 16))
fcd.write(struct.pack('i', 20))
fcd.write(struct.pack('I', g_bus_pos))
fcd.write(struct.pack('i', top_rail))
fcd.write(struct.pack('i', depth))
type = channels_map.get(channels, 1)
fcd.write(struct.pack('i', type))
elif header[1:] == b'\x00\x36\x22\x21':
if t1k.read(1) == b'\x08':
number = t1k.read(1)[0]
switch = t1k.read(4)[3]
start = int(struct.unpack('h', t1k.read(2))[0] / 10) * 1000
ret = t1k.read(2)
width = int(struct.unpack('h', ret)[0] / 10) * 1000 - start
threshold = int(round(t1k.read(1)[0] * 100 / 255, 2) * 1000)
inhibition = int(round(t1k.read(1)[0] * 100 / 255, 2) * 1000)
gain = int(round(struct.unpack('h', t1k.read(2))[0] * 16 / 31 - 16, 2) * 1000)
pos = g_bus_pos
if change_param < 45:
change_param += 1
fcd.write(struct.pack('i', 4 + 4 + 32))
fcd.write(struct.pack('i', 70))
fcd.write(struct.pack('I', pos))
fcd.write(struct.pack('I', number))
fcd.write(struct.pack('i', switch))
fcd.write(struct.pack('I', start))
fcd.write(struct.pack('I', width))
fcd.write(struct.pack('I', threshold))
fcd.write(struct.pack('I', inhibition))
fcd.write(struct.pack('I', gain))
else:
change_param += 1
fcd.write(struct.pack('i', 4 + 4 + 32))
fcd.write(struct.pack('i', 80))
fcd.write(struct.pack('I', pos))
fcd.write(struct.pack('I', number))
fcd.write(struct.pack('i', switch))
fcd.write(struct.pack('I', start))
fcd.write(struct.pack('I', width))
fcd.write(struct.pack('I', threshold))
fcd.write(struct.pack('I', inhibition))
fcd.write(struct.pack('I', gain))
else:
pass
elif header == b'\x00\x00\x20\x22\x27':
number_type = t1k.read(1)[0]
# number_type = keyboard_map.get(number_type, 25)
fcd.write(struct.pack('i', 4 + 4 + 16))
fcd.write(struct.pack('i', 40))
fcd.write(struct.pack('I', g_bus_pos))
fcd.write(struct.pack('i', number_type))
fcd.write(struct.pack('i', 1))
fcd.write(struct.pack('i', 0))
elif header == b'\x00\x00\x2d\x22\x21':
t1k.read(6)
else:
t1k.seek(-5, 1)
else:
break
if post_list[0] < post_list[1]:
fcd.seek(67)
fcd.write(struct.pack('i', 1))
fcd.seek(187)
kilometer = str(post_list[0]).split('.')[0]
meter = str(post_list[0]).split('.')[1][:3]
fcd.write(struct.pack('i', int(kilometer)))
fcd.write(struct.pack('i', int(meter)))
kilometer = str(post_list[1]).split('.')[0]
meter = str(post_list[1]).split('.')[1][:3]
fcd.write(struct.pack('i', int(kilometer)))
fcd.write(struct.pack('i', int(meter)))
fcd.seek(582)
fcd.write(struct.pack('i', bus_pos_list[0]))
fcd.write(struct.pack('i', bus_pos_list[1]))
fcd.write(struct.pack('f', post_list[0]))
fcd.write(struct.pack('f', post_list[1]))
fcd.write(struct.pack('f', min_pos))
fcd.write(struct.pack('f', max_pos))
# logger.info(f'3--fcd文件生成: {tmp_fcd_path}')
print('222', tmp_fcd_path)
except:
print('111 ', tmp_fcd_path)
# logger.error(f'\n 3--fcd文件生成失败: {tmp_fcd_path}' f'\n' + traceback.format_exc() + f'\n')
原文地址: https://www.cveoy.top/t/topic/mhqU 著作权归作者所有。请勿转载和采集!