diff --git a/nmr_processing/__init__.py b/nmr_processing/__init__.py index f295c6c..b55b58f 100644 --- a/nmr_processing/__init__.py +++ b/nmr_processing/__init__.py @@ -8,3 +8,6 @@ TODO: Refactor arguments into dataclasses and reenable pylint too-many checks TODO: Test issue creation and url insertion with todo-issues """ + +# TODO: (Future) Make a class for data bundles (including str representation, plot +#   methods, etc) diff --git a/nmr_processing/plotting.py b/nmr_processing/plotting.py index 8620a7f..b0b0ce5 100644 --- a/nmr_processing/plotting.py +++ b/nmr_processing/plotting.py @@ -24,6 +24,7 @@ get_data_from_folder, get_diff_params, get_pseudo2d_data, + pick_peaks_pseudo2d, ) from nmr_processing.utils import find_gamma, nucleus_label @@ -688,16 +689,28 @@ def plot_t2_relaxation( fig.savefig(save_path, bbox_inches="tight", dpi=300) -def diff_plot(peak_ints_norm, exp_path, *, fig_width=8, fig_height=8, save_path=None): +def plot_diffusion( + exp_path, + *, + peak_pos=None, + prominence=None, + fig_width=8, + fig_height=8, + save_path=None, +): """ Plot diffusion attenuation data against gradient strength. Parameters ---------- - peak_ints_norm : array-like - Normalized peak intensities from the diffusion experiment. exp_path : str Path to the experiment directory containing diff.xml metadata. + peak_pos : array-like, optional + Position(s) in ppm of peaks to extract. If None, peaks are automatically + detected automatically using `scipy.signal.find_peaks`. + prominence : number or ndarray or sequence, default: [0.5, 1] + Prominence range passed to `scipy.signal.find_peaks` when peaks are + auto-detected. Not used if `peak_pos` is provided. fig_width : float, default: 8 Figure width in inches. fig_height : float, default: 8 @@ -709,19 +722,25 @@ def diff_plot(peak_ints_norm, exp_path, *, fig_width=8, fig_height=8, save_path= ------- dict Bundle containing the generated figure, axes, and diffusion data. - - TODO: Add data getting to diff_plot so peak_ints_norm is not needed """ - bundle = get_diff_params(exp_path) + bundle = get_pseudo2d_data(exp_path) + bundle.update(get_diff_params(exp_path)) + bundle.update( + pick_peaks_pseudo2d( + bundle, prominence=prominence, peak_pos=peak_pos, normalize=True + ) + ) fig, ax = plt.subplots(figsize=(fig_width, fig_height)) ax.plot( - bundle["gradient_list"], peak_ints_norm, "o" + bundle["gradient_list"], bundle["peak_ints_norm"], "o" ) # , c='red', mfc='blue', mec='blue') ax.set_xlabel(r"Gradient Strength / G cm$\mathregular{^{-1}}$") ax.set_ylabel("Normalized Intensity") + fig.tight_layout() + if save_path: fig.savefig(save_path, bbox_inches="tight", dpi=300) diff --git a/nmr_processing/process_sir.py b/nmr_processing/process_sir.py index 441964d..bbc11a0 100644 --- a/nmr_processing/process_sir.py +++ b/nmr_processing/process_sir.py @@ -23,8 +23,8 @@ from nmr_processing.processing import ( get_1d_data, get_data_from_folder, - get_peak_slice_intensities, get_pseudo2d_data, + pick_peaks_pseudo2d, ) @@ -160,8 +160,8 @@ def process_sir( This function has three methods for determining peak intensities, listed in order of decreasing priority: - 1. Intensities extracted at the provided `peak_pos` values. - 2. Integrated intensities over the provided `regions`. + 1. Integrated intensities over the provided `regions`. + 2. Intensities extracted at the provided `peak_pos` values. 3. Intensities extracted at the automatically-found positions of peaks. This function should also theoretically work for T1 measurements or other pseudo-2D @@ -182,9 +182,11 @@ def process_sir( Returns ------- - tuple - (delays, intensities) where `delays` are the experimental delays and - `intensities` are extracted intensity values for the selected peaks. + delays : np.ndarray + Experimental delays in seconds, with pulse length offset if `delay_offset` + is True. + intensities : np.ndarray + Normalized intensity values for the selected regions/peaks. """ # Load vdlist and interpret suffixes @@ -192,24 +194,10 @@ def process_sir( delays = load_vdlist(vdlist, delay_offset=delay_offset) bundle = get_pseudo2d_data(exp_path, proc_num=proc_num) - x_vals_ppm = np.array(bundle["x_vals_ppm"]) - y_data = np.array(bundle["y_data"]) - - if peak_pos: - intensities = get_peak_slice_intensities(x_vals_ppm, y_data, peak_pos=peak_pos) - elif regions: - ints = [] - for x_min, x_max in regions: - if x_min > x_max: - x_min, x_max = x_max, x_min - idx_filter = (x_vals_ppm >= x_min) & (x_vals_ppm <= x_max) - ints.append(np.trapz(x_vals_ppm[idx_filter], y_data[idx_filter])) - intensities = np.concatenate(ints, axis=1) - else: - peak_pick_bundle = get_peak_slice_intensities( - x_vals_ppm, y_data, prominence=[0.9, 1] - ) - intensities = peak_pick_bundle["peak_ints_norm"] + bundle.update( + pick_peaks_pseudo2d(bundle, peak_pos=peak_pos, regions=regions, normalize=True) + ) + intensities = bundle["peak_ints_norm"] return delays, intensities @@ -282,7 +270,7 @@ def make_cifit_files( make_mch_file( filename, title=title, - sites=intensities.shape[1], + n_sites=intensities.shape[1], initial_mag_guesses=initial_mag_guesses, final_mag_guesses=final_mag_guesses, r1_guesses=r1_guesses, @@ -295,8 +283,8 @@ def make_cifit_files( def make_mch_file( filename, *, - sites=2, - processes=1, + n_sites=2, + n_procs=1, r1_guesses=None, k_guesses=None, initial_mag_guesses=None, @@ -315,9 +303,9 @@ def make_mch_file( ---------- filename : str Base filename for the output file (without extension). - sites : int, default: 2 + n_sites : int, default: 2 Number of sites being analyzed. - processes : int, default: 1 + n_procs : int, default: 1 Number of exchange processes. r1_guesses : list of float, optional Initial guess of R1 relaxation rates in Hz for each site. The default guesses @@ -343,13 +331,13 @@ def make_mch_file( if matrix: matrix = np.array(matrix) - if matrix.shape != (sites, sites): + if matrix.shape != (n_sites, n_sites): raise ValueError( - "matrix must be a square array of shape `sites` by `sites`!" + "matrix must be a square array of shape `n_sites` by `n_sites`!" ) else: - if processes == 1: - matrix = np.ones((sites, sites)) - np.diag(np.ones(sites)) + if n_procs == 1: + matrix = np.ones((n_sites, n_sites)) - np.diag(np.ones(n_sites)) else: raise NotImplementedError( "This function can't currently handle " @@ -359,53 +347,53 @@ def make_mch_file( mch_lines = [title] - mch_lines.extend(["", f"{sites} {processes}"]) + mch_lines.extend(["", f"{n_sites} {n_procs}"]) if not r1_guesses: r1_guesses = [1 / 0.462, 1 / 2.141] # Example reciprocal of LPSC and LZC T1s - elif len(r1_guesses) != sites: - raise ValueError("`r1_guesses` must be of length `sites`!") + elif len(r1_guesses) != n_sites: + raise ValueError("`r1_guesses` must be of length `n_sites`!") mch_lines.extend(["", " ".join(map(str, r1_guesses))]) if specify_vary: - mch_lines.append(" ".join(map(str, np.ones(sites, dtype=np.int8)))) + mch_lines.append(" ".join(map(str, np.ones(n_sites, dtype=np.int8)))) if final_mag_guesses: - if len(final_mag_guesses) != sites: - raise ValueError("`final_mag_guesses` must be of length `sites`!") + if len(final_mag_guesses) != n_sites: + raise ValueError("`final_mag_guesses` must be of length `n_sites`!") else: - final_mag_guesses = np.ones(sites) # Final intensities are normalized, so 1 + final_mag_guesses = np.ones(n_sites) # Final intensities are normalized, so 1 mch_lines.extend(["", " ".join(map(str, final_mag_guesses))]) if specify_vary: - mch_lines.append(" ".join(map(str, np.ones(sites, dtype=np.int8)))) + mch_lines.append(" ".join(map(str, np.ones(n_sites, dtype=np.int8)))) if initial_mag_guesses: - if len(initial_mag_guesses) != sites: - raise ValueError("`initial_mag_guesses` must be of length `sites`!") + if len(initial_mag_guesses) != n_sites: + raise ValueError("`initial_mag_guesses` must be of length `n_sites`!") else: # FIXME: This is a bad guess, it should be more like 1, -1 - initial_mag_guesses = np.ones(sites) + initial_mag_guesses = np.ones(n_sites) mch_lines.extend(["", " ".join(map(str, initial_mag_guesses))]) if specify_vary: - mch_lines.append(" ".join(map(str, np.ones(sites, dtype=np.int8)))) + mch_lines.append(" ".join(map(str, np.ones(n_sites, dtype=np.int8)))) if k_guesses: - if len(k_guesses) != sites: - raise ValueError("`k_guesses` must be of length `sites`!") + if len(k_guesses) != n_sites: + raise ValueError("`k_guesses` must be of length `n_sites`!") else: - k_guesses = np.ones(processes) # FIXME: This is a bad guess + k_guesses = np.ones(n_procs) # FIXME: This is a bad guess - for i in range(processes): + for i in range(n_procs): if specify_vary: # rate guess for the mechanism, with variation specified mch_lines.extend(["", str(k_guesses[i]) + " 1"]) else: # rate guess for the mechanism mch_lines.extend(["", str(k_guesses[i])]) - mch_lines.append(str(sites * (sites - 1))) # number of off-diagonals + mch_lines.append(str(n_sites * (n_sites - 1))) # number of off-diagonals - for j in range(sites): - for k in range(sites): + for j in range(n_sites): + for k in range(n_sites): if matrix[j][k] != 0: mch_lines.append(f"{j} {k} {matrix[j][k]}") @@ -453,11 +441,11 @@ def make_dat_file(filename, delays, intensities, *, title="TEST", peak_names=Non delays = delays.reshape(numpoints, 1) data = np.concatenate((delays, intensities), axis=1) - def format_array(arr): + def _format_array(arr): return np.array2string(arr, precision=6, suppress_small=True) for line in data: - data_lines.append("\t".join(map(format_array, line))) + data_lines.append("\t".join(map(_format_array, line))) # data_str = np.array_str(data, precision=5, suppress_small=True) # data_lines.extend(data_str.strip('[] ').split(']\n [')) @@ -569,7 +557,7 @@ def exp_to_cifit( def plot_cifit_csv( - file_path, *, site_names=None, data_rows=16, fit_rows=101, save_path=None + file_path, *, site_names=None, n_data_rows=16, n_fit_rows=101, save_path=None ): """ Plot CIFIT result data from the output CSV file. @@ -582,7 +570,7 @@ def plot_cifit_csv( Labels for each site to display in plot legend. n_data_rows : int, default: 16 Number of data rows to read from the CSV file. - fit_rows : int, default: 101 + n_fit_rows : int, default: 101 Number of smooth-fit rows to read from the CSV file. save_path : str, optional Output file path for the saved figure. If not specified, saves a PDF file with @@ -594,7 +582,7 @@ def plot_cifit_csv( with open(file_path, "r", encoding="utf-8") as f: f.readline() # skip header lines line = f.readline() - n_cols = len(re.findall(r'[.\d]+', line)) + n_cols = len(re.findall(r"[.\d]+", line)) n_sites = (n_cols - 1) // 3 cols = ["delay"] @@ -602,7 +590,7 @@ def plot_cifit_csv( data_df = pd.read_csv( file_path, sep=None, - nrows=data_rows, + nrows=n_data_rows, skiprows=1, header=None, index_col=False, @@ -616,8 +604,8 @@ def plot_cifit_csv( fit_df = pd.read_csv( file_path, sep=None, - nrows=fit_rows, - skiprows=3 + data_rows, + nrows=n_fit_rows, + skiprows=3 + n_data_rows, header=None, index_col=False, names=cols, @@ -717,7 +705,7 @@ def get_1d_exsy_data(dir_path, exp_nums): # CHECK: order might be messed up in dict? y_data = [exp_bundle["y_data"] for _, exp_bundle in data_bundle.items()] - proc_bundle = get_peak_slice_intensities(x_vals_ppm=x_vals_ppm, y_data=y_data) + proc_bundle = pick_peaks_pseudo2d(x_vals_ppm=x_vals_ppm, y_data=y_data) peak_ints_norm = proc_bundle["peak_ints_norm"] return d15_vals, peak_ints_norm diff --git a/nmr_processing/processing.py b/nmr_processing/processing.py index 3b0e7d9..8f592c2 100644 --- a/nmr_processing/processing.py +++ b/nmr_processing/processing.py @@ -197,10 +197,10 @@ def get_diff_params(exp_path): tree = ElementTree.parse(xml_path) root = tree.getroot() - little_delta = float(root.find(".//little_delta").text) # [ms] + little_delta = float(root.find(".//delta").text) # [ms] little_delta = little_delta / 1000 # [s] - big_delta = float(root.find(".//big_delta").text) # [ms] + big_delta = float(root.find(".//DELTA").text) # [ms] big_delta = big_delta / 1000 # [s] diff_coeff_estimate = float(root.find(".//exDiffCoff").text) # [m2/s] @@ -275,11 +275,13 @@ def get_pseudo2d_data(exp_path, *, proc_num=1): return bundle -def get_peak_slice_intensities( - x_vals_ppm, - y_data, +def pick_peaks_pseudo2d( + bundle=None, *, + x_vals_ppm=None, + y_data=None, peak_pos=None, + regions=None, prominence=None, normalize=True, ): @@ -287,8 +289,17 @@ def get_peak_slice_intensities( Extract peak intensities from each slice of a pseudo-2D dataset. Peak intensities are normalized by the slice with the largest total intensity. + This function has three methods for determining peak intensities, listed in order of + decreasing priority: + 1. Integrated intensities over the provided `regions`. + 2. Intensities extracted at the provided `peak_pos` values. + 3. Intensities extracted at the automatically-found positions of peaks. + Parameters ---------- + bundle : dict, optional + Data bundle containing keys 'x_vals_ppm' and 'y_data' for the pseudo-2D dataset. + If provided, these values will be used instead of the corresponding parameters. x_vals_ppm : array-like X-axis values in ppm. y_data : array-like @@ -296,9 +307,11 @@ def get_peak_slice_intensities( peak_pos : array-like, optional Position(s) in ppm of peaks to extract. If None, peaks are automatically detected automatically using `scipy.signal.find_peaks`. - prominence : number or ndarray or sequence, default: [0.001, 1] + regions : list of tuple, optional + List of ppm ranges to integrate across to determine peak intensities. + prominence : number or ndarray or sequence, default: [0.5, 1] Prominence range passed to `scipy.signal.find_peaks` when peaks are - auto-detected. + auto-detected. Not used if `peak_pos` is provided. normalize : bool, default: True If True, normalize each spectrum by the maximum intensity of all slices. Otherwise, return raw intensities. @@ -309,38 +322,62 @@ def get_peak_slice_intensities( Bundle containing peak indices, ppm positions, raw intensities, and normalized intensities. - TODO: add bundle input to get_peak_slice_intensities - TODO: allow no xdata input to get_peak_slice_intensities + TODO: allow no xdata input to pick_peaks_pseudo2d """ - # Convert data into nparray if not already - x_vals_ppm = np.array(x_vals_ppm) - y_data = np.array(y_data) - - if peak_pos is None: - # Find peaks using find_peaks if peak_pos not provided - if prominence is None: - prominence = [0.001, 1] - # Choose the slice with the highest total intensity for peak picking - best_slice = max(y_data, key=np.sum) - best_slice = best_slice - min(best_slice) - best_slice = best_slice / max(best_slice) - - peak_idx = signal.find_peaks(best_slice, prominence=prominence)[0] + # Unpack or create bundle depending on args + if bundle: + x_vals_ppm = bundle["x_vals_ppm"] + y_data = bundle["y_data"] else: - peak_idx = [np.abs(x_vals_ppm - ppm).argmin() for ppm in peak_pos] - - # ppm values of picked peaks, might be slightly different from input peak_pos - peak_pos = x_vals_ppm[peak_idx] - - # Get intensities of picked peaks in all slices - peak_ints = np.array([[y_slice[i] for i in peak_idx] for y_slice in y_data]) - - bundle = { - "peak_idx": peak_idx, - "peak_pos_ppm": peak_pos, - "peak_ints": peak_ints - } + bundle = {"x_vals_ppm": np.array(x_vals_ppm), "y_data": np.array(y_data)} + + # If regions are provided, integrate across those regions to get peak intensities. + # Otherwise, find intensities at peak positions. + if regions: + bundle["peak_pick_method"] = "integrate_regions" + ints = [] + for x_min, x_max in regions: + if x_min > x_max: + x_min, x_max = x_max, x_min + idx_filter = (x_vals_ppm >= x_min) & (x_vals_ppm <= x_max) + ints.append(np.trapz(x_vals_ppm[idx_filter], y_data[idx_filter])) + + bundle.update( + { + "peak_pick_method": "integrate_regions", + "integration_regions": regions, + "peak_ints": np.concatenate(ints, axis=1), + } + ) + else: + if peak_pos: + bundle["peak_pick_method"] = "peak_positions" + # Find the indices of the closest x_vals_ppm to each peak_pos + peak_idx = [np.abs(x_vals_ppm - ppm).argmin() for ppm in peak_pos] + else: + bundle["peak_pick_method"] = "find_peaks" + # Find peaks using find_peaks if neither peak_pos nor regions provided + if prominence is None: + prominence = [0.5, 1] + # Choose the slice with the highest total intensity for peak picking + best_slice = max(y_data, key=np.sum) + best_slice = best_slice - min(best_slice) + best_slice = best_slice / max(best_slice) + + peak_idx = signal.find_peaks(best_slice, prominence=prominence)[0] + # ppm values of picked peaks, might be slightly different from input peak_pos + peak_positions = x_vals_ppm[peak_idx] + # Get intensities of picked peaks in all slices + peak_ints = np.array([[y_slice[i] for i in peak_idx] for y_slice in y_data]) + + bundle.update( + { + "peak_idx": peak_idx, + "peak_pos_ppm": peak_positions, + "peak_ints": peak_ints, + } + ) if normalize: # Normalize by max intensity of each peak, and add to bundle