活动介绍

lines = np.unique(np.array([i[0]+i[1]for i in data[:,[1,2]]])) lines = np.array([[i[0],i[1]] for i in lines])这两行代码什么意思

时间: 2023-11-09 12:08:57 浏览: 116
这两行代码的作用是从数据集中提取不同的行(line),并将它们存储在一个 Numpy 数组中。 第一行代码中,`data[:,[1,2]]` 表示选择数据集中所有行的第 1 和第 2 列,即线段的起点和终点。`[i[0] i[1]for i in data[:,[1,2]]]` 将这些起点和终点组成一个列表。`np.unique()` 函数用于去除列表中的重复元素,因此 `lines` 变量将包含所有不同的线段。 第二行代码中,`[[i[0],i[1]] for i in lines]` 将 `lines` 中的元素转换为一个包含两个元素的列表,即线段的起点和终点。最终,`lines` 数组将包含所有不同线段的起点和终点的坐标。
相关问题

lines = np.unique(np.array([i[0]+i[1]for i in data[:,[1,2]]])) lines = np.array([[i[0],i[1]] for i in lines]) date = np.unique(data[:,0])

这段代码的作用是什么? 首先,假设`data`是一个二维的NumPy数组,其中每一行代表一个数据点,每一列代表不同的特征。那么,这段代码的作用如下: 1. 从`data`中选取第2列和第3列,并将它们合并成一个字符串。这里使用了列表推导式,将选取到的每一行的第2列和第3列合并起来,并形成一个列表。 2. 使用`np.unique`函数将这个列表去重,返回一个仅包含唯一值的新列表。 3. 将这个新列表中的每个字符串拆分成两个元素,存储到一个新的NumPy数组`lines`中。这里同样使用了列表推导式。 4. 从`data`中选取第1列,并使用`np.unique`函数将这一列去重,返回一个仅包含唯一值的新列表。这些唯一值代表不同的日期。 5. 返回`lines`和`date`两个变量。 因此,最终的结果是将`data`中的第2列和第3列合并成一个新的字符串,并且去重后将每个字符串拆分成两个元素存储到一个新的NumPy数组中;同时,将`data`中的第1列去重后,存储到另一个新的列表中。

这两行代码是在讲什么lines = np.unique(np.array([i[0]+i[1]for i in data[:,[1,2]]])) lines = np.array([[i[0],i[1]] for i in lines])

这两行代码的作用是从数据集中的第二和第三列提取出所有的行和列的组合,并去除重复的行和列,然后将其转换为一个二维数组。具体来说,第一行代码中,`data[:,[1,2]]`表示从数据集的第二和第三列中提取出所有的行和列的组合,`[i[0] i[1]for i in data[:,[1,2]]]`将这些组合转换为一个列表,`np.array()`将这个列表转换为一个数组,最后`np.unique()`去除数组中的重复项。第二行代码中,`[[i[0],i[1]] for i in lines]`将数组中的元素转换为一个二维列表。
阅读全文

相关推荐

#!/usr/bin/env python import numpy as np import linecache from io import StringIO import time import os import mmap import platform import math import argparse def pbc(a, box): return a - box * np.round(a/box) class CellList: """ A class for implementing the link cell list method in molecular simulations. Handles periodic boundary conditions and efficient neighbor searching. Attributes: bounds (np.ndarray): Box boundaries of shape (3, 2) cell_size (float): Requested size of grid cells box_lengths (np.ndarray): Size of the simulation box in each dimension n_cells (np.ndarray): Number of cells in each dimension (nx, ny, nz) actual_cell_size (np.ndarray): Actual cell size after adjusting for box size total_cells (int): Total number of cells in the system particle_cell_indices (np.ndarray): Cell index for each particle cell_to_particles (dict): Mapping from cell index to particle indices particles (np.ndarray): Array of particle coordinates """ def __init__(self, bounds, cell_size): """ Initialize the cell list system. Args: bounds: Box boundaries as [[xmin, xmax], [ymin, ymax], [zmin, zmax]] cell_size: Requested size of grid cells (cubic cells assumed) """ self.bounds = np.array(bounds, dtype=float) self.cell_size = float(cell_size) # Calculate box dimensions self.box_lengths = self.bounds[:, 1] - self.bounds[:, 0] # Calculate number of cells in each dimension (ceiling to cover entire box) self.n_cells = np.ceil(self.box_lengths / self.cell_size).astype(int) self.nx, self.ny, self.nz = self.n_cells self.total_cells = self.nx * self.ny * self.nz # Calculate actual cell size (may be smaller than requested) self.actual_cell_size = self.box_lengths / self.n_cells # Initialize particle storage self.particle_cell_indices = None self.cell_to_particles = None self.particles = None # Validate grid if np.any(self.n_cells < 1): raise ValueError("Cell size too large for box dimensions") if np.any(self.actual_cell_size <= 0): raise ValueError("Invalid cell size resulting in non-positive dimensions") def assign_particles(self, coords): """ Assign particles to cells and build the cell-to-particles mapping. Args: coords: Array of particle coordinates, shape (N, 3) """ coords = np.asarray(coords) if coords.shape[1] != 3: raise ValueError("Particle coordinates must be 3-dimensional") self.particles = coords N = len(coords) self.particle_cell_indices = np.zeros(N, dtype=int) self.cell_to_particles = {} # Assign each particle to a cell for idx, coord in enumerate(coords): cell_idx = self.coord_to_cell_index(coord) self.particle_cell_indices[idx] = cell_idx if cell_idx not in self.cell_to_particles: self.cell_to_particles[cell_idx] = [] self.cell_to_particles[cell_idx].append(idx) def coord_to_cell_index(self, coord): """ Convert a 3D coordinate to a 1D cell index with periodic boundaries. Args: coord: Particle coordinate (x, y, z) Returns: int: 1D cell index """ # Apply periodic boundary conditions rel_pos = coord - self.bounds[:, 0] periodic_pos = rel_pos % self.box_lengths # Calculate grid coordinates i, j, k = (periodic_pos / self.actual_cell_size).astype(int) # Apply periodic wrapping i = i % self.nx j = j % self.ny k = k % self.nz # Convert to 1D index: index = i + j*nx + k*nx*ny return int(i + j * self.nx + k * self.nx * self.ny) def get_neighboring_cells(self, cell_index): """ Get the 26 neighboring cells (plus central cell) for a given cell index. Args: cell_index: Central cell index Returns: list: Indices of 27 neighboring cells (including central cell) """ # Convert 1D index to 3D grid coordinates k = cell_index // (self.nx * self.ny) remainder = cell_index % (self.nx * self.ny) j = remainder // self.nx i = remainder % self.nx neighbors = [] # Iterate over 3x3x3 neighborhood for dx in [-1, 0, 1]: for dy in [-1, 0, 1]: for dz in [-1, 0, 1]: # Apply periodic boundaries ni = (i + dx) % self.nx nj = (j + dy) % self.ny nk = (k + dz) % self.nz # Convert back to 1D index neighbor_index = ni + nj * self.nx + nk * (self.nx * self.ny) neighbors.append(neighbor_index) return neighbors def get_particles_in_neighboring_cells(self, coord): """ Get all particle indices in the 27 neighboring cells of a given coordinate. This includes all particles in the same cell as the coordinate. Args: coord: Reference coordinate (x, y, z) Returns: list: Indices of particles in neighboring cells """ if self.cell_to_particles is None: raise RuntimeError("Particles not assigned. Call assign_particles() first.") # Get central cell index center_idx = self.coord_to_cell_index(coord) # Get all neighboring cell indices (27 cells) neighbor_cells = self.get_neighboring_cells(center_idx) # Collect all particles in these cells particles = [] for cell_idx in neighbor_cells: if cell_idx in self.cell_to_particles: particles.extend(self.cell_to_particles[cell_idx]) return particles def get_neighbor_particles_excluding_self(self, particle_index): """ Get neighbor particles excluding the particle itself. Args: particle_index: Index of the particle to find neighbors for Returns: list: Indices of neighboring particles (excluding self) """ if self.particles is None: raise RuntimeError("Particles not assigned. Call assign_particles() first.") # Get particle coordinate coord = self.particles[particle_index] # Get all particles in neighboring cells (including self) all_neighbors = self.get_particles_in_neighboring_cells(coord) # Remove self from the list neighbors_excluding_self = [idx for idx in all_neighbors if idx != particle_index] return neighbors_excluding_self def get_particles_in_same_cell(self, coord): """ Get all particle indices in the same cell as the given coordinate. Args: coord: Reference coordinate (x, y, z) Returns: list: Indices of particles in the same cell """ if self.cell_to_particles is None: raise RuntimeError("Particles not assigned. Call assign_particles() first.") # Get cell index cell_idx = self.coord_to_cell_index(coord) # Return particles in this cell if cell_idx in self.cell_to_particles: return self.cell_to_particles[cell_idx].copy() return [] def get_cell_stats(self): """ Get information about the grid system. Returns: dict: Dictionary containing grid statistics """ return { "grid_dimensions": (self.nx, self.ny, self.nz), "total_cells": self.total_cells, "requested_cell_size": self.cell_size, "actual_cell_size": self.actual_cell_size, "box_dimensions": self.box_lengths } class snapshot: def __init__(self): self.nodes = {} class ReaderLammpstrj(): def __init__(self, filename): self.filename = filename self.mmap_scan() # grasp some info from frames list print("The trjectory is successfully loaded!") print("num of frames: %d"%(self.nframes)) def mmap_scan(self): f = open(self.filename, "r") if platform.platform() == 'windows': self.mmap = mmap.mmap(f.fileno(), length=0, access=mmap.ACCESS_READ, tagname='sharemem') else: self.mmap = mmap.mmap( f.fileno(), length=0, flags=mmap.MAP_SHARED, access=mmap.ACCESS_READ) # get all the seek self.Seek = [] index_past = 0 while True: index_current = self.mmap.find(b'ITEM: TIMESTEP', index_past) index_past = index_current + 1 if index_current == -1: break self.Seek.append(index_current) self.Seek = np.asarray(self.Seek, dtype=int) self.nframes = len(self.Seek) def mmap_snap(self, ifr): start = self.Seek[ifr] end = self.Seek[ifr + 1] if ifr != len(self.Seek) - 1 else self.mmap.size() - 1 nchars = end - start self.mmap.seek(start) str_frame = self.mmap.read(nchars).decode('utf-8').strip() return self.parse_snap_fromstr(str_frame) def parse_snap_fromstr(self, snap_str): snap = snapshot() snap_lines = snap_str.split('\n') for i, line in enumerate(snap_lines): sp = line.strip().split() if sp[0] == "ITEM:": if sp[1] == "TIMESTEP": snap.timestep = int(snap_lines[i + 1].strip()) elif sp[1] == "NUMBER": snap.natoms = int(snap_lines[i + 1].strip()) elif sp[1] == "BOX": snap.dimension = len(sp[3:]) edge = np.asarray([[float(l) for l in snap_lines[i + _].strip().split()] for _ in range(1, snap.dimension + 1)]).reshape(-1) snap.edge = edge lx = snap.edge[1] - snap.edge[0] ly = snap.edge[3] - snap.edge[2] lz = snap.edge[5] - snap.edge[4] snap.box = np.array([lx, ly, lz]) snap.bounds = edge.reshape(-1, 2) elif sp[1] == "ATOMS": atom_header = sp[2:] io_str = StringIO("\n".join(snap_lines[i + 1:])) atom_value = np.loadtxt(io_str) # grap info for ah in atom_header: if ah not in snap.nodes: snap.nodes[ah] = [] index = atom_header.index(ah) snap.nodes[ah] = atom_value[:, index] # add some info to snap snap.types = np.unique(snap.nodes['type']) snap.natoms = len(snap.nodes['type']) snap.nodes['type'] = snap.nodes['type'].astype(int) # change xs to x if scaled if 'xs' in snap.nodes: snap.nodes['x'] = snap.nodes['xs'] * \ snap.box[0] + snap.edge[0] snap.nodes['y'] = snap.nodes['ys'] * \ snap.box[1] + snap.edge[2] snap.nodes['z'] = snap.nodes['zs'] * \ snap.box[2] + snap.edge[4] # add pos snap.pos = np.c_[snap.nodes['x'], snap.nodes['y'], snap.nodes['z']] return snap class ReaderLammpsData: def __init__(self, filename): self.filename = filename self.scan() def scan(self): import linecache strLines = np.asarray(linecache.getlines(self.filename)) self.nodes = {} for i, line in enumerate(strLines): sp = line.split() if len(sp) == 2 and sp[-1] in ['atoms', 'bonds', 'angles', 'dihedrals', 'impropers']: key = 'num_' + sp[-1] self.nodes[key] = int(sp[0]) elif len(sp) == 3 and sp[-1] == 'types': key = 'num_' + sp[1] + '_' + sp[-1] self.nodes[key] = int(sp[0]) elif len(sp) == 4 and sp[-1][-2:] == 'hi': key = 'edge_' + sp[-1][0] self.nodes[key] = [float(sp[0]), float(sp[1])] elif len(sp) > 0 and sp[0] == 'Masses': index_start = i + 2 index_end = index_start + self.nodes['num_atom_types'] sectionLines = strLines[index_start:index_end] self.nodes['Masses'] = np.loadtxt( StringIO(''.join(sectionLines))) elif len(sp) > 0 and sp[0] == 'Atoms': index_start = i + 2 index_end = index_start + self.nodes['num_atoms'] sectionLines = strLines[index_start:index_end] io_str = StringIO(''.join(sectionLines)) self.nodes['Atoms'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=float) elif len(sp) > 0 and sp[0] == 'Bonds': index_start = i + 2 index_end = index_start + self.nodes['num_bonds'] sectionLines = strLines[index_start:index_end] self.nodes['Bonds'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) elif len(sp) > 0 and sp[0] == 'Angles': index_start = i + 2 index_end = index_start + self.nodes['num_angles'] sectionLines = strLines[index_start:index_end] self.nodes['Angles'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) elif len(sp) > 0 and sp[0] == 'Dihedrals': index_start = i + 2 index_end = index_start + self.nodes['num_dihedrals'] sectionLines = strLines[index_start:index_end] self.nodes['Dihedrals'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) elif len(sp) > 0 and sp[0] == 'Impropers': index_start = i + 2 index_end = index_start + self.nodes['num_impropers'] sectionLines = strLines[index_start:index_end] self.nodes['impropers'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) # get types self.atom_types = self.nodes['Atoms'][:, 2] self.types = np.unique(self.atom_types) # get bonded atom id for all atoms self.bonded = {} for bond in self.nodes['Bonds']: ida, idb = bond[2], bond[3] if ida not in self.bonded: self.bonded[ida] = [] self.bonded[ida].append(idb) else: self.bonded[ida].append(idb) if idb not in self.bonded: self.bonded[idb] = [] self.bonded[idb].append(ida) else: self.bonded[idb].append(ida) class compute_hbonds: def __init__(self, donors_type, hydrogens_type, acceptors_type, dist, alpha): self.donors_type = donors_type self.hydrogens_type = hydrogens_type self.acceptors_type = acceptors_type self.dist = dist self.dist2 = dist * dist self.alpha = alpha self.get_index_fromInitData() self.run() def get_index_fromInitData(self): # get donors id_donors = [] for d in self.donors_type: ret = np.argwhere(D.atom_types == d).reshape(-1) id_donors.extend(D.nodes['Atoms'][ret, 0]) # get hydrogens id_hydrogens = [] for d in self.hydrogens_type: ret = np.argwhere(D.atom_types == d).reshape(-1) id_hydrogens.extend(D.nodes['Atoms'][ret, 0]) # get acceptors id_acceptors = [] for d in self.acceptors_type: ret = np.argwhere(D.atom_types == d).reshape(-1) id_acceptors.extend(D.nodes['Atoms'][ret, 0]) self.id_donors = np.asarray(id_donors, dtype=int) self.id_hydrogens = np.asarray(id_hydrogens, dtype=int) self.id_acceptors = np.asarray(id_acceptors, dtype=int) print("------------------------") print("num of donors: %d" % (self.id_donors.shape[0])) print("num of hydrogens: %d" % (len(self.id_hydrogens))) print("num of acceptors: %d" % (self.id_acceptors.shape[0])) print("------------------------") def run(self): self.frame_hbonds = [] o = open('hbonds_rawdata.csv', 'w') o.write("index,timestep,id_donor,id_hydrogen,id_acceptor,distance,angle,x_donor,y_donor,z_donor,x_h,y_h,z_h,x_acceptor,y_acceptor,z_acceptor\n") for ifr in range(T.nframes): ret_hbonds = [] id_hbonds = [] snap = T.mmap_snap(ifr) print('Tackle Frame Timestep: %d, index: %d' % (snap.timestep, ifr)) hbond_cnt = 0 # link cell list cell_list = CellList(snap.bounds, 3.0) cell_list.assign_particles(snap.pos) for i, don in enumerate(self.id_donors): index = np.argwhere(snap.nodes['id'] == don).reshape(-1)[0] pos_don = np.asarray([snap.nodes['x'][index], snap.nodes['y'][index], snap.nodes['z'][index]]) # get neighboring acceptors neighbors = cell_list.get_neighbor_particles_excluding_self(index) id_neigh_acceptors = [] for nei in neighbors: if snap.nodes['type'][nei] in self.acceptors_type: id_neigh_acceptors.append(snap.nodes['id'][nei]) for h in D.bonded[don]: # check if used if h in id_hbonds: continue if h in self.id_hydrogens: index_h = np.argwhere( snap.nodes['id'] == h).reshape(-1)[0] pos_h = np.asarray([snap.nodes['x'][index_h], snap.nodes['y'][index_h], snap.nodes['z'][index_h]]) vec_h = pbc(pos_h-pos_don, snap.box) else: continue # cycle all the acceptors, like O-H...O for acc in id_neigh_acceptors: # check if used if acc == don: continue index_acc = np.argwhere( snap.nodes['id'] == acc).reshape(-1)[0] pos_acc = np.asarray( [snap.nodes['x'][index_acc], snap.nodes['y'][index_acc], snap.nodes['z'][index_acc]]) # check dist vec_acc = pbc(pos_acc-pos_don, snap.box) dist2 = (vec_acc**2).sum() if dist2 < self.dist2: # check angle angle = (vec_h * vec_acc).sum() / np.linalg.norm(vec_h) / np.linalg.norm(vec_acc) angle = np.arccos(angle) / np.pi * 180 if angle < self.alpha: # record this hydrogen bond hbond_cnt += 1 ret_hbonds.append( [don, h, acc, pos_don, pos_h, pos_acc, dist2**0.5, angle]) id_hbonds.extend([don, h, acc]) # wirte to csv o.write("%d,%d,%d,%d,%d,%.4f,%.4f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f\n"%(ifr, snap.timestep, don, h, acc, dist2**0.5, angle, pos_don[0], pos_don[1], pos_don[2], pos_h[0], pos_h[1],pos_h[2],pos_acc[0],pos_acc[1],pos_acc[2])) print('hbonds: %d' % (hbond_cnt)) # append hbonds self.frame_hbonds.append(ret_hbonds) def compute_ditribution_along_vec(self, direction, start, end, dx=5): print('-----------------') if direction == 'x': index = 0 elif direction == 'y': index = 1 else: index = 2 result = [] # get position-direction bins = np.arange(start, end+0.1, dx) for frame in self.frame_hbonds: pos = np.asarray([hb[4][index] for hb in frame]) hist, e = np.histogram(pos, density=False, bins=bins) result.append(hist) rr = bins[:-1] + np.diff(bins) final_result = np.c_[rr, np.asarray(result).T] np.savetxt(outname, final_result, fmt='%.2f') print('calculation finished!') def get_args(): parser = argparse.ArgumentParser( description='Calculate the hydrogen bonds distribution along one direction.') parser.add_argument( '--f', type=str, help='filename of lammps dump file', required=True) parser.add_argument( '--d', type=str, help='filename of lammps data file', required=True) parser.add_argument( '--donor', type=str, help="donor type, seperate by ',', like '7,9'", required=True) parser.add_argument('--hydrogen', type=str, help="hydrogen type, seperate by ',', like '5,6'", required=True) parser.add_argument('--acceptor', type=str, help="acceptor type, seperate by ',', like '3,4'", required=True) parser.add_argument('--axis', type=str, help="distribution along axis", required=True) parser.add_argument('--start', type=float, help="histogram bin start along axis", required=True) parser.add_argument('--end', type=float, help="histogram bin end along axis", required=True) parser.add_argument('--binsize', type=float, help="histogram bin size along axis", required=True) parser.add_argument('--outname', type=str, help="dump filename. \nThe fist column: position tick along axis \n \ The other columns: hbonds distribution for all the frames", default='hbond.txt') parser.add_argument('--dist', type=float, help="distance criteria in hydrogen bond, default 3.5", default=3.5) parser.add_argument('--alpha', type=float, help="angle criteria in hydrogen bond, default 30", default=30.0) args = parser.parse_args() return args if __name__ == '__main__': args = get_args() donors = [int(_) for _ in args.donor.split(',')] acceptors = [int(_) for _ in args.acceptor.split(',')] hydrogens = [int(_) for _ in args.hydrogen.split(',')] dist = float(args.dist) alpha = float(args.alpha) axis = args.axis start = args.start end = args.end binsize = float(args.binsize) outname = args.outname # read and calculation T = ReaderLammpstrj(args.f) D = ReaderLammpsData(args.d) # print all types and num print('-----------------') for p in np.unique(D.atom_types): num = len(np.argwhere(D.atom_types == p).reshape(-1)) print('type %d: %d' % (p, num)) C = compute_hbonds(donors, hydrogens, acceptors, dist, alpha) C.compute_ditribution_along_vec(axis, start, end, binsize) 这个脚本怎么运行

