该函数的作用是将一个 .t1k 格式的文件转换成 .fcd 格式的文件。函数主要分为以下步骤:

  1. 读取 .t1k 文件的版本号,如果版本号大于 3,则跳过 69 个块。
  2. 读取 .t1k 文件的头部信息,包括描述信息和轨道 ID 等。
  3. 将读取的信息写入 .fcd 文件中。
  4. 读取 .t1k 文件中的数据块,并将数据块中的信息写入 .fcd 文件中。
  5. 将转换后的 .fcd 文件保存到指定的路径中。

具体的实现细节如下:

  1. 打开 .t1k 文件和要保存的 .fcd 文件,并读取 .t1k 文件的版本号。
  2. 如果版本号大于 3,则跳过 69 个块。
  3. 读取 .t1k 文件的头部信息,包括描述信息和轨道 ID 等,并将读取的信息写入 .fcd 文件中。
  4. 读取 .t1k 文件中的数据块,并将数据块中的信息写入 .fcd 文件中。主要包括以下几种类型的数据块:
  • 轨道信息块:包括轨道编号、区域、分区等。
  • 位置信息块:包括位置、速度等。
  • 缺陷信息块:包括缺陷类型、位置、深度等。
  • 开关信息块:包括开关编号、位置、状态等。
  • 键盘信息块:包括键盘编号、状态等。
  1. 将转换后的 .fcd 文件保存到指定的路径中。

需要注意的是,函数中使用了一些辅助函数,如 read_block、skip_block、get_string 和 parse_point 等,这些函数的具体实现并没有给出,需要根据实际情况进行补充。

