diff --git a/scripts/coercion.py b/scripts/coercion.py deleted file mode 100644 index 4220327..0000000 --- a/scripts/coercion.py +++ /dev/null @@ -1,92 +0,0 @@ -from __future__ import annotations -from typing import List, Tuple, Dict -import argparse, math, numpy as np, os, pickle -import matplotlib.pyplot as plt -import matplotlib.ticker as mtick -from multiprocessing import Pool, cpu_count -from pathlib import Path - -import squish.ordered as order -from squish import Simulation, DomainParams -from squish.common import Energy, OUTPUT_DIR - - -def axis_settings(ax, widths): - ax.invert_xaxis() - ax.grid(zorder=0) - ax.set_xticks([round(w,2) for w in widths[::-2]]) - ax.set_xticklabels(ax.get_xticks(), rotation = 90) - plt.subplots_adjust(.07, .12, .97, .9) - - -def main(): - # Loading arguments. - parser = argparse.ArgumentParser("Outputs ordered equilibria lowest eigenvalues.") - parser.add_argument('n_objects', metavar='N', type=int, - help="folder that contains simulation files, or cached data file.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") - - args = parser.parse_args() - - widths = np.round(np.linspace(3.0, 10.0, 141),2) - - values = [] - store_data = {} - - for i, width in enumerate(widths): - domain = DomainParams(args.n_objects, width, 10, 4.0) - eig_vals = [] - store_data[width] = {} - configs = order.configurations(domain) - for j, config in enumerate(configs): - if config == (1,0): - continue - points = order.sites(domain, config) - - hess = Energy("radial-t").mode(*domain, points).hessian(10e-5) - eigs = np.sort(np.linalg.eig(hess)[0])[::-1] - store_data[width][config] = eigs - - # zero_ind = np.where(np.isclose(eigs, 0))[0][0] - # if zero_ind == 0: - # eig_vals.append(eigs[2]) - # else: - # eig_vals.append(eigs[zero_ind-1]) - - hashes = int(21*j/len(widths)) - print(f'Generating at {width}, {i+1}/{len(widths)}... |{"#"*hashes}{" "*(20-hashes)}|' + \ - f' {j+1}/{len(configs)} configs done.', flush=True, end='\r') - - print(flush=True) - - with open("coercivity_eigs.pkl", "wb") as f: - pickle.dump(store_data, f, pickle.HIGHEST_PROTOCOL) - - return - fig, ax = plt.subplots(figsize=(12, 8)) - plt.subplots_adjust(.07, .12, .97, .9) - - ax.plot(widths, values) - - ax.invert_xaxis() - ax.grid(zorder=0) - ax.set_xticks([round(w,2) for w in widths[::-2]]) - ax.set_xticklabels(ax.get_xticks(), rotation = 90) - - fig.suptitle("Coercivity") - #ax.set_xlim([0, 5]) - ax.legend() - ax.set_xlabel("Width") - ax.set_ylabel("Smallest positive eigenvalue") - - fig.savefig(OUTPUT_DIR / "Coercivity.png") - print(f"Wrote to {OUTPUT_DIR / 'Coercivity.png'}") - - -if __name__ == '__main__': - os.environ["QT_LOGGING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file diff --git a/scripts/coercivity.py b/scripts/coercivity.py new file mode 100644 index 0000000..dc3ea41 --- /dev/null +++ b/scripts/coercivity.py @@ -0,0 +1,113 @@ +from __future__ import annotations +from typing import List, Tuple, Dict +import argparse, math, numpy as np, os, pickle +import matplotlib.pyplot as plt +import matplotlib.ticker as mtick +from multiprocessing import Pool, cpu_count +from pathlib import Path + +import squish.ordered as order +from squish import Simulation, DomainParams +from squish.common import Energy, OUTPUT_DIR + + +def axis_settings(ax, widths): + ax.invert_xaxis() + ax.grid(zorder=0) + ax.set_xticks([round(w, 2) for w in widths[::-2]]) + ax.set_xticklabels(ax.get_xticks(), rotation=90) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) + + +def main(): + # Loading arguments. + parser = argparse.ArgumentParser("Outputs ordered equilibria lowest eigenvalues.") + parser.add_argument( + "n_objects", + metavar="N", + type=int, + help="folder that contains simulation files, or cached data file.", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) + + args = parser.parse_args() + + widths = np.round(np.linspace(3.0, 10.0, 141), 2) + + values = [] + with open("coercivity_eigs.pkl", "rb") as f: + store_data = pickle.load(f) + + for width in widths: + eig_vals = [] + for config, eigs in store_data[width].items(): + zero_ind = np.where(np.isclose(eigs, 0, atol=1e-4))[0][0] + if zero_ind == 0: + eig_vals.append(eigs[2]) + else: + eig_vals.append(eigs[0]) + + values.append(min(eig_vals)) + + # for i, width in enumerate(widths): + # domain = DomainParams(args.n_objects, width, 10, 4.0) + # eig_vals = [] + # store_data[width] = {} + # configs = order.configurations(domain) + # for j, config in enumerate(configs): + # if config == (1,0): + # continue + # points = order.sites(domain, config) + + # hess = Energy("radial-t").mode(*domain, points).hessian(10e-5) + # eigs = np.sort(np.linalg.eig(hess)[0])[::-1] + # store_data[width][config] = eigs + + # zero_ind = np.where(np.isclose(eigs, 0))[0][0] + # if zero_ind == 0: + # eig_vals.append(eigs[2]) + # else: + # eig_vals.append(eigs[0]) + + # hashes = int(21*j/len(widths)) + # print(f'Generating at {width}, {i+1}/{len(widths)}... |{"#"*hashes}{" "*(20-hashes)}|' + \ + # f' {j+1}/{len(configs)} configs done.', flush=True, end='\r') + + # print(flush=True) + + # with open("coercivity_eigs.pkl", "wb") as f: + # pickle.dump(store_data, f, pickle.HIGHEST_PROTOCOL) + + # return + fig, ax = plt.subplots(figsize=(12, 8)) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) + + ax.plot(widths, values) + + ax.invert_xaxis() + ax.grid(zorder=0) + ax.set_xticks([round(w, 2) for w in widths[::-2]]) + ax.set_xticklabels(ax.get_xticks(), rotation=90) + + fig.suptitle("Coercivity") + # ax.set_xlim([0, 5]) + ax.set_xlabel("Width") + ax.set_ylabel("Eigenvalue") + + fig.savefig(OUTPUT_DIR / "Coercivity.png") + print(f"Wrote to {OUTPUT_DIR / 'Coercivity.png'}") + + +if __name__ == "__main__": + os.environ["QT_LOGGING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.") diff --git a/scripts/convergence.py b/scripts/convergence.py index 5b64521..19667a0 100644 --- a/scripts/convergence.py +++ b/scripts/convergence.py @@ -9,50 +9,63 @@ from squish.common import OUTPUT_DIR def main(): - parser = argparse.ArgumentParser("Graphs convergence graphs for a collection of simulations.") - parser.add_argument('sims_path', metavar='path/to/data', - help="folder that contains simulation files at various step sizes.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") + parser = argparse.ArgumentParser( + "Graphs convergence graphs for a collection of simulations." + ) + parser.add_argument( + "sims_path", + metavar="path/to/data", + help="folder that contains simulation files at various step sizes.", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) - args = parser.parse_args() - data = {} + args = parser.parse_args() + data = {} - for file in Path(args.sims_path).iterdir(): - sim, frames = Simulation.load(file / "data.squish") + for file in Path(args.sims_path).iterdir(): + sim, frames = Simulation.load(file) - step = sim.step_size - data[step] = {"times": [], "values": [], "diffs": []} - for i, frame_info in enumerate(frames): - data[step]["times"].append(step*i) - data[step]["values"].append(np.linalg.norm(frame_info["arr"])) - data[step]["diffs"].append(np.linalg.norm(all_info[-1]["arr"] - frame_info["arr"])) + step = sim.step_size + data[step] = {"times": [], "values": [], "diffs": []} + for i, frame_info in enumerate(frames): + data[step]["times"].append(step * i) + data[step]["values"].append(np.linalg.norm(frame_info["arr"])) + data[step]["diffs"].append( + np.linalg.norm(all_info[-1]["arr"] - frame_info["arr"]) + ) - fig, ax = plt.subplots(1, 2, figsize=(16, 8)) - plt.subplots_adjust(.07, .12, .97, .9) + fig, ax = plt.subplots(1, 2, figsize=(16, 8)) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) - for step, d in data.items(): - ax[0].plot(d["times"], d["values"], label=step) - ax[1].plot(d["times"], d["diffs"], label=step) + for step, d in data.items(): + ax[0].plot(d["times"], d["values"], label=step) + ax[1].plot(d["times"], d["diffs"], label=step) - fig.suptitle("Equilibrium Convergence") - ax[0].grid(zorder=0) - ax[0].legend() - ax[0].set_xlabel("Time") - ax[0].set_ylabel("L2 Norm of Sites") + fig.suptitle("Equilibrium Convergence") + ax[0].grid(zorder=0) + ax[0].legend() + ax[0].set_xlabel("Time") + ax[0].set_ylabel("L2 Norm of Sites") - ax[1].grid(zorder=0) - ax[1].legend() - ax[1].set_xlabel("Time") - ax[1].set_ylabel("L2 Norm of Difference") + ax[1].grid(zorder=0) + ax[1].legend() + ax[1].set_xlabel("Time") + ax[1].set_ylabel("L2 Norm of Difference") - fig.savefig(OUTPUT_DIR / "Equilibrium Convergence.png") - print(f"Wrote to {OUTPUT_DIR / 'Equilibrium Convergence.png'}") + fig.savefig(OUTPUT_DIR / "Equilibrium Convergence.png") + print(f"Wrote to {OUTPUT_DIR / 'Equilibrium Convergence.png'}") -if __name__ == '__main__': - os.environ["QT_LOGGING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file +if __name__ == "__main__": + os.environ["QT_LOGGING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.") diff --git a/scripts/defects.py b/scripts/defects.py index a179f31..053948e 100644 --- a/scripts/defects.py +++ b/scripts/defects.py @@ -9,80 +9,100 @@ from squish.common import OUTPUT_DIR def main(): - parser = argparse.ArgumentParser("Graphs average defects at N.") - parser.add_argument('sims_path', metavar='path/to/data', - help="folder that contains simulation files at various Ns.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") + parser = argparse.ArgumentParser("Graphs average defects at N.") + parser.add_argument( + "sims_path", + metavar="path/to/data", + help="folder that contains simulation files at various Ns.", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) - args = parser.parse_args() - data = {} + args = parser.parse_args() + data = {} + + files = list(Path(args.sims_path).iterdir()) + for i, file in enumerate(files): + sim, frames = Simulation.load(file) + avg_defects = 0 + count = 0 + + for frame in frames: + if np.var(frame["stats"]["avg_radius"]) > 1e-8: + avg_defects += np.count_nonzero(frame["stats"]["site_edge_count"] != 6) + + count += 1 + + avg_defects /= 1 if count == 0 else count + data[sim.domain.n] = avg_defects + + hashes = int(21 * i / len(files)) + print( + f'Processed N={sim.domain.n:03} |{"#"*hashes}{" "*(20-hashes)}|' + + f" {i+1}/{len(files)} simulations processed.", + flush=True, + end="\r", + ) + + print(flush=True) + + data = sorted(data.items()) + ns, defects = np.array([x[0] for x in data]), np.array([x[1] for x in data]) + + corrected = [] + for i, x in enumerate(defects): + if x == 0: + corrected.append(defects[i + 1]) + else: + corrected.append(x) + + fig, ax = plt.subplots(1, 2, figsize=(16, 8)) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) + + fig.suptitle("Defects vs. Number of Sites (N)") + + m0, b0 = np.polyfit(ns, defects, 1) + + ax[0].plot(ns, defects) + ax[0].plot(ns, m0 * ns + b0, label=f"Slope: {m0:.5f}") + ax[0].grid(zorder=0) + ax[0].legend() + ax[0].set_xlabel("N") + ax[0].set_ylabel("Average Defects") + + x, y = np.log(ns), np.log(corrected) + m, b = np.polyfit(x, y, 1) + + x2, y2 = x[40:], np.log(defects[40:]) + m2, b2 = np.polyfit(x2, y2, 1) + + ax[1].plot(x, y, linestyle="dotted", color="C0") + ax[1].plot(x, np.log(defects)) + + # ax[1].plot(x, m*x+b, label=f"All N: {m:.5f}") + ax[1].plot(x2, m2 * x2 + b2, label=f"N $\\geq$ 40: {m2:.5f}") + ax[1].grid(zorder=0) + ax[1].legend() + ax[1].set_xticks(np.arange(3.75, 6.25, 0.25)) + ax[1].set_yticks(np.arange(0, 4.5, 0.5)) + ax[1].set_xlim(3.75, 6) + ax[1].set_ylim(0, 4) + ax[1].set_xlabel("$\\ln$ N") + ax[1].set_ylabel("$\\ln$ Average Defects") + + fig.savefig(OUTPUT_DIR / "DefectsN.png") + print(f"Wrote to {OUTPUT_DIR / 'DefectsN.png'}") - for file in Path(args.sims_path).iterdir(): - sim, frames = Simulation.load(file / "data.squish") - avg_defects = 0 - count = 0 - - for frame in frames: - if np.var(frame["stats"]["avg_radius"]) > 1e-8: - avg_defects += np.count_nonzero(frame["stats"]["site_edge_count"] != 6) - count += 1 - - - avg_defects /= (1 if count == 0 else count) - data[sim.domain.n] = avg_defects - - data = sorted(data.items()) - ns, defects = np.array([x[0] for x in data]), np.array([x[1] for x in data]) - - corrected = [] - for i, x in enumerate(defects): - if x == 0: - corrected.append(defects[i+1]) - else: - corrected.append(x) - - fig, ax = plt.subplots(1, 2, figsize=(16, 8)) - plt.subplots_adjust(.07, .12, .97, .9) - - fig.suptitle("Defects at N") - - m0, b0 = np.polyfit(ns, defects, 1) - - ax[0].plot(ns, defects) - ax[0].plot(ns, m0*ns+b0, label=f"Slope: {m0:.5f}") - ax[0].grid(zorder=0) - ax[0].legend() - ax[0].set_xlabel("N") - ax[0].set_ylabel("Average Defects") - - x, y = np.log10(ns), np.log10(corrected) - m, b = np.polyfit(x, y, 1) - - x2, y2 = x[14:], np.log10(defects[14:]) - m2, b2 = np.polyfit(x2, y2, 1) - - ax[1].plot(x, y, linestyle='dotted', color='C0') - ax[1].plot(x, np.log10(defects)) - - ax[1].plot(x, m*x+b, label=f"All N: {m:.5f}") - ax[1].plot(x2, m2*x2+b2, label=f"N $\\geq$ 25: {m2:.5f}") - ax[1].grid(zorder=0) - ax[1].legend() - ax[1].set_xlabel("log10 N") - ax[1].set_ylabel("log10 Average Defects") - - - - - fig.savefig(OUTPUT_DIR / "DefectsN.png") - print(f"Wrote to {OUTPUT_DIR / 'DefectsN.png'}") - - -if __name__ == '__main__': - os.environ["QT_log10GING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file +if __name__ == "__main__": + os.environ["QT_log10GING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.") diff --git a/scripts/heatmap.py b/scripts/heatmap.py index 22962eb..647e3e3 100644 --- a/scripts/heatmap.py +++ b/scripts/heatmap.py @@ -10,110 +10,135 @@ from squish.common import OUTPUT_DIR def eq_file_process(file: Path) -> Tuple[float, List[float], List[float]]: - sim, frames = Simulation.load(file / 'data.squish') + sim, frames = Simulation.load(file) - alls = [] - for frame_info in frames: - alls.append([ - frame_info["energy"], - np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, - np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6) - ]) + alls = [] + for frame_info in frames: + alls.append( + [ + frame_info["energy"], + np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, + np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6), + ] + ) - sim, frames = Simulation.load(file / 'data.squish') - sim.frames = list(frames) - counts = sim.get_distinct() + sim, frames = Simulation.load(file) + sim.frames = list(frames) + counts = sim.get_distinct() - distincts = [] - for j, frame_info in enumerate(sim.frames): - distincts.append([ - frame_info["energy"], - np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, - np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6), - counts[j] - ]) + distincts = [] + for j, frame_info in enumerate(sim.frames): + distincts.append( + [ + frame_info["energy"], + np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, + np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6), + counts[j], + ] + ) - return sim.domain.w, alls, distincts + return sim.domain.w, alls, distincts def get_equilibria_data(filepath: Path) -> Tuple[Dict, numpy.ndarray, DomainParams]: - data = {"all": {}, "distinct": {}} - files = list(Path(filepath).iterdir()) - sim, frames = Simulation.load(files[0] / 'data.squish') + data = {"all": {}, "distinct": {}} + files = list(Path(filepath).iterdir()) + sim, frames = Simulation.load(files[0]) - with Pool(cpu_count()) as pool: - for i, res in enumerate(pool.imap_unordered(eq_file_process, files)): - data["all"][res[0]] = res[1] - data["distinct"][res[0]] = res[2] + with Pool(cpu_count()) as pool: + for i, res in enumerate(pool.imap_unordered(eq_file_process, files)): + data["all"][res[0]] = res[1] + data["distinct"][res[0]] = res[2] - hashes = int(21*i/len(files)) - print(f'Loading simulations for N={sim.domain.n}... |{"#"*hashes}{" "*(20-hashes)}|' + \ - f' {i+1}/{len(files)} simulations loaded.', flush=True, end='\r') - print(flush=True) + hashes = int(21 * i / len(files)) + print( + f'Loading simulations for N={sim.domain.n}... |{"#"*hashes}{" "*(20-hashes)}|' + + f" {i+1}/{len(files)} simulations loaded.", + flush=True, + end="\r", + ) + print(flush=True) - widths = np.asarray(sorted(data["all"])) - domain = DomainParams(sim.domain.n, widths[-1], sim.domain.h, sim.domain.r) - return data, widths, domain + widths = np.asarray(sorted(data["all"])) + domain = DomainParams(sim.domain.n, widths[-1], sim.domain.h, sim.domain.r) + return data, widths, domain def main(): - # Loading arguments. - parser = argparse.ArgumentParser("Outputs width search data into diagrams") - parser.add_argument('sims_path', metavar='path/to/data', - help="folder that contains simulation files of all searches for all N.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") + # Loading arguments. + parser = argparse.ArgumentParser("Outputs width search data into diagrams") + parser.add_argument( + "sims_path", + metavar="path/to/data", + help="folder that contains simulation files of all searches for all N.", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) - args = parser.parse_args() + args = parser.parse_args() - # with open("testing.pkl", "rb") as f: - # disorder_dict = pickle.load(f) - # widths = np.linspace(3.0, 10.0, 141) - # min_n, max_n = 60, 80 + # with open("testing.pkl", "rb") as f: + # disorder_dict = pickle.load(f) + # widths = np.linspace(3.0, 10.0, 141) + # min_n, max_n = 60, 80 - disorder_dict = {} - for file in Path(args.sims_path).iterdir(): - sim_data, widths, domain = get_equilibria_data(file) + disorder_dict = {} + for file in Path(args.sims_path).iterdir(): + sim_data, widths, domain = get_equilibria_data(file) - disorder_count = [] - for width in widths: - equal_shape = list([c[1] for c in sim_data["all"][width]]) - disorder_count.append(100*equal_shape.count(False)/len(sim_data["all"][width])) + disorder_count = [] + for width in widths: + equal_shape = list([c[1] for c in sim_data["all"][width]]) + disorder_count.append( + 100 * equal_shape.count(False) / len(sim_data["all"][width]) + ) - disorder_dict[domain.n] = disorder_count + disorder_dict[domain.n] = disorder_count - min_n, max_n = min(disorder_dict), max(disorder_dict) - filepath = f"Disorder Heatmap N{min_n}-{max_n}" + min_n, max_n = min(disorder_dict), max(disorder_dict) + filepath = f"Disorder Heatmap N{min_n}-{max_n}" - # with open("testing.pkl", "wb") as f: - # pickle.dump(disorder_dict, f, pickle.HIGHEST_PROTOCOL) + # with open("testing.pkl", "wb") as f: + # pickle.dump(disorder_dict, f, pickle.HIGHEST_PROTOCOL) - disorder_arr = np.zeros((max_n-min_n+1, len(widths))) - for key, value in disorder_dict.items(): - disorder_arr[key-min_n] = np.asarray(value) + disorder_arr = np.zeros((max_n - min_n + 1, len(widths))) + for key, value in disorder_dict.items(): + disorder_arr[key - min_n] = np.asarray(value) - fig, ax = plt.subplots(figsize=(12, 8)) + fig, ax = plt.subplots(figsize=(12, 8)) - extent = [min(widths), max(widths), min_n, max_n+1] - ax.imshow(disorder_arr, cmap='plasma', interpolation='nearest', aspect='auto', extent=extent) + extent = [min(widths), max(widths), min_n, max_n + 1] + ax.imshow( + disorder_arr, + cmap="plasma", + interpolation="nearest", + aspect="auto", + extent=extent, + ) - ax.invert_xaxis() - ax.set_xticks([round(w,2) for w in widths[::-2]]) - ax.set_xticklabels(ax.get_xticks(), rotation = 90) - ax.set_yticks(list(range(min_n, max_n+1))) - plt.subplots_adjust(.07, .12, .97, .9) + ax.invert_xaxis() + ax.set_xticks([round(w, 2) for w in widths[::-2]]) + ax.set_xticklabels(ax.get_xticks(), rotation=90) + ax.set_yticks(list(range(min_n, max_n + 1))) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) - ax.title.set_text(filepath) - ax.set_xlabel("Width") - ax.set_ylabel("Number of Sites") - fig.savefig(OUTPUT_DIR / filepath) + ax.title.set_text(filepath) + ax.set_xlabel("Width") + ax.set_ylabel("Number of Sites") + fig.savefig(OUTPUT_DIR / filepath) - print(f"Wrote to {OUTPUT_DIR / filepath}.") + print(f"Wrote to {OUTPUT_DIR / filepath}.") -if __name__ == '__main__': - os.environ["QT_LOGGING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file +if __name__ == "__main__": + os.environ["QT_LOGGING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.") diff --git a/scripts/perturbations.py b/scripts/perturbations.py index ec40ec6..68eb128 100644 --- a/scripts/perturbations.py +++ b/scripts/perturbations.py @@ -9,53 +9,70 @@ from squish.common import OUTPUT_DIR def main(): - parser = argparse.ArgumentParser("Graphs perturbation graphs for a collection of simulations.") - parser.add_argument('sims_path', metavar='path/to/data', - help="folder that contains simulations of perturbations from an equilibrium.") - parser.add_argument('end_path', metavar='path/to/equilbrium', - help="NumPy binary (.npy) file that contains the equilibrium to compare to.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") + parser = argparse.ArgumentParser( + "Graphs perturbation graphs for a collection of simulations." + ) + parser.add_argument( + "sims_path", + metavar="path/to/data", + help="folder that contains simulations of perturbations from an equilibrium.", + ) + parser.add_argument( + "end_path", + metavar="path/to/equilbrium", + help="NumPy binary (.npy) file that contains the equilibrium to compare to.", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) - args = parser.parse_args() + args = parser.parse_args() - end = np.load(args.end_path) + end = np.load(args.end_path) - data = {} - for file in Path(args.sims_path).iterdir(): - k = float(file.name.split('k')[-1]) - delta = 10**k + data = {} + for file in Path(args.sims_path).iterdir(): + k = float(file.name.split("k")[-1]) + delta = 10 ** k - sim, frames = Simulation.load(file / 'data.squish') - data[delta] = {"norm": [], "time": [], "k": k} + sim, frames = Simulation.load(file) + data[delta] = {"norm": [], "time": [], "k": k} - for i, frame in enumerate(frames): - adjusted = frame["arr"] + (end[0] - frame["arr"][0]) + for i, frame in enumerate(frames): + adjusted = frame["arr"] + (end[0] - frame["arr"][0]) - data[delta]["norm"].append(np.linalg.norm(adjusted - end)) - data[delta]["time"].append(sim.step_size * i) + data[delta]["norm"].append(np.linalg.norm(adjusted - end)) + data[delta]["time"].append(sim.step_size * i) - fig, ax = plt.subplots(figsize=(12, 8)) - plt.subplots_adjust(.07, .12, .97, .9) + fig, ax = plt.subplots(figsize=(12, 8)) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) - for delta in sorted(data): - ax.plot(np.log10(np.array(data[delta]["time"])+1), np.log10(data[delta]["norm"]), - label=f"k = {data[delta]['k']}") + for delta in sorted(data): + ax.plot( + np.log10(np.array(data[delta]["time"]) + 1), + np.log10(data[delta]["norm"]), + label=f"k = {data[delta]['k']}", + ) - fig.suptitle("Equilibrium Perturbations") - ax.grid(zorder=0) - #ax.set_xlim([0, 5]) - ax.legend() - ax.set_xlabel("Log Time") - ax.set_ylabel("Log L2 Norm of Difference") + fig.suptitle("Equilibrium Perturbations") + ax.grid(zorder=0) + # ax.set_xlim([0, 5]) + ax.legend() + ax.set_xlabel("$\\log_{10}$ Time") + ax.set_ylabel("$\\log_{10}$ L2 Norm of Difference") - fig.savefig(OUTPUT_DIR / "Equilibrium Perturbations.png") - print(f"Wrote to {OUTPUT_DIR / 'Equilibrium Perturbations.png'}") + fig.savefig(OUTPUT_DIR / "Equilibrium Perturbations.png") + print(f"Wrote to {OUTPUT_DIR / 'Equilibrium Perturbations.png'}") -if __name__ == '__main__': - os.environ["QT_LOGGING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file +if __name__ == "__main__": + os.environ["QT_LOGGING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.") diff --git a/scripts/width_diagrams.py b/scripts/width_diagrams.py index 61398bd..40a7f09 100644 --- a/scripts/width_diagrams.py +++ b/scripts/width_diagrams.py @@ -12,223 +12,265 @@ from squish.common import OUTPUT_DIR def order_process(domain: DomainParams) -> Tuple[float, float, float]: - energies = [] - configs = order.configurations(domain) - for config in configs: - energies.append(2*domain.w*domain.h + \ - 2*math.pi*domain.n*(domain.r**2 - 2*domain.r*order.avg_radius(domain, config))) + energies = [] + configs = order.configurations(domain) + for config in configs: + energies.append( + 2 * domain.w * domain.h + + 2 + * math.pi + * domain.n + * (domain.r ** 2 - 2 * domain.r * order.avg_radius(domain, config)) + ) - return domain.w, min(energies), max(energies) + return domain.w, min(energies), max(energies) def get_ordered_energies(orig_domain: DomainParams, widths: np.ndarray) -> Dict: - data = {} - domains = [DomainParams(orig_domain.n, w, orig_domain.h, orig_domain.r) for w in widths] + data = {} + domains = [ + DomainParams(orig_domain.n, w, orig_domain.h, orig_domain.r) for w in widths + ] - with Pool(cpu_count()) as pool: - mins, maxes = {}, {} - for i, res in enumerate(pool.imap_unordered(order_process, domains)): - mins[res[0]] = res[1] - maxes[res[0]] = res[2] + with Pool(cpu_count()) as pool: + mins, maxes = {}, {} + for i, res in enumerate(pool.imap_unordered(order_process, domains)): + mins[res[0]] = res[1] + maxes[res[0]] = res[2] - hashes = int(21*i/len(widths)) - print(f'Generating at width {res[0]:.02f}... |{"#"*hashes}{" "*(20-hashes)}|' + \ - f' {i+1}/{len(widths)} completed.', flush=True, end='\r') + hashes = int(21 * i / len(widths)) + print( + f'Generating at width {res[0]:.02f}... |{"#"*hashes}{" "*(20-hashes)}|' + + f" {i+1}/{len(widths)} completed.", + flush=True, + end="\r", + ) - print(flush=True) + print(flush=True) - data["min"] = list([x[1] for x in sorted(mins.items())]) - data["max"] = list([x[1] for x in sorted(maxes.items())]) + data["min"] = list([x[1] for x in sorted(mins.items())]) + data["max"] = list([x[1] for x in sorted(maxes.items())]) - return data + return data def eq_file_process(file: Path) -> Tuple[float, List[float], List[float]]: - sim, frames = Simulation.load(file / 'data.squish') + sim, frames = Simulation.load(file) - alls = [] - for frame_info in frames: - alls.append([ - frame_info["energy"], - np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, - np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6) - ]) + alls = [] + for frame_info in frames: + alls.append( + [ + frame_info["energy"], + np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, + np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6), + ] + ) - sim, frames = Simulation.load(file / 'data.squish') - sim.frames = list(frames) - counts = sim.get_distinct() + sim, frames = Simulation.load(file) + sim.frames = list(frames) + counts = sim.get_distinct() - distincts = [] - for j, frame_info in enumerate(sim.frames): - distincts.append([ - frame_info["energy"], - np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, - np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6), - counts[j] - ]) + distincts = [] + for j, frame_info in enumerate(sim.frames): + distincts.append( + [ + frame_info["energy"], + np.var(frame_info["stats"]["avg_radius"]) <= 1e-8, + np.count_nonzero(frame_info["stats"]["site_edge_count"] != 6), + counts[j], + ] + ) - return sim.domain.w, alls, distincts + return sim.domain.w, alls, distincts def get_equilibria_data(filepath: Path) -> Tuple[Dict, numpy.ndarray, DomainParams]: - data = {"all": {}, "distinct": {}} - files = list(Path(filepath).iterdir()) + data = {"all": {}, "distinct": {}} + files = list(Path(filepath).iterdir()) - with Pool(cpu_count()) as pool: - for i, res in enumerate(pool.imap_unordered(eq_file_process, files)): - data["all"][res[0]] = res[1] - data["distinct"][res[0]] = res[2] + with Pool(cpu_count()) as pool: + for i, res in enumerate(pool.imap_unordered(eq_file_process, files)): + data["all"][res[0]] = res[1] + data["distinct"][res[0]] = res[2] - hashes = int(21*i/len(files)) - print(f'Loading simulations... |{"#"*hashes}{" "*(20-hashes)}|' + \ - f' {i+1}/{len(files)} simulations loaded.', flush=True, end='\r') - print(flush=True) + hashes = int(21 * i / len(files)) + print( + f'Loading simulations... |{"#"*hashes}{" "*(20-hashes)}|' + + f" {i+1}/{len(files)} simulations loaded.", + flush=True, + end="\r", + ) + print(flush=True) - sim, frames = Simulation.load(files[0] / 'data.squish') - widths = np.asarray(sorted(data["all"])) - domain = DomainParams(sim.domain.n, widths[-1], sim.domain.h, sim.domain.r) - return data, widths, domain + sim, frames = Simulation.load(files[0]) + widths = np.asarray(sorted(data["all"])) + domain = DomainParams(sim.domain.n, widths[-1], sim.domain.h, sim.domain.r) + return data, widths, domain def axis_settings(ax, widths): - ax.invert_xaxis() - ax.grid(zorder=0) - ax.set_xticks([round(w,2) for w in widths[::-2]]) - ax.set_xticklabels(ax.get_xticks(), rotation = 90) - plt.subplots_adjust(.07, .12, .97, .9) + ax.invert_xaxis() + ax.grid(zorder=0) + ax.set_xticks([round(w, 2) for w in widths[::-2]]) + ax.set_xticklabels(ax.get_xticks(), rotation=90) + plt.subplots_adjust(0.07, 0.12, 0.97, 0.9) def main(): - # Loading arguments. - parser = argparse.ArgumentParser("Outputs width search data into diagrams") - parser.add_argument('sims_path', metavar='path/to/data', - help="folder that contains simulation files, or cached data file.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") + # Loading arguments. + parser = argparse.ArgumentParser("Outputs width search data into diagrams") + parser.add_argument( + "sims_path", + metavar="path/to/data", + help="folder that contains simulation files, or cached data file.", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) - args = parser.parse_args() + args = parser.parse_args() - data, widths, domain = get_equilibria_data(Path(args.sims_path)) - order_data = get_ordered_energies(domain, widths) + data, widths, domain = get_equilibria_data(Path(args.sims_path)) + order_data = get_ordered_energies(domain, widths) - fig_folder = OUTPUT_DIR / Path(f"ShrinkEnergyComparison - N{domain.n}") - fig_folder.mkdir(exist_ok=True) + fig_folder = OUTPUT_DIR / Path(f"ShrinkEnergyComparison - N{domain.n}") + fig_folder.mkdir(exist_ok=True) - # Torus minimum energies used as reference. + # Torus minimum energies used as reference. - # Probability of disorder diagram. - fig, ax = plt.subplots(figsize=(16, 8)) - all_disorder_count = [] - for width in widths: - equal_shape = list([c[1] for c in data["all"][width]]) - all_disorder_count.append(100*equal_shape.count(False)/len(data["all"][width])) + # Probability of disorder diagram. + fig, ax = plt.subplots(figsize=(16, 8)) + all_disorder_count = [] + for width in widths: + equal_shape = list([c[1] for c in data["all"][width]]) + all_disorder_count.append( + 100 * equal_shape.count(False) / len(data["all"][width]) + ) - ax.plot(widths, all_disorder_count) - axis_settings(ax, widths) - ax.yaxis.set_major_formatter(mtick.PercentFormatter()) - ax.title.set_text(f"Probability of Disorder - N{domain.n}") - ax.set_xlabel("Width") - ax.set_ylabel("Disordered Equilibria") - boa_y_min = round(min(all_disorder_count)/20)*20 - 5 - ax.set_yticks(np.arange(boa_y_min, 100.01, 2.5)) - fig.savefig(fig_folder / "Probability of Disorder.png") + ax.plot(widths, all_disorder_count) + axis_settings(ax, widths) + + with open("N83-prob.txt", "w") as f: + f.write(", ".join([str(x) for x in widths]) + "\n") + f.write(", ".join([str(x) for x in all_disorder_count])) + + ax.yaxis.set_major_formatter(mtick.PercentFormatter()) + ax.title.set_text(f"Probability of Disorder - N{domain.n}") + ax.set_xlabel("Width") + ax.set_ylabel("Disordered Equilibria") + boa_y_min = round(min(all_disorder_count) / 20) * 20 - 5 + ax.set_yticks(np.arange(boa_y_min, 100.01, 2.5)) + fig.savefig(fig_folder / "Probability of Disorder.png") + + # Density of States diagram. + fig, ax = plt.subplots(figsize=(16, 8)) + distinct_ordered, distinct_unordered = [], [] + for width in widths: + equal_shape = list([c[1] for c in data["distinct"][width]]) + distinct_ordered.append(equal_shape.count(True)) + distinct_unordered.append(equal_shape.count(False)) + + ax2 = ax.twinx() + ax.plot(widths, distinct_unordered, label="Unordered Equilibria", color="C0") + ax2.plot(widths, distinct_ordered, label="Ordered Equilibria", color="C1") + axis_settings(ax, widths) + ax.title.set_text(f"Density of States - N{domain.n}") + ax.set_xlabel("Width") + ax.set_ylabel("Number of States (Disordered)", color="C0") + ax2.set_ylabel("Number of States (Ordered)", color="C1") + + dos_y_max_unorder = 1.05 * max(distinct_unordered) + dos_y_max_order = 1.05 * max(distinct_ordered) + ax.set_yticks(np.linspace(0, dos_y_max_unorder, 20).astype(int)) + # ax.set_yticks(np.arange(0, dos_y_max_unorder, round(dos_y_max_unorder/200, 1)*10)) + ax2.set_yticks(np.arange(0, dos_y_max_order)) + + fig.savefig(fig_folder / "Density Of States.png") + + # Defect density diagram + fig, ax = plt.subplots(figsize=(16, 8)) + + defects = [] + for width in widths: + defects.append( + sum([c[2] for c in data["all"][width] if not c[1]]) + / len(data["all"][width]) + ) + + ax.plot(widths, defects) + axis_settings(ax, widths) + ax.title.set_text(f"Average Defects - N{domain.n}") + ax.set_xlabel("Width") + ax.set_ylabel("Defects") + ax.set_yticks(np.arange(0, 1 + max(defects), 0.5)) + fig.savefig(fig_folder / "Defects.png") + + # Bifurcation diagram + fig, ax = plt.subplots(figsize=(16, 8)) + + ordered_energies, unordered_energies = [], [] + for width in widths: + ordered_energies.append([c[0] for c in data["distinct"][width] if c[1]]) + unordered_energies.append([c[0] for c in data["distinct"][width] if not c[1]]) + + for i in range(len(order_data["min"])): + ordered_energies[i].append(order_data["min"][i]) + ordered_energies[i].append(order_data["max"][i]) + + null_unorder = [] + for i, energies in enumerate(unordered_energies): + if len(energies) == 0: + null_unorder.append(i) + energies.append(order_data["min"][i]) + + min_order = np.asarray([min(width) for width in ordered_energies]) + max_order = np.asarray([max(width) for width in ordered_energies]) + min_unorder = np.asarray([min(width) for width in unordered_energies]) + max_unorder = np.asarray([max(width) for width in unordered_energies]) + + offset = np.array(order_data["min"]) + # offset = np.array(min_order) + + min_unorder_off = min_unorder - offset + max_unorder_off = max_unorder - offset + ax.plot(widths, min_order - offset, color="C1") + # ax.plot(widths, max_order - offset, color='C1', linestyle='dotted') + ax.plot(widths, min_unorder_off, color="C0") + ax.plot(widths, max_unorder_off, color="C0", linestyle="dotted") + axis_settings(ax, widths) + + with open("N83-od.txt", "w") as f: + f.write(", ".join([str(x) for x in widths]) + "\n") + f.write(", ".join([str(x) for x in min_unorder_off]) + "\n") + f.write(", ".join([str(x) for x in max_unorder_off])) + + for i in null_unorder: + ax.scatter(widths[i], 0, marker="X", color="blue", s=50, zorder=4) + # ax.scatter(widths[i], max_unorder[i] - offset[i], + # marker='X', edgecolors="blue", facecolors='none', s=100, zorder=4) + + ax.title.set_text(f"Reduced Energy vs. Width - N{domain.n}") + ax.set_xlabel("Width") + ax.set_ylabel("Reduced Energy") + bif_y_max = np.max(np.abs(np.concatenate((min_unorder_off, max_unorder_off)))) + bif_top = np.arange( + 0, bif_y_max, round(bif_y_max / 20, -math.floor(math.log10(bif_y_max / 20))) + ) + ax.set_yticks(np.concatenate((-bif_top[1:][::-1], bif_top))) + fig.savefig(fig_folder / "Bifurcation.png") + + print(f"Wrote to {fig_folder}.") - # Density of States diagram. - fig, ax = plt.subplots(figsize=(16, 8)) - distinct_ordered, distinct_unordered = [], [] - for width in widths: - equal_shape = list([c[1] for c in data["distinct"][width]]) - distinct_ordered.append(equal_shape.count(True)) - distinct_unordered.append(equal_shape.count(False)) - - ax2 = ax.twinx() - ax.plot(widths, distinct_unordered, label="Unordered Equilibria", color='C0') - ax2.plot(widths, distinct_ordered, label="Ordered Equilibria", color='C1') - axis_settings(ax, widths) - ax.title.set_text(f"Density of States - N{domain.n}") - ax.set_xlabel("Width") - ax.set_ylabel("Number of States (Disordered)", color='C0') - ax2.set_ylabel("Number of States (Ordered)", color='C1') - - dos_y_max_unorder = 1.05*max(distinct_unordered) - dos_y_max_order = 1.05*max(distinct_ordered) - ax.set_yticks(np.linspace(0, dos_y_max_unorder, 20).astype(int)) - #ax.set_yticks(np.arange(0, dos_y_max_unorder, round(dos_y_max_unorder/200, 1)*10)) - ax2.set_yticks(np.arange(0, dos_y_max_order)) - - - fig.savefig(fig_folder / "Density Of States.png") - - # Defect density diagram - fig, ax = plt.subplots(figsize=(16, 8)) - - defects = [] - for width in widths: - defects.append(sum([c[2] for c in data["all"][width] if not c[1]])/len(data["all"][width])) - - ax.plot(widths, defects) - axis_settings(ax, widths) - ax.title.set_text(f"Average Defects - N{domain.n}") - ax.set_xlabel("Width") - ax.set_ylabel("Defects") - ax.set_yticks(np.arange(0, 1+max(defects), 0.5)) - fig.savefig(fig_folder / "Defects.png") - - - # Bifurcation diagram - fig, ax = plt.subplots(figsize=(16, 8)) - - ordered_energies, unordered_energies = [], [] - for width in widths: - ordered_energies.append([c[0] for c in data["distinct"][width] if c[1]]) - unordered_energies.append([c[0] for c in data["distinct"][width] if not c[1]]) - - for i in range(len(order_data["min"])): - ordered_energies[i].append(order_data["min"][i]) - ordered_energies[i].append(order_data["max"][i]) - - null_unorder = [] - for i, energies in enumerate(unordered_energies): - if len(energies) == 0: - null_unorder.append(i) - energies.append(order_data["min"][i]) - - min_order = np.asarray([min(width) for width in ordered_energies]) - max_order = np.asarray([max(width) for width in ordered_energies]) - min_unorder = np.asarray([min(width) for width in unordered_energies]) - max_unorder = np.asarray([max(width) for width in unordered_energies]) - - offset = np.array(order_data["min"]) - #offset = np.array(min_order) - - min_unorder_off = min_unorder - offset - max_unorder_off = max_unorder - offset - ax.plot(widths, min_order - offset, color='C1') - #ax.plot(widths, max_order - offset, color='C1', linestyle='dotted') - ax.plot(widths, min_unorder_off, color='C0') - ax.plot(widths, max_unorder_off, color='C0', linestyle='dotted') - axis_settings(ax, widths) - - for i in null_unorder: - ax.scatter(widths[i], 0, - marker='X', color="blue", s=50, zorder=4) - # ax.scatter(widths[i], max_unorder[i] - offset[i], - # marker='X', edgecolors="blue", facecolors='none', s=100, zorder=4) - - ax.title.set_text(f"Reduced Energy vs. Width - N{domain.n}") - ax.set_xlabel("Width") - ax.set_ylabel("Reduced Energy") - bif_y_max = np.max(np.abs(np.concatenate((min_unorder_off, max_unorder_off)))) - bif_top = np.arange(0, bif_y_max, round(bif_y_max/20, -math.floor(math.log10(bif_y_max/20)))) - ax.set_yticks(np.concatenate((-bif_top[1:][::-1], bif_top))) - fig.savefig(fig_folder / "Bifurcation.png") - - print(f"Wrote to {fig_folder}.") - -if __name__ == '__main__': - os.environ["QT_LOGGING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file +if __name__ == "__main__": + os.environ["QT_LOGGING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.") diff --git a/squish/common.py b/squish/common.py index 28e3def..e44877b 100644 --- a/squish/common.py +++ b/squish/common.py @@ -8,92 +8,91 @@ from ._squish import VoronoiContainer, AreaEnergy, RadialALEnergy, RadialTEnergy OUTPUT_DIR = Path("squish_output") STR_TO_ENERGY = { - "area": AreaEnergy, - "radial-al": RadialALEnergy, - "radial-t" : RadialTEnergy + "area": AreaEnergy, + "radial-al": RadialALEnergy, + "radial-t": RadialTEnergy, } -def generate_filepath(sim: SimulationMode, fol: Union[str, Path], prec: int = 2) -> Path: - energy = sim.energy.title_str - width, height = round(sim.domain.w, prec), round(sim.domain.h, prec) +def generate_filepath( + sim: SimulationMode, fol: Union[str, Path], prec: int = 2 +) -> Path: + energy = sim.energy.title_str + width, height = round(sim.domain.w, prec), round(sim.domain.h, prec) - base_path = f"{fol}/{energy}{sim.title_str} - " + \ - f"N{sim.domain.n} - {width:.{prec}f}x{height:.{prec}f}" + base_path = ( + f"{fol}/{energy}{sim.title_str} - " + + f"N{sim.domain.n} - {width:.{prec}f}x{height:.{prec}f}" + ) - i = 1 - real_path = Path(base_path) - while real_path.is_dir(): - real_path = Path(f"{base_path}({i})") - i += 1 + i = 1 + real_path = Path(base_path) + while real_path.is_dir(): + real_path = Path(f"{base_path}({i})") + i += 1 - return real_path + return real_path class DomainParams: - """Container for basic domain parameters + """Container for basic domain parameters - Attributes: - n (int): Number of sites in simulation. - w (float): width of the bounding domain. - h (float): height of the bounding domain. - r (float): natural radius of the objects. - dim (np.ndarray): dimensions, w x h. + Attributes: + n (int): Number of sites in simulation. + w (float): width of the bounding domain. + h (float): height of the bounding domain. + r (float): natural radius of the objects. + dim (np.ndarray): dimensions, w x h. - """ + """ - __slots__ = ['n', 'w', 'h', 'r', 'dim'] + __slots__ = ["n", "w", "h", "r", "dim"] + def __init__(self, n: int, w: float, h: float, r: float) -> None: + if n < 2: + raise ValueError("Number of objects should be larger than 2!") - def __init__(self, n: int, w: float, h: float, r: float) -> None: - if n < 2: - raise ValueError("Number of objects should be larger than 2!") + if w <= 0: + raise ValueError("Width needs to be nonzero and positive!") - if w <= 0: - raise ValueError("Width needs to be nonzero and positive!") + if h <= 0: + raise ValueError("Height needs to be nonzero and positive!") - if h <= 0: - raise ValueError("Height needs to be nonzero and positive!") + self.n, self.w, self.h, self.r = int(n), float(w), float(h), float(r) + self.dim = np.array([self.w, self.h]) - self.n, self.w, self.h, self.r = int(n), float(w), float(h), float(r) - self.dim = np.array([self.w, self.h]) + def __iter__(self) -> Iterator: + return iter((self.n, self.w, self.h, self.r)) - - def __iter__(self) -> Iterator: - return iter((self.n, self.w, self.h, self.r)) - - def __str__(self) -> str: - return f"N = {self.n}, R = {self.r}, {self.w} X {self.h}" + def __str__(self) -> str: + return f"N = {self.n}, R = {self.r}, {self.w} X {self.h}" class Energy: - """Generic container for energies. + """Generic container for energies. - Attributes: - mode (VoronoiContainer): VoronoiContainer for the chosen energy. + Attributes: + mode (VoronoiContainer): VoronoiContainer for the chosen energy. - """ + """ - __slots__ = ['mode'] + __slots__ = ["mode"] + def __init__(self, mode: Union[str, VoronoiContainer]) -> None: + if isinstance(mode, str): + try: + self.mode = STR_TO_ENERGY[mode.lower()] + except KeyError: + raise ValueError(f"'{mode}' is not a valid energy!") + else: + if mode is not VoronoiContainer and issubclass(mode, VoronoiContainer): + raise ValueError("Provided class is not a valid energy!") + self.mode = mode - def __init__(self, mode: Union[str, VoronoiContainer]) -> None: - if isinstance(mode, str): - try: - self.mode = STR_TO_ENERGY[mode.lower()] - except KeyError: - raise ValueError(f"\'{mode}\' is not a valid energy!") - else: - if mode is not VoronoiContainer and issubclass(mode, VoronoiContainer): - raise ValueError("Provided class is not a valid energy!") - self.mode = mode + @property + def attr_str(self) -> str: + return self.mode.attr_str - - @property - def attr_str(self) -> str: - return self.mode.attr_str - - - @property - def title_str(self) -> str: - return self.mode.title_str \ No newline at end of file + @property + def title_str(self) -> str: + return self.mode.title_str diff --git a/squish/diagram.py b/squish/diagram.py index 67925fe..93515e2 100644 --- a/squish/diagram.py +++ b/squish/diagram.py @@ -8,356 +8,383 @@ from multiprocessing import Pool, cpu_count from .common import DomainParams, OUTPUT_DIR -SYMM = np.array([[0,0], [1,0], [1,1], [0,1], [-1,1], [-1,0], [-1,-1], [0,-1], [1,-1]]) +SYMM = np.array( + [[0, 0], [1, 0], [1, 1], [0, 1], [-1, 1], [-1, 0], [-1, -1], [0, -1], [1, -1]] +) + class SimData: - """Stores diagram information for a simulation. + """Stores diagram information for a simulation. - Attributes: - path (Path): path to output directory. - domains (List[DomainParams]): domain parameters from simulation frames, - energies (List[float]): energy from simulation frames. - voronois (List[Voronoi]): voronoi information from scipy from simulation frames. - stats (List[numpy.ndarray]): statistics from simulation frames. + Attributes: + path (Path): path to output directory. + domains (List[DomainParams]): domain parameters from simulation frames, + energies (List[float]): energy from simulation frames. + voronois (List[Voronoi]): voronoi information from scipy from simulation frames. + stats (List[numpy.ndarray]): statistics from simulation frames. - """ + """ - __slots__ = ['path', 'domains', 'energies', 'voronois', 'stats'] + __slots__ = ["path", "domains", "energies", "voronois", "stats"] + def __init__(self, sim: Simulation) -> None: + if sim is not None: + self.path = sim.path + self.domains = list([DomainParams(s.n, s.w, s.h, s.r) for s in sim]) + self.energies = list([s.energy for s in sim]) + self.voronois = list([s.vor_data for s in sim]) + self.stats = list([s.stats for s in sim]) - def __init__(self, sim: Simulation) -> None: - if sim is not None: - self.path = sim.path - self.domains = list([DomainParams(s.n, s.w, s.h, s.r) for s in sim]) - self.energies = list([s.energy for s in sim]) - self.voronois = list([s.vor_data for s in sim]) - self.stats = list([s.stats for s in sim]) + def __len__(self) -> int: + return len(self.domains) + def slice(self, indices: List[int]) -> SimData: + new_data = SimData(None) + new_data.path = self.path - def __len__(self) -> int: - return len(self.domains) + new_data.domains = list([self.domains[i] for i in indices]) + new_data.energies = list([self.energies[i] for i in indices]) + new_data.voronois = list([self.voronois[i] for i in indices]) + new_data.stats = list([self.stats[i] for i in indices]) + return new_data - def slice(self, indices: List[int]) -> SimData: - new_data = SimData(None) - new_data.path = self.path + def hist( + self, + stat: str, + i: int, + bins: int = 10, + bounds: Optional[Tuple[float, float]] = None, + cumul: bool = False, + avg: bool = False, + ) -> Tuple[numpy.ndarray, numpy.ndarray]: + """Generates a histogram from the selected data. - new_data.domains = list([self.domains[i] for i in indices]) - new_data.energies = list([self.energies[i] for i in indices]) - new_data.voronois = list([self.voronois[i] for i in indices]) - new_data.stats = list([self.stats[i] for i in indices]) + Arguments: + stat (str): name of data to obtain + i (int): which frame to select from + bins (int): number of bins for the histogram. + bounds (Optional[Tuple[float, float]]): upper and lower bounds of the + histogram. This will automatically take the minimum and maximum value + if not set. + cumul (bool): aggregates all data up to frame i if True. + avg (bool): will average the data based on number of frames if True. - return new_data + Returns: + Tuple[numpy.ndarray, numpy.ndarray]: the histogram and its bins. + """ - def hist(self, stat: str, i: int, bins: int = 10, bounds: Optional[Tuple[float, float]] = None, - cumul: bool = False, avg: bool = False) -> Tuple[numpy.ndarray, numpy.ndarray]: - """Generates a histogram from the selected data. + if cumul: + values = np.concatenate([f[stat] for f in self.stats[: (i + 1)]]) + else: + values = self.stats[i][stat] - Arguments: - stat (str): name of data to obtain - i (int): which frame to select from - bins (int): number of bins for the histogram. - bounds (Optional[Tuple[float, float]]): upper and lower bounds of the histogram. - this will automatically take the minimum and maximum value of not set. - cumul (bool): aggregates all data up to frame i if True. - avg (bool): will average the data based on number of frames if True. + if np.var(values) <= 1e-8: + hist = np.zeros((bins,)) + val = np.average(values) + hist[(bins + 1) // 2 - 1] = len(values) + bin_list = np.linspace(0, val, bins // 2 + 1, endpoint=True) + bin_list = np.concatenate((bin_list, (bin_list + val)[1:])) + return hist, bin_list[not (bins % 2) :] - Returns: - Tuple[numpy.ndarray, numpy.ndarray]: the histogram and its bins. + hist, bin_edges = np.histogram(values, bins=bins, range=bounds) + bin_list = np.array( + [(bin_edges[i] + bin_edges[i + 1]) / 2 for i in range(len(bin_edges) - 1)] + ) - """ - if cumul: - values = np.concatenate([f[stat] for f in self.stats[:(i+1)]]) - else: - values = self.stats[i][stat] + if avg and cumul: + return hist / (i + 1), bin_list - if np.var(values) <= 1e-8: - hist = np.zeros((bins,)) - val = np.average(values) - hist[(bins+1) // 2 - 1] = len(values) - bin_list = np.linspace(0, val, bins//2+1, endpoint=True) - bin_list = np.concatenate((bin_list, (bin_list+val)[1:])) - return hist, bin_list[not (bins%2):] - - hist, bin_edges = np.histogram(values, bins=bins, range=bounds) - bin_list = np.array([(bin_edges[i] + bin_edges[i+1])/2 for i in range(len(bin_edges)-1)]) - - if avg and cumul: - return hist / (i+1), bin_list - - return hist, bin_list + return hist, bin_list class Diagram: - """Class for generating diagrams. + """Class for generating diagrams. - Attributes: - sim (SimData): the simulation data that contains all the frames and information. - diagrams (numpy.ndarray): array that selects which diagrams to show. - cumulative (bool): selects whether or not graph statistics are cumulative. + Attributes: + sim (SimData): the simulation data that contains all the frames and information. + diagrams (numpy.ndarray): array that selects which diagrams to show. + cumulative (bool): selects whether or not graph statistics are cumulative. - """ + """ - __slots__ = ['sim', 'diagrams', 'cumulative'] + __slots__ = ["sim", "diagrams", "cumulative"] + def __init__( + self, sim: Simulation, diagrams: np.ndarray, cumulative: bool = False + ) -> None: + self.sim = SimData(sim) + self.diagrams = np.atleast_2d(diagrams) + self.cumulative = cumulative - def __init__(self, sim: Simulation, diagrams: np.ndarray, cumulative: bool = False) -> None: - self.sim = SimData(sim) - self.diagrams = np.atleast_2d(diagrams) - self.cumulative = cumulative + def generate_frame(self, frame: int, mode: str, fol: str, name: str = None) -> None: + if mode not in ["save", "open"]: + raise ValueError("Not a valid mode for diagrams!") + shape = self.diagrams.shape + fig, axes = plt.subplots(*shape, figsize=(shape[1] * 8, shape[0] * 8)) + if self.diagrams.shape == (1, 1): + getattr(self, str(self.diagrams[0][0]) + "_plot")(frame, axes) + else: + axes = np.atleast_2d(axes) + it = np.nditer(self.diagrams, flags=["multi_index"]) + for diagram in it: + if diagram == "": + continue + getattr(self, str(diagram) + "_plot")(frame, axes[it.multi_index]) - def generate_frame(self, frame: int, mode: str, fol: str, name: str = None) -> None: - if mode not in ["save", "open"]: - raise ValueError("Not a valid mode for diagrams!") + plt.tight_layout() + if name is None: + name = f"img{frame:05}.png" - shape = self.diagrams.shape - fig, axes = plt.subplots(*shape, figsize=(shape[1]*8, shape[0]*8)) - if self.diagrams.shape == (1,1): - getattr(self, str(self.diagrams[0][0]) + '_plot')(frame, axes) - else: - axes = np.atleast_2d(axes) - it = np.nditer(self.diagrams, flags=["multi_index"]) - for diagram in it: - if diagram == "": - continue - getattr(self, str(diagram) + '_plot')(frame, axes[it.multi_index]) + if mode == "save": + plt.savefig(self.sim.path / fol / name) + plt.close(fig) + elif mode == "show": + plt.show() - plt.tight_layout() - if name is None: - name = f"img{frame:05}.png" + def voronoi_plot(self, i: int, ax: AxesSubplot) -> None: + domain = self.sim.domains[i] + n, w, h = domain.n, domain.w, domain.h + scale = 1.5 + area = n <= 60 - if mode == "save": - plt.savefig(self.sim.path / fol / name) - plt.close(fig) - elif mode == "show": - plt.show() + voronoi_plot_2d( + self.sim.voronois[i], ax, show_vertices=False, point_size=7 - n / 100 + ) + ax.plot([-w, 2 * w], [0, 0], "r") + ax.plot([-w, 2 * w], [h, h], "r") + ax.plot([0, 0], [-h, 2 * h], "r") + ax.plot([w, w], [-h, 2 * h], "r") + ax.axis("equal") + ax.set_xlim([(1 - scale) * w / 2, (1 + scale) * w / 2]) + ax.set_ylim([(1 - scale) * h / 2, (1 + scale) * h / 2]) + ax.title.set_text("Voronoi Visualization") + props = dict(boxstyle="round", facecolor="wheat", alpha=0.8) - def voronoi_plot(self, i: int, ax: AxesSubplot) -> None: - domain = self.sim.domains[i] - n, w, h = domain.n, domain.w, domain.h - scale = 1.5 - area = n <= 60 + defects = {5: {"x": [], "y": []}, 7: {"x": [], "y": []}} - voronoi_plot_2d(self.sim.voronois[i], ax, show_vertices=False, point_size = 7-n/100) - ax.plot([-w, 2*w], [0, 0], 'r') - ax.plot([-w, 2*w], [h, h], 'r') - ax.plot([0,0], [-h, 2*h], 'r') - ax.plot([w, w], [-h, 2*h], 'r') - ax.axis('equal') - ax.set_xlim([(1-scale)*w/2, (1+scale)*w/2]) - ax.set_ylim([(1-scale)*h/2, (1+scale)*h/2]) - ax.title.set_text("Voronoi Visualization") + for j in range(n): + for s in SYMM: + vec = self.sim.voronois[i].points[j] + s * self.sim.domains[i].dim - props = dict(boxstyle='round', facecolor='wheat', alpha=0.8) + if area: + txt = ax.text( + *vec, str(round(self.sim.stats[i]["site_areas"][j], 3)) + ) + txt.set_clip_on(True) - defects = {5: {"x": [], "y": []}, 7: {"x": [], "y": []}} + if self.sim.stats[i]["site_edge_count"][j] == 5: + defects[5]["x"].append(vec[0]) + defects[5]["y"].append(vec[1]) + elif self.sim.stats[i]["site_edge_count"][j] == 7: + defects[7]["x"].append(vec[0]) + defects[7]["y"].append(vec[1]) - for j in range(n): - for s in SYMM: - vec = (self.sim.voronois[i].points[j] + s*self.sim.domains[i].dim) + ax.scatter(defects[5]["x"], defects[5]["y"], marker="p", color="red") + ax.scatter(defects[7]["x"], defects[7]["y"], marker="*", color="red") - if area: - txt = ax.text(*vec, str(round(self.sim.stats[i]["site_areas"][j], 3))) - txt.set_clip_on(True) + ax.text( + 0.05, + 0.95, + f"Energy: {self.sim.energies[i]}", + transform=ax.transAxes, + fontsize=14, + verticalalignment="top", + bbox=props, + ) - if self.sim.stats[i]["site_edge_count"][j] == 5: - defects[5]["x"].append(vec[0]) - defects[5]["y"].append(vec[1]) - elif self.sim.stats[i]["site_edge_count"][j] == 7: - defects[7]["x"].append(vec[0]) - defects[7]["y"].append(vec[1]) + def energy_plot(self, i: int, ax: AxesSubplot) -> None: + ax.set_xlim([0, len(self.sim)]) - ax.scatter(defects[5]["x"], defects[5]["y"], marker="p", color="red") - ax.scatter(defects[7]["x"], defects[7]["y"], marker="*", color="red") + energies = self.sim.energies[: (i + 1)] + ax.plot(list(range(i + 1)), energies) + ax.title.set_text("Energy vs. Time") + # max_value = round(self.sim[0].energy) + # min_value = round(self.sim[-1].energy) + # diff = max_value-min_value + # ax.set_yticks(np.arange(int(min_value-diff/5), int(max_value+diff/5), diff/25)) + ax.set_xlabel("Iterations") + ax.set_ylabel("Energy") + ax.grid() + def site_areas_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist("site_areas", i, cumul=self.cumulative, avg=True) - ax.text(0.05, 0.95, f'Energy: {self.sim.energies[i]}', transform=ax.transAxes, fontsize=14, - verticalalignment='top', bbox=props) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Site Areas") + ax.set_xlabel("Area") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.ticklabel_format(useOffset=False) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + # for xtick, color in zip(ax.get_xticklabels(), areas_bar[2]): + # if color != 'C0': + # xtick.set_color(color) + def site_edge_count_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist( + "site_edge_count", i, bounds=(1, 11), cumul=self.cumulative, avg=True + ) - def energy_plot(self, i: int, ax: AxesSubplot) -> None: - ax.set_xlim([0, len(self.sim)]) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Edges per Site") + ax.set_xlabel("Number of Edges") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.set_xticklabels([int(z) for z in x]) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - energies = self.sim.energies[:(i+1)] - ax.plot(list(range(i+1)), energies) - ax.title.set_text('Energy vs. Time') - # max_value = round(self.sim[0].energy) - # min_value = round(self.sim[-1].energy) - #diff = max_value-min_value - #ax.set_yticks(np.arange(int(min_value-diff/5), int(max_value+diff/5), diff/25)) - ax.set_xlabel("Iterations") - ax.set_ylabel("Energy") - ax.grid() + def site_isos_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist( + "site_isos", i, bounds=(0, 1), cumul=self.cumulative, avg=True + ) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Isoparametric Values") + ax.set_xlabel("Isoparametric Value") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.ticklabel_format(useOffset=False) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + # for xtick, color in zip(ax.get_xticklabels(), isoparam_bar[2]): + # if color != 'C0': + # xtick.set_color(color) - def site_areas_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("site_areas", i, cumul=self.cumulative, avg=True) + def site_energies_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist("site_energies", i, cumul=self.cumulative, avg=True) - ax.bar(x, y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Site Areas') - ax.set_xlabel("Area") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.ticklabel_format(useOffset=False) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - # for xtick, color in zip(ax.get_xticklabels(), areas_bar[2]): - # if color != 'C0': - # xtick.set_color(color) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Site Energies") + ax.set_xlabel("Energy") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.ticklabel_format(useOffset=False) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + def avg_radius_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist("avg_radius", i, cumul=self.cumulative, avg=True) - def site_edge_count_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("site_edge_count", i, bounds=(1, 11), cumul=self.cumulative, avg=True) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Site Average Radii") + ax.set_xlabel("Average Radius") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.ticklabel_format(useOffset=False) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - ax.bar(x, y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Edges per Site') - ax.set_xlabel("Number of Edges") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.set_xticklabels([int(z) for z in x]) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + def isoparam_avg_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist("isoparam_avg", i, cumul=self.cumulative, avg=True) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Site Isoperimetric Averages") + ax.set_xlabel("Isoperimetric Average") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.ticklabel_format(useOffset=False) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - def site_isos_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("site_isos", i, bounds=(0,1), cumul=self.cumulative, avg=True) + def edge_lengths_plot(self, i: int, ax: AxesSubplot) -> None: + y, x = self.sim.hist("edge_lengths", i, 30, cumul=self.cumulative, avg=True) - ax.bar(x, y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Isoparametric Values') - ax.set_xlabel("Isoparametric Value") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.ticklabel_format(useOffset=False) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - # for xtick, color in zip(ax.get_xticklabels(), isoparam_bar[2]): - # if color != 'C0': - # xtick.set_color(color) + ax.bar(x, y, width=0.8 * (x[1] - x[0])) + ax.title.set_text("Edge Lengths") + ax.set_xlabel("Length") + ax.set_ylabel("Average Occurances") + ax.set_xticks(x) + ax.set_xticklabels(ax.get_xticks(), rotation=90) + ax.xaxis.set_major_formatter(FormatStrFormatter("%.3f")) + # ax.ticklabel_format(useOffset=False) + ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + # for xtick, color in zip(ax.get_xticklabels(), lengths_bar[2]): + # if color != 'C0': + # xtick.set_color(color) + def eigs_plot(self, i: int, ax: AxesSubplot) -> None: + try: + eigs = self.sim.stats[i]["eigs"] + ax.plot( + list(range(len(eigs))), eigs, marker="o", linestyle="dashed", color="C0" + ) + ax.plot([0, len(eigs)], [0, 0], color="red") + except KeyError: + ax.text(0.5, 0.5, "Not Computed") - def site_energies_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("site_energies", i, cumul=self.cumulative, avg=True) + ax.title.set_text("Hessian Eigenvalues") + ax.set_xlabel("") + ax.set_ylabel("Value") - ax.bar(x, y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Site Energies') - ax.set_xlabel("Energy") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.ticklabel_format(useOffset=False) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + def render_frames(self, frames: List[int], fol: str = "frames") -> None: + OUTPUT_DIR.mkdir(exist_ok=True) + self.sim.path.mkdir(exist_ok=True) + (self.sim.path / fol).mkdir(exist_ok=True) + combo_list = [] + for i in range(cpu_count()): + start, end = ( + int(i * len(frames) / cpu_count()), + int((i + 1) * len(frames) / cpu_count()), + ) + new_dia = Diagram(None, self.diagrams, self.cumulative) + new_dia.sim = self.sim.slice(frames[start:end]) + combo_list.append( + (new_dia, fol, start, len(frames[start:end]), len(frames)) + ) + # Free up memory, since it's already duplicated to other cores. + self.sim = self.sim.slice([]) + with Pool(cpu_count()) as pool: + for _ in pool.imap_unordered(render_frame_range, combo_list): + pass - def avg_radius_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("avg_radius", i, cumul=self.cumulative, avg=True) + print(flush=True) + print(f'Wrote to "{self.sim.path / fol}".', flush=True) - ax.bar(x, y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Site Average Radii') - ax.set_xlabel("Average Radius") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.ticklabel_format(useOffset=False) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + def render_video(self, time: int, mode: str) -> None: + if mode not in ["use_all", "sample"]: + raise ValueError("Not a valid mode for videos!") + if mode == "use_all": + frames = list(range(len(self.sim))) + elif mode == "sample": + fps = 30 + if len(self.sim) < fps * time: + frames = list(range(len(self.sim))) + fps = len(self.sim) / time + else: + frames = list( + np.round(np.linspace(0, len(self.sim) - 1, fps * time)).astype(int) + ) - def isoparam_avg_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("isoparam_avg", i, cumul=self.cumulative, avg=True) + self.render_frames(frames, "temp") + path = self.sim.path / "simulation.mp4" - ax.bar(x,y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Site Isoperimetric Averages') - ax.set_xlabel("Isoperimetric Average") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.ticklabel_format(useOffset=False) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) + print("Assembling MP4...", flush=True) + os.system( + f"ffmpeg -hide_banner -loglevel error -r {fps} -i" + + f' "{self.sim.path}/temp/img%05d.png"' + + f" -c:v libx264 -crf 18 -preset slow -pix_fmt yuv420p -vf" + + f' "scale=trunc(iw/2)*2:trunc(ih/2)*2" -f mp4 "{path}"' + ) + # Remove files. + for i in range(len(frames)): + os.remove(self.sim.path / f"temp/img{i:05}.png") - def edge_lengths_plot(self, i: int, ax: AxesSubplot) -> None: - y, x = self.sim.hist("edge_lengths", i, 30, cumul=self.cumulative, avg=True) - - ax.bar(x, y, width=0.8*(x[1]-x[0])) - ax.title.set_text('Edge Lengths') - ax.set_xlabel("Length") - ax.set_ylabel("Average Occurances") - ax.set_xticks(x) - ax.set_xticklabels(ax.get_xticks(), rotation = 90) - ax.xaxis.set_major_formatter(FormatStrFormatter('%.3f')) - #ax.ticklabel_format(useOffset=False) - ax.yaxis.set_major_locator(MaxNLocator(integer=True)) - # for xtick, color in zip(ax.get_xticklabels(), lengths_bar[2]): - # if color != 'C0': - # xtick.set_color(color) - - - def eigs_plot(self, i: int, ax: AxesSubplot) -> None: - try: - eigs = self.sim.stats[i]["eigs"] - ax.plot(list(range(len(eigs))), eigs, marker='o', linestyle='dashed', color='C0') - ax.plot([0,len(eigs)], [0, 0], color="red") - except KeyError: - ax.text(0.5, 0.5, "Not Computed") - - ax.title.set_text('Hessian Eigenvalues') - ax.set_xlabel("") - ax.set_ylabel("Value") - - - def render_frames(self, frames: List[int], fol: str = 'frames') -> None: - OUTPUT_DIR.mkdir(exist_ok=True) - self.sim.path.mkdir(exist_ok=True) - (self.sim.path / fol).mkdir(exist_ok=True) - combo_list = [] - for i in range(cpu_count()): - start, end = int(i*len(frames)/cpu_count()), int((i+1)*len(frames)/cpu_count()) - new_dia = Diagram(None, self.diagrams, self.cumulative) - new_dia.sim = self.sim.slice(frames[start:end]) - combo_list.append((new_dia, fol, start, len(frames[start:end]), len(frames))) - - # Free up memory, since it's already duplicated to other cores. - self.sim = self.sim.slice([]) - with Pool(cpu_count()) as pool: - for _ in pool.imap_unordered(render_frame_range, combo_list): - pass - - print(flush=True) - print(f'Wrote to \"{self.sim.path / fol}\".', flush=True) - - - def render_video(self, time: int, mode: str) -> None: - if mode not in ["use_all", "sample"]: - raise ValueError("Not a valid mode for videos!") - - if mode == "use_all": - frames = list(range(len(self.sim))) - elif mode == "sample": - fps = 30 - if len(self.sim) < fps*time: - frames = list(range(len(self.sim))) - fps = len(self.sim)/time - else: - frames = list(np.round(np.linspace(0, len(self.sim)-1, fps*time)).astype(int)) - - self.render_frames(frames, 'temp') - path = self.sim.path / 'simulation.mp4' - - print("Assembling MP4...", flush=True) - os.system(f'ffmpeg -hide_banner -loglevel error -r {fps} -i' + \ - f' \"{self.sim.path}/temp/img%05d.png\"' + \ - f' -c:v libx264 -crf 18 -preset slow -pix_fmt yuv420p -vf' + \ - f' "scale=trunc(iw/2)*2:trunc(ih/2)*2" -f mp4 "{path}"') - - # Remove files. - for i in range(len(frames)): - os.remove(self.sim.path / f"temp/img{i:05}.png") - - os.rmdir(self.sim.path / 'temp') - print(f'Wrote to \"{path}\".', flush=True) + os.rmdir(self.sim.path / "temp") + print(f'Wrote to "{path}".', flush=True) def render_frame_range(combo: Tuple[Diagram, str, int, int, int]) -> None: - self, fol, offset, length, num_frames = combo - for i in range(length): - self.generate_frame(i, "save", fol, f"img{i+offset:05}.png") - i = len(list((self.sim.path / fol).iterdir())) - hashes = int(21*i/num_frames) - print(f'Generating frames... |{"#"*hashes}{" "*(20-hashes)}|' + \ - f' {i}/{num_frames} frames rendered.', flush=True, end='\r') + self, fol, offset, length, num_frames = combo + for i in range(length): + self.generate_frame(i, "save", fol, f"img{i+offset:05}.png") + i = len(list((self.sim.path / fol).iterdir())) + hashes = int(21 * i / num_frames) + print( + f'Generating frames... |{"#"*hashes}{" "*(20-hashes)}|' + + f" {i}/{num_frames} frames rendered.", + flush=True, + end="\r", + ) diff --git a/squish/ordered.py b/squish/ordered.py index 962cb91..d74bfed 100644 --- a/squish/ordered.py +++ b/squish/ordered.py @@ -6,82 +6,101 @@ from math import gcd, sqrt, log, tan, atan, pi Config = Tuple[int, int] + def configurations(domain: DomainParams) -> List[Config]: - n, w, h = domain.n, domain.w, domain.h - valid = [] - mults = np.arange(n) - configs = np.dstack((np.repeat(mults,n).T, np.tile(mults, n).T))[0][1:] - for i in range(len(configs)): - eq_x = n if configs[i][0] == 0 else configs[i][0] - eq_y = n if configs[i][1] == 0 else configs[i][1] + n, w, h = domain.n, domain.w, domain.h + valid = [] + mults = np.arange(n) + configs = np.dstack((np.repeat(mults, n).T, np.tile(mults, n).T))[0][1:] + for i in range(len(configs)): + eq_x = n if configs[i][0] == 0 else configs[i][0] + eq_y = n if configs[i][1] == 0 else configs[i][1] - if gcd(eq_x, eq_y) != 1: - continue + if gcd(eq_x, eq_y) != 1: + continue - vecs = configs[i]*np.dstack((w*mults, h*mults)).swapaxes(0,1)/n % domain.dim - vmod2 = np.squeeze(np.matmul(vecs, vecs.transpose(0,2,1))) - vmodv = np.squeeze(vecs).dot(vecs[1].T).T.flatten() + vecs = ( + configs[i] + * np.dstack((w * mults, h * mults)).swapaxes(0, 1) + / n + % domain.dim + ) + vmod2 = np.squeeze(np.matmul(vecs, vecs.transpose(0, 2, 1))) + vmodv = np.squeeze(vecs).dot(vecs[1].T).T.flatten() - if np.all(vmod2 >= vmodv): - valid.append(tuple(configs[i])) + if np.all(vmod2 >= vmodv): + valid.append(tuple(configs[i])) - return valid + return valid -def get_config_generators(domain: DomainParams, config: Config) -> Tuple[Config, Config]: - n, w, h = domain.n, domain.w, domain.h - q1 = sites(domain, config) - v = q1[1] - # Sites to check can ignore 0*v and v itself. - all_sites = np.concatenate((q1, q1-[w,0], q1-[w,h], q1-[0,h]))[2:] +def get_config_generators( + domain: DomainParams, config: Config +) -> Tuple[Config, Config]: + n, w, h = domain.n, domain.w, domain.h + q1 = sites(domain, config) + v = q1[1] + # Sites to check can ignore 0*v and v itself. + all_sites = np.concatenate((q1, q1 - [w, 0], q1 - [w, h], q1 - [0, h]))[2:] - # Checking 0 < ax + by < v*v to make the sites are within the region. - tol = 1e-3 - vdot = np.matmul(all_sites, v) - in_box = all_sites[np.where((-tol <= vdot) & (vdot <= (v.dot(v)+tol)))[0]] - in_box = np.expand_dims(in_box, 0).swapaxes(0,1) # Used for the next step, getting site*site + # Checking 0 < ax + by < v*v to make the sites are within the region. + tol = 1e-3 + vdot = np.matmul(all_sites, v) + in_box = all_sites[np.where((-tol <= vdot) & (vdot <= (v.dot(v) + tol)))[0]] + in_box = np.expand_dims(in_box, 0).swapaxes( + 0, 1 + ) # Used for the next step, getting site*site - w = in_box[np.argmin(np.squeeze(np.matmul(in_box, in_box.transpose(0,2,1))))].flatten() + w = in_box[ + np.argmin(np.squeeze(np.matmul(in_box, in_box.transpose(0, 2, 1)))) + ].flatten() - return tuple(v), tuple(w) + return tuple(v), tuple(w) def sites(domain: DomainParams, config: Config) -> numpy.ndarray: - n, w, h = domain.n, domain.w, domain.h - config, mults = np.array(config), np.arange(domain.n) - return (config*np.dstack((w*mults, h*mults))[0]/n) % domain.dim + n, w, h = domain.n, domain.w, domain.h + config, mults = np.array(config), np.arange(domain.n) + return (config * np.dstack((w * mults, h * mults))[0] / n) % domain.dim def area(domain: DomainParams, config: Config) -> float: - v, w = get_config_generators(domain, config) - v, w = np.array(v), np.array(w) - c = circumcenter(v, w) + v, w = get_config_generators(domain, config) + v, w = np.array(v), np.array(w) + c = circumcenter(v, w) - return mag(v)*mag(v/2 - c) + mag(w)*mag(w/2-c) + mag(v-w)*mag((v+w)/2-c) + return ( + mag(v) * mag(v / 2 - c) + + mag(w) * mag(w / 2 - c) + + mag(v - w) * mag((v + w) / 2 - c) + ) def avg_radius(domain: DomainParams, config: Config) -> float: - v, w = get_config_generators(domain, config) - v, w = np.array(v), np.array(w) - c = circumcenter(v, w) + v, w = get_config_generators(domain, config) + v, w = np.array(v), np.array(w) + c = circumcenter(v, w) - return 2*(avg_rp(mag(v), 2*mag(v/2 - c)) + avg_rp(mag(w), 2*mag(w/2-c)) + \ - avg_rp(mag(v-w),2*mag((v+w)/2-c))) + return 2 * ( + avg_rp(mag(v), 2 * mag(v / 2 - c)) + + avg_rp(mag(w), 2 * mag(w / 2 - c)) + + avg_rp(mag(v - w), 2 * mag((v + w) / 2 - c)) + ) def avg_rp(d: float, l: float) -> float: - return (d/(4*pi))*log(tan(.5*(atan(l/d)+pi/2))**2) + return (d / (4 * pi)) * log(tan(0.5 * (atan(l / d) + pi / 2)) ** 2) def circumcenter(v: numpy.ndarray, w: numpy.ndarray) -> Config: - det = 1/(2*rot(v).dot(w)) - v2, w2 = v.dot(v), w.dot(w) - c = np.empty((2,)) - c[0], c[1] = w[1]*v2 - v[1]*w2,-w[0]*v2 + v[0]*w2 - return det*c + det = 1 / (2 * rot(v).dot(w)) + v2, w2 = v.dot(v), w.dot(w) + c = np.empty((2,)) + c[0], c[1] = w[1] * v2 - v[1] * w2, -w[0] * v2 + v[0] * w2 + return det * c def rot(v: numpy.ndarray) -> numpy.ndarray: - w = np.copy(v) - w[0], w[1] = -w[1], w[0] - return w \ No newline at end of file + w = np.copy(v) + w[0], w[1] = -w[1], w[0] + return w diff --git a/squish/simulation.py b/squish/simulation.py index c384296..fb93f4d 100644 --- a/squish/simulation.py +++ b/squish/simulation.py @@ -11,372 +11,419 @@ from .common import DomainParams, Energy, generate_filepath, OUTPUT_DIR class Simulation: - """Generic container for simulations. + """Generic container for simulations. - Attributes: - domain (DomainParams): Domain Parameters for this simulation. - energy (Energy): energy being used for caluclations. - path (Path): Path to location of where to store simulation files. - frames (List[VoronoiContainer]): Stores frames of the simulation. + Attributes: + domain (DomainParams): Domain Parameters for this simulation. + energy (Energy): energy being used for caluclations. + path (Path): Path to location of where to store simulation files. + frames (List[VoronoiContainer]): Stores frames of the simulation. - """ + """ - __slots__ = ['domain', 'energy', 'path', 'frames'] + __slots__ = ["domain", "energy", "path", "frames"] - def __init__(self, domain: DomainParams, energy: Energy, \ - name: Optional[str, int] = None) -> None: - self.domain, self.energy = domain, energy - self.frames = [] + def __init__( + self, domain: DomainParams, energy: Energy, name: Optional[str, int] = None + ) -> None: + self.domain, self.energy = domain, energy + self.frames = [] - if name is None: - self.path = generate_filepath(self, OUTPUT_DIR) - elif isinstance(name, int): - self.path = generate_filepath(self, OUTPUT_DIR, name) - else: - self.path = OUTPUT_DIR / name + if name is None: + self.path = generate_filepath(self, OUTPUT_DIR) + elif isinstance(name, int): + self.path = generate_filepath(self, OUTPUT_DIR, name) + else: + self.path = OUTPUT_DIR / name + def __iter__(self) -> Iterator: + return iter(self.frames) - def __iter__(self) -> Iterator: - return iter(self.frames) + def __getitem__(self, key: int) -> Energy: + return self.frames[key] - def __getitem__(self, key: int) -> Energy: - return self.frames[key] + def __len__(self) -> int: + return len(self.frames) + def add_frame(self, points: Optional[numpy.ndarray] = None) -> None: + if points is None: + points = np.random.random_sample((self.domain.n, 2)) * self.domain.dim + else: + if points.shape[1] != 2 or len(points.shape) > 2: + raise ValueError("Sites should be 2 dimensional!") - def __len__(self) -> int: - return len(self.frames) + if points.shape[0] != self.domain.n: + raise ValueError("Number of sites provided do not match the array!") + self.frames.append(self.energy.mode(*self.domain, points % self.domain.dim)) - def add_frame(self, points: Optional[numpy.ndarray] = None) -> None: - if points is None: - points = np.random.random_sample((self.domain.n, 2)) * self.domain.dim - else: - if points.shape[1] != 2 or len(points.shape) > 2: - raise ValueError("Sites should be 2 dimensional!") + def get_distinct(self) -> List[int]: + """Gets the distinct configurations based on the average radii of the sites. + and returns the number of configurations for each distinct configuration. + """ - if points.shape[0] != self.domain.n: - raise ValueError("Number of sites provided do not match the array!") + distinct_avg_radii, distinct_count, new_frames = [], [], [] - self.frames.append(self.energy.mode(*self.domain, points % self.domain.dim)) + for frame in self.frames: + try: + stats = frame.stats + except AttributeError: + stats = frame["stats"] # When we have a loaded simulations. + avg_radii = np.sort(stats["avg_radius"]) + is_in = False + for i, dist_radii in enumerate(distinct_avg_radii): + if np.allclose(avg_radii, dist_radii, atol=1e-5): + is_in = True + distinct_count[i] += 1 + break - def get_distinct(self) -> List[int]: - """Gets the distinct configurations based on the average radii of the sites. - and returns the number of configurations for each distinct configuration. - """ + if not is_in: + distinct_avg_radii.append(avg_radii) + distinct_count.append(1) + new_frames.append(frame) - distinct_avg_radii, distinct_count, new_frames = [], [], [] + self.frames = new_frames + return distinct_count + def save(self, info: Dict, overwrite: bool = False) -> None: + OUTPUT_DIR.mkdir(exist_ok=True) + self.path.mkdir(exist_ok=True) + path = self.path / "data.squish" - for frame in self.frames: - try: - stats = frame.stats - except AttributeError: - stats = frame["stats"] # When we have a loaded simulations. + with open(path, "wb" if overwrite else "ab") as out: + pickle.dump(info, out, pickle.HIGHEST_PROTOCOL) - avg_radii = np.sort(stats["avg_radius"]) - is_in = False - for i, dist_radii in enumerate(distinct_avg_radii): - if np.allclose(avg_radii, dist_radii, atol=1e-5): - is_in = True - distinct_count[i] += 1 - break + def save_all(self) -> None: + self.save(self.initial_data, True) + for i in range(len(self.frames)): + self.save(self.frame_data(i)) - if not is_in: - distinct_avg_radii.append(avg_radii) - distinct_count.append(1) - new_frames.append(frame) + def frame_data(self, index: int) -> None: + f = self[index] + info = { + "arr": f.site_arr, + "domain": (f.n, f.w, f.h, f.r), + "energy": f.energy, + "stats": f.stats, + } + return info - self.frames = new_frames - return distinct_count + @staticmethod + def load(path: str) -> Tuple[Simulation, Generator]: + path = Path(path) + def frames() -> Dict: + with open(path / "data.squish", "rb") as infile: + first = True + while True: + try: + if first: + pickle.load(infile) + first = False + continue + yield pickle.load(infile) + except EOFError: + break - def save(self, info: Dict, overwrite: bool = False) -> None: - OUTPUT_DIR.mkdir(exist_ok=True) - self.path.mkdir(exist_ok=True) - path = self.path / 'data.squish' + with open(path / "data.squish", "rb") as infile: + sim_info = pickle.load(infile) - with open(path, 'wb' if overwrite else 'ab') as out: - pickle.dump(info, out, pickle.HIGHEST_PROTOCOL) + domain = DomainParams(*sim_info["domain"]) + energy = Energy(sim_info["energy"]) + sim = STR_TO_SIM[sim_info["mode"]]( + domain, energy, *list(sim_info.values())[3:] + ) + sim.path = path + return sim, frames() - def save_all(self) -> None: - self.save(self.initial_data, True) - for i in range(len(self.frames)): - self.save(self.frame_data(i)) + @staticmethod + def from_file(path: str) -> Simulation: + sim, frames = Simulation.load(path) + for frame in frames: + sim.frames.append(sim.energy.mode(*frame["domain"], frame["arr"])) - - def frame_data(self, index: int) -> None: - f = self[index] - info = { - "arr": f.site_arr, - "domain": (f.n, f.w, f.h, f.r), - "energy": f.energy, - "stats": f.stats - } - return info - - - @staticmethod - def load(path: str) -> Tuple[Simulation, Generator]: - path = Path(path) - def frames() -> Dict: - with open(path / 'data.squish', 'rb') as infile: - first = True - while True: - try: - if first: - pickle.load(infile) - first = False - continue - yield pickle.load(infile) - except EOFError: - break - - with open(path / 'data.squish', 'rb') as infile: - sim_info = pickle.load(infile) - - domain = DomainParams(*sim_info["domain"]) - energy = Energy(sim_info["energy"]) - sim = STR_TO_SIM[sim_info["mode"]](domain, energy, *list(sim_info.values())[3:]) - sim.path = path - - return sim, frames() - - - @staticmethod - def from_file(path: str) -> Simulation: - sim, frames = Simulation.load(path) - for frame in frames: - sim.frames.append(sim.energy.mode(*frame["domain"], frame["arr"])) - - return sim + return sim class Flow(Simulation): - """Finds an equilibrium from initial sites. + """Finds an equilibrium from initial sites. - Attributes: - domain (DomainParams): domain parameters for this simulation. - energy (Energy): energy being used for caluclations. - path (Path): path to the location of where to store simulation files. - frames (List[VoronoiContainer]): stores frames of the simulation. - step_size (float): size fo step by for each iteration. - thres (float): threshold for the stopping condition. - adaptive (bool): set to True if adaptive stepping is desired. + Attributes: + domain (DomainParams): domain parameters for this simulation. + energy (Energy): energy being used for caluclations. + path (Path): path to the location of where to store simulation files. + frames (List[VoronoiContainer]): stores frames of the simulation. + step_size (float): size fo step by for each iteration. + thres (float): threshold for the stopping condition. + adaptive (bool): set to True if adaptive stepping is desired. - """ + """ - __slots__ = ['step_size', 'thres', 'adaptive'] - attr_str = "flow" - title_str = "Flow" + __slots__ = ["step_size", "thres", "adaptive"] + attr_str = "flow" + title_str = "Flow" - def __init__(self, domain: DomainParams, energy: Energy, step_size: float, thres: float, - adaptive: bool, name: Optional[str] = None) -> None: - super().__init__(domain, energy, name=name) - self.step_size, self.thres, self.adaptive = step_size, thres, adaptive + def __init__( + self, + domain: DomainParams, + energy: Energy, + step_size: float, + thres: float, + adaptive: bool, + name: Optional[str] = None, + ) -> None: + super().__init__(domain, energy, name=name) + self.step_size, self.thres, self.adaptive = step_size, thres, adaptive + @property + def initial_data(self) -> Dict: + info = { + "mode": self.attr_str, + "domain": (self.domain.n, self.domain.w, self.domain.h, self.domain.r), + "energy": self.energy.attr_str, + "step_size": self.step_size, + "thres": self.thres, + "adaptive": self.adaptive, + } + return info - @property - def initial_data(self) -> Dict: - info = { - "mode": self.attr_str, - "domain": (self.domain.n, self.domain.w, self.domain.h, self.domain.r), - "energy": self.energy.attr_str, - "step_size": self.step_size, - "thres": self.thres, - "adaptive": self.adaptive - } - return info + def run( + self, + save: bool, + log: bool, + log_steps: int, + new_sites: Optional[numpy.ndarray] = None, + ) -> None: + if log: + print(f"Find - {self.domain}", flush=True) + if save and len(self) == 0: + self.save(self.initial_data, True) + if len(self) == 0: + self.add_frame(new_sites) + i, stop = len(self) - 1, False - def run(self, save: bool, log: bool, log_steps: int, - new_sites: Optional[numpy.ndarray] = None) -> None: - if log: print(f"Find - {self.domain}", flush=True) - if save and len(self) == 0: self.save(self.initial_data, True) - if len(self) == 0: self.add_frame(new_sites) + while not stop: # Get to threshold. + if save: + self.save(self.frame_data(i)) + frame = self[i] + else: + frame = self[0] - i, stop = len(self)-1, False + # Iterate and generate next frame using RK-2 + start = timer() + change, grad = frame.iterate(self.step_size) - while not stop: # Get to threshold. - if save: - self.save(self.frame_data(i)) - frame = self[i] - else: - frame = self[0] + new_frame = self.energy.mode(*self.domain, frame.add_sites(change)) + grad_norm = np.linalg.norm(grad) + end = timer() - # Iterate and generate next frame using RK-2 - start = timer() - change, grad = frame.iterate(self.step_size) + if self.adaptive: + error = change - grad * self.step_size + tol = 10 ** min(-3, -2 + log10(grad_norm)) - new_frame = self.energy.mode(*self.domain, frame.add_sites(change)) - grad_norm = np.linalg.norm(grad) - end = timer() + self.step_size *= (tol / np.linalg.norm(error)) ** 0.5 - if self.adaptive: - error = change - grad*self.step_size - tol = 10**min(-3, -2+log10(grad_norm)) + if not save: + del self.frames[0] - self.step_size *= (tol/np.linalg.norm(error))**.5 - - if not save: - del self.frames[0] - - self.frames.append(new_frame) - stop = grad_norm < self.thres - - i += 1 - if (log and i % log_steps == 0) or stop: - print(f'Iteration: {i:05} | Energy: {frame.energy: .5f}' + \ - f' | Gradient: {grad_norm:.8f} | Step: {self.step_size: .5f} | ' + \ - f'Time: {end-start: .3f} |', flush=True) + self.frames.append(new_frame) + stop = grad_norm < self.thres + i += 1 + if (log and i % log_steps == 0) or stop: + print( + f"Iteration: {i:05} | Energy: {frame.energy: .5f}" + + f" | Gradient: {grad_norm:.8f} | Step: {self.step_size: .5f} | " + + f"Time: {end-start: .3f} |", + flush=True, + ) class Search(Simulation): - """Searches for a given number of equilibria. + """Searches for a given number of equilibria. - Attributes: - domain (DomainParams): domain parameters for this simulation. - energy (Energy): energy being used for caluclations. - path (Path): path to the location of where to store simulation files. - frames (List[VoronoiContainer]): stores frames of the simulation. - step_size (float): size fo step by for each iteration. - thres (float): threshold for the stopping condition. - adaptive (bool): set to True if adaptive stepping is desired. - kernel_step (float): size to step on manifold if nullity of hessian > 2. - count (int): number of equilibria to find. + Attributes: + domain (DomainParams): domain parameters for this simulation. + energy (Energy): energy being used for caluclations. + path (Path): path to the location of where to store simulation files. + frames (List[VoronoiContainer]): stores frames of the simulation. + step_size (float): size fo step by for each iteration. + thres (float): threshold for the stopping condition. + adaptive (bool): set to True if adaptive stepping is desired. + kernel_step (float): size to step on manifold if nullity of hessian > 2. + count (int): number of equilibria to find. - """ + """ - __slots__ = ['step_size', 'thres', 'adaptive', 'kernel_step', 'count'] - attr_str = "search" - title_str = "Search" + __slots__ = ["step_size", "thres", "adaptive", "kernel_step", "count"] + attr_str = "search" + title_str = "Search" - def __init__(self, domain: DomainParams, energy: Energy, step_size: float, thres: float, - adaptive: bool, kernel_step: float, count: int, - name: Optional[str] = None) -> None: - super().__init__(domain, energy, name=name) - self.step_size, self.thres, self.adaptive = step_size, thres, adaptive - self.kernel_step, self.count = kernel_step, count + def __init__( + self, + domain: DomainParams, + energy: Energy, + step_size: float, + thres: float, + adaptive: bool, + kernel_step: float, + count: int, + name: Optional[str] = None, + ) -> None: + super().__init__(domain, energy, name=name) + self.step_size, self.thres, self.adaptive = step_size, thres, adaptive + self.kernel_step, self.count = kernel_step, count + @property + def initial_data(self) -> Dict: + info = { + "mode": self.attr_str, + "domain": (self.domain.n, self.domain.w, self.domain.h, self.domain.r), + "energy": self.energy.attr_str, + "step_size": self.step_size, + "thres": self.thres, + "adaptive": self.adaptive, + "kernel_step": self.kernel_step, + "count": self.count, + } + return info - @property - def initial_data(self) -> Dict: - info = { - "mode": self.attr_str, - "domain": (self.domain.n, self.domain.w, self.domain.h, self.domain.r), - "energy": self.energy.attr_str, - "step_size": self.step_size, - "thres": self.thres, - "adaptive": self.adaptive, - "kernel_step": self.kernel_step, - "count": self.count - } - return info + def run( + self, + save: bool, + log: bool, + log_steps: int, + new_sites: Optional[numpy.ndarray] = None, + ) -> None: + if log: + print(f"Travel - {self.domain}", flush=True) + if save and len(self) == 0: + self.save(self.initial_data, True) + for i in range(len(self), self.count): + # Get to equilibrium. + sim = Flow( + self.domain, self.energy, self.step_size, self.thres, self.adaptive + ) + sim.add_frame(new_sites) + sim.run(False, log, log_steps) - def run(self, save: bool, log: bool, log_steps: int, - new_sites: Optional[numpy.ndarray] = None) -> None: - if log: print(f'Travel - {self.domain}', flush=True) - if save and len(self) == 0: self.save(self.initial_data, True) + self.frames.append(sim[-1]) + if save: + self.save(self.frame_data(i)) + if log: + print(f"Equilibrium: {i:04}\n", flush=True) - for i in range(len(self), self.count): - # Get to equilibrium. - sim = Flow(self.domain, self.energy, self.step_size, self.thres, self.adaptive) - sim.add_frame(new_sites) - sim.run(False, log, log_steps) + # Get Hessian,and check nullity. If > 2, perturb. + hess = self.frames[i].hessian(10e-5) + eigs = np.sort(np.linalg.eig(hess)[0]) + self.frames[i].stats["eigs"] = eigs - self.frames.append(sim[-1]) - if save: self.save(self.frame_data(i)) - if log: print(f'Equilibrium: {i:04}\n', flush=True) + zero_eigs = np.count_nonzero( + np.isclose(eigs, np.zeros((len(eigs),)), atol=1e-4) + ) - # Get Hessian,and check nullity. If > 2, perturb. - hess = self.frames[i].hessian(10e-5) - eigs = np.sort(np.linalg.eig(hess)[0]) - self.frames[i].stats["eigs"] = eigs - - zero_eigs = np.count_nonzero(np.isclose(eigs, np.zeros((len(eigs),)), atol=1e-4)) - - if zero_eigs == 2: - new_sites = None - else: - print("Warning: Nullity > 2. Expected if AreaEnergy.", flush=True) - ns = null_space(hess, 10e-4).T - vec = ns[random.randint(0, len(ns)-1)].reshape((self.domain.n, 2)) # Random vector. - new_sites = self.frames[i].add_sites(self.kernel_step*vec) + if zero_eigs == 2: + new_sites = None + else: + print("Warning: Nullity > 2. Expected if AreaEnergy.", flush=True) + ns = null_space(hess, 10e-4).T + vec = ns[random.randint(0, len(ns) - 1)].reshape( + (self.domain.n, 2) + ) # Random vector. + new_sites = self.frames[i].add_sites(self.kernel_step * vec) class Shrink(Simulation): - """Shrinks width and finds nearest equilibrium. + """Shrinks width and finds nearest equilibrium. - Attributes: - domain (DomainParams): domain parameters for this simulation. - energy (Energy): energy being used for caluclations. - path (Path): path to the location of where to store simulation files. - frames (List[VoronoiContainer]): stores frames of the simulation. - step_size (float): size fo step by for each iteration. - thres (float): threshold for the stopping condition. - adaptive (bool): set to True if adaptive stepping is desired. - delta (float): percent to change w each iteration. - stop_width (float): percent at which to stop iterating. + Attributes: + domain (DomainParams): domain parameters for this simulation. + energy (Energy): energy being used for calcu1lations. + path (Path): path to the location of where to store simulation files. + frames (List[VoronoiContainer]): stores frames of the simulation. + step_size (float): size fo step by for each iteration. + thres (float): threshold for the stopping condition. + adaptive (bool): set to True if adaptive stepping is desired. + delta (float): percent to change w each iteration. + stop_width (float): percent at which to stop iterating. - """ + """ - __slots__ = ['step_size', 'thres', 'adaptive', 'delta', 'stop_width'] - attr_str = "shrink" - title_str = "Shrink" + __slots__ = ["step_size", "thres", "adaptive", "delta", "stop_width"] + attr_str = "shrink" + title_str = "Shrink" + + def __init__( + self, + domain: DomainParams, + energy: Energy, + step_size: float, + thres: float, + adaptive: bool, + delta: float, + stop_width: float, + name: Optional[str] = None, + ) -> None: + super().__init__(domain, energy, name=name) + self.step_size, self.thres, self.adaptive = step_size, thres, adaptive + self.delta, self.stop_width = ( + self.domain.w * delta / 100, + self.domain.w * stop_width, + ) + + @property + def initial_data(self) -> Dict: + info = { + "mode": self.attr_str, + "domain": (self.domain.n, self.domain.w, self.domain.h, self.domain.r), + "energy": self.energy.attr_str, + "step_size": self.step_size, + "thres": self.thres, + "adaptive": self.adaptive, + "delta": self.delta, + "stop_width": self.stop_width, + } + return info + + def run( + self, + save: bool, + log: bool, + log_steps: int, + new_sites: Optional[numpy.ndarray] = None, + ) -> None: + if log: + print(f"Shrink - {self.domain}", flush=True) + if save and len(self) == 0: + self.save(self.initial_data, True) + + width = self.domain.w if len(self.frames) == 0 else self.frames[-1].w + i = 0 + while width >= self.stop_width: + # Get to equilibrium. + new_domain = DomainParams( + self.domain.n, width, self.domain.h, self.domain.r + ) + sim = Flow( + new_domain, self.energy, self.step_size, self.thres, self.adaptive + ) + sim.add_frame(new_sites) + sim.run(False, log, log_steps) + new_sites = sim[-1].site_arr + + self.frames.append(sim[-1]) + if save: + self.save(self.frame_data(i)) + + if log: + print(f"Width: {width:.4f}\n") + + width -= self.delta + i += 1 - def __init__(self, domain: DomainParams, energy: Energy, step_size: float, thres: float, - adaptive: bool, delta: float, stop_width: float, - name: Optional[str] = None) -> None: - super().__init__(domain, energy, name=name) - self.step_size, self.thres, self.adaptive = step_size, thres, adaptive - self.delta, self.stop_width = self.domain.w*delta/100, self.domain.w*stop_width - - - @property - def initial_data(self) -> Dict: - info = { - "mode": self.attr_str, - "domain": (self.domain.n, self.domain.w, self.domain.h, self.domain.r), - "energy": self.energy.attr_str, - "step_size": self.step_size, - "thres": self.thres, - "adaptive": self.adaptive, - "delta": self.delta, - "stop_width": self.stop_width - } - return info - - - def run(self, save: bool, log: bool, log_steps: int, - new_sites: Optional[numpy.ndarray] = None) -> None: - if log: print(f'Shrink - {self.domain}', flush=True) - if save and len(self) == 0: self.save(self.initial_data, True) - - width = self.domain.w if len(self.frames) == 0 else self.frames[-1].w - i = 0 - while width >= self.stop_width: - # Get to equilibrium. - new_domain = DomainParams(self.domain.n, width, self.domain.h, self.domain.r) - sim = Flow(new_domain, self.energy, self.step_size, self.thres, self.adaptive) - sim.add_frame(new_sites) - sim.run(False, log, log_steps) - new_sites = sim[-1].site_arr - - self.frames.append(sim[-1]) - if save: self.save(self.frame_data(i)) - - if log: print(f'Width: {width:.4f}\n') - - width -= self.delta - i += 1 - - -STR_TO_SIM = { - "flow": Flow, - "search": Search, - "shrink": Shrink -} \ No newline at end of file +STR_TO_SIM = {"flow": Flow, "search": Search, "shrink": Shrink} diff --git a/squish/squish.py b/squish/squish.py index 8cae60d..8250485 100644 --- a/squish/squish.py +++ b/squish/squish.py @@ -9,154 +9,224 @@ from .simulation import Simulation, Flow, Search, Shrink from .diagram import Diagram dia_presets = { - "animate": [["voronoi"]], - "energy": [["voronoi", "energy"]], - "stats": [ - ["voronoi", "eigs", "site_edge_count"], - ["site_isos", "site_energies", "edge_lengths"] - ], - "eigs": [["voronoi", "eigs"]] + "animate": [["voronoi"]], + "energy": [["voronoi", "energy"]], + "stats": [ + ["voronoi", "eigs", "site_edge_count"], + ["site_isos", "site_energies", "edge_lengths"], + ], + "eigs": [["voronoi", "eigs"]], } + def check_params(container: Dict, needed: List[str], valid: Dict) -> None: - """Checks container for the necessary items, and raises - an error if the parameter is not found. + """Checks container for the necessary items, and raises + an error if the parameter is not found. - Args: - container (Dict): contains the submitted parameters. - needed (List[str]): contains the needed paramters. - valid: (Dict): if there are specific valid parameters, it will also check for these. + Args: + container (Dict): contains the submitted parameters. + needed (List[str]): contains the needed paramters. + valid: (Dict): if there are specific valid parameters, it will also check these. - """ + """ - for need in needed: - if need not in container: - raise ValueError(f"Parameter \'{need}\' is required.") + for need in needed: + if need not in container: + raise ValueError(f"Parameter '{need}' is required.") - if need in valid: - if type(valid[need]) is list: - if container[need] not in valid[need]: - raise ValueError(f"Parameter \'{need}\' must be one of these values: " + \ - f"{str(valid[need])[1:-1]}.") - elif valid[need] == "positive": - if container[need] < 0: - raise ValueError(f"Parameter \'{need}\' must be positive.") + if need in valid: + if type(valid[need]) is list: + if container[need] not in valid[need]: + raise ValueError( + f"Parameter '{need}' must be one of these values: " + + f"{str(valid[need])[1:-1]}." + ) + elif valid[need] == "positive": + if container[need] < 0: + raise ValueError(f"Parameter '{need}' must be positive.") def main(): - # Loading configuration and settings. - parser = argparse.ArgumentParser("Squish") - parser.add_argument('sim_conf', metavar='/path/to/config.json', - help="configuration file for a simulation") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False, - help="suppress all normal output") - parser.add_argument('-l', '--log', dest='log_steps', default=50, type=int, - help="number of iterations before logging") - parser.add_argument('-i', dest='input_file', metavar='/path/to/sim', - help="folder that contains outputted simulation files.", default=None) + # Loading configuration and settings. + parser = argparse.ArgumentParser("Squish") + parser.add_argument( + "sim_conf", + metavar="/path/to/config.json", + help="configuration file for a simulation", + ) + parser.add_argument( + "-q", + "--quiet", + dest="quiet", + action="store_true", + default=False, + help="suppress all normal output", + ) + parser.add_argument( + "-l", + "--log", + dest="log_steps", + default=50, + type=int, + help="number of iterations before logging", + ) + parser.add_argument( + "-i", + dest="input_file", + metavar="/path/to/sim", + help="folder that contains outputted simulation files.", + default=None, + ) - parser.add_argument('--n_objects', dest='n', type=int, help="objects in domain") - parser.add_argument('--width', dest='w', type=float, help="width of domain") - parser.add_argument('--height', dest='h', type=float, help="height of domain") - parser.add_argument('--natural_radius', dest='r', type=float, help="natural radius of object") - parser.add_argument('--energy', dest='energy', help="energy type of system") + parser.add_argument("--n_objects", dest="n", type=int, help="objects in domain") + parser.add_argument("--width", dest="w", type=float, help="width of domain") + parser.add_argument("--height", dest="h", type=float, help="height of domain") + parser.add_argument( + "--natural_radius", dest="r", type=float, help="natural radius of object" + ) + parser.add_argument("--energy", dest="energy", help="energy type of system") - args = parser.parse_args() - do_sim(args, args.input_file) + args = parser.parse_args() + do_sim(args, args.input_file) def do_sim(args, file): - with open(args.sim_conf) as f: - params = json.load(f) + with open(args.sim_conf) as f: + params = json.load(f) + check_params(params, ["domain", "simulation"], {}) + dmn_params, sim_params = params["domain"], params["simulation"] - check_params(params, ["domain", "simulation"], {}) - dmn_params, sim_params = params["domain"], params["simulation"] + overrides = { + args.n: "n_objects", + args.w: "width", + args.h: "height", + args.r: "natural_radius", + args.energy: "energy", + } + for arg, arg_name in overrides.items(): + if arg is not None: + dmn_params[arg_name] = arg - overrides = {args.n: "n_objects", args.w: "width", args.h: "height", args.r: "natural_radius", - args.energy: "energy"} - for arg, arg_name in overrides.items(): - if arg is not None: - dmn_params[arg_name] = arg + check_params( + dmn_params, + ["n_objects", "width", "height", "natural_radius", "energy"], + { + "n_objects": "positive", + "width": "positive", + "height": "positive", + "natural_radius": "positive", + "energy": ["area", "radial-al", "radial-t"], + }, + ) + domain = DomainParams( + dmn_params["n_objects"], + dmn_params["width"], + dmn_params["height"], + dmn_params["natural_radius"], + ) + energy = Energy(dmn_params["energy"]) - check_params(dmn_params, ["n_objects", "width", "height", "natural_radius", "energy"], { - "n_objects": "positive", "width": "positive", "height": "positive", - "natural_radius": "positive", "energy": ["area", "radial-al", "radial-t"] - }) - domain = DomainParams(dmn_params["n_objects"], dmn_params["width"], \ - dmn_params["height"], dmn_params["natural_radius"]) - energy = Energy(dmn_params["energy"]) + points = None + if "points" in dmn_params: + if type(dmn_params["points"]) is str: + with open(Path(dmn_params["points"]), "rb") as f: + points = np.load(f) + else: + points = np.asarray(dmn_params["points"]) - points = None - if "points" in dmn_params: - if type(dmn_params["points"]) is str: - with open(Path(dmn_params["points"]), 'rb') as f: - points = np.load(f) - else: - points = np.asarray(dmn_params["points"]) + check_params( + sim_params, + ["mode", "step_size", "threshold", "save_sim", "adaptive"], + { + "mode": ["flow", "search", "shrink"], + "step_size": "positive", + "threshold": "positive", + }, + ) + mode, step, thres, adaptive, save_sim = ( + sim_params["mode"], + sim_params["step_size"], + sim_params["threshold"], + sim_params["adaptive"], + sim_params["save_sim"], + ) - check_params(sim_params, ["mode", "step_size", "threshold", "save_sim", "adaptive"], { - "mode": ["flow", "search", "shrink"], "step_size": "positive", "threshold": "positive", - }) - mode, step, thres, adaptive, save_sim = sim_params["mode"], sim_params["step_size"], \ - sim_params["threshold"], sim_params["adaptive"], \ - sim_params["save_sim"] + name = sim_params.get("name") - name = sim_params.get("name") + if file is None: + if mode == "flow": + sim = Flow(domain, energy, step, thres, adaptive, name=name) + elif mode == "search": + check_params( + sim_params, + ["manifold_step_size", "eq_stop_count"], + {"manifold_step_size": "positive", "eq_stop_count": "positive"}, + ) + sim = Search( + domain, + energy, + step, + thres, + adaptive, + sim_params["manifold_step_size"], + sim_params["eq_stop_count"], + name=name, + ) + elif mode == "shrink": + check_params( + sim_params, + ["width_change", "width_stop"], + {"width_change": "positive", "width_stop": "positive"}, + ) + sim = Shrink( + domain, + energy, + step, + thres, + adaptive, + sim_params["width_change"], + sim_params["width_stop"], + name=name, + ) + else: + sim = Simulation.from_file(file) - if file is None: - if mode == "flow": - sim = Flow(domain, energy, step, thres, adaptive, name=name) - elif mode == "search": - check_params(sim_params, ["manifold_step_size", "eq_stop_count"], { - "manifold_step_size": "positive", "eq_stop_count": "positive" - }) - sim = Search(domain, energy, step, thres, adaptive, sim_params["manifold_step_size"], - sim_params["eq_stop_count"], name=name) - elif mode == "shrink": - check_params(sim_params, ["width_change", "width_stop"], { - "width_change": "positive", "width_stop": "positive" - }) - sim = Shrink(domain, energy, step, thres, adaptive, sim_params["width_change"], - sim_params["width_stop"], name=name) - else: - sim = Simulation.from_file(file) + save_diagram = False + if "diagram" in params: + save_diagram = True + dia_params = params["diagram"] + check_params(dia_params, ["filetype", "figures"], {"filetype": ["img", "mp4"]}) + if dia_params["filetype"] == "mp4": + if which("ffmpeg") is None: + raise ValueError( + "The program 'ffmpeg' needs to be installed on your system." + ) - save_diagram = False - if "diagram" in params: - save_diagram = True - dia_params = params["diagram"] - check_params(dia_params, ["filetype", "figures"], { - "filetype": ["img", "mp4"] - }) - if dia_params["filetype"] == "mp4": - if which("ffmpeg") is None: - raise ValueError("The program 'ffmpeg' needs to be installed on your system.") + if type(dia_params["figures"]) is str: + dia_params["figures"] = np.asarray(dia_presets[dia_params["figures"]]) + else: + dia_params["figures"] = np.asarray(dia_params["figures"]) - if type(dia_params["figures"]) is str: - dia_params["figures"] = np.asarray(dia_presets[dia_params["figures"]]) - else: - dia_params["figures"] = np.asarray(dia_params["figures"]) + if "time" not in dia_params: + dia_params["time"] = 30 - if "time" not in dia_params: - dia_params["time"] = 30 + sim.run(save_sim, not args.quiet, args.log_steps, points) - sim.run(save_sim, not args.quiet, args.log_steps, points) - - if save_diagram: - diagram = Diagram(sim, dia_params["figures"]) - if dia_params["filetype"] == "img": - diagram.render_frames(range(len(sim))) - elif dia_params["filetype"] == "mp4": - if mode == "flow": - diagram.render_video(dia_params["time"], "sample") - else: - diagram.render_video(dia_params["time"], "use_all") + if save_diagram: + diagram = Diagram(sim, dia_params["figures"]) + if dia_params["filetype"] == "img": + diagram.render_frames(range(len(sim))) + elif dia_params["filetype"] == "mp4": + if mode == "flow": + diagram.render_video(dia_params["time"], "sample") + else: + diagram.render_video(dia_params["time"], "use_all") def pre(): - os.environ["QT_LOGGING_RULES"] = "*=false" - try: - main() - except KeyboardInterrupt: - print("Program terminated by user.") \ No newline at end of file + os.environ["QT_LOGGING_RULES"] = "*=false" + try: + main() + except KeyboardInterrupt: + print("Program terminated by user.")