详细解释这个脚本,逐行 import numpy as np import linecache from io import StringIO import time import os import mmap import platform import math import argparse def pbc(a, box): return a - box * np.round(a/box) class CellList: """ A class for implementing the link cell list method in molecular simulations. Handles periodic boundary conditions and efficient neighbor searching. python calculate_hbonds.py --f 01traj.lammpstrj --d 01system.data --donor 3 --hydrogen 5 --acceptor 1,4,6 --outname 01ret.txt --axis=x --binsize=5.0 --dist 3.5 --alpha 30 --start 0 --end 80 Attributes: bounds (np.ndarray): Box boundaries of shape (3, 2) cell_size (float): Requested size of grid cells box_lengths (np.ndarray): Size of the simulation box in each dimension n_cells (np.ndarray): Number of cells in each dimension (nx, ny, nz) actual_cell_size (np.ndarray): Actual cell size after adjusting for box size total_cells (int): Total number of cells in the system particle_cell_indices (np.ndarray): Cell index for each particle cell_to_particles (dict): Mapping from cell index to particle indices particles (np.ndarray): Array of particle coordinates """ def __init__(self, bounds, cell_size): """ Initialize the cell list system. Args: bounds: Box boundaries as [[xmin, xmax], [ymin, ymax], [zmin, zmax]] cell_size: Requested size of grid cells (cubic cells assumed) """ self.bounds = np.array(bounds, dtype=float) self.cell_size = float(cell_size) # Calculate box dimensions self.box_lengths = self.bounds[:, 1] - self.bounds[:, 0] # Calculate number of cells in each dimension (ceiling to cover entire box) self.n_cells = np.ceil(self.box_lengths / self.cell_size).astype(int) self.nx, self.ny, self.nz = self.n_cells self.total_cells = self.nx * self.ny * self.nz # Calculate actual cell size (may be smaller than requested) self.actual_cell_size = self.box_lengths / self.n_cells # Initialize particle storage self.particle_cell_indices = None self.cell_to_particles = None self.particles = None # Validate grid if np.any(self.n_cells < 1): raise ValueError("Cell size too large for box dimensions") if np.any(self.actual_cell_size <= 0): raise ValueError("Invalid cell size resulting in non-positive dimensions") def assign_particles(self, coords): """ Assign particles to cells and build the cell-to-particles mapping. Args: coords: Array of particle coordinates, shape (N, 3) """ coords = np.asarray(coords) if coords.shape[1] != 3: raise ValueError("Particle coordinates must be 3-dimensional") self.particles = coords N = len(coords) self.particle_cell_indices = np.zeros(N, dtype=int) self.cell_to_particles = {} # Assign each particle to a cell for idx, coord in enumerate(coords): cell_idx = self.coord_to_cell_index(coord) self.particle_cell_indices[idx] = cell_idx if cell_idx not in self.cell_to_particles: self.cell_to_particles[cell_idx] = [] self.cell_to_particles[cell_idx].append(idx) def coord_to_cell_index(self, coord): """ Convert a 3D coordinate to a 1D cell index with periodic boundaries. Args: coord: Particle coordinate (x, y, z) Returns: int: 1D cell index """ # Apply periodic boundary conditions rel_pos = coord - self.bounds[:, 0] periodic_pos = rel_pos % self.box_lengths # Calculate grid coordinates i, j, k = (periodic_pos / self.actual_cell_size).astype(int) # Apply periodic wrapping i = i % self.nx j = j % self.ny k = k % self.nz # Convert to 1D index: index = i + j*nx + k*nx*ny return int(i + j * self.nx + k * self.nx * self.ny) def get_neighboring_cells(self, cell_index): """ Get the 26 neighboring cells (plus central cell) for a given cell index. Args: cell_index: Central cell index Returns: list: Indices of 27 neighboring cells (including central cell) """ # Convert 1D index to 3D grid coordinates k = cell_index // (self.nx * self.ny) remainder = cell_index % (self.nx * self.ny) j = remainder // self.nx i = remainder % self.nx neighbors = [] # Iterate over 3x3x3 neighborhood for dx in [-1, 0, 1]: for dy in [-1, 0, 1]: for dz in [-1, 0, 1]: # Apply periodic boundaries ni = (i + dx) % self.nx nj = (j + dy) % self.ny nk = (k + dz) % self.nz # Convert back to 1D index neighbor_index = ni + nj * self.nx + nk * (self.nx * self.ny) neighbors.append(neighbor_index) return neighbors def get_particles_in_neighboring_cells(self, coord): """ Get all particle indices in the 27 neighboring cells of a given coordinate. This includes all particles in the same cell as the coordinate. Args: coord: Reference coordinate (x, y, z) Returns: list: Indices of particles in neighboring cells """ if self.cell_to_particles is None: raise RuntimeError("Particles not assigned. Call assign_particles() first.") # Get central cell index center_idx = self.coord_to_cell_index(coord) # Get all neighboring cell indices (27 cells) neighbor_cells = self.get_neighboring_cells(center_idx) # Collect all particles in these cells particles = [] for cell_idx in neighbor_cells: if cell_idx in self.cell_to_particles: particles.extend(self.cell_to_particles[cell_idx]) return particles def get_neighbor_particles_excluding_self(self, particle_index): """ Get neighbor particles excluding the particle itself. Args: particle_index: Index of the particle to find neighbors for Returns: list: Indices of neighboring particles (excluding self) """ if self.particles is None: raise RuntimeError("Particles not assigned. Call assign_particles() first.") # Get particle coordinate coord = self.particles[particle_index] # Get all particles in neighboring cells (including self) all_neighbors = self.get_particles_in_neighboring_cells(coord) # Remove self from the list neighbors_excluding_self = [idx for idx in all_neighbors if idx != particle_index] return neighbors_excluding_self def get_particles_in_same_cell(self, coord): """ Get all particle indices in the same cell as the given coordinate. Args: coord: Reference coordinate (x, y, z) Returns: list: Indices of particles in the same cell """ if self.cell_to_particles is None: raise RuntimeError("Particles not assigned. Call assign_particles() first.") # Get cell index cell_idx = self.coord_to_cell_index(coord) # Return particles in this cell if cell_idx in self.cell_to_particles: return self.cell_to_particles[cell_idx].copy() return [] def get_cell_stats(self): """ Get information about the grid system. Returns: dict: Dictionary containing grid statistics """ return { "grid_dimensions": (self.nx, self.ny, self.nz), "total_cells": self.total_cells, "requested_cell_size": self.cell_size, "actual_cell_size": self.actual_cell_size, "box_dimensions": self.box_lengths } class snapshot: def __init__(self): self.nodes = {} class ReaderLammpstrj(): def __init__(self, filename): self.filename = filename self.mmap_scan() # grasp some info from frames list print("The trjectory is successfully loaded!") print("num of frames: %d"%(self.nframes)) def mmap_scan(self): f = open(self.filename, "r") if os.name == 'nt': # Windows: don't use MAP_SHARED or tagname self.mmap = mmap.mmap(f.fileno(), length=0, access=mmap.ACCESS_READ) else: # Linux/Unix: use MAP_SHARED self.mmap = mmap.mmap(f.fileno(), length=0, flags=mmap.MAP_SHARED, access=mmap.ACCESS_READ) # get all the seek self.Seek = [] index_past = 0 while True: index_current = self.mmap.find(b'ITEM: TIMESTEP', index_past) index_past = index_current + 1 if index_current == -1: break self.Seek.append(index_current) self.Seek = np.asarray(self.Seek, dtype=int) self.nframes = len(self.Seek) def mmap_snap(self, ifr): start = self.Seek[ifr] end = self.Seek[ifr + 1] if ifr != len(self.Seek) - 1 else self.mmap.size() - 1 nchars = end - start self.mmap.seek(start) str_frame = self.mmap.read(nchars).decode('utf-8').strip() return self.parse_snap_fromstr(str_frame) def parse_snap_fromstr(self, snap_str): snap = snapshot() snap_lines = snap_str.split('\n') for i, line in enumerate(snap_lines): sp = line.strip().split() if sp[0] == "ITEM:": if sp[1] == "TIMESTEP": snap.timestep = int(snap_lines[i + 1].strip()) elif sp[1] == "NUMBER": snap.natoms = int(snap_lines[i + 1].strip()) elif sp[1] == "BOX": snap.dimension = len(sp[3:]) edge = np.asarray([[float(l) for l in snap_lines[i + _].strip().split()] for _ in range(1, snap.dimension + 1)]).reshape(-1) snap.edge = edge lx = snap.edge[1] - snap.edge[0] ly = snap.edge[3] - snap.edge[2] lz = snap.edge[5] - snap.edge[4] snap.box = np.array([lx, ly, lz]) snap.bounds = edge.reshape(-1, 2) elif sp[1] == "ATOMS": atom_header = sp[2:] io_str = StringIO("\n".join(snap_lines[i + 1:])) atom_value = np.loadtxt(io_str) # grap info for ah in atom_header: if ah not in snap.nodes: snap.nodes[ah] = [] index = atom_header.index(ah) snap.nodes[ah] = atom_value[:, index] # add some info to snap snap.types = np.unique(snap.nodes['type']) snap.natoms = len(snap.nodes['type']) snap.nodes['type'] = snap.nodes['type'].astype(int) # change xs to x if scaled if 'xs' in snap.nodes: snap.nodes['x'] = snap.nodes['xs'] * \ snap.box[0] + snap.edge[0] snap.nodes['y'] = snap.nodes['ys'] * \ snap.box[1] + snap.edge[2] snap.nodes['z'] = snap.nodes['zs'] * \ snap.box[2] + snap.edge[4] # add pos snap.pos = np.c_[snap.nodes['x'], snap.nodes['y'], snap.nodes['z']] return snap class ReaderLammpsData: def __init__(self, filename): self.filename = filename self.scan() def scan(self): import linecache strLines = np.asarray(linecache.getlines(self.filename)) self.nodes = {} for i, line in enumerate(strLines): sp = line.split() if len(sp) == 2 and sp[-1] in ['atoms', 'bonds', 'angles', 'dihedrals', 'impropers']: key = 'num_' + sp[-1] self.nodes[key] = int(sp[0]) elif len(sp) == 3 and sp[-1] == 'types': key = 'num_' + sp[1] + '_' + sp[-1] self.nodes[key] = int(sp[0]) elif len(sp) == 4 and sp[-1][-2:] == 'hi': key = 'edge_' + sp[-1][0] self.nodes[key] = [float(sp[0]), float(sp[1])] elif len(sp) > 0 and sp[0] == 'Masses': index_start = i + 2 index_end = index_start + self.nodes['num_atom_types'] sectionLines = strLines[index_start:index_end] self.nodes['Masses'] = np.loadtxt( StringIO(''.join(sectionLines))) elif len(sp) > 0 and sp[0] == 'Atoms': index_start = i + 2 index_end = index_start + self.nodes['num_atoms'] sectionLines = strLines[index_start:index_end] io_str = StringIO(''.join(sectionLines)) self.nodes['Atoms'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=float) elif len(sp) > 0 and sp[0] == 'Bonds': index_start = i + 2 index_end = index_start + self.nodes['num_bonds'] sectionLines = strLines[index_start:index_end] self.nodes['Bonds'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) elif len(sp) > 0 and sp[0] == 'Angles': index_start = i + 2 index_end = index_start + self.nodes['num_angles'] sectionLines = strLines[index_start:index_end] self.nodes['Angles'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) elif len(sp) > 0 and sp[0] == 'Dihedrals': index_start = i + 2 index_end = index_start + self.nodes['num_dihedrals'] sectionLines = strLines[index_start:index_end] self.nodes['Dihedrals'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) elif len(sp) > 0 and sp[0] == 'Impropers': index_start = i + 2 index_end = index_start + self.nodes['num_impropers'] sectionLines = strLines[index_start:index_end] self.nodes['impropers'] = np.loadtxt( StringIO(''.join(sectionLines)), dtype=int) # get types self.atom_types = self.nodes['Atoms'][:, 2] self.types = np.unique(self.atom_types) # get bonded atom id for all atoms self.bonded = {} for bond in self.nodes['Bonds']: ida, idb = bond[2], bond[3] if ida not in self.bonded: self.bonded[ida] = [] self.bonded[ida].append(idb) else: self.bonded[ida].append(idb) if idb not in self.bonded: self.bonded[idb] = [] self.bonded[idb].append(ida) else: self.bonded[idb].append(ida) class compute_hbonds: def __init__(self, donors_type, hydrogens_type, acceptors_type, dist, alpha): self.donors_type = donors_type self.hydrogens_type = hydrogens_type self.acceptors_type = acceptors_type self.dist = dist self.dist2 = dist * dist self.alpha = alpha # 添加初始化 self.timesteps = [] # 存储每帧的时间步 self.frame_hbonds = [] # 存储每帧的氢键信息 self.get_index_fromInitData() self.run() def get_index_fromInitData(self): # get donors id_donors = [] for d in self.donors_type: ret = np.argwhere(D.atom_types == d).reshape(-1) id_donors.extend(D.nodes['Atoms'][ret, 0]) # get hydrogens id_hydrogens = [] for d in self.hydrogens_type: ret = np.argwhere(D.atom_types == d).reshape(-1) id_hydrogens.extend(D.nodes['Atoms'][ret, 0]) # get acceptors id_acceptors = [] for d in self.acceptors_type: ret = np.argwhere(D.atom_types == d).reshape(-1) id_acceptors.extend(D.nodes['Atoms'][ret, 0]) self.id_donors = np.asarray(id_donors, dtype=int) self.id_hydrogens = np.asarray(id_hydrogens, dtype=int) self.id_acceptors = np.asarray(id_acceptors, dtype=int) print("------------------------") print("num of donors: %d" % (self.id_donors.shape[0])) print("num of hydrogens: %d" % (len(self.id_hydrogens))) print("num of acceptors: %d" % (self.id_acceptors.shape[0])) print("------------------------") def run(self): o = open('hbonds_rawdata.csv', 'w') o.write("index,timestep,id_donor,id_hydrogen,id_acceptor,distance,angle,x_donor,y_donor,z_donor,x_h,y_h,z_h,x_acceptor,y_acceptor,z_acceptor\n") for ifr in range(T.nframes): ret_hbonds = [] id_hbonds = [] snap = T.mmap_snap(ifr) print('Tackle Frame Timestep: %d, index: %d' % (snap.timestep, ifr)) # 添加时间步记录 self.timesteps.append(snap.timestep) # 记录当前帧的时间步 hbond_cnt = 0 # link cell list cell_list = CellList(snap.bounds, 3.0) cell_list.assign_particles(snap.pos) for i, don in enumerate(self.id_donors): index = np.argwhere(snap.nodes['id'] == don).reshape(-1)[0] pos_don = np.asarray([snap.nodes['x'][index], snap.nodes['y'][index], snap.nodes['z'][index]]) # get neighboring acceptors neighbors = cell_list.get_neighbor_particles_excluding_self(index) id_neigh_acceptors = [] for nei in neighbors: if snap.nodes['type'][nei] in self.acceptors_type: id_neigh_acceptors.append(snap.nodes['id'][nei]) for h in D.bonded[don]: # check if used if h in id_hbonds: continue if h in self.id_hydrogens: index_h = np.argwhere( snap.nodes['id'] == h).reshape(-1)[0] pos_h = np.asarray([snap.nodes['x'][index_h], snap.nodes['y'][index_h], snap.nodes['z'][index_h]]) vec_h = pbc(pos_h-pos_don, snap.box) else: continue # cycle all the acceptors, like O-H...O for acc in id_neigh_acceptors: # check if used if acc == don: continue index_acc = np.argwhere( snap.nodes['id'] == acc).reshape(-1)[0] pos_acc = np.asarray( [snap.nodes['x'][index_acc], snap.nodes['y'][index_acc], snap.nodes['z'][index_acc]]) # check dist vec_acc = pbc(pos_acc-pos_don, snap.box) dist2 = (vec_acc**2).sum() if dist2 < self.dist2: # check angle angle = (vec_h * vec_acc).sum() / np.linalg.norm(vec_h) / np.linalg.norm(vec_acc) angle = np.arccos(angle) / np.pi * 180 if angle < self.alpha: # record this hydrogen bond hbond_cnt += 1 ret_hbonds.append( [don, h, acc, pos_don, pos_h, pos_acc, dist2**0.5, angle]) id_hbonds.extend([don, h, acc]) # wirte to csv o.write("%d,%d,%d,%d,%d,%.4f,%.4f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f,%.7f\n"%(ifr, snap.timestep, don, h, acc, dist2**0.5, angle, pos_don[0], pos_don[1], pos_don[2], pos_h[0], pos_h[1],pos_h[2],pos_acc[0],pos_acc[1],pos_acc[2])) print('hbonds: %d' % (hbond_cnt)) # append hbonds self.frame_hbonds.append(ret_hbonds) o.close() def compute_ditribution_along_vec(self, direction, start, end, dx=5): print('-----------------') if direction == 'x': index = 0 elif direction == 'y': index = 1 else: index = 2 result = [] # get position-direction bins = np.arange(start, end+0.1, dx) for frame in self.frame_hbonds: pos = np.asarray([hb[4][index] for hb in frame]) hist, e = np.histogram(pos, density=False, bins=bins) result.append(hist) rr = bins[:-1] + np.diff(bins) final_result = np.c_[rr, np.asarray(result).T] np.savetxt(outname, final_result, fmt='%.2f') print('calculation finished!') # 添加C(t)计算方法 def compute_Ct(self): """计算C(t)间歇性氢键自相关函数并输出""" # 获取初始帧的氢键集合 initial_hbonds = set() for hbond in self.frame_hbonds[0]: don, h, acc = hbond[0], hbond[1], hbond[2] initial_hbonds.add((don, h, acc)) N0 = len(initial_hbonds) # 初始氢键数量 if N0 == 0: print("Warning: No hydrogen bonds found at initial frame.") return # 初始化C(t)数组 Ct = np.zeros(len(self.frame_hbonds)) timesteps = np.zeros(len(self.frame_hbonds)) # 计算每帧的存活氢键数 for t in range(len(self.frame_hbonds)): current_hbonds = set() for hbond in self.frame_hbonds[t]: don, h, acc = hbond[0], hbond[1], hbond[2] current_hbonds.add((don, h, acc)) # 计算当前帧存活的初始氢键数 surviving = len(initial_hbonds & current_hbonds) Ct[t] = surviving / N0 timesteps[t] = self.timesteps[t] # 输出C(t)结果 np.savetxt('hbond_Ct.txt', np.column_stack((timesteps, Ct)), header='Timestep C(t)', fmt='%d %.6f') print("C(t) saved to hbond_Ct.txt") def get_args(): parser = argparse.ArgumentParser( description='Calculate the hydrogen bonds distribution along one direction.') parser.add_argument( '--f', type=str, help='filename of lammps dump file', required=True) parser.add_argument( '--d', type=str, help='filename of lammps data file', required=True) parser.add_argument( '--donor', type=str, help="donor type, seperate by ',', like '7,9'", required=True) parser.add_argument('--hydrogen', type=str, help="hydrogen type, seperate by ',', like '5,6'", required=True) parser.add_argument('--acceptor', type=str, help="acceptor type, seperate by ',', like '3,4'", required=True) parser.add_argument('--axis', type=str, help="distribution along axis", required=True) parser.add_argument('--start', type=float, help="histogram bin start along axis", required=True) parser.add_argument('--end', type=float, help="histogram bin end along axis", required=True) parser.add_argument('--binsize', type=float, help="histogram bin size along axis", required=True) parser.add_argument('--outname', type=str, help="dump filename. \nThe fist column: position tick along axis \n \ The other columns: hbonds distribution for all the frames", default='hbond.txt') parser.add_argument('--dist', type=float, help="distance criteria in hydrogen bond, default 3.5", default=3.5) parser.add_argument('--alpha', type=float, help="angle criteria in hydrogen bond, default 30", default=30.0) args = parser.parse_args() return args if __name__ == '__main__': args = get_args() donors = [int(_) for _ in args.donor.split(',')] acceptors = [int(_) for _ in args.acceptor.split(',')] hydrogens = [int(_) for _ in args.hydrogen.split(',')] dist = float(args.dist) alpha = float(args.alpha) axis = args.axis start = args.start end = args.end binsize = float(args.binsize) outname = args.outname # read and calculation T = ReaderLammpstrj(args.f) D = ReaderLammpsData(args.d) # print all types and num print('-----------------') for p in np.unique(D.atom_types): num = len(np.argwhere(D.atom_types == p).reshape(-1)) print('type %d: %d' % (p, num)) C = compute_hbonds(donors, hydrogens, acceptors, dist, alpha) C.compute_ditribution_along_vec(axis, start, end, binsize) # 添加C(t)计算 C.compute_Ct() # 计算并输出C(t)