def fcd_convert(tmp_fcd_path, save_fcd_path, origin_t1k_name):
    try:
        with open(tmp_fcd_path, 'rb') as t1k:
            with open(save_fcd_path, 'wb') as fcd:
                ver = read_block(t1k)[6]
                if ver > 3:
                    [skip_block(t1k) for _ in range(69)]
                block_len = t1k.read(1)[0]
                header = t1k.read(6)
                desc = [get_string(t1k.read(20), ver) for _ in range(6)]
                desc += [get_string(t1k.read(15), ver) for _ in range(3)]
                t1k.seek(79, 1)
                if ver > 3:
                    for i in range(4):
                        block = read_block(t1k)
                        desc[i] = get_string(block[6: -(len(block) % 2)], ver)
                railway_name = desc[0] if len(desc[0]) <= 20 else desc[0][:20]
                region = desc[1] if len(desc[1]) <= 20 else desc[1][:20]
                subdivision = desc[2] if len(desc[2]) <= 20 else desc[2][:20]
                track_id = desc[4]

                post_direction = desc[6].split(' ')[0]
                if 'ascending' in post_direction.lower():
                    post_direction = 0
                else:
                    post_direction = 1

                movement = desc[5]
                if 'forward' in movement.lower():
                    movement = 1
                    stock_type = 0
                else:
                    movement = 0
                    stock_type = 1

                # try:
                fcd.write(struct.pack('i', 50))

                fcd.write(struct.pack('20s', '1.0'.encode('gb2312').ljust(20)))

                fcd.write(struct.pack('5s', b' '.ljust(5)))

                fcd.write(region.replace('—', ' ').encode('gb2312').ljust(30)[:30])

                fcd.write(struct.pack('i', post_direction))

                fcd.write(struct.pack('i', movement))

                fcd.write(struct.pack('i', 0))

                fcd.write(struct.pack('20s', b' '.ljust(20)))

                fcd.write(struct.pack('i', 0))

                fcd.write(struct.pack('d', 0))

                fcd.write(struct.pack('d', 0))

                fcd.write(subdivision.replace('—', ' ').encode('gb2312').ljust(60)[:60])

                fcd.write(struct.pack('16s', b' '.ljust(16)))

                fcd.write(struct.pack('i', 0))

                fcd.write(struct.pack('i', 0))

                fcd.write(struct.pack('i', 0))

                fcd.write(struct.pack('i', 0))

                fcd.write(struct.pack('i', 1))

                fcd.write(struct.pack('i', 1))

                fcd.write(struct.pack('20s', b' '.ljust(20)))

                fcd.write(railway_name.replace('—', ' ').encode('gb2312').ljust(20)[:20])

                fcd.write(track_id.replace('—', ' ').encode('gb2312').ljust(20)[:20])

                fcd.write(struct.pack('i', stock_type))

                fcd.write(struct.pack('f', 1.6))

                fcd.write(struct.pack('i', 4 + 4 + 351))
                fcd.write(struct.pack('i', 10))
                filename = origin_t1k_name

                fcd.write(struct.pack('255s', filename.replace('—', ' ').encode('gb2312').ljust(255))[:255])

                fcd.write(struct.pack('20s', b' '.ljust(20)))

                fcd.write(struct.pack('20s', b' '.ljust(20)))

                fcd.write(struct.pack('I', 0))

                fcd.write(struct.pack('I', 0))

                fcd.write(struct.pack('f', 0))

                fcd.write(struct.pack('f', 0))

                fcd.write(struct.pack('f', 0))

                fcd.write(struct.pack('f', 0))

                for i in range(4):
                    fcd.write(struct.pack('d', 0))

                # except Exception:
                #     pos = fcd.tell()
                #     length = 638 - pos
                #     fcd.write(struct.pack(f'{length}s', b''.ljust(length, b'*')))
                post_list = []
                bus_pos_list = []
                flag_pos = True
                min_pos = 0
                max_pos = 0
                g_bus_pos = 0

                change_param = 1

                # points = []

                while True:
                    cnt = t1k.read(1)
                    if len(cnt) == 1:
                        block_len = cnt[0]
                        header = t1k.read(5)
                        if len(header) != 5:
                            break
                        if header[: 4] == b'\x00\x00\t\x22' and (header[4] == 33 or header[4] == 34):
                            ret = t1k.read(1)
                            if ret == b'\x11':
                                post = struct.unpack('f', t1k.read(4))[0]
                                if ver > 3:
                                    post = post * 1.609344
                                bus_pos = struct.unpack('i', t1k.read(4))[0]
                                speed = struct.unpack('f', t1k.read(4))[0]
                                if len(post_list) < 2:
                                    post_list.append(post)
                                if len(post_list) == 2:
                                    post_list[1] = post
                                if flag_pos:
                                    min_pos = post
                                    max_pos = post
                                    flag_pos = False
                                else:
                                    if post >= max_pos:
                                        max_pos = post
                                    if post <= min_pos:
                                        min_pos = post

                                fcd.write(struct.pack('i', 4 + 4 + 12))
                                fcd.write(struct.pack('i', 30))
                                fcd.write(struct.pack('I', bus_pos))
                                fcd.write(struct.pack('f', post))
                                fcd.write(struct.pack('f', speed))

                            elif ret == b'\x18':
                                t1k.read(26)
                            else:
                                pass

                        elif header[:5] == b'\x00\x00\x07\x22\x23' or header[:5] == b'\x00\x00\x07\x21\x23':
                            flag = t1k.read(1)[0]
                            top_rail = flag & 0x040 != 0
                            code = flag & 0x3f
                            bus_pos = struct.unpack('i', t1k.read(4))[0]
                            max_depth = t1k.read(1)[0]
                            width = struct.unpack('h', t1k.read(2))[0]
                            min_depth = t1k.read(1)[0]
                            severity = t1k.read(1)[0]
                            tag_id = struct.unpack('i', t1k.read(4))

                            fcd.write(struct.pack('i', 4 + 4 + 28))
                            fcd.write(struct.pack('i', 110))
                            fcd.write(struct.pack('I', bus_pos))
                            fcd.write(struct.pack('i', top_rail))
                            fcd.write(struct.pack('i', code))
                            fcd.write(struct.pack('i', min_depth))
                            fcd.write(struct.pack('i', max_depth))
                            fcd.write(struct.pack('i', width))
                            if severity == 125:
                                severity = 2
                            elif severity == 126:
                                severity = 1
                            elif severity == 128:
                                severity = 3
                            else:
                                severity = 1

                            fcd.write(struct.pack('i', severity))

                        elif header[1:4] == b'\x00\x05\x23' and block_len >= 11 and (
                                header[4] == 36 or header[4] == 100):
                            top_rail = (header[4] & 0x40 == 0)
                            start = struct.unpack('i', t1k.read(4))[0]
                            width = t1k.read(1)[0]
                            for g_bus_pos in range(start, start + width, 1):
                                record_count = t1k.read(1)[0]
                                for j in range(record_count):
                                    depth = t1k.read(1)[0]
                                    channels = struct.unpack('h', t1k.read(2))[0]
                                    point = [g_bus_pos, top_rail, depth, channels]

                                    points = parse_point(point)
                                    # points.append(point)
                                    for point in points:
                                        g_bus_pos, top_rail, depth, channels = point
                                        if len(bus_pos_list) < 2:
                                            bus_pos_list.append(g_bus_pos)
                                        if len(bus_pos_list) == 2:
                                            bus_pos_list[1] = g_bus_pos

                                        fcd.write(struct.pack('i', 4 + 4 + 16))
                                        fcd.write(struct.pack('i', 20))
                                        fcd.write(struct.pack('I', g_bus_pos))
                                        fcd.write(struct.pack('i', top_rail))
                                        fcd.write(struct.pack('i', depth))

                                        type = channels_map.get(channels, 1)
                                        fcd.write(struct.pack('i', type))


                        elif header[1:] == b'\x00\x36\x22\x21':
                            if t1k.read(1) == b'\x08':

                                number = t1k.read(1)[0]

                                switch = t1k.read(4)[3]

                                start = int(struct.unpack('h', t1k.read(2))[0] / 10) * 1000

                                ret = t1k.read(2)
                                width = int(struct.unpack('h', ret)[0] / 10) * 1000 - start

                                threshold = int(round(t1k.read(1)[0] * 100 / 255, 2) * 1000)

                                inhibition = int(round(t1k.read(1)[0] * 100 / 255, 2) * 1000)

                                gain = int(round(struct.unpack('h', t1k.read(2))[0] * 16 / 31 - 16, 2) * 1000)

                                pos = g_bus_pos

                                if change_param < 45:
                                    change_param += 1
                                    fcd.write(struct.pack('i', 4 + 4 + 32))
                                    fcd.write(struct.pack('i', 70))
                                    fcd.write(struct.pack('I', pos))
                                    fcd.write(struct.pack('I', number))
                                    fcd.write(struct.pack('i', switch))
                                    fcd.write(struct.pack('I', start))
                                    fcd.write(struct.pack('I', width))
                                    fcd.write(struct.pack('I', threshold))
                                    fcd.write(struct.pack('I', inhibition))
                                    fcd.write(struct.pack('I', gain))
                                else:
                                    change_param += 1
                                    fcd.write(struct.pack('i', 4 + 4 + 32))
                                    fcd.write(struct.pack('i', 80))
                                    fcd.write(struct.pack('I', pos))
                                    fcd.write(struct.pack('I', number))
                                    fcd.write(struct.pack('i', switch))
                                    fcd.write(struct.pack('I', start))
                                    fcd.write(struct.pack('I', width))
                                    fcd.write(struct.pack('I', threshold))
                                    fcd.write(struct.pack('I', inhibition))
                                    fcd.write(struct.pack('I', gain))

                            else:
                                pass

                        elif header == b'\x00\x00\x20\x22\x27':
                            number_type = t1k.read(1)[0]
                            # number_type = keyboard_map.get(number_type, 25)

                            fcd.write(struct.pack('i', 4 + 4 + 16))
                            fcd.write(struct.pack('i', 40))
                            fcd.write(struct.pack('I', g_bus_pos))
                            fcd.write(struct.pack('i', number_type))
                            fcd.write(struct.pack('i', 1))
                            fcd.write(struct.pack('i', 0))


                        elif header == b'\x00\x00\x2d\x22\x21':
                            t1k.read(6)

                        else:
                            t1k.seek(-5, 1)
                    else:
                        break

                if post_list[0] < post_list[1]:
                    fcd.seek(67)
                    fcd.write(struct.pack('i', 1))
                fcd.seek(187)

                kilometer = str(post_list[0]).split('.')[0]
                meter = str(post_list[0]).split('.')[1][:3]
                fcd.write(struct.pack('i', int(kilometer)))
                fcd.write(struct.pack('i', int(meter)))

                kilometer = str(post_list[1]).split('.')[0]
                meter = str(post_list[1]).split('.')[1][:3]
                fcd.write(struct.pack('i', int(kilometer)))
                fcd.write(struct.pack('i', int(meter)))

                fcd.seek(582)
                fcd.write(struct.pack('i', bus_pos_list[0]))
                fcd.write(struct.pack('i', bus_pos_list[1]))
                fcd.write(struct.pack('f', post_list[0]))
                fcd.write(struct.pack('f', post_list[1]))
                fcd.write(struct.pack('f', min_pos))
                fcd.write(struct.pack('f', max_pos))

            # logger.info(f'3--fcd文件生成: {tmp_fcd_path}')
        print('222', tmp_fcd_path)
    except:
        print('111 ', tmp_fcd_path)
        # logger.error(f'\n 3--fcd文件生成失败: {tmp_fcd_path}' f'\n' + traceback.format_exc() + f'\n')
T1K to FCD 文件转换:详细代码解析和优化

原文地址: https://www.cveoy.top/t/topic/mhqU 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录