import os import numpy as np import matplotlib.pyplot as plt from sklearn.decomposition import PCA

def read_fasta_file(file_path): sequences = {} with open(file_path) as f: sequence_name = '' sequence = '' for line in f: if line.startswith('>'): if sequence_name: sequences[sequence_name] = sequence sequence_name = line.strip()[1:] sequence = '' else: sequence += line.strip() if sequence_name: sequences[sequence_name] = sequence return sequences

def get_transition_matrix(sequences): # 初始化状态转移频率矩阵 transition_matrix = np.zeros((4, 4)) # 遍历所有序列 for sequence in sequences.values(): # 遍历序列中的所有碱基对 for i in range(len(sequence)-1): # 获取当前碱基和下一个碱基的状态 current_state = get_base_state(sequence[i]) next_state = get_base_state(sequence[i+1]) # 更新状态转移频率矩阵 transition_matrix[current_state, next_state] += 1 # 将状态转移频率矩阵转换为状态转移概率矩阵 transition_matrix = transition_matrix / np.sum(transition_matrix, axis=1, keepdims=True) return transition_matrix

def get_base_state(base): if base == 'A': return 0 elif base == 'C': return 1 elif base == 'G': return 2 elif base == 'T': return 3 else: return -1

def count_adjacent_transitions(sequences): # 初始化相邻状态转移计数器 transition_count = { 'AA': 0, 'AC': 0, 'AG': 0, 'AT': 0, 'CA': 0, 'CC': 0, 'CG': 0, 'CT': 0, 'GA': 0, 'GC': 0, 'GG': 0, 'GT': 0, 'TA': 0, 'TC': 0, 'TG': 0, 'TT': 0, } # 遍历所有序列 for sequence in sequences.values(): # 遍历序列中的所有碱基对 for i in range(len(sequence)-1): # 获取当前碱基和下一个碱基的状态 current_state = get_base_state(sequence[i]) next_state = get_base_state(sequence[i+1]) # 判断当前和下一个碱基是否都合法 if current_state != -1 and next_state != -1: # 统计相邻状态转移 transition_key = '{}{}'.format(get_state_base(current_state), get_state_base(next_state)) transition_count[transition_key] += 1 return transition_count

def get_state_base(state): if state == 0: return 'A' elif state == 1: return 'C' elif state == 2: return 'G' elif state == 3: return 'T' else: raise ValueError('Invalid state: {}'.format(state))

def plot_pac(transition_matrix): # 计算PAC pac = np.zeros((4, 4)) for i in range(4): for j in range(4): pac[i, j] = transition_matrix[i, j] * transition_matrix[j, i] / (transition_matrix[i, i] * transition_matrix[j, j]) # 绘制PAC图 fig, ax = plt.subplots() im = ax.imshow(pac, cmap='coolwarm', vmin=0, vmax=1) ax.set_xticks(np.arange(4)) ax.set_yticks(np.arange(4)) ax.set_xticklabels(['A', 'C', 'G', 'T']) ax.set_yticklabels(['A', 'C', 'G', 'T']) ax.set_title('PAC') cbar = ax.figure.colorbar(im, ax=ax) cbar.ax.set_ylabel('PAC value', rotation=-90, va="bottom") plt.show()

遍历文件夹中的所有fasta文件

folder_path = 'fasta_folder' for file_name in os.listdir(folder_path): if file_name.endswith('.fasta'): file_path = os.path.join(folder_path, file_name) print('Processing file: {}'.format(file_path)) # 读取FASTA文件 sequences = read_fasta_file(file_path) # 计算状态转移频率矩阵 transition_matrix = get_transition_matrix(sequences) print('Transition Matrix:') print(transition_matrix) # 统计两个相邻状态转移的次数 transition_count = count_adjacent_transitions(sequences) print('Adjacent Transition Count:') print(transition_count) # 标准化状态转移频率矩阵 standardized_matrix = (transition_matrix - np.mean(transition_matrix)) / np.std(transition_matrix) # 进行PCA分析 pca = PCA(n_components=2) pca.fit(standardized_matrix) transformed_matrix = pca.transform(standardized_matrix) # 绘制散点图 fig, ax = plt.subplots() ax.scatter(transformed_matrix[:, 0], transformed_matrix[:, 1]) for i, txt in enumerate(['A', 'C', 'G', 'T']): ax.annotate(txt, (transformed_matrix[i, 0], transformed_matrix[i, 1])) ax.set_xlabel('PC1 ({:.2f}%)'.format(pca.explained_variance_ratio_[0] * 100)) ax.set_ylabel('PC2 ({:.2f}%)'.format(pca.explained_variance_ratio_[1] * 100)) ax.set_title('PCA') plt.show() # 绘制PAC图 plot_pac(transition_matrix)


原文地址: https://www.cveoy.top/t/topic/lNaj 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录