import numpy as np import matplotlib.pyplot as plt from pymatgen.io.vasp import Vasprun from pymatgen.core import Structure from scipy.optimize import curve_fit from tqdm import tqdm import matplotlib as mpl from collections import defaultdict from multiprocessing import Pool, cpu_count import time import argparse import os from scipy.spatial import cKDTree from scipy.signal import savgol_filter # ================ 配置区域 ================ # 在这里设置您的 vasprun.xml 文件默认路径 DEFAULT_VASPRUN_PATH = "C:/Users/zheng/vasprun4.xml" # 更改为您的实际路径 # ========================================= # 1. 设置期刊绘图格式 (The Journal of Chemical Physics) mpl.rcParams['font.family'] = 'sans-serif' mpl.rcParams['font.sans-serif'] = ['Arial', 'DejaVu Sans', 'Liberation Sans'] mpl.rcParams['font.size'] = 10 mpl.rcParams['axes.linewidth'] = 1.2 mpl.rcParams['xtick.major.width'] = 1.2 mpl.rcParams['ytick.major.width'] = 1.2 mpl.rcParams['lines.linewidth'] = 2 mpl.rcParams['figure.dpi'] = 300 mpl.rcParams['figure.figsize'] = (3.5, 2.8) # 紧凑尺寸适合期刊 mpl.rcParams['savefig.dpi'] = 600 mpl.rcParams['savefig.bbox'] = 'tight' mpl.rcParams['savefig.pad_inches'] = 0.05 mpl.rcParams['pdf.fonttype'] = 42 mpl.rcParams['mathtext.fontset'] = 'dejavusans' # 2. 增强的原子类型识别函数 - 明确区分所有原子类型 def identify_atom_types(struct): """识别所有关键原子类型并排除自身化学键""" # 磷酸氧分类 p_oxygens = {"P=O": [], "P-O": [], "P-OH": []} phosphate_hydrogens = [] # 仅P-OH基团中的H原子 # 水合氢离子识别 hydronium_oxygens = [] hydronium_hydrogens = [] # H₃O⁺中的H原子 # 普通水分子 water_oxygens = [] water_hydrogens = [] # 普通水中的H原子 # 氟离子 fluoride_atoms = [i for i, site in enumerate(struct) if site.species_string == "F"] # 铝离子 aluminum_atoms = [i for i, site in enumerate(struct) if site.species_string == "Al"] # 创建快速邻居查找表 neighbor_cache = defaultdict(list) for i, site in enumerate(struct): if site.species_string == "O": neighbors = struct.get_neighbors(site, r=1.3) h_neighbors = [n[0] for n in neighbors if n[0].species_string == "H"] neighbor_cache[i] = [h.index for h in h_neighbors] # 识别水合氢离子 (H₃O⁺) if len(h_neighbors) == 3: hydronium_oxygens.append(i) hydronium_hydrogens.extend([h.index for h in h_neighbors]) # 识别磷酸基团 for site in struct: if site.species_string == "P": neighbors = struct.get_neighbors(site, r=2.0) # 扩大搜索半径 # 筛选氧原子邻居 o_neighbors = [(n[0], n[1]) for n in neighbors if n[0].species_string == "O"] if len(o_neighbors) < 4: # 如果找不到4个氧原子,使用旧方法 for neighbor in o_neighbors: nn_site = neighbor[0] if neighbor[1] < 1.55: p_oxygens["P=O"].append(nn_site.index) else: if any(n[0].species_string == "H" for n in struct.get_neighbors(nn_site, r=1.3)): p_oxygens["P-OH"].append(nn_site.index) else: p_oxygens["P-O"].append(nn_site.index) continue # 按距离排序 o_neighbors.sort(key=lambda x: x[1]) # 最近的氧原子为P=O p_double_o = o_neighbors[0][0] p_oxygens["P=O"].append(p_double_o.index) # 其他三个氧原子 for i in range(1, 4): o_site = o_neighbors[i][0] # 检查氧原子上是否有氢 if neighbor_cache.get(o_site.index, []): p_oxygens["P-OH"].append(o_site.index) else: p_oxygens["P-O"].append(o_site.index) # 识别P-OH基团中的H原子 (磷酸中的H) for o_idx in p_oxygens["P-OH"]: # 获取与P-OH氧相连的H原子 h_neighbors = neighbor_cache.get(o_idx, []) phosphate_hydrogens.extend(h_neighbors) # 识别普通水分子 (排除磷酸氧和水合氢离子) for i, site in enumerate(struct): if site.species_string == "O" and i not in hydronium_oxygens: is_phosphate_oxygen = False for cat in p_oxygens.values(): if i in cat: is_phosphate_oxygen = True break if not is_phosphate_oxygen: water_oxygens.append(i) # 识别普通水分子中的H原子 (水中的H) for o_idx in water_oxygens: h_neighbors = neighbor_cache.get(o_idx, []) water_hydrogens.extend(h_neighbors) return { "phosphate_oxygens": p_oxygens, "phosphate_hydrogens": phosphate_hydrogens, "water_oxygens": water_oxygens, "water_hydrogens": water_hydrogens, "hydronium_oxygens": hydronium_oxygens, "hydronium_hydrogens": hydronium_hydrogens, "fluoride_atoms": fluoride_atoms, "aluminum_atoms": aluminum_atoms } # 3. 预计算结构特征 def precompute_structural_features(structure): """预计算结构特征减少重复计算""" atom_types = identify_atom_types(structure) features = { 'atom_types': atom_types, 'volume': structure.volume, 'structure': structure } # 预构建KDTree用于快速距离查询 all_coords = np.array([site.coords for site in structure]) features['kdtree'] = cKDTree(all_coords) return features # 4. 氢键识别函数 (基于新分类) def identify_h_bonds_optimized(features, bond_type, r_cut=3.5, angle_cut=130): """使用预计算特征识别氢键""" struct = features['structure'] atom_types = features['atom_types'] kdtree = features['kdtree'] h_bonds = [] # 从原子类型字典中获取各类原子 p_oxy = atom_types["phosphate_oxygens"] po4_o = p_oxy["P=O"] + p_oxy["P-O"] + p_oxy["P-OH"] po4_h = atom_types["phosphate_hydrogens"] water_o = atom_types["water_oxygens"] water_h = atom_types["water_hydrogens"] hydronium_o = atom_types["hydronium_oxygens"] hydronium_h = atom_types["hydronium_hydrogens"] fluoride = atom_types["fluoride_atoms"] # 1. F与水中的H (F...H-O_water) if bond_type == 'F_H2O': for f_idx in fluoride: # 查找水中的H原子 for h_idx in water_h: # 排除化学键 (F-H) if struct.get_distance(f_idx, h_idx) < 1.3: continue # 检查距离 d_fh = struct.get_distance(f_idx, h_idx) if d_fh > r_cut: continue # 获取H所属的O原子 o_idx = next((o for o in water_o if h_idx in atom_types["water_hydrogens"]), None) if o_idx is None: continue # 计算角度 angle = struct.get_angle(o_idx, h_idx, f_idx) if angle > angle_cut: h_bonds.append((o_idx, f_idx)) # 2. F与水合氢离子的H (F...H-H3O) elif bond_type == 'F_H3O': for f_idx in fluoride: for h_idx in hydronium_h: # 排除化学键 if struct.get_distance(f_idx, h_idx) < 1.3: continue d_fh = struct.get_distance(f_idx, h_idx) if d_fh > r_cut: continue # 获取H所属的O原子 (水合氢离子) o_idx = next((o for o in hydronium_o if h_idx in atom_types["hydronium_hydrogens"]), None) if o_idx is None: continue angle = struct.get_angle(o_idx, h_idx, f_idx) if angle > angle_cut: h_bonds.append((o_idx, f_idx)) # 3. F与磷酸上的H (F...H-PO4) elif bond_type == 'F_PO4': for f_idx in fluoride: for h_idx in po4_h: # 排除化学键 if struct.get_distance(f_idx, h_idx) < 1.3: continue d_fh = struct.get_distance(f_idx, h_idx) if d_fh > r_cut: continue # 获取H所属的O原子 (磷酸) o_idx = next((o for o in p_oxy["P-OH"] if h_idx in atom_types["phosphate_hydrogens"]), None) if o_idx is None: continue angle = struct.get_angle(o_idx, h_idx, f_idx) if angle > angle_cut: h_bonds.append((o_idx, f_idx)) # 4. 磷酸与磷酸之间的氢键 elif bond_type == 'PO4_PO4': # 受体: 所有磷酸氧 acceptors = po4_o # 供体: P-OH基团 donors = [(o_idx, h_idx) for o_idx in p_oxy["P-OH"] for h_idx in atom_types["phosphate_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: # 查找附近的H原子 h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in po4_h: continue # 检查是否属于供体 donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue # 排除自身化学键 if struct.get_distance(acc_o, donor_o) < 1.8: continue # 计算角度 angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 5. 水与水之间的氢键 elif bond_type == 'H2O_H2O': # 受体: 水中的O acceptors = water_o # 供体: 水中的H donors = [(o_idx, h_idx) for o_idx in water_o for h_idx in atom_types["water_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in water_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None or donor_o == acc_o: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 6. 水合氢离子与水合氢离子之间的氢键 elif bond_type == 'H3O_H3O': acceptors = hydronium_o donors = [(o_idx, h_idx) for o_idx in hydronium_o for h_idx in atom_types["hydronium_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in hydronium_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None or donor_o == acc_o: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 7. 磷酸与水之间的氢键 (磷酸作为受体) elif bond_type == 'PO4_H2O_a': acceptors = po4_o donors = [(o_idx, h_idx) for o_idx in water_o for h_idx in atom_types["water_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in water_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 8. 磷酸与水之间的氢键 (磷酸作为供体) elif bond_type == 'PO4_H2O_d': acceptors = water_o donors = [(o_idx, h_idx) for o_idx in p_oxy["P-OH"] for h_idx in atom_types["phosphate_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in po4_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 9. 水与水合氢离子 (水作为受体) elif bond_type == 'H3O_H2O_a': acceptors = water_o donors = [(o_idx, h_idx) for o_idx in hydronium_o for h_idx in atom_types["hydronium_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in hydronium_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 10. 水与水合氢离子 (水合氢离子作为受体) elif bond_type == 'H3O_H2O_d': acceptors = hydronium_o donors = [(o_idx, h_idx) for o_idx in water_o for h_idx in atom_types["water_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in water_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 11. 磷酸与水合氢离子 (磷酸作为受体) elif bond_type == 'PO4_H3O_a': acceptors = po4_o donors = [(o_idx, h_idx) for o_idx in hydronium_o for h_idx in atom_types["hydronium_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in hydronium_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) # 12. 磷酸与水合氢离子 (磷酸作为供体) elif bond_type == 'PO4_H3O_d': acceptors = hydronium_o donors = [(o_idx, h_idx) for o_idx in p_oxy["P-OH"] for h_idx in atom_types["phosphate_hydrogens"] if struct.get_distance(o_idx, h_idx) < 1.3] for acc_o in acceptors: h_indices = kdtree.query_ball_point(struct[acc_o].coords, r=r_cut) for h_idx in h_indices: if h_idx not in po4_h: continue donor_o = next((o for o, h in donors if h == h_idx), None) if donor_o is None: continue if struct.get_distance(acc_o, donor_o) < 1.8: continue angle = struct.get_angle(donor_o, h_idx, acc_o) if angle > angle_cut: h_bonds.append((donor_o, acc_o)) return h_bonds # 5. 并行处理框架 (保持原样) def process_frame(args): """处理单个帧的并行函数""" i, structure, bond_types = args features = precompute_structural_features(structure) frame_results = {} for bt in bond_types: bonds = identify_h_bonds_optimized(features, bt) density = len(bonds) / features['volume'] * 1e24 / 6.022e23 frame_results[bt] = (bonds, density) return i, frame_results def calculate_tcf_parallel(h_bond_list, max_dt, n_workers): """并行计算时间关联函数""" total_bonds = len(h_bond_list[0]) if len(h_bond_list) > 0 else 0 tcf = np.zeros(max_dt) counts = np.zeros(max_dt, dtype=int) total_frames = len(h_bond_list) if total_frames - max_dt <= 0: print(f"Warning: Not enough frames for TCF calculation (total_frames={total_frames}, max_dt={max_dt})") return np.zeros(max_dt) tasks = [(t0, h_bond_list, max_dt) for t0 in range(total_frames - max_dt)] with Pool(n_workers) as pool: results = list(tqdm(pool.imap(process_tcf_task, tasks), total=len(tasks), desc="Parallel TCF Calculation")) for tcf_part, counts_part in results: tcf += tcf_part counts += counts_part valid_counts = np.where(counts > 0, counts, 1) tcf = tcf / valid_counts return tcf / total_bonds if total_bonds > 0 else np.zeros(max_dt) def process_tcf_task(args): """处理单个TCF任务的并行函数""" t0, h_bond_list, max_dt = args ref_bonds = set(h_bond_list[t0]) local_tcf = np.zeros(max_dt) local_counts = np.zeros(max_dt, dtype=int) for dt in range(max_dt): t = t0 + dt if t >= len(h_bond_list): break current_bonds = set(h_bond_list[t]) persistent_bonds = ref_bonds & current_bonds local_tcf[dt] = len(persistent_bonds) local_counts[dt] = 1 return local_tcf, local_counts # 6. 主函数 (优化版) def main_optimized(workers, vasprun_path): print(f"Using {workers} workers for parallel processing") print(f"Using vasprun file: {vasprun_path}") # 时间参数定义 time_per_step = 0.2e-3 skip_steps = 5 frame_interval = time_per_step * skip_steps print(f"Time per frame: {frame_interval:.6f} ps") # 读取VASP轨迹 print(f"Reading vasprun.xml...") try: vr = Vasprun(vasprun_path, ionic_step_skip=skip_steps) structures = vr.structures total_frames = len(structures) print(f"Loaded {total_frames} frames") except Exception as e: print(f"Error loading vasprun file: {str(e)}") return # 计算总模拟时间 total_sim_time = total_frames * frame_interval print(f"Total simulation time: {total_sim_time:.3f} ps") # 设置时间窗口 max_time_window = min(2.0, total_sim_time * 0.5) max_dt_frames = int(max_time_window / frame_interval) print(f"Setting TCF time window: {max_time_window:.3f} ps ({max_dt_frames} frames)") # 定义分析的氢键类型 (12种) bond_types = [ 'F_H2O', # F...H-O_water 'F_H3O', # F...H-H3O 'F_PO4', # F...H-PO4 'PO4_PO4', # PO4...PO4 (磷酸间) 'H2O_H2O', # O_water-H...O_water 'H3O_H3O', # H3O...H3O 'PO4_H2O_a', # PO4...H-O_water (磷酸氧作为受体) 'PO4_H2O_d', # PO4-H...O_water (磷酸H作为供体) 'H3O_H2O_a', # H3O...H-O_water (水合氢离子作为受体) 'H3O_H2O_d', # H3O-H...O_water (水合氢离子作为供体) 'PO4_H3O_a', # PO4...H-H3O (磷酸氧作为受体) 'PO4_H3O_d' # PO4-H...H3O (磷酸H作为供体) ] # 友好的标签名称 bond_labels = { 'F_H2O': 'F···H-O$_{water}$', 'F_H3O': 'F···H-H$_3$O$^+$', 'F_PO4': 'F···H-PO$_4$', 'PO4_PO4': 'PO$_4$···PO$_4$', 'H2O_H2O': 'O$_{water}$-H···O$_{water}$', 'H3O_H3O': 'H$_3$O$^+$···H$_3$O$^+$', 'PO4_H2O_a': 'PO$_4$···H-O$_{water}$', 'PO4_H2O_d': 'PO$_4$-H···O$_{water}$', 'H3O_H2O_a': 'H$_3$O$^+$···H-O$_{water}$', 'H3O_H2O_d': 'H$_3$O$^+$-H···O$_{water}$', 'PO4_H3O_a': 'PO$_4$···H-H$_3$O$^+$', 'PO4_H3O_d': 'PO$_4$-H···H$_3$O$^+$' } # 并行计算氢键密度 print(f"Parallel hydrogen bond calculation with {workers} workers...") start_time = time.time() # 创建任务列表 tasks = [(i, structure, bond_types) for i, structure in enumerate(structures)] # 使用进程池并行处理 densities = {bt: np.zeros(total_frames) for bt in bond_types} h_bond_lists = {bt: [None] * total_frames for bt in bond_types} try: with Pool(workers) as pool: results = list(tqdm(pool.imap(process_frame, tasks), total=total_frames, desc="Processing Frames")) # 整理结果 for i, frame_results in results: for bt in bond_types: bonds, density = frame_results[bt] densities[bt][i] = density h_bond_lists[bt][i] = bonds except Exception as e: print(f"Error during parallel processing: {str(e)}") return # 计算平均密度 avg_densities = {bt: np.mean(densities[bt]) for bt in bond_types} hb_time = time.time() - start_time print(f"H-bond calculation completed in {hb_time:.2f} seconds") # 并行计算TCF print("Parallel TCF calculation...") tcf_results = {} lifetimes = {} reliability = {} for bt in bond_types: print(f"\nProcessing bond type: {bt}") h_bond_list = h_bond_lists[bt] start_time = time.time() try: tcf = calculate_tcf_parallel(h_bond_list, max_dt_frames, workers) tcf_results[bt] = tcf except Exception as e: print(f"Error calculating TCF for {bt}: {str(e)}") tcf_results[bt] = np.zeros(max_dt_frames) continue # 创建时间轴 (转换为ps) t_frames = np.arange(len(tcf)) t_ps = t_frames * frame_interval # 指数拟合 mask = (tcf > 0.01) & (t_ps < max_time_window * 0.8) if np.sum(mask) > 3: try: popt, _ = curve_fit(exp_decay, t_ps[mask], tcf[mask], p0=[1.0, 1.0], maxfev=5000) lifetimes[bt] = popt[0] if lifetimes[bt] > max_time_window * 0.8: reliability[bt] = "unreliable (τ > 80% time window)" else: reliability[bt] = "reliable" print(f"Fit successful: τ = {lifetimes[bt]:.3f} ps ({reliability[bt]})") except Exception as e: print(f"Fit failed for {bt}: {str(e)}") lifetimes[bt] = np.nan reliability[bt] = "fit failed" else: print(f"Not enough data points for fitting {bt}") lifetimes[bt] = np.nan reliability[bt] = "insufficient data" tcf_time = time.time() - start_time print(f"TCF calculation for {bt} completed in {tcf_time:.2f} seconds") # 绘图 - 优化期刊格式 # 使用颜色循环,避免太多颜色 colors = plt.cm.tab20(np.linspace(0, 1, len(bond_types))) # 1. 氢键密度图 fig1, ax1 = plt.subplots(figsize=(3.5, 2.8)) time_axis = np.arange(len(structures)) * frame_interval # 选择最重要的6种氢键类型显示 selected_types = ['F_H2O', 'F_H3O', 'F_PO4', 'PO4_PO4', 'H2O_H2O', 'PO4_H2O_a'] for i, bt in enumerate(bond_types): if bt in selected_types: ax1.plot(time_axis, densities[bt], color=colors[i], label=bond_labels[bt], alpha=0.8, linewidth=1.5) ax1.set_xlabel('Time (ps)', fontsize=10) ax1.set_ylabel('Concentration (mol/L)', fontsize=10) ax1.legend(frameon=False, fontsize=8, loc='upper center', bbox_to_anchor=(0.5, -0.25), ncol=2) fig1.tight_layout(pad=0.5) fig1.subplots_adjust(bottom=0.35) # 为图例留出空间 fig1.savefig('hbond_densities.pdf') print("Saved hbond_densities.pdf") # 2. 时间关联函数图 (对数坐标) fig2, ax2 = plt.subplots(figsize=(3.5, 2.8)) # 选择最重要的6种氢键类型显示 for i, bt in enumerate(bond_types): if bt in selected_types: t_ps = np.arange(len(tcf_results[bt])) * frame_interval # 创建标签 label = bond_labels[bt] if bt in lifetimes and not np.isnan(lifetimes[bt]): label += f' (τ={lifetimes[bt]:.2f} ps)' if reliability.get(bt, "").startswith("unreliable"): label += '*' ax2.semilogy(t_ps, tcf_results[bt], color=colors[i], label=label, linewidth=1.5) # 标记时间窗口上限 ax2.axvline(x=max_time_window, color='gray', linestyle='--', alpha=0.7, linewidth=1) ax2.text(max_time_window*1.02, 0.5, 'Time window', rotation=90, va='center', ha='left', fontsize=7) ax2.set_xlabel('Time (ps)', fontsize=10) ax2.set_ylabel('Hydrogen Bond TCF', fontsize=10) ax2.set_xlim(0, max_time_window * 1.1) ax2.set_ylim(1e-3, 1.1) ax2.legend(frameon=False, fontsize=7, loc='upper right', bbox_to_anchor=(1.0, 1.0)) # 添加可靠性说明 if any("unreliable" in rel for bt, rel in reliability.items() if isinstance(rel, str)): ax2.text(0.95, 0.05, "* Unreliable fit (τ > 80% time window)", transform=ax2.transAxes, ha='right', fontsize=6) fig2.tight_layout(pad=0.5) fig2.savefig('hbond_tcf.pdf') print("Saved hbond_tcf.pdf") # 3. 氢键寿命比较图 fig3, ax3 = plt.subplots(figsize=(3.5, 2.8)) # 收集可靠寿命数据 valid_lifetimes = [] valid_labels = [] for bt in selected_types: if bt in lifetimes and not np.isnan(lifetimes[bt]) and "reliable" in reliability.get(bt, ""): valid_lifetimes.append(lifetimes[bt]) valid_labels.append(bond_labels[bt]) if valid_lifetimes: y_pos = np.arange(len(valid_lifetimes)) ax3.barh(y_pos, valid_lifetimes, color=colors[:len(valid_lifetimes)], alpha=0.8) ax3.set_yticks(y_pos) ax3.set_yticklabels(valid_labels, fontsize=8) ax3.set_xlabel('Lifetime (ps)', fontsize=10) ax3.set_title('Hydrogen Bond Lifetimes', fontsize=10) fig3.tight_layout(pad=0.5) fig3.savefig('hbond_lifetimes.pdf') print("Saved hbond_lifetimes.pdf") # 打印结果 print("\nResults Summary:") print("="*85) print("{:<20} {:<15} {:<15} {:<20}".format( "Bond Type", "Avg Density", "Lifetime (ps)", "Reliability")) print("-"*85) for bt in bond_types: lifetime = lifetimes.get(bt, np.nan) rel = reliability.get(bt, "N/A") print("{:<20} {:<15.4f} {:<15.3f} {:<20}".format( bond_labels[bt], avg_densities[bt], lifetime, rel )) print("="*85) def exp_decay(t, tau, A): return A * np.exp(-t / tau) if __name__ == "__main__": # 命令行参数解析 parser = argparse.ArgumentParser(description='Hydrogen Bond Lifetime Analysis') parser.add_argument('--workers', type=int, default=max(1, cpu_count() - 2), help='Number of parallel workers (default: CPU cores - 2)') parser.add_argument('--vasprun', type=str, default=DEFAULT_VASPRUN_PATH, help='Path to vasprun.xml file') args = parser.parse_args() print(f"Starting analysis with {args.workers} workers") print(f"Using vasprun file: {args.vasprun}") main_optimized(workers=args.workers, vasprun_path=args.vasprun)以上为VASP计算氢键寿命,但是现在我需要计算LAMMPS,在计算LAMMPS的时候可以参考一下代码import MDAnalysis as mda import numpy as np import numba as nb from MDAnalysis.lib.distances import capped_distance, calc_angles, distance_array import multiprocessing as mp from tqdm.auto import tqdm import os import argparse import matplotlib.pyplot as plt import pandas as pd import zarr import time import gc import sys from scipy.spatial import cKDTree # ====================== 优化参数设置 ====================== DEFAULT_MAX_FRAMES = 1000 # 只处理前100ps (1000帧) DEFAULT_MEM_LIMIT = 100 # GB (126GB内存中保留6GB给系统) DEFAULT_N_WORKERS = 58 # 默认使用64个核心 BOX_SIZE = 80.0 CALC_REGION = np.array([0, 80, 0, 80, 0, 80], dtype=np.float32) HB_DIST_CUTOFF = 3.5 HB_ANGLE_CUTOFF = 130 FRAME_DT = 0.1 # ps TAU_MAX = 20.0 # ps DT_ORIGIN = 2.0 # ps MEMORY_LIMIT = 100 # GB (126GB内存中保留6GB给系统) # 原子类型常量 - 扩展包含氟原子 O_WATER, O_HYDRONIUM, O_PHYDROXYL, O_PDOUBLE, O_PBRIDGE = 0, 1, 2, 3, 4 H_WATER, H_HYDRONIUM, H_PHOSPHORIC, P, F_FLUORIDE, H_FLUORIDE = 5, 6, 7, 8, 9, 10 UNKNOWN = 99 # 原子类型反向映射 ATOM_TYPE_NAMES = { O_WATER: 'O_water', O_HYDRONIUM: 'O_hydronium', O_PHYDROXYL: 'O_phydroxyl', O_PDOUBLE: 'O_pdouble', O_PBRIDGE: 'O_pbridge', H_WATER: 'H_water', H_HYDRONIUM: 'H_hydronium', H_PHOSPHORIC: 'H_phosphoric', P: 'P', F_FLUORIDE: 'F', H_FLUORIDE: 'H_fluoride', UNKNOWN: 'UNK' } # ====================== 核心优化函数 ====================== @nb.njit(nogil=True, fastmath=True, cache=True, parallel=True) def classify_atoms_vectorized(positions, types, box): """使用Numba加速的向量化原子分类函数,包含氟原子处理""" n_atoms = len(types) atom_labels = np.full(n_atoms, UNKNOWN, dtype=np.int8) # 提取原子索引 o_indices = np.where(types == 2)[0] # 氧原子 h_indices = np.where(types == 1)[0] # 氢原子 p_indices = np.where(types == 7)[0] # 磷原子 f_indices = np.where(types == 3)[0] # 氟原子 # 标记氟原子 for f_idx in f_indices: atom_labels[f_idx] = F_FLUORIDE # 预计算O-H距离矩阵 oh_distances = np.empty((len(o_indices), len(h_indices)), dtype=np.float32) for i in nb.prange(len(o_indices)): o_idx = o_indices[i] for j in range(len(h_indices)): h_idx = h_indices[j] d = positions[o_idx] - positions[h_idx] d -= np.round(d / box) * box # PBC校正 oh_distances[i, j] = np.sqrt(np.sum(d**2)) # 预计算P-O距离矩阵 po_distances = np.empty((len(p_indices), len(o_indices)), dtype=np.float32) for i in nb.prange(len(p_indices)): p_idx = p_indices[i] for j in range(len(o_indices)): o_idx = o_indices[j] d = positions[p_idx] - positions[o_idx] d -= np.round(d / box) * box po_distances[i, j] = np.sqrt(np.sum(d**2)) # 预计算F-H距离矩阵 fh_distances = np.empty((len(f_indices), len(h_indices)), dtype=np.float32) for i in nb.prange(len(f_indices)): f_idx = f_indices[i] for j in range(len(h_indices)): h_idx = h_indices[j] d = positions[f_idx] - positions[h_idx] d -= np.round(d / box) * box fh_distances[i, j] = np.sqrt(np.sum(d**2)) # 分类氧原子 (并行) for i in nb.prange(len(o_indices)): o_idx = o_indices[i] h_count = np.sum(oh_distances[i] < 1.2) # O-H键长阈值 p_count = 0 for j in range(len(p_indices)): if po_distances[j, i] < 1.8: # P-O键长阈值 p_count += 1 if h_count == 3: atom_labels[o_idx] = O_HYDRONIUM elif h_count == 2: atom_labels[o_idx] = O_WATER elif h_count == 1 and p_count == 1: atom_labels[o_idx] = O_PHYDROXYL elif p_count == 1: atom_labels[o_idx] = O_PDOUBLE elif p_count == 2: atom_labels[o_idx] = O_PBRIDGE # 分类氢原子 (并行) for j in nb.prange(len(h_indices)): h_idx = h_indices[j] min_dist = 10.0 min_o_idx = -1 for i in range(len(o_indices)): if oh_distances[i, j] < min_dist: min_dist = oh_distances[i, j] min_o_idx = o_indices[i] # 检查是否连接到氟原子 min_f_dist = 10.0 min_f_idx = -1 for i in range(len(f_indices)): if fh_distances[i, j] < min_f_dist: min_f_dist = fh_distances[i, j] min_f_idx = f_indices[i] # 优先判断氟键 if min_f_dist < 1.2: # F-H键长阈值 atom_labels[h_idx] = H_FLUORIDE elif min_dist < 1.2: # O-H键长阈值 o_type = atom_labels[min_o_idx] if o_type == O_HYDRONIUM: atom_labels[h_idx] = H_HYDRONIUM elif o_type == O_WATER: atom_labels[h_idx] = H_WATER elif o_type == O_PHYDROXYL: atom_labels[h_idx] = H_PHOSPHORIC # 分类磷原子 for p_idx in p_indices: atom_labels[p_idx] = P return atom_labels def find_hbonds_kdtree(positions, atom_labels, box, kdtree=None): """使用KD树加速的氢键检测,包含氟键处理""" # 识别供体和受体 donor_mask = np.isin(atom_labels, [O_WATER, O_HYDRONIUM, O_PHYDROXYL, F_FLUORIDE]) acceptor_mask = np.isin(atom_labels, [O_WATER, O_HYDRONIUM, O_PDOUBLE, O_PBRIDGE, O_PHYDROXYL, F_FLUORIDE]) donor_indices = np.where(donor_mask)[0] acceptor_indices = np.where(acceptor_mask)[0] if len(donor_indices) == 0 or len(acceptor_indices) == 0: return [], kdtree # 创建或复用KD树 if kdtree is None: kdtree = cKDTree(positions[acceptor_indices], boxsize=box[:3]) # 找到供体氢原子 h_mask = ((atom_labels == H_WATER) | (atom_labels == H_HYDRONIUM) | (atom_labels == H_PHOSPHORIC) | (atom_labels == H_FLUORIDE)) h_positions = positions[h_mask] h_indices = np.where(h_mask)[0] hbonds = [] processed_pairs = set() # 避免重复处理 # 处理每个供体 for donor_idx in donor_indices: # 找到与供体相连的氢原子 donor_pos = positions[donor_idx] d_h_dist = distance_array(donor_pos.reshape(1, -1), h_positions, box=box)[0] connected_h = np.where(d_h_dist < 1.2)[0] for h_idx in connected_h: hydrogen_idx = h_indices[h_idx] hydrogen_pos = positions[hydrogen_idx] # 使用KD树查找附近受体 neighbors = kdtree.query_ball_point(hydrogen_pos, HB_DIST_CUTOFF) for j in neighbors: acceptor_idx = acceptor_indices[j] # 跳过自相互作用 if donor_idx == acceptor_idx: continue # 检查是否已处理 pair_id = tuple(sorted((donor_idx, acceptor_idx))) if pair_id in processed_pairs: continue processed_pairs.add(pair_id) # 计算角度 angle = calc_angles(donor_pos, hydrogen_pos, positions[acceptor_idx], box=box) angle_deg = np.degrees(angle) if angle_deg > HB_ANGLE_CUTOFF: dist = np.linalg.norm(hydrogen_pos - positions[acceptor_idx]) # 确定氢键类型 donor_type = ATOM_TYPE_NAMES.get(atom_labels[donor_idx], 'UNK').split('_')[0] acceptor_type = ATOM_TYPE_NAMES.get(atom_labels[acceptor_idx], 'UNK').split('_')[0] # 简化类型命名 if donor_type == 'O' and acceptor_type == 'O': hbond_type = 'O-O' elif donor_type == 'O' and acceptor_type == 'P': hbond_type = 'O-P' elif donor_type == 'P' and acceptor_type == 'O': hbond_type = 'P-O' elif donor_type == 'P' and acceptor_type == 'P': hbond_type = 'P-P' elif donor_type == 'H3O' and acceptor_type == 'O': hbond_type = 'H3O-O' elif donor_type == 'H3O' and acceptor_type == 'P': hbond_type = 'H3O-P' elif donor_type == 'F' or acceptor_type == 'F': hbond_type = f"{donor_type}-F" if acceptor_type == 'F' else f"F-{acceptor_type}" else: hbond_type = f"{donor_type}-{acceptor_type}" hbonds.append(( donor_idx, hydrogen_idx, acceptor_idx, dist, angle_deg, hbond_type )) return hbonds, kdtree # ====================== 并行处理框架 ====================== def process_frame_chunk(args): """处理一批帧的优化函数,包含详细调试输出""" start_idx, end_idx, top_file, traj_file, debug = args try: universe = mda.Universe(top_file, traj_file, topology_format="DATA", format="LAMMPSDUMP", atom_style="id type charge x y z") # 预加载区域掩码 universe.trajectory[0] positions = universe.atoms.positions mask = ((positions[:, 0] >= CALC_REGION[0]) & (positions[:, 0] <= CALC_REGION[1]) & (positions[:, 1] >= CALC_REGION[2]) & (positions[:, 1] <= CALC_REGION[3]) & (positions[:, 2] >= CALC_REGION[4]) & (positions[:, 2] <= CALC_REGION[5])) filtered_indices = np.where(mask)[0] types_all = universe.atoms.types.astype(int) if debug: print(f"区域筛选: 原始原子数 {len(positions)} -> 筛选后 {len(filtered_indices)}") results = [] kdtree = None for frame_idx in range(start_idx, end_idx): if frame_idx >= len(universe.trajectory): break universe.trajectory[frame_idx] positions = universe.atoms.positions # 应用区域筛选 filtered_pos = positions[filtered_indices] filtered_types = types_all[filtered_indices] # 分类原子 atom_labels = classify_atoms_vectorized( filtered_pos, filtered_types, universe.dimensions ) if debug: unique, counts = np.unique(atom_labels, return_counts=True) print(f"\n帧 {frame_idx} 原子类型统计:") for u, c in zip(unique, counts): print(f" {ATOM_TYPE_NAMES.get(u, 'UNKNOWN')}: {c}") # 识别氢键 hbonds, kdtree = find_hbonds_kdtree( filtered_pos, atom_labels, universe.dimensions, kdtree ) if debug: print(f"检测到 {len(hbonds)} 个氢键") if len(hbonds) > 0: for i, hb in enumerate(hbonds[:min(5, len(hbonds))]): donor_name = ATOM_TYPE_NAMES.get(atom_labels[hb[0]], "UNK") acceptor_name = ATOM_TYPE_NAMES.get(atom_labels[hb[2]], "UNK") print(f" 氢键 {i+1}: {donor_name}-{acceptor_name} " f"距离={hb[3]:.2f}Å, 角度={hb[4]:.1f}°") # 存储压缩的氢键信息 hbond_data = [] for hb in hbonds: # 简化类型映射 type_code = { 'O-O': 0, 'O-P': 1, 'P-O': 2, 'P-P': 3, 'H3O-O': 4, 'H3O-P': 5, 'O-F': 6, 'P-F': 7, 'F-O': 8, 'F-P': 9, 'F-F': 10, 'H3O-F': 11, 'F-H3O': 12 }.get(hb[5], 13) # 默认其他类型 hbond_data.append((hb[0], hb[1], hb[2], type_code)) results.append((frame_idx, np.array(hbond_data, dtype=np.int32))) return results except Exception as e: import traceback print(f"处理帧块 [{start_idx}-{end_idx}] 时出错: {str(e)}") print(traceback.format_exc()) # 返回空结果但保持结构 error_results = [] for frame_idx in range(start_idx, min(end_idx, len(universe.trajectory))): error_results.append((frame_idx, np.array([], dtype=np.int32))) return error_results # ====================== 氢键跟踪优化 ====================== @nb.njit(nogil=True, fastmath=True, parallel=True) def track_hbonds_batch(origin_frame, positions_cache, hbond_data, tau_max_frames, box): """批量跟踪氢键寿命的Numba优化函数""" n_hbonds = len(hbond_data) survival_matrix = np.zeros((n_hbonds, tau_max_frames), dtype=np.uint8) # 为所有氢键并行处理 for i in nb.prange(n_hbonds): donor, hydrogen, acceptor, _ = hbond_data[i] # 检查初始帧中的氢键是否存在 if donor >= positions_cache[0].shape[0] or \ hydrogen >= positions_cache[0].shape[0] or \ acceptor >= positions_cache[0].shape[0]: continue # 初始距离和角度 d0 = positions_cache[0][donor] h0 = positions_cache[0][hydrogen] a0 = positions_cache[0][acceptor] # 检查初始条件 dist0 = np.sqrt(np.sum((h0 - a0)**2)) angle_rad = calc_angles(d0, h0, a0, box=box) angle0 = np.degrees(angle_rad) if dist0 > HB_DIST_CUTOFF or angle0 < HB_ANGLE_CUTOFF: continue survival_matrix[i, 0] = 1 # 跟踪后续帧 for t in range(1, tau_max_frames): if t >= len(positions_cache): break pos_t = positions_cache[t] if donor >= pos_t.shape[0] or hydrogen >= pos_t.shape[0] or acceptor >= pos_t.shape[0]: break dt = pos_t[donor] ht = pos_t[hydrogen] at = pos_t[acceptor] # 应用PBC校正 d_vec = ht - dt d_vec -= np.round(d_vec / box) * box a_vec = at - ht a_vec -= np.round(a_vec / box) * box dist = np.sqrt(np.sum((ht - at)**2)) angle_rad = calc_angles(dt, ht, at, box=box) angle = np.degrees(angle_rad) if dist <= HB_DIST_CUTOFF and angle >= HB_ANGLE_CUTOFF: survival_matrix[i, t] = 1 else: break return survival_matrix # ====================== 氢键跟踪全局函数 ====================== def process_origin(origin, all_hbonds, traj_cache, n_frames, tau_max_frames, box): """处理单个时间原点的氢键寿命跟踪""" # 获取当前原点帧的氢键 origin_hbonds = all_hbonds[origin] if origin_hbonds is None or len(origin_hbonds) == 0: return np.zeros(tau_max_frames), np.zeros(tau_max_frames) # 准备帧数据缓存 frame_cache = [] for t in range(tau_max_frames): frame_idx = origin + t if frame_idx < n_frames: frame_cache.append(traj_cache.get_frame(frame_idx)) else: break # 批量跟踪氢键 survival_matrix = track_hbonds_batch( origin, frame_cache, origin_hbonds, tau_max_frames, box ) # 计算SCF scf = np.sum(survival_matrix, axis=0) count = np.array([np.sum(survival_matrix[:, :t+1].any(axis=1)) for t in range(tau_max_frames)]) return scf, count # 全局包装函数 def process_origin_wrapper(args): """包装器函数,用于传递多个参数""" return process_origin(*args) # ====================== 内存管理优化 ====================== class TrajectoryCache: """高效轨迹缓存管理器""" def __init__(self, universe, max_frames, mem_limit_gb): self.universe = universe self.n_atoms = len(universe.atoms) self.frame_size = self.n_atoms * 3 * 4 # 每帧大小 (bytes) self.max_frames = min(max_frames, len(universe.trajectory)) self.mem_limit = mem_limit_gb * 1e9 # 计算内存中可以缓存的帧数 self.cache_capacity = max(1, int(self.mem_limit * 0.7 / self.frame_size)) self.cache = {} self.access_counter = {} self.counter = 0 # 创建Zarr磁盘缓存 cache_dir = f'temp_positions_cache_{os.getpid()}' os.makedirs(cache_dir, exist_ok=True) self.store = zarr.DirectoryStore(cache_dir) self.root = zarr.group(store=self.store, overwrite=True) self.disk_cache = self.root.zeros( 'positions', shape=(self.max_frames, self.n_atoms, 3), chunks=(1, self.n_atoms, 3), dtype='float32' ) # 预填充磁盘缓存 for frame_idx in tqdm(range(self.max_frames), desc="预缓存轨迹"): self.universe.trajectory[frame_idx] self.disk_cache[frame_idx] = universe.atoms.positions.astype(np.float32) def get_frame(self, frame_idx): """获取帧数据,使用内存+磁盘混合缓存""" if frame_idx not in self.cache: # 如果缓存已满,移除最久未使用的 if len(self.cache) >= self.cache_capacity: least_used = min(self.access_counter, key=self.access_counter.get) del self.cache[least_used] del self.access_counter[least_used] # 从磁盘加载到内存 self.cache[frame_idx] = self.disk_cache[frame_idx][:] # 更新访问计数 self.counter += 1 self.access_counter[frame_idx] = self.counter return self.cache[frame_idx] def cleanup(self): """清理缓存""" self.cache.clear() self.access_counter.clear() gc.collect() # 清理磁盘缓存 if os.path.exists(self.store.path): import shutil shutil.rmtree(self.store.path) # ====================== 主程序优化 ====================== def main(top_file, traj_file, output_prefix, max_frames=DEFAULT_MAX_FRAMES, mem_limit=DEFAULT_MEM_LIMIT, workers=DEFAULT_N_WORKERS, debug=False): print(f"加载拓扑文件: {top_file}") print(f"加载轨迹文件: {traj_file}") # 创建Universe universe = mda.Universe(top_file, traj_file, topology_format="DATA", format="LAMMPSDUMP", atom_style="id type charge x y z") n_frames = min(len(universe.trajectory), max_frames) n_atoms = len(universe.atoms) print(f"系统信息: {n_atoms} 个原子, {n_frames} 帧 ({n_frames*FRAME_DT}ps)") # 打印原子类型分布 (调试) if debug: print("\n原子类型分布 (第一帧):") unique_types, counts = np.unique(universe.atoms.types.astype(int), return_counts=True) for t, c in zip(unique_types, counts): print(f" 类型 {t}: {c} 个原子") # 初始化轨迹缓存 traj_cache = TrajectoryCache(universe, n_frames, mem_limit) # ===== 第一阶段: 氢键检测 ===== print("\n===== 阶段1: 氢键检测 =====") start_time = time.time() # 计算批处理大小 frame_size = n_atoms * 3 * 4 # 位置数据大小 (bytes) batch_size = max(1, int(mem_limit * 0.3 * 1e9 / (frame_size * workers))) batches = [(i, min(i+batch_size, n_frames)) for i in range(0, n_frames, batch_size)] print(f"使用 {len(batches)} 个批次, 每批 {batch_size} 帧") print(f"启动 {workers} 个工作进程...") # 并行处理帧 all_hbonds = [None] * n_frames with mp.Pool(processes=workers) as pool: args = [(start, end, top_file, traj_file, debug) for start, end in batches] results = list(tqdm(pool.imap_unordered(process_frame_chunk, args, chunksize=1), total=len(batches), desc="氢键检测")) # 整合结果 for batch_result in results: for frame_idx, hbonds in batch_result: all_hbonds[frame_idx] = hbonds hbond_detect_time = time.time() - start_time print(f"氢键检测完成! 耗时: {hbond_detect_time:.2f} 秒") # ===== 第二阶段: 氢键寿命跟踪 ===== print("\n===== 阶段2: 氢键寿命跟踪 =====") start_time = time.time() tau_max_frames = int(TAU_MAX / FRAME_DT) dt_origin_frames = int(DT_ORIGIN / FRAME_DT) origins = list(range(0, n_frames - tau_max_frames, dt_origin_frames)) print(f"跟踪 {len(origins)} 个时间原点的氢键寿命...") print(f"时间窗口: {TAU_MAX}ps ({tau_max_frames}帧)") # 准备结果数组 scf_total = np.zeros(tau_max_frames) count_total = np.zeros(tau_max_frames) # 准备参数列表 args_list = [(origin, all_hbonds, traj_cache, n_frames, tau_max_frames, universe.dimensions) for origin in origins] # 并行处理时间原点 with mp.Pool(processes=workers) as pool: results = list(tqdm( pool.imap(process_origin_wrapper, args_list, chunksize=10), total=len(origins), desc="跟踪氢键" )) for scf, count in results: valid_len = min(len(scf), len(scf_total)) scf_total[:valid_len] += scf[:valid_len] count_total[:valid_len] += count[:valid_len] # 清理缓存 traj_cache.cleanup() # ===== 结果处理 ===== print("\n===== 阶段3: 结果分析 =====") # 计算平均SCF scf_avg = np.zeros_like(scf_total) valid_mask = count_total > 0 scf_avg[valid_mask] = scf_total[valid_mask] / count_total[valid_mask] # 计算弛豫时间 time_axis = np.arange(tau_max_frames) * FRAME_DT tau_relax = np.trapz(scf_avg, time_axis) # 氢键统计 hbond_counts = {} for frame_hbonds in all_hbonds: if frame_hbonds is None: continue for hb in frame_hbonds: hb_type = hb[3] hbond_counts[hb_type] = hbond_counts.get(hb_type, 0) + 1 total_hbonds = sum(hbond_counts.values()) avg_hbonds = total_hbonds / n_frames if n_frames > 0 else 0 # 计算氢键密度 calc_volume = ((CALC_REGION[1]-CALC_REGION[0]) * (CALC_REGION[3]-CALC_REGION[2]) * (CALC_REGION[5]-CALC_REGION[4])) * 1e-27 # m³ hbond_density = avg_hbonds / (calc_volume * 6.022e23) # mol/L # 保存结果 scf_df = pd.DataFrame({ 'Time (ps)': time_axis, 'SCF': scf_avg, 'Count': count_total }) scf_df.to_csv(f"{output_prefix}_scf.csv", index=False) stats_data = { 'Avg_HBonds_per_frame': avg_hbonds, 'HBond_Density_mol/L': hbond_density, 'Relaxation_Time_ps': tau_relax, 'Total_Frames': n_frames, 'HBond_Detection_Time_s': hbond_detect_time, 'Lifetime_Tracking_Time_s': time.time() - start_time } # 添加按类型的氢键统计 type_names = { 0: 'O-O', 1: 'O-P', 2: 'P-O', 3: 'P-P', 4: 'H3O-O', 5: 'H3O-P', 6: 'O-F', 7: 'P-F', 8: 'F-O', 9: 'F-P', 10: 'F-F', 11: 'H3O-F', 12: 'F-H3O', 13: 'Other' } for type_code, count in hbond_counts.items(): stats_data[f"HBond_{type_names.get(type_code, 'Unknown')}"] = count / n_frames stats_df = pd.DataFrame([stats_data]) stats_df.to_csv(f"{output_prefix}_stats.csv", index=False) # 绘图 plt.figure(figsize=(10, 7), dpi=150) plt.plot(time_axis, scf_avg, 'b-', linewidth=2.5, label='SCF') plt.fill_between(time_axis, scf_avg, alpha=0.2, color='blue') # 添加指数拟合 try: from scipy.optimize import curve_fit def exp_decay(t, a, tau): return a * np.exp(-t / tau) popt, _ = curve_fit(exp_decay, time_axis, scf_avg, p0=[1.0, 5.0]) plt.plot(time_axis, exp_decay(time_axis, *popt), 'r--', label=f'Exp Fit: τ={popt[1]:.2f}ps') except: pass plt.xlabel('Time (ps)', fontsize=14, fontfamily='serif') plt.ylabel('Survival Probability', fontsize=14, fontfamily='serif') plt.title(f'H-Bond Lifetime in Phosphoric Acid Solution\nτ_relax = {tau_relax:.2f} ps', fontsize=16, fontfamily='serif') plt.grid(alpha=0.3, linestyle='--') plt.xlim(0, TAU_MAX) plt.ylim(0, 1.05) plt.legend(fontsize=12) plt.tight_layout() # 添加统计信息框 stats_text = (f"Avg HBonds/frame: {avg_hbonds:.1f}\n" f"HBond density: {hbond_density:.2f} mol/L\n" f"Frames analyzed: {n_frames}") plt.figtext(0.75, 0.25, stats_text, bbox=dict(facecolor='white', alpha=0.8), fontsize=10, fontfamily='monospace') plt.savefig(f"{output_prefix}_lifetime.png", dpi=300, bbox_inches='tight') total_time = time.time() - start_time print(f"\n分析完成! 总耗时: {total_time/60:.2f} 分钟") print(f"结果保存到: {output_prefix}_*.csv/png") if __name__ == "__main__": # 简化命令行参数解析 parser = argparse.ArgumentParser( description='优化版氢键寿命分析 - 湿法磷酸体系专用', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # 必需参数 parser.add_argument('top_file', type=str, help='拓扑文件路径 (.data)') parser.add_argument('traj_file', type=str, help='轨迹文件路径 (.lammpstrj)') parser.add_argument('output_prefix', type=str, help='输出文件前缀') # 可选参数(设置默认值) parser.add_argument('--workers', type=int, default=DEFAULT_N_WORKERS, help=f'并行工作进程数 (默认: {DEFAULT_N_WORKERS})') parser.add_argument('--max_frames', type=int, default=DEFAULT_MAX_FRAMES, help=f'处理的最大帧数 (默认: {DEFAULT_MAX_FRAMES})') parser.add_argument('--mem_limit', type=int, default=DEFAULT_MEM_LIMIT, help=f'内存限制 (GB) (默认: {DEFAULT_MEM_LIMIT})') parser.add_argument('--debug', action='store_true', help='启用详细调试输出') args = parser.parse_args() # 打印配置信息 print("="*60) print(f"氢键寿命分析配置:") print(f" 拓扑文件: {args.top_file}") print(f" 轨迹文件: {args.traj_file}") print(f" 输出前缀: {args.output_prefix}") print(f" 工作进程: {args.workers}") print(f" 处理帧数: {args.max_frames} (前 {args.max_frames * FRAME_DT}ps)") print(f" 内存限制: {args.mem_limit}GB") print(f" 调试模式: {'开启' if args.debug else '关闭'}") print("="*60) # 检查文件是否存在 if not os.path.exists(args.top_file): print(f"错误: 拓扑文件不存在 - {args.top_file}") sys.exit(1) if not os.path.exists(args.traj_file): print(f"错误: 轨迹文件不存在 - {args.traj_file}") sys.exit(1) # 确保输出目录存在 output_dir = os.path.dirname(args.output_prefix) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) try: main( args.top_file, args.traj_file, args.output_prefix, max_frames=args.max_frames, mem_limit=args.mem_limit, workers=args.workers, debug=args.debug ) except Exception as e: print(f"程序执行出错: {str(e)}") import traceback traceback.print_exc() sys.exit(1) 后面这部分的代码为LAMMPS计算的代码,但是LAMMPS计算的代码显示结果为空而报错,可能是识别氢键为0也可能是有空表格导致,我也不清楚,但是上面VASP的部分已经能够成功运行,我希望你能够参考vasp的代码来优化LAMMPS的代码,针对LAMMPS的代码首先需要加速计算,在这里注意pickle问题,执行命令如LAMMPS的参考,出图部分需要符合The Journal of Chemical Physics期刊要求,注意int字符,主要字体情况,本电脑有64核126g内存,使用58核和100g内存。不要因为时间问题省略代码,输出的代码不要有错别字以及常规错误

