diff --git a/doc/source/conf.py b/doc/source/conf.py index 195ee2ff..821fd467 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -62,7 +62,9 @@ # General information about the project. project = "diffpy.snmf" -copyright = "2023-%Y, The Trustees of Columbia University in the City of New York" +copyright = ( + "2023-%Y, The Trustees of Columbia University in the City of New York" +) # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -221,7 +223,13 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - ("index", "diffpy.snmf.tex", "diffpy.snmf Documentation", ab_authors, "manual"), + ( + "index", + "diffpy.snmf.tex", + "diffpy.snmf Documentation", + ab_authors, + "manual", + ), ] # The name of an image file (relative to this directory) to place at the top of @@ -249,7 +257,9 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [("index", "diffpy.snmf", "diffpy.snmf Documentation", ab_authors, 1)] +man_pages = [ + ("index", "diffpy.snmf", "diffpy.snmf Documentation", ab_authors, 1) +] # If true, show URL addresses after external links. # man_show_urls = False diff --git a/pyproject.toml b/pyproject.toml index e7659fd7..08a90b6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,25 +60,23 @@ ignore-words = ".codespell/ignore_words.txt" skip = "*.cif,*.dat" [tool.black] -line-length = 115 +line-length = 79 include = '\.pyi?$' exclude = ''' /( \.git - | \.hg - | \.mypy_cache - | \.tox - | \.venv - | \.rst - | \.txt - | _build - | buck-out - | build - | dist - - # The following are specific to Black, you probably don't want those. - | blib2to3 - | tests/data +| \.hg +| \.mypy_cache +| \.tox +| \.venv +| \.rst +| \.txt +| _build +| buck-out +| build +| dist +| blib2to3 +| tests/data )/ ''' diff --git a/src/diffpy/__init__.py b/src/diffpy/__init__.py index 2baa1eaa..8f8f6bdb 100644 --- a/src/diffpy/__init__.py +++ b/src/diffpy/__init__.py @@ -15,7 +15,6 @@ """Blank namespace package for module diffpy.""" - from pkgutil import extend_path __path__ = extend_path(__path__, __name__) diff --git a/src/diffpy/snmf/main.py b/src/diffpy/snmf/main.py index 40ebb647..25127b44 100644 --- a/src/diffpy/snmf/main.py +++ b/src/diffpy/snmf/main.py @@ -18,6 +18,8 @@ my_model.fit(rho=1e12, eta=610) print("Done") -np.savetxt("my_norm_components.txt", my_model.components_, fmt="%.6g", delimiter=" ") +np.savetxt( + "my_norm_components.txt", my_model.components_, fmt="%.6g", delimiter=" " +) np.savetxt("my_norm_weights.txt", my_model.weights_, fmt="%.6g", delimiter=" ") np.savetxt("my_norm_stretch.txt", my_model.stretch_, fmt="%.6g", delimiter=" ") diff --git a/src/diffpy/snmf/plotter.py b/src/diffpy/snmf/plotter.py index 9d8255ef..ebc2bd7c 100644 --- a/src/diffpy/snmf/plotter.py +++ b/src/diffpy/snmf/plotter.py @@ -6,7 +6,11 @@ class SNMFPlotter: def __init__(self, figsize=(12, 4)): plt.ion() self.fig, self.axes = plt.subplots(1, 3, figsize=figsize) - titles = ["Components", "Weights (rows as series)", "Stretch (rows as series)"] + titles = [ + "Components", + "Weights (rows as series)", + "Stretch (rows as series)", + ] for ax, t in zip(self.axes, titles): ax.set_title(t) self.lines = {"components": [], "weights": [], "stretch": []} diff --git a/src/diffpy/snmf/snmf_class.py b/src/diffpy/snmf/snmf_class.py index 4999168f..13076d3f 100644 --- a/src/diffpy/snmf/snmf_class.py +++ b/src/diffpy/snmf/snmf_class.py @@ -132,14 +132,18 @@ def __init__( # Initialize weights and determine number of components if init_weights is None: self.n_components = n_components - self.weights_ = self._rng.beta(a=2.0, b=2.0, size=(self.n_components, self.n_signals)) + self.weights_ = self._rng.beta( + a=2.0, b=2.0, size=(self.n_components, self.n_signals) + ) else: self.n_components = init_weights.shape[0] self.weights_ = init_weights # Initialize stretching matrix if not provided if init_stretch is None: - self.stretch_ = np.ones((self.n_components, self.n_signals)) + self._rng.normal( + self.stretch_ = np.ones( + (self.n_components, self.n_signals) + ) + self._rng.normal( 0, 1e-3, size=(self.n_components, self.n_signals) ) else: @@ -147,7 +151,9 @@ def __init__( # Initialize component matrix if not provided if init_components is None: - self.components_ = self._rng.random((self.signal_length, self.n_components)) + self.components_ = self._rng.random( + (self.signal_length, self.n_components) + ) else: self.components_ = init_components @@ -162,7 +168,9 @@ def __init__( # Second-order spline: Tridiagonal (-2 on diagonal, 1 on sub/superdiagonals) self._spline_smooth_operator = 0.25 * diags( - [1, -2, 1], offsets=[0, 1, 2], shape=(self.n_signals - 2, self.n_signals) + [1, -2, 1], + offsets=[0, 1, 2], + shape=(self.n_signals - 2, self.n_signals), ) def fit(self, rho=0, eta=0, reset=True): @@ -196,7 +204,11 @@ def fit(self, rho=0, eta=0, reset=True): self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.best_objective = self.objective_function - self.best_matrices = [self.components_.copy(), self.weights_.copy(), self.stretch_.copy()] + self.best_matrices = [ + self.components_.copy(), + self.weights_.copy(), + self.stretch_.copy(), + ] self.objective_difference = None self._objective_history = [self.objective_function] @@ -206,9 +218,16 @@ def fit(self, rho=0, eta=0, reset=True): self._prev_grad_components = np.zeros_like(self.components_) regularization_term = ( - 0.5 * self.rho * np.linalg.norm(self._spline_smooth_operator @ self.stretch_.T, "fro") ** 2 + 0.5 + * self.rho + * np.linalg.norm( + self._spline_smooth_operator @ self.stretch_.T, "fro" + ) + ** 2 ) - sparsity_term = self.eta * np.sum(np.sqrt(self.components_)) # Square root penalty + sparsity_term = self.eta * np.sum( + np.sqrt(self.components_) + ) # Square root penalty print( f"Start, Objective function: {self.objective_function:.5e}" f", Obj - reg/sparse: {self.objective_function - regularization_term - sparsity_term:.5e}" @@ -220,9 +239,16 @@ def fit(self, rho=0, eta=0, reset=True): self.outer_loop() # Print diagnostics regularization_term = ( - 0.5 * self.rho * np.linalg.norm(self._spline_smooth_operator @ self.stretch_.T, "fro") ** 2 + 0.5 + * self.rho + * np.linalg.norm( + self._spline_smooth_operator @ self.stretch_.T, "fro" + ) + ** 2 ) - sparsity_term = self.eta * np.sum(np.sqrt(self.components_)) # Square root penalty + sparsity_term = self.eta * np.sum( + np.sqrt(self.components_) + ) # Square root penalty print( f"Obj fun: {self.objective_function:.5e}, " f"Obj - reg/sparse: {self.objective_function - regularization_term - sparsity_term:.5e}, " @@ -230,8 +256,16 @@ def fit(self, rho=0, eta=0, reset=True): ) # Convergence check: Stop if diffun is small and at least min_iter iterations have passed - print("Checking if ", self.objective_difference, " < ", self.objective_function * self.tol) - if self.objective_difference < self.objective_function * self.tol and outiter >= self.min_iter: + print( + "Checking if ", + self.objective_difference, + " < ", + self.objective_function * self.tol, + ) + if ( + self.objective_difference < self.objective_function * self.tol + and outiter >= self.min_iter + ): break self.normalize_results() @@ -251,8 +285,12 @@ def normalize_results(self): self.stretch_ = self.stretch_ / stretch_row_max # effectively just re-running with component updates only vs normalized weights/stretch - self._grad_components = np.zeros_like(self.components_) # Gradient of X (zeros for now) - self._prev_grad_components = np.zeros_like(self.components_) # Previous gradient of X (zeros for now) + self._grad_components = np.zeros_like( + self.components_ + ) # Gradient of X (zeros for now) + self._prev_grad_components = np.zeros_like( + self.components_ + ) # Previous gradient of X (zeros for now) self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.objective_difference = None @@ -265,9 +303,13 @@ def normalize_results(self): self.update_components() self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after normalize_components: {self.objective_function:.5e}") + print( + f"Objective function after normalize_components: {self.objective_function:.5e}" + ) self._objective_history.append(self.objective_function) - self.objective_difference = self._objective_history[-2] - self._objective_history[-1] + self.objective_difference = ( + self._objective_history[-2] - self._objective_history[-1] + ) if self.plotter is not None: self.plotter.update( components=self.components_, @@ -275,7 +317,10 @@ def normalize_results(self): stretch=self.stretch_, update_tag="normalize components", ) - if self.objective_difference < self.objective_function * self.tol and outiter >= 7: + if ( + self.objective_difference < self.objective_function * self.tol + and outiter >= 7 + ): break def outer_loop(self): @@ -285,12 +330,20 @@ def outer_loop(self): self.update_components() self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after update_components: {self.objective_function:.5e}") + print( + f"Objective function after update_components: {self.objective_function:.5e}" + ) self._objective_history.append(self.objective_function) - self.objective_difference = self._objective_history[-2] - self._objective_history[-1] + self.objective_difference = ( + self._objective_history[-2] - self._objective_history[-1] + ) if self.objective_function < self.best_objective: self.best_objective = self.objective_function - self.best_matrices = [self.components_.copy(), self.weights_.copy(), self.stretch_.copy()] + self.best_matrices = [ + self.components_.copy(), + self.weights_.copy(), + self.stretch_.copy(), + ] if self.plotter is not None: self.plotter.update( components=self.components_, @@ -302,33 +355,60 @@ def outer_loop(self): self.update_weights() self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after update_weights: {self.objective_function:.5e}") + print( + f"Objective function after update_weights: {self.objective_function:.5e}" + ) self._objective_history.append(self.objective_function) - self.objective_difference = self._objective_history[-2] - self._objective_history[-1] + self.objective_difference = ( + self._objective_history[-2] - self._objective_history[-1] + ) if self.objective_function < self.best_objective: self.best_objective = self.objective_function - self.best_matrices = [self.components_.copy(), self.weights_.copy(), self.stretch_.copy()] + self.best_matrices = [ + self.components_.copy(), + self.weights_.copy(), + self.stretch_.copy(), + ] if self.plotter is not None: self.plotter.update( - components=self.components_, weights=self.weights_, stretch=self.stretch_, update_tag="weights" + components=self.components_, + weights=self.weights_, + stretch=self.stretch_, + update_tag="weights", ) - self.objective_difference = self._objective_history[-2] - self._objective_history[-1] - if self._objective_history[-3] - self.objective_function < self.objective_difference * 1e-3: + self.objective_difference = ( + self._objective_history[-2] - self._objective_history[-1] + ) + if ( + self._objective_history[-3] - self.objective_function + < self.objective_difference * 1e-3 + ): break self.update_stretch() self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after update_stretch: {self.objective_function:.5e}") + print( + f"Objective function after update_stretch: {self.objective_function:.5e}" + ) self._objective_history.append(self.objective_function) - self.objective_difference = self._objective_history[-2] - self._objective_history[-1] + self.objective_difference = ( + self._objective_history[-2] - self._objective_history[-1] + ) if self.objective_function < self.best_objective: self.best_objective = self.objective_function - self.best_matrices = [self.components_.copy(), self.weights_.copy(), self.stretch_.copy()] + self.best_matrices = [ + self.components_.copy(), + self.weights_.copy(), + self.stretch_.copy(), + ] if self.plotter is not None: self.plotter.update( - components=self.components_, weights=self.weights_, stretch=self.stretch_, update_tag="stretch" + components=self.components_, + weights=self.weights_, + stretch=self.stretch_, + update_tag="stretch", ) def get_residual_matrix(self, components=None, weights=None, stretch=None): @@ -384,7 +464,9 @@ def get_objective_function(self, residuals=None, stretch=None): spline_smooth_operator=self._spline_smooth_operator, ) - def compute_stretched_components(self, components=None, weights=None, stretch=None): + def compute_stretched_components( + self, components=None, weights=None, stretch=None + ): """ Interpolates each component along its sample axis according to per-(component, signal) stretch factors, then applies per-(component, signal) weights. Also computes the @@ -429,7 +511,10 @@ def compute_stretched_components(self, components=None, weights=None, stretch=No stretch_inv = 1.0 / stretch # Apply stretching to the original sample indices, represented as a "time-stretch" - t = np.arange(signal_len, dtype=float)[:, None, None] * stretch_inv[None, :, :] + t = ( + np.arange(signal_len, dtype=float)[:, None, None] + * stretch_inv[None, :, :] + ) # has shape (signal_len, n_components, n_signals) # For each stretched coordinate, find its prior integer (original) index and their difference @@ -442,7 +527,9 @@ def compute_stretched_components(self, components=None, weights=None, stretch=No i1 = np.clip(i0 + 1, 0, max_idx) # Gather sample values - comps_3d = components[:, :, None] # expand components by a dimension for broadcasting across n_signals + comps_3d = components[ + :, :, None + ] # expand components by a dimension for broadcasting across n_signals c0 = np.take_along_axis(comps_3d, i0, axis=0) # left sample values c1 = np.take_along_axis(comps_3d, i1, axis=0) # right sample values @@ -452,7 +539,9 @@ def compute_stretched_components(self, components=None, weights=None, stretch=No # Derivatives di = -t * stretch_inv[None, :, :] # first-derivative coefficient - ddi = -di * stretch_inv[None, :, :] * 2.0 # second-derivative coefficient + ddi = ( + -di * stretch_inv[None, :, :] * 2.0 + ) # second-derivative coefficient d_unweighted = c0 * (-di) + c1 * di dd_unweighted = c0 * (-ddi) + c1 * ddi @@ -467,7 +556,9 @@ def compute_stretched_components(self, components=None, weights=None, stretch=No dd_weighted.reshape(signal_len, n_components * n_signals), ) - def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None): + def apply_transformation_matrix( + self, stretch=None, weights=None, residuals=None + ): """ Computes the transformation matrix `stretch_transformed` for residuals, using scaling matrix `stretch` and weight coefficients `weights`. @@ -482,7 +573,9 @@ def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None # Compute scaling matrix stretch_tiled = np.tile( - stretch.reshape(1, self.n_signals * self.n_components, order="F") ** -1, (self.signal_length, 1) + stretch.reshape(1, self.n_signals * self.n_components, order="F") + ** -1, + (self.signal_length, 1), ) # Compute indices @@ -490,7 +583,8 @@ def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None # Weighting coefficients weights_tiled = np.tile( - weights.reshape(1, self.n_signals * self.n_components, order="F"), (self.signal_length, 1) + weights.reshape(1, self.n_signals * self.n_components, order="F"), + (self.signal_length, 1), ) # Compute floor indices @@ -502,7 +596,9 @@ def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None fractional_indices = indices - floor_indices # Expand row indices - repm = np.tile(np.arange(self.n_components), (self.signal_length, self.n_signals)) + repm = np.tile( + np.arange(self.n_components), (self.signal_length, self.n_signals) + ) # Compute transformations kron = np.kron(residuals, np.ones((1, self.n_components))) @@ -511,11 +607,17 @@ def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None # Construct sparse matrices x2 = coo_matrix( - ((-kron * fractional_weights).flatten(), (floor_indices_1.flatten() - 1, repm.flatten())), + ( + (-kron * fractional_weights).flatten(), + (floor_indices_1.flatten() - 1, repm.flatten()), + ), shape=(self.signal_length + 1, self.n_components), ).tocsc() x3 = coo_matrix( - ((fractional_kron * weights_tiled).flatten(), (floor_indices_2.flatten() - 1, repm.flatten())), + ( + (fractional_kron * weights_tiled).flatten(), + (floor_indices_2.flatten() - 1, repm.flatten()), + ), shape=(self.signal_length + 1, self.n_components), ).tocsc() @@ -553,7 +655,9 @@ def solve_quadratic_program(self, t, m): k = q.shape[0] # Number of variables # Regularize q to ensure positive semi-definiteness - reg_factor = 1e-8 * np.linalg.norm(q, ord="fro") # Adaptive regularization, original was fixed + reg_factor = 1e-8 * np.linalg.norm( + q, ord="fro" + ) # Adaptive regularization, original was fixed q += np.eye(k) * reg_factor # Define optimization variable @@ -570,39 +674,51 @@ def solve_quadratic_program(self, t, m): prob.solve(solver=cp.OSQP, verbose=False) # Get the solution - return np.maximum(y.value, 0) # Ensure non-negative values in case of solver tolerance issues + return np.maximum( + y.value, 0 + ) # Ensure non-negative values in case of solver tolerance issues def update_components(self): """ Updates `components` using gradient-based optimization with adaptive step size. """ # Compute stretched components using the interpolation function - stretched_components, _, _ = self.compute_stretched_components() # Discard the derivatives + stretched_components, _, _ = ( + self.compute_stretched_components() + ) # Discard the derivatives # Compute reshaped_stretched_components and component_residuals - intermediate_reshaped = stretched_components.flatten(order="F").reshape( + intermediate_reshaped = stretched_components.flatten( + order="F" + ).reshape( (self.signal_length * self.n_signals, self.n_components), order="F" ) - reshaped_stretched_components = intermediate_reshaped.sum(axis=1).reshape( - (self.signal_length, self.n_signals), order="F" + reshaped_stretched_components = intermediate_reshaped.sum( + axis=1 + ).reshape((self.signal_length, self.n_signals), order="F") + component_residuals = ( + reshaped_stretched_components - self.source_matrix ) - component_residuals = reshaped_stretched_components - self.source_matrix # Compute gradient self._grad_components = self.apply_transformation_matrix( residuals=component_residuals ).toarray() # toarray equivalent of full, make non-sparse # Compute initial step size `initial_step_size` - initial_step_size = np.linalg.eigvalsh(self.weights_.T @ self.weights_).max() * np.max( - [self.stretch_.max(), 1 / self.stretch_.min()] - ) + initial_step_size = np.linalg.eigvalsh( + self.weights_.T @ self.weights_ + ).max() * np.max([self.stretch_.max(), 1 / self.stretch_.min()]) # Compute adaptive step size `step_size` if self.outiter == 0 and self.iter == 0: step_size = initial_step_size else: num = np.sum( - (self._grad_components - self._prev_grad_components) * (self.components_ - self._prev_components) + (self._grad_components - self._prev_grad_components) + * (self.components_ - self._prev_components) ) # Element-wise multiplication - denom = np.linalg.norm(self.components_ - self._prev_components, "fro") ** 2 # Frobenius norm squared + denom = ( + np.linalg.norm(self.components_ - self._prev_components, "fro") + ** 2 + ) # Frobenius norm squared step_size = num / denom if denom > 0 else initial_step_size if step_size <= 0: step_size = initial_step_size @@ -611,9 +727,15 @@ def update_components(self): self._prev_components = self.components_.copy() while True: # iterate updating components - components_step = self._prev_components - self._grad_components / step_size + components_step = ( + self._prev_components - self._grad_components / step_size + ) # Solve x^3 + p*x + q = 0 for the largest real root - self.components_ = np.square(cubic_largest_real_root(-components_step, self.eta / (2 * step_size))) + self.components_ = np.square( + cubic_largest_real_root( + -components_step, self.eta / (2 * step_size) + ) + ) # Mask values that should be set to zero mask = ( self.components_**2 * step_size / 2 @@ -623,7 +745,9 @@ def update_components(self): ) self.components_ = mask * self.components_ - objective_improvement = self._objective_history[-1] - self.get_objective_function( + objective_improvement = self._objective_history[ + -1 + ] - self.get_objective_function( residuals=self.get_residual_matrix() ) @@ -646,7 +770,10 @@ def update_weights(self): # Stretch factors for this signal across components: this_stretch = self.stretch_[:, signal] # Build stretched_comps[:, k] by interpolating component at frac. pos. index / this_stretch[comp] - stretched_comps = np.empty((self.signal_length, self.n_components), dtype=self.components_.dtype) + stretched_comps = np.empty( + (self.signal_length, self.n_components), + dtype=self.components_.dtype, + ) for comp in range(self.n_components): pos = sample_indices / this_stretch[comp] stretched_comps[:, comp] = np.interp( @@ -658,29 +785,40 @@ def update_weights(self): ) # Solve quadratic problem for a given signal and update its weight - new_weight = self.solve_quadratic_program(t=stretched_comps, m=signal) + new_weight = self.solve_quadratic_program( + t=stretched_comps, m=signal + ) self.weights_[:, signal] = new_weight def regularize_function(self, stretch=None): if stretch is None: stretch = self.stretch_ - stretched_components, d_stretch_comps, dd_stretch_comps = self.compute_stretched_components( - stretch=stretch + stretched_components, d_stretch_comps, dd_stretch_comps = ( + self.compute_stretched_components(stretch=stretch) ) intermediate = stretched_components.flatten(order="F").reshape( (self.signal_length * self.n_signals, self.n_components), order="F" ) residuals = ( - intermediate.sum(axis=1).reshape((self.signal_length, self.n_signals), order="F") - self.source_matrix + intermediate.sum(axis=1).reshape( + (self.signal_length, self.n_signals), order="F" + ) + - self.source_matrix ) fun = self.get_objective_function(residuals, stretch) tiled_res = np.tile(residuals, (1, self.n_components)) grad_flat = np.sum(d_stretch_comps * tiled_res, axis=0) - gra = grad_flat.reshape((self.n_signals, self.n_components), order="F").T - gra += self.rho * stretch @ (self._spline_smooth_operator.T @ self._spline_smooth_operator) + gra = grad_flat.reshape( + (self.n_signals, self.n_components), order="F" + ).T + gra += ( + self.rho + * stretch + @ (self._spline_smooth_operator.T @ self._spline_smooth_operator) + ) # Hessian would go here @@ -696,13 +834,17 @@ def update_stretch(self): # Define the optimization function def objective(stretch_vec): - stretch_matrix = stretch_vec.reshape(self.stretch_.shape) # Reshape back to matrix form + stretch_matrix = stretch_vec.reshape( + self.stretch_.shape + ) # Reshape back to matrix form fun, gra = self.regularize_function(stretch_matrix) gra = gra.flatten() return fun, gra # Optimization constraints: lower bound 0.1, no upper bound - bounds = [(0.1, None)] * stretch_flat_initial.size # Equivalent to 0.1 * ones(K, M) + bounds = [ + (0.1, None) + ] * stretch_flat_initial.size # Equivalent to 0.1 * ones(K, M) # Solve optimization problem (equivalent to fmincon) result = minimize( @@ -717,7 +859,9 @@ def objective(stretch_vec): self.stretch_ = result.x.reshape(self.stretch_.shape) @staticmethod - def _compute_objective_function(components, residuals, stretch, rho, eta, spline_smooth_operator): + def _compute_objective_function( + components, residuals, stretch, rho, eta, spline_smooth_operator + ): r""" Computes the objective function used in stretched non-negative matrix factorization. @@ -765,7 +909,11 @@ def _compute_objective_function(components, residuals, stretch, rho, eta, spline """ residual_term = 0.5 * np.linalg.norm(residuals, "fro") ** 2 - regularization_term = 0.5 * rho * np.linalg.norm(spline_smooth_operator @ stretch.T, "fro") ** 2 + regularization_term = ( + 0.5 + * rho + * np.linalg.norm(spline_smooth_operator @ stretch.T, "fro") ** 2 + ) sparsity_term = eta * np.sum(np.sqrt(components)) return residual_term + regularization_term + sparsity_term @@ -775,7 +923,9 @@ def cubic_largest_real_root(p, q): Solves x^3 + p*x + q = 0 element-wise for matrices, returning the largest real root. """ # Handle special case where q == 0 - y = np.where(q == 0, np.maximum(0, -p) ** 0.5, np.zeros_like(p)) # q=0 case + y = np.where( + q == 0, np.maximum(0, -p) ** 0.5, np.zeros_like(p) + ) # q=0 case # Compute discriminant delta = (q / 2) ** 2 + (p / 3) ** 3 @@ -798,7 +948,9 @@ def cubic_largest_real_root(p, q): # Take the largest real root element-wise when delta < 0 real_roots = np.stack([np.real(y1), np.real(y2), np.real(y3)], axis=0) - y = np.max(real_roots, axis=0) * (delta < 0) # Keep only real roots when delta < 0 + y = np.max(real_roots, axis=0) * ( + delta < 0 + ) # Keep only real roots when delta < 0 return y @@ -832,7 +984,10 @@ def reconstruct_matrix(components, weights, stretch): for comp in range(n_components): # loop over components reconstructed_matrix += ( np.interp( - sample_indices[:, None] / stretch[comp][None, :], # fractional positions (signal_len, n_signals) + sample_indices[:, None] + / stretch[comp][ + None, : + ], # fractional positions (signal_len, n_signals) sample_indices, # (signal_len,) components[:, comp], # component profile (signal_len,) left=components[0, comp], diff --git a/tests/test_snmf_optimizer.py b/tests/test_snmf_optimizer.py index 48c0a27f..e85b6e75 100644 --- a/tests/test_snmf_optimizer.py +++ b/tests/test_snmf_optimizer.py @@ -15,13 +15,17 @@ "init_weights.txt", ] _missing = [f for f in _required if not (DATA_DIR / f).exists()] -pytestmark = pytest.mark.skipif(_missing, reason=f"Missing test data files: {_missing}") +pytestmark = pytest.mark.skipif( + _missing, reason=f"Missing test data files: {_missing}" +) @pytest.fixture(scope="module") def inputs(): return { - "components": np.loadtxt(DATA_DIR / "init_components.txt", dtype=float), + "components": np.loadtxt( + DATA_DIR / "init_components.txt", dtype=float + ), "source": np.loadtxt(DATA_DIR / "source_matrix.txt", dtype=float), "stretch": np.loadtxt(DATA_DIR / "init_stretch.txt", dtype=float), "weights": np.loadtxt(DATA_DIR / "init_weights.txt", dtype=float), diff --git a/tests/test_version.py b/tests/test_version.py index eccc3cf3..d3293db5 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -1,5 +1,4 @@ -"""Unit tests for __version__.py -""" +"""Unit tests for __version__.py""" import diffpy.snmf