import jieba import numpy as np import torch import sqlalchemy from sqlalchemy import create_engine, text, Column, Integer, String from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.dialects.mssql import VARBINARY from transformers import AutoModel, AutoTokenizer import re # 配置参数(保持不变) MODEL_PATH = r"C:\Users\Lenovo\nepu_qa_project\local_models\bge-small-zh" DB_CONN_STR = "mssql+pyodbc://sa:123456@LAPTOP-6KK5U1P0/nepu_qa_db?driver=ODBC+Driver+17+for+SQL+Server" # 初始化数据库(保持不变) Base = declarative_base() class NepuData(Base): __tablename__ = 'nepudata' id = Column(Integer, primary_key=True) url = Column(String) full_content = Column(String) scraped_at = Column(String) vector = Column(VARBINARY(8000)) engine = create_engine(DB_CONN_STR) Session = sessionmaker(bind=engine) # 加载模型 print("加载模型中...") tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) model = AutoModel.from_pretrained(MODEL_PATH) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) # 改进的预处理函数 def preprocess_text(text): """增强版文本预处理""" if not text: return "" # 1. 移除HTML标签 text = re.sub(r'<[^>]+>', '', text) # 2. 移除固定前缀 text = re.sub(r'标题\s*:\s*', '', text) text = re.sub(r'-\s*东北石油大学', '', text) text = re.sub(r'当前位置\s*:.*?-\s*', '', text) # 3. 移除系统提示类内容 if "系统提示" in text and "抱歉" in text: return "系统提示内容已移除" # 4. 分割为行并去重 lines = text.split('\n') unique_lines = [] seen = set() for line in lines: clean_line = line.strip() if clean_line and len(clean_line) > 10 and clean_line not in seen: seen.add(clean_line) unique_lines.append(clean_line) # 5. 保留核心内容 return " ".join(unique_lines[:15]) # 改进的向量化函数 def text_to_vector(text: str) -> bytes: """加权池化向量化方法""" # 预处理 cleaned_text = preprocess_text(text) # 空内容处理 if not cleaned_text or "已移除" in cleaned_text: return np.zeros(768).astype(np.float32).tobytes() # 分词 words = jieba.lcut(cleaned_text) processed_text = " ".join(words) # 向量化 inputs = tokenizer( processed_text, padding=True, truncation=True, max_length=512, return_tensors="pt" ).to(device) with torch.no_grad(): outputs = model(**inputs) # 加权平均池化 weights = inputs['attention_mask'].float().to(device) embeddings = (outputs.last_hidden_state * weights.unsqueeze(-1)).sum(dim=1) / weights.sum(dim=1, keepdim=True) return embeddings[0].cpu().numpy().astype(np.float32).tobytes() def final_vector_test(): """最终向量质量测试""" session = Session() try: records = session.execute(text( "SELECT id, full_content, vector FROM nepudata" )).fetchall() print("\n最终向量质量报告:") print(f"共 {len(records)} 条记录") # 分类统计 content_types = {} for r in records: content = r[1] or "" if "新闻" in content: content_types.setdefault("新闻", []).append(r[0]) elif "通知" in content: content_types.setdefault("通知", []).append(r[0]) elif "公告" in content: content_types.setdefault("公告", []).append(r[0]) else: content_types.setdefault("其他", []).append(r[0]) print("\n内容类型分布:") for ctype, ids in content_types.items(): print(f" {ctype}: {len(ids)}条") # 提取向量 vectors = [] for r in records: if r[2]: vectors.append(np.frombuffer(r[2], dtype=np.float32)) # 随机相似度测试 print("\n随机样本相似度:") for i in range(5): idx1, idx2 = np.random.choice(len(vectors), np.random.choice(len(vectors))) sim = np.dot(vectors[idx1], vectors[idx2]) / (np.linalg.norm(vectors[idx1]) * np.linalg.norm(vectors[idx2])) print(f" 记录 {records[idx1][0]} 和 {records[idx2][0]}: {sim:.4f}") # 类内类间相似度 print("\n类内类间相似度对比:") for ctype1, ids1 in content_types.items(): for ctype2, ids2 in content_types.items(): if len(ids1) > 1 and len(ids2) > 1: vecs1 = [vectors[records.index(r)] for r in records if r[0] in ids1] vecs2 = [vectors[records.index(r)] for r in records if r[0] in ids2] # 计算平均相似度 similarities = [] for v1 in vecs1: for v2 in vecs2: sim = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) similarities.append(sim) avg_sim = np.mean(similarities) print(f" {ctype1}({len(ids1)}) vs {ctype2}({len(ids2)}): {avg_sim:.4f}") finally: session.close() if __name__ == "__main__": print("运行最终向量质量测试...") final_vector_test() 这是代码

编码: ascii, 置信度: 1.00 Training until validation scores don't improve for 20 rounds [10] training's auc: 0.999999 valid_1's auc: 0.999999 [20] training's auc: 0.999999 valid_1's auc: 0.999999 Early stopping, best iteration is: [1] training's auc: 0.999999 valid_1's auc: 0.999999 Validation AUC: 1.0000 --------------------------------------------------------------------------- InvalidIndexError Traceback (most recent call last) Cell In[16], line 188 186 samples = prepare_samples(all_see, all_click, all_play) 187 model, features, auc_score = train_model(samples) --> 188 result = predict_new_data(model, features, 'testA_did_show.csv') Cell In[16], line 164, in predict_new_data(model, feature_columns, test_file) 161 user_click_rate = pd.read_csv('user_click_rate.csv', encoding='gbk').set_index('did')['user_click_rate'] 162 video_popularity = pd.read_csv('video_popularity.csv', encoding='gbk').set_index('vid')['video_popularity'] --> 164 test_data['user_click_rate'] = test_data['did'].map(user_click_rate).fillna(0).astype(np.float32) 165 test_data['video_popularity'] = test_data['vid'].map(video_popularity).fillna(0).astype(np.int32) 167 test_data[feature_columns] = test_data[feature_columns].fillna(0) File ~\ANA\Lib\site-packages\pandas\core\series.py:4544, in Series.map(self, arg, na_action) 4464 def map( 4465 self, 4466 arg: Callable | Mapping | Series, 4467 na_action: Literal["ignore"] | None = None, 4468 ) -> Series: 4469 """ 4470 Map values of Series according to an input mapping or function. 4471 (...) 4542 dtype: object 4543 """ -> 4544 new_values = self._map_values(arg, na_action=na_action) 4545 return self._constructor(new_values, index=self.index, copy=False).__finalize__( 4546 self, method="map" 4547 ) File ~\ANA\Lib\site-packages\pandas\core\base.py:919, in IndexOpsMixin._map_values(self, mapper, na_action, convert) 916 arr = self._values 918 if isinstance(arr, ExtensionArray): --> 919 return arr.map(mapper, na_action=na_action) 921 return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert) File ~\ANA\Lib\site-packages\pandas\core\arrays\categorical.py:1530, in Categorical.map(self, mapper, na_action) 1526 na_action = "ignore" 1528 assert callable(mapper) or is_dict_like(mapper) -> 1530 new_categories = self.categories.map(mapper) 1532 has_nans = np.any(self._codes == -1) 1534 na_val = np.nan File ~\ANA\Lib\site-packages\pandas\core\indexes\base.py:6419, in Index.map(self, mapper, na_action) 6383 """ 6384 Map values using an input mapping or function. 6385 (...) 6415 Index(['A', 'B', 'C'], dtype='object') 6416 """ 6417 from pandas.core.indexes.multi import MultiIndex -> 6419 new_values = self._map_values(mapper, na_action=na_action) 6421 # we can return a MultiIndex 6422 if new_values.size and isinstance(new_values[0], tuple): File ~\ANA\Lib\site-packages\pandas\core\base.py:921, in IndexOpsMixin._map_values(self, mapper, na_action, convert) 918 if isinstance(arr, ExtensionArray): 919 return arr.map(mapper, na_action=na_action) --> 921 return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert) File ~\ANA\Lib\site-packages\pandas\core\algorithms.py:1803, in map_array(arr, mapper, na_action, convert) 1799 mapper = mapper[mapper.index.notna()] 1801 # Since values were input this means we came from either 1802 # a dict or a series and mapper should be an index -> 1803 indexer = mapper.index.get_indexer(arr) 1804 new_values = take_nd(mapper._values, indexer) 1806 return new_values File ~\ANA\Lib\site-packages\pandas\core\indexes\base.py:3875, in Index.get_indexer(self, target, method, limit, tolerance) 3872 self._check_indexing_method(method, limit, tolerance) 3874 if not self._index_as_unique: -> 3875 raise InvalidIndexError(self._requires_unique_msg) 3877 if len(target) == 0: 3878 return np.array([], dtype=np.intp) InvalidIndexError: Reindexing only valid with uniquely valued Index objects,请帮我定位并解决问题

>>> splice_data, spliced_ptms, altered_flanks = project.project_ptms_onto_MATS(SE_events\ = SE_data, MXE_events = MXE_data, A5SS_events = A5SS_data, A3SS_events = A3SS_data, RI_\ events = RI_data, coordinate_type = 'hg38', identify_flanking_sequences = True) Projecting PTMs onto MATS splice events using hg38 coordinates. Skipped Exon events: 0%| | 0/3635 [00:00<?, ?it/s] Traceback (most recent call last): File "", line 1, in <module> splice_data, spliced_ptms, altered_flanks = project.project_ptms_onto_MATS(SE_events = SE_data, MXE_events = MXE_data, A5SS_events = A5SS_data, A3SS_events = A3SS_data, RI_events = RI_data, coordinate_type = 'hg38', identify_flanking_sequences = True) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/ptm_pose/project.py", line 416, in project_ptms_onto_MATS spliced_events['SE'], SE_ptms = project_ptms_onto_splice_events(SE_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based', taskbar_label = "Skipped Exon events", separate_modification_types=separate_modification_types, PROCESSES = SE_processes) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/ptm_pose/project.py", line 327, in project_ptms_onto_splice_events splice_data, spliced_ptm_info = find_ptms_in_many_regions(splice_data, ptm_coordinates, chromosome_col = chromosome_col, strand_col = strand_col, region_start_col = region_start_col, region_end_col = region_end_col, dPSI_col = dPSI_col, sig_col = sig_col, event_id_col = event_id_col, gene_col = gene_col, extra_cols = extra_cols, annotate_original_df = annotate_original_df, coordinate_type = coordinate_type,start_coordinate_system=start_coordinate_system, end_coordinate_system=end_coordinate_system, taskbar_label = taskbar_label, separate_modification_types=separate_modification_types) ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/ptm_pose/project.py", line 212, in find_ptms_in_many_regions if annotate_original_df: ^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/pandas/core/generic.py", line 1577, in __nonzero__ raise ValueError( ...<2 lines>... ) ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). 脚本project.py: import numpy as np import pandas as pd import multiprocessing import datetime from ptm_pose import pose_config, helpers from ptm_pose import flanking_sequences as fs from tqdm import tqdm def find_ptms_in_region(ptm_coordinates, chromosome, strand, start, end, gene = None, coordinate_type = 'hg38'): """ Given an genomic region in either hg38 or hg19 coordinates (such as the region encoding an exon of interest), identify PTMs that are mapped to that region. If so, return the exon number. If none are found, return np.nan. Parameters ---------- chromosome: str chromosome where region is located strand: int DNA strand for region is found on (1 for forward, -1 for reverse) start: int start position of region on the chromosome/strand (should always be less than end) end: int end position of region on the chromosome/strand (should always be greater than start) coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. Returns ------- ptms_in_region: pandas.DataFrame dataframe containing all PTMs found in the region. If no PTMs are found, returns np.nan. """ #restrict to PTMs on the same chromosome and strand ptms_in_region = ptm_coordinates[(ptm_coordinates['Chromosome/scaffold name'] == chromosome) & (ptm_coordinates['Strand'] == strand)].copy() if coordinate_type in ['hg18', 'hg19','hg38']: loc_col = f'Gene Location ({coordinate_type})' else: raise ValueError('Coordinate type must be hg38 or hg19') #check to make sure the start value is less than the end coordinate. If it is not, treat the end coordinate as the start and the start coordinate as the end if start < end: ptms_in_region = ptms_in_region[(ptms_in_region[loc_col] >= start) & (ptms_in_region[loc_col] <= end)] else: ptms_in_region = ptms_in_region[(ptms_in_region[loc_col] <= start) & (ptms_in_region[loc_col] >= end)] #extract only PTM information from dataframe and return that and list (if not ptms, return empty dataframe) if not ptms_in_region.empty: #grab uniprot id and residue ptms_in_region = ptms_in_region[['Source of PTM', 'UniProtKB Accession','Isoform ID', 'Isoform Type', 'Residue', 'PTM Position in Isoform', loc_col, 'Modification', 'Modification Class', 'Canonical Flanking Sequence', 'Constitutive', 'MS_LIT', 'MS_CST', 'LT_LIT', 'Compendia', 'Number of Compendia']] #check if ptm is associated with the same gene (if info is provided). if not, do not add if gene is not None: for i, row in ptms_in_region.iterrows(): #if ';' in row['UniProtKB Accession']: # uni_ids = row['UniProtKB Accession'].split(';') # remove = True # for uni in uni_ids: # if row['UniProtKB Accession'] in pose_config.uniprot_to_genename: # if gene in pose_config.uniprot_to_genename[uni.split('-')[0]].split(' '): # remove = False # break # if remove: # ptms_in_region.drop(i) if row['UniProtKB Accession'] in pose_config.uniprot_to_genename: if gene not in pose_config.uniprot_to_genename[row['UniProtKB Accession']].split(' '): ptms_in_region = ptms_in_region.drop(i) else: ptms_in_region = ptms_in_region.drop(i) #make sure ptms still are present after filtering if ptms_in_region.empty: return pd.DataFrame() else: ptms_in_region.insert(0, 'Gene', gene) #calculate proximity to region start and end ptms_in_region['Proximity to Region Start (bp)'] = (ptms_in_region[loc_col] - start).abs() ptms_in_region['Proximity to Region End (bp)'] = (ptms_in_region[loc_col] - end).abs() ptms_in_region['Proximity to Splice Boundary (bp)'] = ptms_in_region.apply(lambda x: min(x['Proximity to Region Start (bp)'], x['Proximity to Region End (bp)']), axis = 1) return ptms_in_region else: return pd.DataFrame() def convert_strand_symbol(strand): """ Given DNA strand information, make sure the strand information is in integer format (1 for forward, -1 for reverse). This is intended to convert from string format ('+' or '-') to integer format (1 or -1), but will return the input if it is already in integer format. Parameters ---------- strand: str or int DNA strand information, either as a string ('+' or '-') or an integer (1 or -1) Returns ------- int DNA strand information as an integer (1 for forward, -1 for reverse) """ if isinstance(strand, str): if strand == '+' or strand == '1': return 1 elif strand == '-' or strand == '-1': return -1 else: return strand def find_ptms_in_many_regions(region_data, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', gene_col = None, dPSI_col = None, sig_col = None, event_id_col = None, extra_cols = None, annotate_original_df = True, coordinate_type = 'hg38', start_coordinate_system = '1-based', end_coordinate_system = '1-based', separate_modification_types = False, taskbar_label = None): """ Given a dataframe with a unique region in each row, project PTMs onto the regions. Assumes that the region data will have chromosome, strand, and genomic start/end positions, and each row corresponds to a unique region. Parameters ---------- ptm_coordinates: pandas.DataFrame dataframe containing PTM information, including chromosome, strand, and genomic location of PTMs region_data: pandas.DataFrame dataframe containing region information, including chromosome, strand, and genomic location of regions of interest chromosome_col: str column name in splice_data that contains chromosome information. Default is 'chr'. Expects it to be a str with only the chromosome number: 'Y', '1', '2', etc. strand_col: str column name in splice_data that contains strand information. Default is 'strand'. Expects it to be a str with '+' or '-', or integers as 1 or -1. Will convert to integers automatically if string format is provided. region_start_col: str column name in splice_data that contains the start position of the region of interest. Default is 'exonStart_0base'. region_end_col: str column name in splice_data that contains the end position of the region of interest. Default is 'exonEnd'. gene_col: str column name in splice_data that contains the gene name. If provided, will be used to make sure the projected PTMs stem from the same gene (some cases where genomic coordiantes overlap between distinct genes). Default is None. event_id_col: str column name in splice_data that contains the unique identifier for the splice event. If provided, will be used to annotate the ptm information with the specific splice event ID. Default is None. coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. separate_modification_types: bool Indicate whether to store PTM sites with multiple modification types as multiple rows. For example, if a site at K100 was both an acetylation and methylation site, these will be separated into unique rows with the same site number but different modification types. Default is True. taskbar_label: str Label to display in the tqdm progress bar. Default is None, which will automatically state "Projecting PTMs onto regions using ----- coordinates". Returns ------- spliced_ptm_info: pandas.DataFrame Contains the PTMs identified across the different splice events splice_data: pandas.DataFrame dataframe containing the original splice data with an additional column 'PTMs' that contains the PTMs found in the region of interest, in the format of 'SiteNumber(ModificationType)'. If no PTMs are found, the value will be np.nan. """ if taskbar_label is None: taskbar_label = 'Projecting PTMs onto regions using ' + coordinate_type + ' coordinates.' if region_data[chromosome_col].str.contains('chr').any(): region_data[chromosome_col] = region_data[chromosome_col].str.strip('chr') spliced_ptm_info = [] spliced_ptms_list = [] num_ptms_affected = [] num_unique_ptm_sites = [] #copy region_data = region_data.copy() #iterate through each row of the splice data and find PTMs in the region for index, row in tqdm(region_data.iterrows(), total = len(region_data), desc = taskbar_label): #grab region information from row chromosome = row[chromosome_col] strand = convert_strand_symbol(row[strand_col]) start = row[region_start_col] end = row[region_end_col] #only provide these if column is given gene = row[gene_col] if gene_col is not None else None #adjust region coordinates if needed (make sure in 1-based coordinate system) if start_coordinate_system == '0-based': start += 1 elif start_coordinate_system != '1-based': raise ValueError("Start coordinate system must be either '0-based' or '1-based'") if end_coordinate_system == '0-based': end += 1 elif end_coordinate_system != '1-based': raise ValueError("End coordinate system must be either '0-based' or '1-based'") #project ptms onto region ptms_in_region = find_ptms_in_region(ptm_coordinates, chromosome, strand, start, end, gene = gene, coordinate_type = coordinate_type) extra_info = {} #add additional context from splice data, if indicated extra_info = {} if event_id_col is not None: extra_info['Region ID'] = row[event_id_col] if dPSI_col is not None: extra_info['dPSI'] = row[dPSI_col] if sig_col is not None: extra_info['Significance'] = row[sig_col] if extra_cols is not None: for col in extra_cols: extra_info[col] = row[col] #add extra info to ptms_in_region ptms_in_region = pd.concat([pd.DataFrame(extra_info, index = ptms_in_region.index), ptms_in_region], axis = 1) #if desired, add ptm information to the original splice event dataframe if annotate_original_df: if not ptms_in_region.empty: #split and separate unique modification types if separate_modification_types: ptms_in_region['Modification Class'] = ptms_in_region['Modification Class'].str.split(';') ptms_in_region = ptms_in_region.explode('Modification Class') ptms_info = ptms_in_region.apply(lambda x: x['UniProtKB Accession'] + '_' + x['Residue'] + str(x['PTM Position in Isoform']) + ' (' + x['Modification Class'] + ')', axis = 1) ptms_str = '/'.join(ptms_info.values) spliced_ptms_list.append(ptms_str) num_ptms_affected.append(ptms_in_region.shape[0]) num_unique_ptm_sites.append(ptms_in_region.groupby(['UniProtKB Accession', 'Residue', 'PTM Position in Isoform']).size().shape[0]) else: spliced_ptms_list.append(np.nan) num_ptms_affected.append(0) num_unique_ptm_sites.append(0) spliced_ptm_info.append(ptms_in_region.copy()) #combine all PTM information spliced_ptm_info = pd.concat(spliced_ptm_info, ignore_index = True) #convert ptm position to float if spliced_ptm_info.shape[0] > 0: spliced_ptm_info['PTM Position in Isoform'] = spliced_ptm_info['PTM Position in Isoform'].astype(float) #add ptm info to original splice event dataframe if annotate_original_df: region_data['PTMs'] = spliced_ptms_list region_data['Number of PTMs Affected'] = num_ptms_affected region_data['Number of Unique PTM Sites by Position'] = num_unique_ptm_sites region_data['Event Length'] = (region_data[region_end_col] - region_data[region_start_col]).abs() region_data['PTM Density (PTMs/bp)'] = (region_data['Number of Unique PTM Sites by Position']*3)/region_data['Event Length'] #multiply by 3 to convert aa to bp (3 bp per codon) return region_data, spliced_ptm_info def project_ptms_onto_splice_events(splice_data,annotate_original_df = True, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', dPSI_col = None, sig_col = None, event_id_col = None, gene_col = None, extra_cols = None, separate_modification_types = False, coordinate_type = 'hg38', start_coordinate_system = '1-based', end_coordinate_system = '1-based', taskbar_label = None, ptm_coordinates = None,PROCESSES = 1, **kwargs): """ Given splice event quantification data, project PTMs onto the regions impacted by the splice events. Assumes that the splice event data will have chromosome, strand, and genomic start/end positions for the regions of interest, and each row of the splice_event_data corresponds to a unique region. Important note: PTM-POSE relies on Ensembl based coordinates (1-based), so if the coordinates are 0-based, make sure to indicate using the start_coordinate_system and end_coordinate_system parameters. For example, rMATS uses 0-based for the start coordinates, but 1-based for the end coordinates. In this case, set start_coordinate_system = '0-based' and end_coordinate_system = '1-based'. Parameters ---------- splice_data: pandas.DataFrame dataframe containing splice event information, including chromosome, strand, and genomic location of regions of interest ptm_coordinates: pandas.DataFrame dataframe containing PTM information, including chromosome, strand, and genomic location of PTMs. If none, it will pull from the config file. chromosome_col: str column name in splice_data that contains chromosome information. Default is 'chr'. Expects it to be a str with only the chromosome number: 'Y', '1', '2', etc. strand_col: str column name in splice_data that contains strand information. Default is 'strand'. Expects it to be a str with '+' or '-', or integers as 1 or -1. Will convert to integers automatically if string format is provided. region_start_col: str column name in splice_data that contains the start position of the region of interest. Default is 'exonStart_0base'. region_end_col: str column name in splice_data that contains the end position of the region of interest. Default is 'exonEnd'. event_id_col: str column name in splice_data that contains the unique identifier for the splice event. If provided, will be used to annotate the ptm information with the specific splice event ID. Default is None. gene_col: str column name in splice_data that contains the gene name. If provided, will be used to make sure the projected PTMs stem from the same gene (some cases where genomic coordiantes overlap between distinct genes). Default is None. dPSI_col: str column name in splice_data that contains the delta PSI value for the splice event. Default is None, which will not include this information in the output sig_col: str column name in splice_data that contains the significance value for the splice event. Default is None, which will not include this information in the output. extra_cols: list list of additional columns to include in the output dataframe. Default is None, which will not include any additional columns. coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. start_coordinate_system: str indicates the coordinate system used for the start position. Either '0-based' or '1-based'. Default is '1-based'. end_coordinate_system: str indicates the coordinate system used for the end position. Either '0-based' or '1-based'. Default is '1-based'. separate_modification_types: bool Indicate whether to store PTM sites with multiple modification types as multiple rows. For example, if a site at K100 was both an acetylation and methylation site, these will be separated into unique rows with the same site number but different modification types. Default is True. taskbar_label: str Label to display in the tqdm progress bar. Default is None, which will automatically state "Projecting PTMs onto regions using ----- coordinates". PROCESSES: int Number of processes to use for multiprocessing. Default is 1 (single processing) **kwargs: additional keyword arguments Additional keyword arguments to pass to the find_ptms_in_many_regions function, which will be fed into the filter_ptms() function from the helper module. These will be used to filter ptms with lower evidence. For example, if you want to filter PTMs based on the number of MS observations, you can add 'min_MS_observations = 2' to the kwargs. This will filter out any PTMs that have less than 2 MS observations. See the filter_ptms() function for more options. Returns ------- spliced_ptm_info: pandas.DataFrame Contains the PTMs identified across the different splice events splice_data: pandas.DataFrame dataframe containing the original splice data with an additional column 'PTMs' that contains the PTMs found in the region of interest, in the format of 'SiteNumber(ModificationType)'. If no PTMs are found, the value will be np.nan. """ #load ptm data from config if not provided if ptm_coordinates is None: ptm_coordinates = pose_config.ptm_coordinates.copy() #check for any keyword arguments to use for filtering if kwargs: filter_arguments = helpers.extract_filter_kwargs(**kwargs) #check any excess unused keyword arguments, report them helpers.check_filter_kwargs(filter_arguments) #filter ptm coordinates file to include only ptms with desired evidence ptm_coordinates = helpers.filter_ptms(ptm_coordinates, **filter_arguments) if taskbar_label is None: taskbar_label = 'Projecting PTMs onto splice events using ' + coordinate_type + ' coordinates.' #copy splice_data = splice_data.copy() #check columns to make sure they are present and correct data type check_columns(splice_data, chromosome_col=chromosome_col, strand_col=strand_col, region_start_col=region_start_col, region_end_col=region_end_col, dPSI_col=dPSI_col, sig_col=sig_col, event_id_col=event_id_col, gene_col=gene_col, extra_cols=extra_cols) if PROCESSES == 1: splice_data, spliced_ptm_info = find_ptms_in_many_regions(splice_data, ptm_coordinates, chromosome_col = chromosome_col, strand_col = strand_col, region_start_col = region_start_col, region_end_col = region_end_col, dPSI_col = dPSI_col, sig_col = sig_col, event_id_col = event_id_col, gene_col = gene_col, extra_cols = extra_cols, annotate_original_df = annotate_original_df, coordinate_type = coordinate_type,start_coordinate_system=start_coordinate_system, end_coordinate_system=end_coordinate_system, taskbar_label = taskbar_label, separate_modification_types=separate_modification_types) elif PROCESSES > 1: #check num_cpus available, if greater than number of cores - 1 (to avoid freezing machine), then set to PROCESSES to 1 less than total number of cores num_cores = multiprocessing.cpu_count() if PROCESSES > num_cores - 1: PROCESSES = num_cores - 1 #split dataframe into chunks equal to PROCESSES splice_data_split = np.array_split(splice_data, PROCESSES) pool = multiprocessing.Pool(PROCESSES) #run with multiprocessing results = pool.starmap(find_ptms_in_many_regions, [(splice_data_split[i], ptm_coordinates, chromosome_col, strand_col, region_start_col, region_end_col, gene_col, dPSI_col, sig_col, event_id_col, extra_cols, annotate_original_df, coordinate_type, start_coordinate_system, end_coordinate_system, separate_modification_types, taskbar_label) for i in range(PROCESSES)]) splice_data = pd.concat([res[0] for res in results]) spliced_ptm_info = pd.concat([res[1] for res in results]) #raise ValueError('Multiprocessing not yet functional. Please set PROCESSES = 1.') print(f'PTMs projection successful ({spliced_ptm_info.shape[0]} identified).\n') return splice_data, spliced_ptm_info def project_ptms_onto_MATS(SE_events = None, A5SS_events = None, A3SS_events = None, RI_events = None, MXE_events = None, coordinate_type = 'hg38', identify_flanking_sequences = False, dPSI_col = 'meanDeltaPSI', sig_col = 'FDR', extra_cols = None, separate_modification_types = False, PROCESSES = 1,ptm_coordinates = None, **kwargs): """ Given splice quantification from the MATS algorithm, annotate with PTMs that are found in the differentially included regions. Parameters ---------- ptm_coordinates: pandas.DataFrame dataframe containing PTM information, including chromosome, strand, and genomic location of PTMs SE_events: pandas.DataFrame dataframe containing skipped exon event information from MATS A5SS_events: pandas.DataFrame dataframe containing 5' alternative splice site event information from MATS A3SS_events: pandas.DataFrame dataframe containing 3' alternative splice site event information from MATS RI_events: pandas.DataFrame dataframe containing retained intron event information from MATS MXE_events: pandas.DataFrame dataframe containing mutually exclusive exon event information from MATS coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. dPSI_col: str Column name indicating delta PSI value. Default is 'meanDeltaPSI'. sig_col: str Column name indicating significance of the event. Default is 'FDR'. extra_cols: list List of column names for additional information to add to the results. Default is None. separate_modification_types: bool Indicate whether residues with multiple modifications (i.e. phosphorylation and acetylation) should be treated as separate PTMs and be placed in unique rows of the output dataframe. Default is False. PROCESSES: int Number of processes to use for multiprocessing. Default is 1. **kwargs: additional keyword arguments Additional keyword arguments to pass to the find_ptms_in_many_regions function, which will be fed into the filter_ptms() function from the helper module. These will be used to filter ptms with lower evidence. For example, if you want to filter PTMs based on the number of MS observations, you can add 'min_MS_observations = 2' to the kwargs. This will filter out any PTMs that have less than 2 MS observations. See the filter_ptms() function for more options. """ #load ptm data from config if not provided if ptm_coordinates is None: ptm_coordinates = pose_config.ptm_coordinates.copy() #check for any keyword arguments to use for filtering if kwargs: filter_arguments = helpers.extract_filter_kwargs(**kwargs) #check any excess unused keyword arguments, report them helpers.check_filter_kwargs(filter_arguments) #filter ptm coordinates file to include only ptms with desired evidence ptm_coordinates = helpers.filter_ptms(ptm_coordinates, **filter_arguments) print(f'Projecting PTMs onto MATS splice events using {coordinate_type} coordinates.') #reformat chromosome name format spliced_events = {} spliced_flanks = [] spliced_ptms = [] if SE_events is not None: if SE_events['chr'].str.contains('chr').any(): SE_events['chr'] = SE_events['chr'].apply(lambda x: x[3:]) SE_events['AS ID'] = "SE_" + SE_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > SE_events.shape[0]: SE_processes = 1 else: SE_processes = PROCESSES spliced_events['SE'], SE_ptms = project_ptms_onto_splice_events(SE_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based', taskbar_label = "Skipped Exon events", separate_modification_types=separate_modification_types, PROCESSES = SE_processes) SE_ptms['Event Type'] = 'SE' spliced_ptms.append(SE_ptms) if identify_flanking_sequences: print('Identifying flanking sequences for skipped exon events.') if 'upstreamES' in SE_events.columns: first_flank_start_col = 'upstreamES' first_flank_end_col = 'upstreamEE' second_flank_start_col = 'downstreamES' second_flank_end_col = 'downstreamEE' elif 'firstFlankingES' in SE_events.columns: first_flank_start_col = 'firstFlankingES' first_flank_end_col = 'firstFlankingEE' second_flank_start_col = 'secondFlankingES' second_flank_end_col = 'secondFlankingEE' else: raise ValueError('Could not find flanking sequence columns in skipped exon event data, based on what is typically outputted by MATS. Please check column names and provide the appropriate columns for the first and second flanking sequences') SE_flanks = fs.get_flanking_changes_from_splice_data(SE_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'exonStart_0base', spliced_region_end_col = 'exonEnd', first_flank_start_col = first_flank_start_col, first_flank_end_col = first_flank_end_col, second_flank_start_col = second_flank_start_col, second_flank_end_col = second_flank_end_col, dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') SE_flanks['Event Type'] = 'SE' spliced_flanks.append(SE_flanks) else: print('Skipped exon event data (SE_events) not provided, skipping') if A5SS_events is not None: if A5SS_events['chr'].str.contains('chr').any(): A5SS_events['chr'] = A5SS['chr'].apply(lambda x: x[3:]) #set the relevent start and end regions of the spliced out region, which are different depending on the strand region_start = [] region_end = [] first_flank_start = [] first_flank_end = [] second_flank_end = [] second_flank_start = [] for i, row in A5SS_events.iterrows(): strand = row['strand'] if strand == '+': region_start.append(row['shortEE']) region_end.append(row['longExonEnd']) if identify_flanking_sequences: first_flank_start.append(row['shortES']) first_flank_end.append(row['shortEE']) second_flank_start.append(row['flankingES']) second_flank_end.append(row['flankingEE']) else: region_start.append(row['longExonStart_0base']) region_end.append(row['shortES']) if identify_flanking_sequences: second_flank_start.append(row['shortES']) second_flank_end.append(row['shortEE']) first_flank_start.append(row['flankingES']) first_flank_end.append(row['flankingEE']) A5SS_events['event_start'] = region_start A5SS_events['event_end'] = region_end if identify_flanking_sequences: A5SS_events['first_flank_start'] = first_flank_start A5SS_events['first_flank_end'] = first_flank_end A5SS_events['second_flank_start'] = second_flank_start A5SS_events['second_flank_end'] = second_flank_end #set specific as id A5SS_events['AS ID'] = "5ASS_" + A5SS_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > A5SS_events.shape[0]: fiveASS_processes = 1 else: fiveASS_processes = PROCESSES #identify PTMs found within spliced regions spliced_events['5ASS'], fiveASS_ptms = project_ptms_onto_splice_events(A5SS_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'event_start', region_end_col = 'event_end', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', coordinate_type=coordinate_type, start_coordinate_system = '0-based', extra_cols = extra_cols, taskbar_label = "5' ASS events", separate_modification_types=separate_modification_types, PROCESSES = fiveASS_processes) fiveASS_ptms['Event Type'] = '5ASS' spliced_ptms.append(fiveASS_ptms) #identify ptms with altered flanking sequences if identify_flanking_sequences: print("Identifying flanking sequences for 5'ASS events.") fiveASS_flanks = fs.get_flanking_changes_from_splice_data(A5SS_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'event_start', spliced_region_end_col = 'event_end', first_flank_start_col = 'first_flank_start', first_flank_end_col = 'first_flank_end', second_flank_start_col = 'second_flank_start', second_flank_end_col = 'second_flank_end',dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') fiveASS_flanks['Event Type'] = '5ASS' spliced_flanks.append(fiveASS_flanks) else: print("5' ASS event data (A5SS_events) not provided, skipping.") if A3SS_events is not None: if RI_events['chr'].str.contains('chr').any(): RI_events['chr'] = RI_events['chr'].apply(lambda x: x[3:]) if A3SS_events['chr'].str.contains('chr').any(): A3SS_events['chr'] = A3SS_events['chr'].apply(lambda x: x[3:]) #set the relevent start and end regions of the spliced out region, which are different depending on the strand region_start = [] region_end = [] first_flank_start = [] first_flank_end = [] second_flank_end = [] second_flank_start = [] for i, row in A3SS_events.iterrows(): strand = row['strand'] if strand == '+': region_start.append(row['longExonStart_0base']) region_end.append(row['shortES']) if identify_flanking_sequences: second_flank_start.append(row['flankingES']) second_flank_end.append(row['flankingEE']) first_flank_start.append(row['shortES']) first_flank_end.append(row['shortEE']) else: region_start.append(row['shortEE']) region_end.append(row['longExonEnd']) if identify_flanking_sequences: second_flank_start.append(row['flankingES']) second_flank_end.append(row['flankingEE']) first_flank_start.append(row['shortES']) first_flank_end.append(row['shortEE']) #save region info A3SS_events['event_start'] = region_start A3SS_events['event_end'] = region_end if identify_flanking_sequences: A3SS_events['first_flank_start'] = first_flank_start A3SS_events['first_flank_end'] = first_flank_end A3SS_events['second_flank_start'] = second_flank_start A3SS_events['second_flank_end'] = second_flank_end #add event ids A3SS_events['AS ID'] = "3ASS_" + A3SS_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > A3SS_events.shape[0]: threeASS_processes = 1 else: threeASS_processes = PROCESSES spliced_events['3ASS'], threeASS_ptms = project_ptms_onto_splice_events(A3SS_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'event_start', region_end_col = 'event_end', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system = '0-based', taskbar_label = "3' ASS events", separate_modification_types=separate_modification_types, PROCESSES = threeASS_processes) threeASS_ptms['Event Type'] = '3ASS' spliced_ptms.append(threeASS_ptms) #identify ptms with altered flanking sequences if identify_flanking_sequences: print("Identifying flanking sequences for 3' ASS events.") threeASS_flanks = fs.get_flanking_changes_from_splice_data(A3SS_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'event_start', spliced_region_end_col = 'event_end', first_flank_start_col = 'first_flank_start', first_flank_end_col = 'first_flank_end', second_flank_start_col = 'second_flank_start', second_flank_end_col = 'second_flank_end', dPSI_col=dPSI_col, sig_col = dPSI_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') threeASS_flanks['Event Type'] = '3ASS' spliced_flanks.append(threeASS_flanks) else: print("3' ASS event data (A3SS_events) not provided, skipping") if RI_events is not None: if RI_events['chr'].str.contains('chr').any(): RI_events['chr'] = RI_events['chr'].apply(lambda x: x[3:]) #add event id RI_events['AS ID'] = "RI_" + RI_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > RI_events.shape[0]: RI_processes = 1 else: RI_processes = PROCESSES spliced_events['RI'], RI_ptms = project_ptms_onto_splice_events(RI_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'upstreamEE', region_end_col = 'downstreamES', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', coordinate_type=coordinate_type, start_coordinate_system='0-based', extra_cols = extra_cols, taskbar_label = 'Retained Intron Events', separate_modification_types=separate_modification_types, PROCESSES = RI_processes) RI_ptms['Event Type'] = 'RI' spliced_ptms.append(RI_ptms) #identify ptms with altered flanking sequences if identify_flanking_sequences: print('Identifying flanking sequences for retained intron events.') RI_flanks = fs.get_flanking_changes_from_splice_data(RI_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'upstreamEE', spliced_region_end_col = 'downstreamES', first_flank_start_col = 'upstreamES', first_flank_end_col = 'upstreamEE', second_flank_start_col = 'downstreamES', second_flank_end_col = 'downstreamEE', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') RI_flanks['Event Type'] = 'RI' spliced_flanks.append(RI_flanks) if MXE_events is not None: if MXE_events['chr'].str.contains('chr').any(): MXE_events['chr'] = MXE_events['chr'].apply(lambda x: x[3:]) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > MXE_events.shape[0]: MXE_processes = 1 else: MXE_processes = PROCESSES #add AS ID MXE_events['AS ID'] = "MXE_" + MXE_events.index.astype(str) mxe_ptms = [] #first mxe exon spliced_events['MXE_Exon1'], MXE_Exon1_ptms = project_ptms_onto_splice_events(MXE_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = '1stExonStart_0base', region_end_col = '1stExonEnd', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', coordinate_type=coordinate_type, start_coordinate_system = '0-based', taskbar_label = 'MXE, First Exon', extra_cols=extra_cols, separate_modification_types=separate_modification_types, PROCESSES = MXE_processes) MXE_Exon1_ptms['Event Type'] = 'MXE (First Exon)' mxe_ptms.append(MXE_Exon1_ptms) #second mxe exon spliced_events['MXE_Exon2'], MXE_Exon2_ptms = project_ptms_onto_splice_events(MXE_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = '2ndExonStart_0base', region_end_col = '2ndExonEnd', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', extra_cols=extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based', taskbar_label = 'MXE, Second Exon', separate_modification_types=separate_modification_types, PROCESSES = MXE_processes) MXE_Exon2_ptms['Event Type'] = 'MXE (Second Exon)' mxe_ptms.append(MXE_Exon2_ptms) #combine mxe ptms, and then drop any PTMs that were found in both MXE's mxe_ptms = pd.concat([MXE_Exon1_ptms, MXE_Exon2_ptms]) columns_to_check = ['UniProtKB Accession', 'Source of PTM', 'Residue', 'PTM Position in Isoform', 'Modification', 'Modification Class', 'Gene'] if dPSI_col is not None: columns_to_check.append('dPSI') if sig_col is not None: columns_to_check.append('Significance') if extra_cols is not None: columns_to_check += extra_cols mxe_ptms = mxe_ptms.drop_duplicates(subset = columns_to_check, keep = False) #flip dPSI values for second exon if dPSI_col is not None: mxe_ptms['dPSI'] = mxe_ptms.apply(lambda x: x['dPSI']* -1 if x['Event Type'] == 'MXE (Second Exon)' else x['dPSI'], axis = 1) #add mxe ptms to spliced_ptms spliced_ptms.append(mxe_ptms) spliced_ptms = pd.concat(spliced_ptms) if identify_flanking_sequences: spliced_flanks = pd.concat(spliced_flanks) return spliced_events, spliced_ptms, spliced_flanks else: return spliced_events, spliced_ptms #def project_ptms_onto_MAJIQ_dPSI(majiq_data, ptm_coordinates = None, coordinate_type = 'hg38', identify_flanking_sequences = False, dPSI_col = 'dPSI', sig_col = 'FDR', separate_modification_types = False, PROCESSES = 1): # print('in progress') # pass def add_splicegraph_info(psi_data, splicegraph, purpose = 'inclusion'): psi_data = psi_data[psi_data['splice_type'] != 'ME'].copy() if purpose == 'inclusion': #split exons into individual exons psi_data['Individual exon'] = psi_data['exons'].apply(lambda x: x.split(':')) psi_data = psi_data.explode('Individual exon').drop_duplicates() psi_data['Individual exon'] = psi_data['Individual exon'].astype(float) #add gene location information to psi data from spliceseq psi_data = psi_data.merge(splicegraph, left_on = ['symbol', 'Individual exon'], right_on = ['Symbol', 'Exon'], how = 'left') psi_data = psi_data.rename(columns = {'Chr_Start': 'spliced_region_start', 'Chr_Stop': 'spliced_region_end'}) return psi_data elif purpose == 'flanking': print('Not yet active. Please check back later.') else: raise ValueError('Purpose must be either inclusion or flanking. Please provide the correct purpose for the splicegraph information.') def project_ptms_onto_SpliceSeq(psi_data, splicegraph, gene_col ='symbol', dPSI_col = None, sig_col = None, extra_cols = None, coordinate_type = 'hg19', separate_modification_types = False, identify_flanking_sequences = False, flank_size = 5, ptm_coordinates = None, PROCESSES = 1, **kwargs): """ Given splice event quantification from SpliceSeq (such as what can be downloaded from TCGASpliceSeq), annotate with PTMs that are found in the differentially included regions. Parameters ---------- psi_data: pandas.DataFrame dataframe containing splice event quantification from SpliceSeq. Must contain the following columns: 'symbol', 'exons', 'splice_type'. splicegraph: pandas.DataFrame dataframe containing exon information from the splicegraph used during splice event quantification. Must contain the following columns: 'Symbol', 'Exon', 'Chr_Start', 'Chr_Stop'. gene_col: str column name in psi_data that contains the gene name. Default is 'symbol'. dPSI_col: str column name in psi_data that contains the delta PSI value for the splice event. Default is None, which will not include this information in the output. sig_col: str column name in psi_data that contains the significance value for the splice event. Default is None, which will not include this information in the output. extra_cols: list list of additional columns to include in the output dataframe. Default is None, which will not include any additional columns. coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg19'. separate_modification_types: bool Indicate whether to store PTM sites with multiple modification types as multiple rows. For example, if a site at K100 was both an acetylation and methylation site, these will be separated into unique rows with the same site number but different modification types. Default is True. identify_flanking_sequences: bool Indicate whether to identify and return the flanking sequences for the splice events. Default is False. flank_size: int Size of the flanking sequence to extract from the splice event. Default is 5, which will extract 5 bases upstream and downstream of the splice event. Only relevant if identify_flanking_sequences is True. PROCESSES: int Number of processes to use for multiprocessing. Default is 1 (single processing). **kwargs: additional keyword arguments Additional keyword arguments to pass to the find_ptms_in_many_regions function, which will be fed into the filter_ptms() function from the helper module. These will be used to filter ptms with lower evidence. For example, if you want to filter PTMs based on the number of MS observations, you can add 'min_MS_observations = 2' to the kwargs. This will filter out any PTMs that have less than 2 MS observations. See the filter_ptms() function for more options. """ #load ptm data from config if not provided if ptm_coordinates is None: ptm_coordinates = pose_config.ptm_coordinates.copy() #check for any keyword arguments to use for filtering if kwargs: filter_arguments = helpers.extract_filter_kwargs(**kwargs) #check any excess unused keyword arguments, report them helpers.check_filter_kwargs(filter_arguments) #filter ptm coordinates file to include only ptms with desired evidence ptm_coordinates = helpers.filter_ptms(ptm_coordinates, **filter_arguments) #remove ME events from this analysis overlapping_columns = set(psi_data.columns).intersection({'Chromosome', 'Strand', 'Chr_Start', 'Chr_Stop'}) if len(overlapping_columns) > 0: #drop columns that will be added from splicegraph psi_data = psi_data.drop(columns=overlapping_columns) print('Removing ME events from analysis') spliced_data = psi_data.copy() spliced_data = spliced_data[spliced_data['splice_type'] != 'ME'].copy() #split exons into individual exons spliced_data['Individual exon'] = spliced_data['exons'].apply(lambda x: x.split(':')) spliced_data = spliced_data.explode('Individual exon').drop_duplicates() spliced_data['Individual exon'] = spliced_data['Individual exon'].astype(float) #add gene location information to psi data from spliceseq spliced_data = spliced_data.merge(splicegraph.copy(), left_on = ['symbol', 'Individual exon'], right_on = ['Symbol', 'Exon'], how = 'left') spliced_data = spliced_data.rename(columns = {'Chr_Start': 'spliced_region_start', 'Chr_Stop': 'spliced_region_end'}) print('Projecting PTMs onto SpliceSeq data') spliced_data, spliced_ptms = project_ptms_onto_splice_events(spliced_data, chromosome_col = 'Chromosome', strand_col = 'Strand', gene_col = 'symbol', region_start_col = 'spliced_region_start', region_end_col = 'spliced_region_end', event_id_col = 'as_id',dPSI_col = dPSI_col, sig_col = sig_col, extra_cols = extra_cols, separate_modification_types = separate_modification_types, coordinate_type = coordinate_type, PROCESSES = PROCESSES) ## add code for extracting flanking sequences (to do) if identify_flanking_sequences: altered_flanks = fs.get_flanking_changes_from_splicegraph(psi_data, splicegraph, dPSI_col = dPSI_col, sig_col = sig_col, extra_cols = extra_cols, gene_col = gene_col, coordinate_type=coordinate_type, flank_size = flank_size) return spliced_data, spliced_ptms, altered_flanks else: return spliced_data, spliced_ptms #def project_ptms_onto_TCGA_SpliceSeq(tcga_cancer = 'PRAD'): # """ # In progress. Will download and process TCGA SpliceSeq data for a specific cancer type, and project PTMs onto the spliced regions. # """ # print('Not yet active. Please check back later.') # pass def check_columns(splice_data, chromosome_col = None, strand_col = None, region_start_col = None, region_end_col = None, first_flank_start_col = None, first_flank_end_col = None, second_flank_start_col = None, second_flank_end_col = None, gene_col = None, dPSI_col = None, sig_col = None, event_id_col = None, extra_cols = None): """ Function to quickly check if the provided column names exist in the dataset and if they are the correct type of data """ expected_cols = [chromosome_col, strand_col, region_start_col, region_end_col, first_flank_start_col, first_flank_end_col, second_flank_start_col, second_flank_end_col, gene_col, dPSI_col, sig_col, event_id_col] expected_dtypes = [[str, object], [str,int, object], [int,float], [int,float], [int,float], [int,float], [int,float], [int,float], [str, object], float, float, None] #remove cols with None and the corresponding dtype entry expected_dtypes = [dtype for col, dtype in zip(expected_cols, expected_dtypes) if col is not None] expected_cols = [col for col in expected_cols if col is not None] #add extra columns to the expected columns list if extra_cols is not None: expected_cols += extra_cols expected_dtypes += [None]*len(extra_cols) #extra columns do not have dtype requirement #check to make sure columns exist in the dataframe if not all([x in splice_data.columns for x in expected_cols]): raise ValueError('Not all expected columns are present in the splice data. Please check the column names and provide the correct names for the following columns: {}'.format([x for x in expected_cols if x not in splice_data.columns])) #check to make sure columns are the correct data type for col, data_type in zip(expected_cols, expected_dtypes): if data_type is None: continue elif isinstance(data_type, list): if splice_data[col].dtype not in data_type: #try converting to the expected data type try: splice_data[col] = splice_data[col].astype(data_type[0]) except: raise ValueError('Column {} is not the expected data type. Expected data type is one of {}, but found data type {}'.format(col, data_type, splice_data[col].dtype)) else: if splice_data[col].dtype != data_type: #try converting to the expected data type try: splice_data[col] = splice_data[col].astype(data_type) except: raise ValueError('Column {} is not the expected data type. Expected data type is {}, but found data type {}'.format(col, data_type, splice_data[col].dtype))

修改后代码:import numpy as np import pandas as pd import multiprocessing import datetime from ptm_pose import pose_config, helpers from ptm_pose import flanking_sequences as fs from tqdm import tqdm def find_ptms_in_region(ptm_coordinates, chromosome, strand, start, end, gene = None, coordinate_type = 'hg38'): """ Given an genomic region in either hg38 or hg19 coordinates (such as the region encoding an exon of interest), identify PTMs that are mapped to that region. If so, return the exon number. If none are found, return np.nan. Parameters ---------- chromosome: str chromosome where region is located strand: int DNA strand for region is found on (1 for forward, -1 for reverse) start: int start position of region on the chromosome/strand (should always be less than end) end: int end position of region on the chromosome/strand (should always be greater than start) coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. Returns ------- ptms_in_region: pandas.DataFrame dataframe containing all PTMs found in the region. If no PTMs are found, returns np.nan. """ #restrict to PTMs on the same chromosome and strand ptms_in_region = ptm_coordinates[(ptm_coordinates['Chromosome/scaffold name'] == chromosome) & (ptm_coordinates['Strand'] == strand)].copy() if coordinate_type in ['hg18', 'hg19','hg38']: loc_col = f'Gene Location ({coordinate_type})' else: raise ValueError('Coordinate type must be hg38 or hg19') #check to make sure the start value is less than the end coordinate. If it is not, treat the end coordinate as the start and the start coordinate as the end if start < end: ptms_in_region = ptms_in_region[(ptms_in_region[loc_col] >= start) & (ptms_in_region[loc_col] <= end)] else: ptms_in_region = ptms_in_region[(ptms_in_region[loc_col] <= start) & (ptms_in_region[loc_col] >= end)] #extract only PTM information from dataframe and return that and list (if not ptms, return empty dataframe) if not ptms_in_region.empty: #grab uniprot id and residue ptms_in_region = ptms_in_region[['Source of PTM', 'UniProtKB Accession','Isoform ID', 'Isoform Type', 'Residue', 'PTM Position in Isoform', loc_col, 'Modification', 'Modification Class', 'Canonical Flanking Sequence', 'Constitutive', 'MS_LIT', 'MS_CST', 'LT_LIT', 'Compendia', 'Number of Compendia']] #check if ptm is associated with the same gene (if info is provided). if not, do not add if gene is not None: for i, row in ptms_in_region.iterrows(): #if ';' in row['UniProtKB Accession']: # uni_ids = row['UniProtKB Accession'].split(';') # remove = True # for uni in uni_ids: # if row['UniProtKB Accession'] in pose_config.uniprot_to_genename: # if gene in pose_config.uniprot_to_genename[uni.split('-')[0]].split(' '): # remove = False # break # if remove: # ptms_in_region.drop(i) if row['UniProtKB Accession'] in pose_config.uniprot_to_genename: if gene not in pose_config.uniprot_to_genename[row['UniProtKB Accession']].split(' '): ptms_in_region = ptms_in_region.drop(i) else: ptms_in_region = ptms_in_region.drop(i) #make sure ptms still are present after filtering if ptms_in_region.empty: return pd.DataFrame() else: ptms_in_region.insert(0, 'Gene', gene) #calculate proximity to region start and end ptms_in_region['Proximity to Region Start (bp)'] = (ptms_in_region[loc_col] - start).abs() ptms_in_region['Proximity to Region End (bp)'] = (ptms_in_region[loc_col] - end).abs() ptms_in_region['Proximity to Splice Boundary (bp)'] = ptms_in_region.apply(lambda x: min(x['Proximity to Region Start (bp)'], x['Proximity to Region End (bp)']), axis = 1) return ptms_in_region else: return pd.DataFrame() def convert_strand_symbol(strand): """ Given DNA strand information, make sure the strand information is in integer format (1 for forward, -1 for reverse). This is intended to convert from string format ('+' or '-') to integer format (1 or -1), but will return the input if it is already in integer format. Parameters ---------- strand: str or int DNA strand information, either as a string ('+' or '-') or an integer (1 or -1) Returns ------- int DNA strand information as an integer (1 for forward, -1 for reverse) """ if isinstance(strand, str): if strand == '+' or strand == '1': return 1 elif strand == '-' or strand == '-1': return -1 else: return strand def find_ptms_in_many_regions(region_data, ptm_coordinates, annotate_original_df = True, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', gene_col = None, dPSI_col = None, sig_col = None, event_id_col = None, extra_cols = None, coordinate_type = 'hg38', start_coordinate_system = '1-based', end_coordinate_system = '1-based', separate_modification_types = False, taskbar_label = None): """ Given a dataframe with a unique region in each row, project PTMs onto the regions. Assumes that the region data will have chromosome, strand, and genomic start/end positions, and each row corresponds to a unique region. Parameters ---------- ptm_coordinates: pandas.DataFrame dataframe containing PTM information, including chromosome, strand, and genomic location of PTMs region_data: pandas.DataFrame dataframe containing region information, including chromosome, strand, and genomic location of regions of interest chromosome_col: str column name in splice_data that contains chromosome information. Default is 'chr'. Expects it to be a str with only the chromosome number: 'Y', '1', '2', etc. strand_col: str column name in splice_data that contains strand information. Default is 'strand'. Expects it to be a str with '+' or '-', or integers as 1 or -1. Will convert to integers automatically if string format is provided. region_start_col: str column name in splice_data that contains the start position of the region of interest. Default is 'exonStart_0base'. region_end_col: str column name in splice_data that contains the end position of the region of interest. Default is 'exonEnd'. gene_col: str column name in splice_data that contains the gene name. If provided, will be used to make sure the projected PTMs stem from the same gene (some cases where genomic coordiantes overlap between distinct genes). Default is None. event_id_col: str column name in splice_data that contains the unique identifier for the splice event. If provided, will be used to annotate the ptm information with the specific splice event ID. Default is None. coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. separate_modification_types: bool Indicate whether to store PTM sites with multiple modification types as multiple rows. For example, if a site at K100 was both an acetylation and methylation site, these will be separated into unique rows with the same site number but different modification types. Default is True. taskbar_label: str Label to display in the tqdm progress bar. Default is None, which will automatically state "Projecting PTMs onto regions using ----- coordinates". Returns ------- spliced_ptm_info: pandas.DataFrame Contains the PTMs identified across the different splice events splice_data: pandas.DataFrame dataframe containing the original splice data with an additional column 'PTMs' that contains the PTMs found in the region of interest, in the format of 'SiteNumber(ModificationType)'. If no PTMs are found, the value will be np.nan. """ if taskbar_label is None: taskbar_label = 'Projecting PTMs onto regions using ' + coordinate_type + ' coordinates.' if region_data[chromosome_col].str.contains('chr').any(): region_data[chromosome_col] = region_data[chromosome_col].str.strip('chr') spliced_ptm_info = [] spliced_ptms_list = [] num_ptms_affected = [] num_unique_ptm_sites = [] #copy region_data = region_data.copy() #iterate through each row of the splice data and find PTMs in the region for index, row in tqdm(region_data.iterrows(), total = len(region_data), desc = taskbar_label): #grab region information from row chromosome = row[chromosome_col] strand = convert_strand_symbol(row[strand_col]) start = row[region_start_col] end = row[region_end_col] #only provide these if column is given gene = row[gene_col] if gene_col is not None else None #adjust region coordinates if needed (make sure in 1-based coordinate system) if start_coordinate_system == '0-based': start += 1 elif start_coordinate_system != '1-based': raise ValueError("Start coordinate system must be either '0-based' or '1-based'") if end_coordinate_system == '0-based': end += 1 elif end_coordinate_system != '1-based': raise ValueError("End coordinate system must be either '0-based' or '1-based'") #project ptms onto region ptms_in_region = find_ptms_in_region(ptm_coordinates, chromosome, strand, start, end, gene = gene, coordinate_type = coordinate_type) extra_info = {} #add additional context from splice data, if indicated extra_info = {} if event_id_col is not None: extra_info['Region ID'] = row[event_id_col] if dPSI_col is not None: extra_info['dPSI'] = row[dPSI_col] if sig_col is not None: extra_info['Significance'] = row[sig_col] if extra_cols is not None: for col in extra_cols: extra_info[col] = row[col] #add extra info to ptms_in_region ptms_in_region = pd.concat([pd.DataFrame(extra_info, index = ptms_in_region.index), ptms_in_region], axis = 1) #if desired, add ptm information to the original splice event dataframe if annotate_original_df: if not ptms_in_region.empty: #split and separate unique modification types if separate_modification_types: ptms_in_region['Modification Class'] = ptms_in_region['Modification Class'].str.split(';') ptms_in_region = ptms_in_region.explode('Modification Class') ptms_info = ptms_in_region.apply(lambda x: x['UniProtKB Accession'] + '_' + x['Residue'] + str(x['PTM Position in Isoform']) + ' (' + x['Modification Class'] + ')', axis = 1) ptms_str = '/'.join(ptms_info.values) spliced_ptms_list.append(ptms_str) num_ptms_affected.append(ptms_in_region.shape[0]) num_unique_ptm_sites.append(ptms_in_region.groupby(['UniProtKB Accession', 'Residue', 'PTM Position in Isoform']).size().shape[0]) else: spliced_ptms_list.append(np.nan) num_ptms_affected.append(0) num_unique_ptm_sites.append(0) spliced_ptm_info.append(ptms_in_region.copy()) #combine all PTM information spliced_ptm_info = pd.concat(spliced_ptm_info, ignore_index = True) #convert ptm position to float if spliced_ptm_info.shape[0] > 0: spliced_ptm_info['PTM Position in Isoform'] = spliced_ptm_info['PTM Position in Isoform'].astype(float) #add ptm info to original splice event dataframe if annotate_original_df: region_data['PTMs'] = spliced_ptms_list region_data['Number of PTMs Affected'] = num_ptms_affected region_data['Number of Unique PTM Sites by Position'] = num_unique_ptm_sites region_data['Event Length'] = (region_data[region_end_col] - region_data[region_start_col]).abs() region_data['PTM Density (PTMs/bp)'] = (region_data['Number of Unique PTM Sites by Position']*3)/region_data['Event Length'] #multiply by 3 to convert aa to bp (3 bp per codon) return region_data, spliced_ptm_info def project_ptms_onto_splice_events(splice_data, annotate_original_df = True, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', dPSI_col = None, sig_col = None, event_id_col = None, gene_col = None, extra_cols = None, separate_modification_types = False, coordinate_type = 'hg38', start_coordinate_system = '1-based', end_coordinate_system = '1-based', taskbar_label = None, ptm_coordinates = None,PROCESSES = 1, **kwargs): """ Given splice event quantification data, project PTMs onto the regions impacted by the splice events. Assumes that the splice event data will have chromosome, strand, and genomic start/end positions for the regions of interest, and each row of the splice_event_data corresponds to a unique region. Important note: PTM-POSE relies on Ensembl based coordinates (1-based), so if the coordinates are 0-based, make sure to indicate using the start_coordinate_system and end_coordinate_system parameters. For example, rMATS uses 0-based for the start coordinates, but 1-based for the end coordinates. In this case, set start_coordinate_system = '0-based' and end_coordinate_system = '1-based'. Parameters ---------- splice_data: pandas.DataFrame dataframe containing splice event information, including chromosome, strand, and genomic location of regions of interest ptm_coordinates: pandas.DataFrame dataframe containing PTM information, including chromosome, strand, and genomic location of PTMs. If none, it will pull from the config file. chromosome_col: str column name in splice_data that contains chromosome information. Default is 'chr'. Expects it to be a str with only the chromosome number: 'Y', '1', '2', etc. strand_col: str column name in splice_data that contains strand information. Default is 'strand'. Expects it to be a str with '+' or '-', or integers as 1 or -1. Will convert to integers automatically if string format is provided. region_start_col: str column name in splice_data that contains the start position of the region of interest. Default is 'exonStart_0base'. region_end_col: str column name in splice_data that contains the end position of the region of interest. Default is 'exonEnd'. event_id_col: str column name in splice_data that contains the unique identifier for the splice event. If provided, will be used to annotate the ptm information with the specific splice event ID. Default is None. gene_col: str column name in splice_data that contains the gene name. If provided, will be used to make sure the projected PTMs stem from the same gene (some cases where genomic coordiantes overlap between distinct genes). Default is None. dPSI_col: str column name in splice_data that contains the delta PSI value for the splice event. Default is None, which will not include this information in the output sig_col: str column name in splice_data that contains the significance value for the splice event. Default is None, which will not include this information in the output. extra_cols: list list of additional columns to include in the output dataframe. Default is None, which will not include any additional columns. coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. start_coordinate_system: str indicates the coordinate system used for the start position. Either '0-based' or '1-based'. Default is '1-based'. end_coordinate_system: str indicates the coordinate system used for the end position. Either '0-based' or '1-based'. Default is '1-based'. separate_modification_types: bool Indicate whether to store PTM sites with multiple modification types as multiple rows. For example, if a site at K100 was both an acetylation and methylation site, these will be separated into unique rows with the same site number but different modification types. Default is True. taskbar_label: str Label to display in the tqdm progress bar. Default is None, which will automatically state "Projecting PTMs onto regions using ----- coordinates". PROCESSES: int Number of processes to use for multiprocessing. Default is 1 (single processing) **kwargs: additional keyword arguments Additional keyword arguments to pass to the find_ptms_in_many_regions function, which will be fed into the filter_ptms() function from the helper module. These will be used to filter ptms with lower evidence. For example, if you want to filter PTMs based on the number of MS observations, you can add 'min_MS_observations = 2' to the kwargs. This will filter out any PTMs that have less than 2 MS observations. See the filter_ptms() function for more options. Returns ------- spliced_ptm_info: pandas.DataFrame Contains the PTMs identified across the different splice events splice_data: pandas.DataFrame dataframe containing the original splice data with an additional column 'PTMs' that contains the PTMs found in the region of interest, in the format of 'SiteNumber(ModificationType)'. If no PTMs are found, the value will be np.nan. """ #load ptm data from config if not provided if ptm_coordinates is None: ptm_coordinates = pose_config.ptm_coordinates.copy() #check for any keyword arguments to use for filtering if kwargs: filter_arguments = helpers.extract_filter_kwargs(**kwargs) #check any excess unused keyword arguments, report them helpers.check_filter_kwargs(filter_arguments) #filter ptm coordinates file to include only ptms with desired evidence ptm_coordinates = helpers.filter_ptms(ptm_coordinates, **filter_arguments) if taskbar_label is None: taskbar_label = 'Projecting PTMs onto splice events using ' + coordinate_type + ' coordinates.' #copy splice_data = splice_data.copy() #check columns to make sure they are present and correct data type check_columns(splice_data, chromosome_col=chromosome_col, strand_col=strand_col, region_start_col=region_start_col, region_end_col=region_end_col, dPSI_col=dPSI_col, sig_col=sig_col, event_id_col=event_id_col, gene_col=gene_col, extra_cols=extra_cols) if PROCESSES == 1: splice_data, spliced_ptm_info = find_ptms_in_many_regions(splice_data, ptm_coordinates, chromosome_col = chromosome_col, strand_col = strand_col, region_start_col = region_start_col, region_end_col = region_end_col, dPSI_col = dPSI_col, sig_col = sig_col, event_id_col = event_id_col, gene_col = gene_col, extra_cols = extra_cols, annotate_original_df = annotate_original_df, coordinate_type = coordinate_type,start_coordinate_system=start_coordinate_system, end_coordinate_system=end_coordinate_system, taskbar_label = taskbar_label, separate_modification_types=separate_modification_types) elif PROCESSES > 1: #check num_cpus available, if greater than number of cores - 1 (to avoid freezing machine), then set to PROCESSES to 1 less than total number of cores num_cores = multiprocessing.cpu_count() if PROCESSES > num_cores - 1: PROCESSES = num_cores - 1 #split dataframe into chunks equal to PROCESSES splice_data_split = np.array_split(splice_data, PROCESSES) pool = multiprocessing.Pool(PROCESSES) #run with multiprocessing results = pool.starmap(find_ptms_in_many_regions, [(splice_data_split[i], ptm_coordinates, chromosome_col, strand_col, region_start_col, region_end_col, gene_col, dPSI_col, sig_col, event_id_col, extra_cols, annotate_original_df, coordinate_type, start_coordinate_system, end_coordinate_system, separate_modification_types, taskbar_label) for i in range(PROCESSES)]) splice_data = pd.concat([res[0] for res in results]) spliced_ptm_info = pd.concat([res[1] for res in results]) #raise ValueError('Multiprocessing not yet functional. Please set PROCESSES = 1.') print(f'PTMs projection successful ({spliced_ptm_info.shape[0]} identified).\n') return splice_data, spliced_ptm_info def project_ptms_onto_MATS(SE_events = None, A5SS_events = None, A3SS_events = None, RI_events = None, MXE_events = None, coordinate_type = 'hg38', identify_flanking_sequences = False, dPSI_col = 'meanDeltaPSI', sig_col = 'FDR', extra_cols = None, separate_modification_types = False, PROCESSES = 1,ptm_coordinates = None, **kwargs): """ Given splice quantification from the MATS algorithm, annotate with PTMs that are found in the differentially included regions. Parameters ---------- ptm_coordinates: pandas.DataFrame dataframe containing PTM information, including chromosome, strand, and genomic location of PTMs SE_events: pandas.DataFrame dataframe containing skipped exon event information from MATS A5SS_events: pandas.DataFrame dataframe containing 5' alternative splice site event information from MATS A3SS_events: pandas.DataFrame dataframe containing 3' alternative splice site event information from MATS RI_events: pandas.DataFrame dataframe containing retained intron event information from MATS MXE_events: pandas.DataFrame dataframe containing mutually exclusive exon event information from MATS coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg38'. dPSI_col: str Column name indicating delta PSI value. Default is 'meanDeltaPSI'. sig_col: str Column name indicating significance of the event. Default is 'FDR'. extra_cols: list List of column names for additional information to add to the results. Default is None. separate_modification_types: bool Indicate whether residues with multiple modifications (i.e. phosphorylation and acetylation) should be treated as separate PTMs and be placed in unique rows of the output dataframe. Default is False. PROCESSES: int Number of processes to use for multiprocessing. Default is 1. **kwargs: additional keyword arguments Additional keyword arguments to pass to the find_ptms_in_many_regions function, which will be fed into the filter_ptms() function from the helper module. These will be used to filter ptms with lower evidence. For example, if you want to filter PTMs based on the number of MS observations, you can add 'min_MS_observations = 2' to the kwargs. This will filter out any PTMs that have less than 2 MS observations. See the filter_ptms() function for more options. """ #load ptm data from config if not provided if ptm_coordinates is None: ptm_coordinates = pose_config.ptm_coordinates.copy() #check for any keyword arguments to use for filtering if kwargs: filter_arguments = helpers.extract_filter_kwargs(**kwargs) #check any excess unused keyword arguments, report them helpers.check_filter_kwargs(filter_arguments) #filter ptm coordinates file to include only ptms with desired evidence ptm_coordinates = helpers.filter_ptms(ptm_coordinates, **filter_arguments) print(f'Projecting PTMs onto MATS splice events using {coordinate_type} coordinates.') #reformat chromosome name format spliced_events = {} spliced_flanks = [] spliced_ptms = [] if SE_events is not None: if SE_events['chr'].str.contains('chr').any(): SE_events['chr'] = SE_events['chr'].apply(lambda x: x[3:]) SE_events['AS ID'] = "SE_" + SE_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > SE_events.shape[0]: SE_processes = 1 else: SE_processes = PROCESSES spliced_events['SE'], SE_ptms = project_ptms_onto_splice_events(SE_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based', taskbar_label = "Skipped Exon events", separate_modification_types=separate_modification_types, PROCESSES = SE_processes) SE_ptms['Event Type'] = 'SE' spliced_ptms.append(SE_ptms) if identify_flanking_sequences: print('Identifying flanking sequences for skipped exon events.') if 'upstreamES' in SE_events.columns: first_flank_start_col = 'upstreamES' first_flank_end_col = 'upstreamEE' second_flank_start_col = 'downstreamES' second_flank_end_col = 'downstreamEE' elif 'firstFlankingES' in SE_events.columns: first_flank_start_col = 'firstFlankingES' first_flank_end_col = 'firstFlankingEE' second_flank_start_col = 'secondFlankingES' second_flank_end_col = 'secondFlankingEE' else: raise ValueError('Could not find flanking sequence columns in skipped exon event data, based on what is typically outputted by MATS. Please check column names and provide the appropriate columns for the first and second flanking sequences') SE_flanks = fs.get_flanking_changes_from_splice_data(SE_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'exonStart_0base', spliced_region_end_col = 'exonEnd', first_flank_start_col = first_flank_start_col, first_flank_end_col = first_flank_end_col, second_flank_start_col = second_flank_start_col, second_flank_end_col = second_flank_end_col, dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') SE_flanks['Event Type'] = 'SE' spliced_flanks.append(SE_flanks) else: print('Skipped exon event data (SE_events) not provided, skipping') if A5SS_events is not None: if A5SS_events['chr'].str.contains('chr').any(): A5SS_events['chr'] = A5SS['chr'].apply(lambda x: x[3:]) #set the relevent start and end regions of the spliced out region, which are different depending on the strand region_start = [] region_end = [] first_flank_start = [] first_flank_end = [] second_flank_end = [] second_flank_start = [] for i, row in A5SS_events.iterrows(): strand = row['strand'] if strand == '+': region_start.append(row['shortEE']) region_end.append(row['longExonEnd']) if identify_flanking_sequences: first_flank_start.append(row['shortES']) first_flank_end.append(row['shortEE']) second_flank_start.append(row['flankingES']) second_flank_end.append(row['flankingEE']) else: region_start.append(row['longExonStart_0base']) region_end.append(row['shortES']) if identify_flanking_sequences: second_flank_start.append(row['shortES']) second_flank_end.append(row['shortEE']) first_flank_start.append(row['flankingES']) first_flank_end.append(row['flankingEE']) A5SS_events['event_start'] = region_start A5SS_events['event_end'] = region_end if identify_flanking_sequences: A5SS_events['first_flank_start'] = first_flank_start A5SS_events['first_flank_end'] = first_flank_end A5SS_events['second_flank_start'] = second_flank_start A5SS_events['second_flank_end'] = second_flank_end #set specific as id A5SS_events['AS ID'] = "5ASS_" + A5SS_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > A5SS_events.shape[0]: fiveASS_processes = 1 else: fiveASS_processes = PROCESSES #identify PTMs found within spliced regions spliced_events['5ASS'], fiveASS_ptms = project_ptms_onto_splice_events(A5SS_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'event_start', region_end_col = 'event_end', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', coordinate_type=coordinate_type, start_coordinate_system = '0-based', extra_cols = extra_cols, taskbar_label = "5' ASS events", separate_modification_types=separate_modification_types, PROCESSES = fiveASS_processes) fiveASS_ptms['Event Type'] = '5ASS' spliced_ptms.append(fiveASS_ptms) #identify ptms with altered flanking sequences if identify_flanking_sequences: print("Identifying flanking sequences for 5'ASS events.") fiveASS_flanks = fs.get_flanking_changes_from_splice_data(A5SS_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'event_start', spliced_region_end_col = 'event_end', first_flank_start_col = 'first_flank_start', first_flank_end_col = 'first_flank_end', second_flank_start_col = 'second_flank_start', second_flank_end_col = 'second_flank_end',dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') fiveASS_flanks['Event Type'] = '5ASS' spliced_flanks.append(fiveASS_flanks) else: print("5' ASS event data (A5SS_events) not provided, skipping.") if A3SS_events is not None: if RI_events['chr'].str.contains('chr').any(): RI_events['chr'] = RI_events['chr'].apply(lambda x: x[3:]) if A3SS_events['chr'].str.contains('chr').any(): A3SS_events['chr'] = A3SS_events['chr'].apply(lambda x: x[3:]) #set the relevent start and end regions of the spliced out region, which are different depending on the strand region_start = [] region_end = [] first_flank_start = [] first_flank_end = [] second_flank_end = [] second_flank_start = [] for i, row in A3SS_events.iterrows(): strand = row['strand'] if strand == '+': region_start.append(row['longExonStart_0base']) region_end.append(row['shortES']) if identify_flanking_sequences: second_flank_start.append(row['flankingES']) second_flank_end.append(row['flankingEE']) first_flank_start.append(row['shortES']) first_flank_end.append(row['shortEE']) else: region_start.append(row['shortEE']) region_end.append(row['longExonEnd']) if identify_flanking_sequences: second_flank_start.append(row['flankingES']) second_flank_end.append(row['flankingEE']) first_flank_start.append(row['shortES']) first_flank_end.append(row['shortEE']) #save region info A3SS_events['event_start'] = region_start A3SS_events['event_end'] = region_end if identify_flanking_sequences: A3SS_events['first_flank_start'] = first_flank_start A3SS_events['first_flank_end'] = first_flank_end A3SS_events['second_flank_start'] = second_flank_start A3SS_events['second_flank_end'] = second_flank_end #add event ids A3SS_events['AS ID'] = "3ASS_" + A3SS_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > A3SS_events.shape[0]: threeASS_processes = 1 else: threeASS_processes = PROCESSES spliced_events['3ASS'], threeASS_ptms = project_ptms_onto_splice_events(A3SS_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'event_start', region_end_col = 'event_end', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system = '0-based', taskbar_label = "3' ASS events", separate_modification_types=separate_modification_types, PROCESSES = threeASS_processes) threeASS_ptms['Event Type'] = '3ASS' spliced_ptms.append(threeASS_ptms) #identify ptms with altered flanking sequences if identify_flanking_sequences: print("Identifying flanking sequences for 3' ASS events.") threeASS_flanks = fs.get_flanking_changes_from_splice_data(A3SS_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'event_start', spliced_region_end_col = 'event_end', first_flank_start_col = 'first_flank_start', first_flank_end_col = 'first_flank_end', second_flank_start_col = 'second_flank_start', second_flank_end_col = 'second_flank_end', dPSI_col=dPSI_col, sig_col = dPSI_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') threeASS_flanks['Event Type'] = '3ASS' spliced_flanks.append(threeASS_flanks) else: print("3' ASS event data (A3SS_events) not provided, skipping") if RI_events is not None: if RI_events['chr'].str.contains('chr').any(): RI_events['chr'] = RI_events['chr'].apply(lambda x: x[3:]) #add event id RI_events['AS ID'] = "RI_" + RI_events.index.astype(str) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > RI_events.shape[0]: RI_processes = 1 else: RI_processes = PROCESSES spliced_events['RI'], RI_ptms = project_ptms_onto_splice_events(RI_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'upstreamEE', region_end_col = 'downstreamES', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', coordinate_type=coordinate_type, start_coordinate_system='0-based', extra_cols = extra_cols, taskbar_label = 'Retained Intron Events', separate_modification_types=separate_modification_types, PROCESSES = RI_processes) RI_ptms['Event Type'] = 'RI' spliced_ptms.append(RI_ptms) #identify ptms with altered flanking sequences if identify_flanking_sequences: print('Identifying flanking sequences for retained intron events.') RI_flanks = fs.get_flanking_changes_from_splice_data(RI_events, ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', spliced_region_start_col = 'upstreamEE', spliced_region_end_col = 'downstreamES', first_flank_start_col = 'upstreamES', first_flank_end_col = 'upstreamEE', second_flank_start_col = 'downstreamES', second_flank_end_col = 'downstreamEE', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based') RI_flanks['Event Type'] = 'RI' spliced_flanks.append(RI_flanks) if MXE_events is not None: if MXE_events['chr'].str.contains('chr').any(): MXE_events['chr'] = MXE_events['chr'].apply(lambda x: x[3:]) #check to make sure there is enough information to do multiprocessing if that is desired if PROCESSES*4 > MXE_events.shape[0]: MXE_processes = 1 else: MXE_processes = PROCESSES #add AS ID MXE_events['AS ID'] = "MXE_" + MXE_events.index.astype(str) mxe_ptms = [] #first mxe exon spliced_events['MXE_Exon1'], MXE_Exon1_ptms = project_ptms_onto_splice_events(MXE_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = '1stExonStart_0base', region_end_col = '1stExonEnd', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', coordinate_type=coordinate_type, start_coordinate_system = '0-based', taskbar_label = 'MXE, First Exon', extra_cols=extra_cols, separate_modification_types=separate_modification_types, PROCESSES = MXE_processes) MXE_Exon1_ptms['Event Type'] = 'MXE (First Exon)' mxe_ptms.append(MXE_Exon1_ptms) #second mxe exon spliced_events['MXE_Exon2'], MXE_Exon2_ptms = project_ptms_onto_splice_events(MXE_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = '2ndExonStart_0base', region_end_col = '2ndExonEnd', event_id_col = 'AS ID', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', extra_cols=extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based', taskbar_label = 'MXE, Second Exon', separate_modification_types=separate_modification_types, PROCESSES = MXE_processes) MXE_Exon2_ptms['Event Type'] = 'MXE (Second Exon)' mxe_ptms.append(MXE_Exon2_ptms) #combine mxe ptms, and then drop any PTMs that were found in both MXE's mxe_ptms = pd.concat([MXE_Exon1_ptms, MXE_Exon2_ptms]) columns_to_check = ['UniProtKB Accession', 'Source of PTM', 'Residue', 'PTM Position in Isoform', 'Modification', 'Modification Class', 'Gene'] if dPSI_col is not None: columns_to_check.append('dPSI') if sig_col is not None: columns_to_check.append('Significance') if extra_cols is not None: columns_to_check += extra_cols mxe_ptms = mxe_ptms.drop_duplicates(subset = columns_to_check, keep = False) #flip dPSI values for second exon if dPSI_col is not None: mxe_ptms['dPSI'] = mxe_ptms.apply(lambda x: x['dPSI']* -1 if x['Event Type'] == 'MXE (Second Exon)' else x['dPSI'], axis = 1) #add mxe ptms to spliced_ptms spliced_ptms.append(mxe_ptms) spliced_ptms = pd.concat(spliced_ptms) if identify_flanking_sequences: spliced_flanks = pd.concat(spliced_flanks) return spliced_events, spliced_ptms, spliced_flanks else: return spliced_events, spliced_ptms #def project_ptms_onto_MAJIQ_dPSI(majiq_data, ptm_coordinates = None, coordinate_type = 'hg38', identify_flanking_sequences = False, dPSI_col = 'dPSI', sig_col = 'FDR', separate_modification_types = False, PROCESSES = 1): # print('in progress') # pass def add_splicegraph_info(psi_data, splicegraph, purpose = 'inclusion'): psi_data = psi_data[psi_data['splice_type'] != 'ME'].copy() if purpose == 'inclusion': #split exons into individual exons psi_data['Individual exon'] = psi_data['exons'].apply(lambda x: x.split(':')) psi_data = psi_data.explode('Individual exon').drop_duplicates() psi_data['Individual exon'] = psi_data['Individual exon'].astype(float) #add gene location information to psi data from spliceseq psi_data = psi_data.merge(splicegraph, left_on = ['symbol', 'Individual exon'], right_on = ['Symbol', 'Exon'], how = 'left') psi_data = psi_data.rename(columns = {'Chr_Start': 'spliced_region_start', 'Chr_Stop': 'spliced_region_end'}) return psi_data elif purpose == 'flanking': print('Not yet active. Please check back later.') else: raise ValueError('Purpose must be either inclusion or flanking. Please provide the correct purpose for the splicegraph information.') def project_ptms_onto_SpliceSeq(psi_data, splicegraph, gene_col ='symbol', dPSI_col = None, sig_col = None, extra_cols = None, coordinate_type = 'hg19', separate_modification_types = False, identify_flanking_sequences = False, flank_size = 5, ptm_coordinates = None, PROCESSES = 1, **kwargs): """ Given splice event quantification from SpliceSeq (such as what can be downloaded from TCGASpliceSeq), annotate with PTMs that are found in the differentially included regions. Parameters ---------- psi_data: pandas.DataFrame dataframe containing splice event quantification from SpliceSeq. Must contain the following columns: 'symbol', 'exons', 'splice_type'. splicegraph: pandas.DataFrame dataframe containing exon information from the splicegraph used during splice event quantification. Must contain the following columns: 'Symbol', 'Exon', 'Chr_Start', 'Chr_Stop'. gene_col: str column name in psi_data that contains the gene name. Default is 'symbol'. dPSI_col: str column name in psi_data that contains the delta PSI value for the splice event. Default is None, which will not include this information in the output. sig_col: str column name in psi_data that contains the significance value for the splice event. Default is None, which will not include this information in the output. extra_cols: list list of additional columns to include in the output dataframe. Default is None, which will not include any additional columns. coordinate_type: str indicates the coordinate system used for the start and end positions. Either hg38 or hg19. Default is 'hg19'. separate_modification_types: bool Indicate whether to store PTM sites with multiple modification types as multiple rows. For example, if a site at K100 was both an acetylation and methylation site, these will be separated into unique rows with the same site number but different modification types. Default is True. identify_flanking_sequences: bool Indicate whether to identify and return the flanking sequences for the splice events. Default is False. flank_size: int Size of the flanking sequence to extract from the splice event. Default is 5, which will extract 5 bases upstream and downstream of the splice event. Only relevant if identify_flanking_sequences is True. PROCESSES: int Number of processes to use for multiprocessing. Default is 1 (single processing). **kwargs: additional keyword arguments Additional keyword arguments to pass to the find_ptms_in_many_regions function, which will be fed into the filter_ptms() function from the helper module. These will be used to filter ptms with lower evidence. For example, if you want to filter PTMs based on the number of MS observations, you can add 'min_MS_observations = 2' to the kwargs. This will filter out any PTMs that have less than 2 MS observations. See the filter_ptms() function for more options. """ #load ptm data from config if not provided if ptm_coordinates is None: ptm_coordinates = pose_config.ptm_coordinates.copy() #check for any keyword arguments to use for filtering if kwargs: filter_arguments = helpers.extract_filter_kwargs(**kwargs) #check any excess unused keyword arguments, report them helpers.check_filter_kwargs(filter_arguments) #filter ptm coordinates file to include only ptms with desired evidence ptm_coordinates = helpers.filter_ptms(ptm_coordinates, **filter_arguments) #remove ME events from this analysis overlapping_columns = set(psi_data.columns).intersection({'Chromosome', 'Strand', 'Chr_Start', 'Chr_Stop'}) if len(overlapping_columns) > 0: #drop columns that will be added from splicegraph psi_data = psi_data.drop(columns=overlapping_columns) print('Removing ME events from analysis') spliced_data = psi_data.copy() spliced_data = spliced_data[spliced_data['splice_type'] != 'ME'].copy() #split exons into individual exons spliced_data['Individual exon'] = spliced_data['exons'].apply(lambda x: x.split(':')) spliced_data = spliced_data.explode('Individual exon').drop_duplicates() spliced_data['Individual exon'] = spliced_data['Individual exon'].astype(float) #add gene location information to psi data from spliceseq spliced_data = spliced_data.merge(splicegraph.copy(), left_on = ['symbol', 'Individual exon'], right_on = ['Symbol', 'Exon'], how = 'left') spliced_data = spliced_data.rename(columns = {'Chr_Start': 'spliced_region_start', 'Chr_Stop': 'spliced_region_end'}) print('Projecting PTMs onto SpliceSeq data') spliced_data, spliced_ptms = project_ptms_onto_splice_events(spliced_data, chromosome_col = 'Chromosome', strand_col = 'Strand', gene_col = 'symbol', region_start_col = 'spliced_region_start', region_end_col = 'spliced_region_end', event_id_col = 'as_id',dPSI_col = dPSI_col, sig_col = sig_col, extra_cols = extra_cols, separate_modification_types = separate_modification_types, coordinate_type = coordinate_type, PROCESSES = PROCESSES) ## add code for extracting flanking sequences (to do) if identify_flanking_sequences: altered_flanks = fs.get_flanking_changes_from_splicegraph(psi_data, splicegraph, dPSI_col = dPSI_col, sig_col = sig_col, extra_cols = extra_cols, gene_col = gene_col, coordinate_type=coordinate_type, flank_size = flank_size) return spliced_data, spliced_ptms, altered_flanks else: return spliced_data, spliced_ptms #def project_ptms_onto_TCGA_SpliceSeq(tcga_cancer = 'PRAD'): # """ # In progress. Will download and process TCGA SpliceSeq data for a specific cancer type, and project PTMs onto the spliced regions. # """ # print('Not yet active. Please check back later.') # pass def check_columns(splice_data, chromosome_col = None, strand_col = None, region_start_col = None, region_end_col = None, first_flank_start_col = None, first_flank_end_col = None, second_flank_start_col = None, second_flank_end_col = None, gene_col = None, dPSI_col = None, sig_col = None, event_id_col = None, extra_cols = None): """ Function to quickly check if the provided column names exist in the dataset and if they are the correct type of data """ expected_cols = [chromosome_col, strand_col, region_start_col, region_end_col, first_flank_start_col, first_flank_end_col, second_flank_start_col, second_flank_end_col, gene_col, dPSI_col, sig_col, event_id_col] expected_dtypes = [[str, object], [str,int, object], [int,float], [int,float], [int,float], [int,float], [int,float], [int,float], [str, object], float, float, None] #remove cols with None and the corresponding dtype entry expected_dtypes = [dtype for col, dtype in zip(expected_cols, expected_dtypes) if col is not None] expected_cols = [col for col in expected_cols if col is not None] #add extra columns to the expected columns list if extra_cols is not None: expected_cols += extra_cols expected_dtypes += [None]*len(extra_cols) #extra columns do not have dtype requirement #check to make sure columns exist in the dataframe if not all([x in splice_data.columns for x in expected_cols]): raise ValueError('Not all expected columns are present in the splice data. Please check the column names and provide the correct names for the following columns: {}'.format([x for x in expected_cols if x not in splice_data.columns])) #check to make sure columns are the correct data type for col, data_type in zip(expected_cols, expected_dtypes): if data_type is None: continue elif isinstance(data_type, list): if splice_data[col].dtype not in data_type: #try converting to the expected data type try: splice_data[col] = splice_data[col].astype(data_type[0]) except: raise ValueError('Column {} is not the expected data type. Expected data type is one of {}, but found data type {}'.format(col, data_type, splice_data[col].dtype)) else: if splice_data[col].dtype != data_type: #try converting to the expected data type try: splice_data[col] = splice_data[col].astype(data_type) except: raise ValueError('Column {} is not the expected data type. Expected data type is {}, but found data type {}'.format(col, data_type, splice_data[col].dtype)) 仍然报错>>> splice_data, spliced_ptms, altered_flanks = project.project_ptms_onto_MATS(SE_events\ = SE_data, MXE_events = MXE_data, A5SS_events = A5SS_data, A3SS_events = A3SS_data, RI_\ events = RI_data, coordinate_type = 'hg38', identify_flanking_sequences = True) Projecting PTMs onto MATS splice events using hg38 coordinates. Skipped Exon events: 0%| | 0/3635 [00:00<?, ?it/s] Traceback (most recent call last): File "", line 1, in <module> splice_data, spliced_ptms, altered_flanks = project.project_ptms_onto_MATS(SE_events = SE_data, MXE_events = MXE_data, A5SS_events = A5SS_data, A3SS_events = A3SS_data, RI_events = RI_data, coordinate_type = 'hg38', identify_flanking_sequences = True) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/ptm_pose/project.py", line 416, in project_ptms_onto_MATS spliced_events['SE'], SE_ptms = project_ptms_onto_splice_events(SE_events, annotate_original_df=True, ptm_coordinates = ptm_coordinates, chromosome_col = 'chr', strand_col = 'strand', region_start_col = 'exonStart_0base', region_end_col = 'exonEnd', dPSI_col=dPSI_col, sig_col = sig_col, gene_col = 'geneSymbol', event_id_col = 'AS ID', extra_cols = extra_cols, coordinate_type=coordinate_type, start_coordinate_system='0-based', taskbar_label = "Skipped Exon events", separate_modification_types=separate_modification_types, PROCESSES = SE_processes) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/ptm_pose/project.py", line 327, in project_ptms_onto_splice_events splice_data, spliced_ptm_info = find_ptms_in_many_regions(splice_data, ptm_coordinates, chromosome_col = chromosome_col, strand_col = strand_col, region_start_col = region_start_col, region_end_col = region_end_col, dPSI_col = dPSI_col, sig_col = sig_col, event_id_col = event_id_col, gene_col = gene_col, extra_cols = extra_cols, annotate_original_df = annotate_original_df, coordinate_type = coordinate_type,start_coordinate_system=start_coordinate_system, end_coordinate_system=end_coordinate_system, taskbar_label = taskbar_label, separate_modification_types=separate_modification_types) ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/ptm_pose/project.py", line 212, in find_ptms_in_many_regions if annotate_original_df: ^^^^^^^^^^^^^^^^^^^^ File "/mnt/940660EA0660CEB4/PTM-POSE/venv/lib/python3.13/site-packages/pandas/core/generic.py", line 1577, in __nonzero__ raise ValueError( ...<2 lines>... ) ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

Traceback (most recent call last): File "/home/sever/PycharmProjects/pythonProject1/5m vs 6m.py", line 8, in <module> data = pd.read_csv(file_path) ^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/io/parsers/readers.py", line 1026, in read_csv return _read(filepath_or_buffer, kwds) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/io/parsers/readers.py", line 626, in _read return parser.read(nrows) ^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/io/parsers/readers.py", line 1968, in read df = DataFrame( ^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/frame.py", line 778, in __init__ mgr = dict_to_mgr(data, index, columns, dtype=dtype, copy=copy, typ=manager) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/internals/construction.py", line 443, in dict_to_mgr arrays = Series(data, index=columns, dtype=object) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/series.py", line 490, in __init__ index = ensure_index(index) ^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/indexes/base.py", line 7647, in ensure_index return Index(index_like, copy=copy, tupleize_cols=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/indexes/base.py", line 565, in __new__ arr = sanitize_array(data, None, dtype=dtype, copy=copy) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/construction.py", line 654, in sanitize_array subarr = maybe_convert_platform(data) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/sever/anaconda3/envs/seaborn/lib/python3.11/site-packages/pandas/core/dtypes/cast.py", line 138, in maybe_convert_platform arr = lib.maybe_convert_objects(arr) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "lib.pyx", line 2538, in pandas._libs.lib.maybe_convert_objects TypeError: Cannot convert numpy.ndarray to numpy.ndarray

最新推荐

recommend-type

langchain4j-anthropic-spring-boot-starter-0.31.0.jar中文文档.zip

1、压缩文件中包含: 中文文档、jar包下载地址、Maven依赖、Gradle依赖、源代码下载地址。 2、使用方法: 解压最外层zip,再解压其中的zip包,双击 【index.html】 文件,即可用浏览器打开、进行查看。 3、特殊说明: (1)本文档为人性化翻译,精心制作,请放心使用; (2)只翻译了该翻译的内容,如:注释、说明、描述、用法讲解 等; (3)不该翻译的内容保持原样,如:类名、方法名、包名、类型、关键字、代码 等。 4、温馨提示: (1)为了防止解压后路径太长导致浏览器无法打开,推荐在解压时选择“解压到当前文件夹”(放心,自带文件夹,文件不会散落一地); (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件。 5、本文件关键字: jar中文文档.zip,java,jar包,Maven,第三方jar包,组件,开源组件,第三方组件,Gradle,中文API文档,手册,开发手册,使用手册,参考手册。
recommend-type

TMS320F28335电机控制程序详解:BLDC、PMSM无感有感及异步VF源代码与开发资料

TMS320F28335这款高性能数字信号处理器(DSP)在电机控制领域的应用,涵盖了BLDC(无刷直流电机)、PMSM(永磁同步电机)的无感有感控制以及异步VF(变频调速)程序。文章不仅解释了各类型的电机控制原理,还提供了完整的开发资料,包括源代码、原理图和说明文档,帮助读者深入了解其工作原理和编程技巧。 适合人群:从事电机控制系统开发的技术人员,尤其是对TMS320F28335感兴趣的工程师。 使用场景及目标:适用于需要掌握TMS320F28335在不同电机控制应用场景下具体实现方法的专业人士,旨在提高他们对该微控制器的理解和实际操作能力。 其他说明:文中提供的开发资料为读者提供了从硬件到软件的全面支持,有助于加速项目开发进程并提升系统性能。
recommend-type

基于爬山搜索法的风力发电MPPT控制Simulink仿真:定步长与变步长算法性能对比 - 爬山搜索法 最新版

基于爬山搜索法的风力发电最大功率点追踪(MPPT)控制的Simulink仿真模型,重点比较了定步长和变步长算法在不同风速条件下的表现。文中展示了两种算法的具体实现方法及其优缺点。定步长算法虽然结构简单、计算量小,但在风速突变时响应较慢,存在明显的稳态振荡。相比之下,变步长算法能够根据功率变化动态调整步长,表现出更快的响应速度和更高的精度,尤其在风速突变时优势明显。实验数据显示,变步长算法在风速从8m/s突增至10m/s的情况下,仅用0.3秒即可稳定,功率波动范围仅为±15W,而定步长算法则需要0.8秒,功率波动达到±35W。 适合人群:从事风力发电研究的技术人员、对MPPT控制感兴趣的工程技术人员以及相关专业的高校师生。 使用场景及目标:适用于风力发电系统的设计与优化,特别是需要提高系统响应速度和精度的场合。目标是在不同风速条件下,选择合适的MPPT算法以最大化风能利用率。 其他说明:文章还讨论了定步长算法在风速平稳情况下的优势,提出了根据不同应用场景灵活选择或组合使用这两种算法的建议。
recommend-type

基于MatlabSimulink的风电场调频策略研究:虚拟惯性、超速减载与下垂控制的协调优化

内容概要:本文详细探讨了在Matlab/Simulink环境下,针对风电场调频的研究,尤其是双馈风机调频策略的应用及其与其他调频策略的协调工作。文中介绍了三种主要的调频策略——虚拟惯性、超速减载和下垂控制,并基于三机九节点系统进行了改进,模拟了四组风电机组的协同调频过程。研究指出,虚拟惯性的应用虽然可以提供短期频率支持,但也可能导致频率二次跌落的问题。因此,需要通过超速减载和下垂控制来进行补偿,以维持电网的稳定。此外,文章还展示了在变风速条件下,风电机组对电网频率支撑能力的显著提升,尤其是在高比例风电并网渗透的情况下,频率最低点提高了50%,验证了调频策略的有效性。 适合人群:从事电力系统、风电场运营管理和调频技术研发的专业人士,以及对风电调频感兴趣的科研人员和技术爱好者。 使用场景及目标:适用于希望深入理解风电场调频机制及其优化方法的人群。目标是掌握不同调频策略的工作原理及其协调工作的关键点,提高风电场的运行效率和稳定性。 其他说明:本文通过具体的案例研究和仿真数据,展示了调频策略的实际效果,强调了合理运用调频策略对于风电场稳定运行的重要意义。同时,也为未来的风电调频技术创新提供了理论依据和实践经验。
recommend-type

Visual C++.NET编程技术实战指南

根据提供的文件信息,可以生成以下知识点: ### Visual C++.NET编程技术体验 #### 第2章 定制窗口 - **设置窗口风格**:介绍了如何通过编程自定义窗口的外观和行为。包括改变窗口的标题栏、边框样式、大小和位置等。这通常涉及到Windows API中的`SetWindowLong`和`SetClassLong`函数。 - **创建六边形窗口**:展示了如何创建一个具有特殊形状边界的窗口,这类窗口不遵循标准的矩形形状。它需要使用`SetWindowRgn`函数设置窗口的区域。 - **创建异形窗口**:扩展了定制窗口的内容,提供了创建非标准形状窗口的方法。这可能需要创建一个不规则的窗口区域,并将其应用到窗口上。 #### 第3章 菜单和控制条高级应用 - **菜单编程**:讲解了如何创建和修改菜单项,处理用户与菜单的交互事件,以及动态地添加或删除菜单项。 - **工具栏编程**:阐述了如何使用工具栏,包括如何创建工具栏按钮、分配事件处理函数,并实现工具栏按钮的响应逻辑。 - **状态栏编程**:介绍了状态栏的创建、添加不同类型的指示器(如文本、进度条等)以及状态信息的显示更新。 - **为工具栏添加皮肤**:展示了如何为工具栏提供更加丰富的视觉效果,通常涉及到第三方的控件库或是自定义的绘图代码。 #### 第5章 系统编程 - **操作注册表**:解释了Windows注册表的结构和如何通过程序对其进行读写操作,这对于配置软件和管理软件设置非常关键。 - **系统托盘编程**:讲解了如何在系统托盘区域创建图标,并实现最小化到托盘、从托盘恢复窗口的功能。 - **鼠标钩子程序**:介绍了钩子(Hook)技术,特别是鼠标钩子,如何拦截和处理系统中的鼠标事件。 - **文件分割器**:提供了如何将文件分割成多个部分,并且能够重新组合文件的技术示例。 #### 第6章 多文档/多视图编程 - **单文档多视**:展示了如何在同一个文档中创建多个视图,这在文档编辑软件中非常常见。 #### 第7章 对话框高级应用 - **实现无模式对话框**:介绍了无模式对话框的概念及其应用场景,以及如何实现和管理无模式对话框。 - **使用模式属性表及向导属性表**:讲解了属性表的创建和使用方法,以及如何通过向导性质的对话框引导用户完成多步骤的任务。 - **鼠标敏感文字**:提供了如何实现点击文字触发特定事件的功能,这在阅读器和编辑器应用中很有用。 #### 第8章 GDI+图形编程 - **图像浏览器**:通过图像浏览器示例,展示了GDI+在图像处理和展示中的应用,包括图像的加载、显示以及基本的图像操作。 #### 第9章 多线程编程 - **使用全局变量通信**:介绍了在多线程环境下使用全局变量进行线程间通信的方法和注意事项。 - **使用Windows消息通信**:讲解了通过消息队列在不同线程间传递信息的技术,包括发送消息和处理消息。 - **使用CriticalSection对象**:阐述了如何使用临界区(CriticalSection)对象防止多个线程同时访问同一资源。 - **使用Mutex对象**:介绍了互斥锁(Mutex)的使用,用以同步线程对共享资源的访问,保证资源的安全。 - **使用Semaphore对象**:解释了信号量(Semaphore)对象的使用,它允许一个资源由指定数量的线程同时访问。 #### 第10章 DLL编程 - **创建和使用Win32 DLL**:介绍了如何创建和链接Win32动态链接库(DLL),以及如何在其他程序中使用这些DLL。 - **创建和使用MFC DLL**:详细说明了如何创建和使用基于MFC的动态链接库,适用于需要使用MFC类库的场景。 #### 第11章 ATL编程 - **简单的非属性化ATL项目**:讲解了ATL(Active Template Library)的基础使用方法,创建一个不使用属性化组件的简单项目。 - **使用ATL开发COM组件**:详细阐述了使用ATL开发COM组件的步骤,包括创建接口、实现类以及注册组件。 #### 第12章 STL编程 - **list编程**:介绍了STL(标准模板库)中的list容器的使用,讲解了如何使用list实现复杂数据结构的管理。 #### 第13章 网络编程 - **网上聊天应用程序**:提供了实现基本聊天功能的示例代码,包括客户端和服务器的通信逻辑。 - **简单的网页浏览器**:演示了如何创建一个简单的Web浏览器程序,涉及到网络通信和HTML解析。 - **ISAPI服务器扩展编程**:介绍了如何开发ISAPI(Internet Server API)服务器扩展来扩展IIS(Internet Information Services)的功能。 #### 第14章 数据库编程 - **ODBC数据库编程**:解释了ODBC(开放数据库互联)的概念,并提供了使用ODBC API进行数据库访问的示例。 - **ADO编程**:介绍了ADO(ActiveX Data Objects)技术,讲解了如何使用ADO进行数据库的增删改查等操作。 #### 第15章 HTML帮助系统编程 - **创建HTML帮助文件**:讲解了如何创建HTML帮助文件(.chm),这是微软帮助系统的一种形式。 - **区分上下文帮助**:提供了在应用程序中实现上下文敏感帮助的技术示例。 #### 第16章 安装和部署项目 - **用安装程序向导创建安装项目**:介绍了如何使用Visual Studio中的安装和部署项目向导,快速创建安装程序。 - **部署应用程序**:阐述了部署应用程序到目标系统的过程,以及如何确保应用程序的正确安装和运行。 以上内容涵盖了Visual C++.NET编程技术体验一书中涉及的各个技术点,并对每个章节中的示例做了详细的说明。这些知识点对于理解和掌握Visual C++.NET编程非常重要,并能够帮助读者在实际项目开发中应用这些技术。
recommend-type

HarmonyOS内核深度探秘:优化自由行旅游系统的策略

# 1. HarmonyOS内核概述与旅游系统需求分析 ## 1.1 HarmonyOS内核概述 HarmonyOS(鸿蒙操作系统)是由华为开发的一个分布式操作系统,旨在提供跨多种设备的无缝体验。其内核设计理念强调高效、安全和模块化,以微内核架构和分布式能力
recommend-type

tkinter模块所有控件

<think>好的,我现在需要回答用户关于Tkinter模块中所有控件列表的问题。首先,我需要回忆Tkinter的基础知识,确保自己了解各个控件的名称和用途。用户希望得到所有支持的控件列表,所以必须全面覆盖,不能遗漏。 首先,我应该从Tkinter的标准控件开始。常见的控件包括Label、Button、Entry这些基础部件。然后是Frame,用于布局,还有Canvas用于绘图。接下来是Checkbutton、Radiobutton,这些属于选择类控件。Listbox和Scrollbar通常一起使用,处理滚动内容。还有Scale(滑块)、Spinbox、Menu、Menubutton这些可能
recommend-type

局域网五子棋游戏:娱乐与聊天的完美结合

标题“网络五子棋”和描述“适合于局域网之间娱乐和聊天!”以及标签“五子棋 网络”所涉及的知识点主要围绕着五子棋游戏的网络版本及其在局域网中的应用。以下是详细的知识点: 1. 五子棋游戏概述: 五子棋是一种两人对弈的纯策略型棋类游戏,又称为连珠、五子连线等。游戏的目标是在一个15x15的棋盘上,通过先后放置黑白棋子,使得任意一方先形成连续五个同色棋子的一方获胜。五子棋的规则简单,但策略丰富,适合各年龄段的玩家。 2. 网络五子棋的意义: 网络五子棋是指可以在互联网或局域网中连接进行对弈的五子棋游戏版本。通过网络版本,玩家不必在同一地点即可进行游戏,突破了空间限制,满足了现代人们快节奏生活的需求,同时也为玩家们提供了与不同对手切磋交流的机会。 3. 局域网通信原理: 局域网(Local Area Network,LAN)是一种覆盖较小范围如家庭、学校、实验室或单一建筑内的计算机网络。它通过有线或无线的方式连接网络内的设备,允许用户共享资源如打印机和文件,以及进行游戏和通信。局域网内的计算机之间可以通过网络协议进行通信。 4. 网络五子棋的工作方式: 在局域网中玩五子棋,通常需要一个客户端程序(如五子棋.exe)和一个服务器程序。客户端负责显示游戏界面、接受用户输入、发送落子请求给服务器,而服务器负责维护游戏状态、处理玩家的游戏逻辑和落子请求。当一方玩家落子时,客户端将该信息发送到服务器,服务器确认无误后将更新后的棋盘状态传回给所有客户端,更新显示。 5. 五子棋.exe程序: 五子棋.exe是一个可执行程序,它使得用户可以在个人计算机上安装并运行五子棋游戏。该程序可能包含了游戏的图形界面、人工智能算法(如果支持单机对战AI的话)、网络通信模块以及游戏规则的实现。 6. put.wav文件: put.wav是一个声音文件,很可能用于在游戏进行时提供声音反馈,比如落子声。在网络环境中,声音文件可能被用于提升玩家的游戏体验,尤其是在局域网多人游戏场景中。当玩家落子时,系统会播放.wav文件中的声音,为游戏增添互动性和趣味性。 7. 网络五子棋的技术要求: 为了确保多人在线游戏的顺利进行,网络五子棋需要具备一些基本的技术要求,包括但不限于稳定的网络连接、高效的数据传输协议(如TCP/IP)、以及安全的数据加密措施(如果需要的话)。此外,还需要有一个良好的用户界面设计来提供直观和舒适的用户体验。 8. 社交与娱乐: 网络五子棋除了是一个娱乐游戏外,它还具有社交功能。玩家可以通过游戏内的聊天系统进行交流,分享经验和策略,甚至通过网络寻找新的朋友。这使得网络五子棋不仅是一个个人娱乐工具,同时也是一种社交活动。 总结来说,网络五子棋结合了五子棋游戏的传统魅力和现代网络技术,使得不同地区的玩家能够在局域网内进行娱乐和聊天,既丰富了人们的娱乐生活,又加强了人际交流。而实现这一切的基础在于客户端程序的设计、服务器端的稳定运行、局域网的高效通信,以及音效文件增强的游戏体验。
recommend-type

自由行旅游新篇章:HarmonyOS技术融合与系统架构深度解析

# 1. HarmonyOS技术概述 ## 1.1 HarmonyOS的起源与发展 HarmonyOS(鸿蒙操作系统)由华为公司开发,旨在构建全场景分布式OS,以应对不同设备间的互联问题。自从2019年首次发布以来,HarmonyOS迅速成长,并迅速应用于智能手机、平板、智能穿戴、车载设备等多种平台。该系
recommend-type

足底支撑相到达73%是什么问题

<think>好的,我现在要处理用户的问题:“足底支撑相达到73%可能的原因及影响”。首先,需要理解用户提到的“足底支撑相73%”具体指什么。足底支撑相通常指步态周期中脚与地面接触的时间占比,正常范围大约在60%-62%左右。如果达到73%,明显超出正常范围,可能意味着步态异常。 接下来,需要确定可能的原因。可能的原因包括生物力学异常,比如足弓异常(扁平足或高弓足)、踝关节活动度受限,或者肌肉力量不足,特别是小腿和足部肌肉。另外,神经系统疾病如脑卒中或帕金森病可能影响步态控制。骨骼关节问题如关节炎、髋膝关节病变也可能导致支撑时间延长。还有代偿性步态,比如因疼痛或受伤而改变步态模式。 然后是