squish/squish/diagram.py

391 lines
14 KiB
Python

from __future__ import annotations
from typing import Tuple, List, Optional
import matplotlib.pyplot as plt, numpy as np, os
from matplotlib.ticker import MaxNLocator, FormatStrFormatter
from scipy.spatial import Voronoi, voronoi_plot_2d
from multiprocessing import Pool, cpu_count
from .common import DomainParams, OUTPUT_DIR
SYMM = np.array(
[[0, 0], [1, 0], [1, 1], [0, 1], [-1, 1], [-1, 0], [-1, -1], [0, -1], [1, -1]]
)
class SimData:
"""Stores diagram information for a simulation.
Attributes:
path (Path): path to output directory.
domains (List[DomainParams]): domain parameters from simulation frames,
energies (List[float]): energy from simulation frames.
voronois (List[Voronoi]): voronoi information from scipy from simulation frames.
stats (List[numpy.ndarray]): statistics from simulation frames.
"""
__slots__ = ["path", "domains", "energies", "voronois", "stats"]
def __init__(self, sim: Simulation) -> None:
if sim is not None:
self.path = sim.path
self.domains = list([DomainParams(s.n, s.w, s.h, s.r) for s in sim])
self.energies = list([s.energy for s in sim])
self.voronois = list([s.vor_data for s in sim])
self.stats = list([s.stats for s in sim])
def __len__(self) -> int:
return len(self.domains)
def slice(self, indices: List[int]) -> SimData:
new_data = SimData(None)
new_data.path = self.path
new_data.domains = list([self.domains[i] for i in indices])
new_data.energies = list([self.energies[i] for i in indices])
new_data.voronois = list([self.voronois[i] for i in indices])
new_data.stats = list([self.stats[i] for i in indices])
return new_data
def hist(
self,
stat: str,
i: int,
bins: int = 10,
bounds: Optional[Tuple[float, float]] = None,
cumul: bool = False,
avg: bool = False,
) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""Generates a histogram from the selected data.
Arguments:
stat (str): name of data to obtain
i (int): which frame to select from
bins (int): number of bins for the histogram.
bounds (Optional[Tuple[float, float]]): upper and lower bounds of the
histogram. This will automatically take the minimum and maximum value
if not set.
cumul (bool): aggregates all data up to frame i if True.
avg (bool): will average the data based on number of frames if True.
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: the histogram and its bins.
"""
if cumul:
values = np.concatenate([f[stat] for f in self.stats[: (i + 1)]])
else:
values = self.stats[i][stat]
if np.var(values) <= 1e-8:
hist = np.zeros((bins,))
val = np.average(values)
hist[(bins + 1) // 2 - 1] = len(values)
bin_list = np.linspace(0, val, bins // 2 + 1, endpoint=True)
bin_list = np.concatenate((bin_list, (bin_list + val)[1:]))
return hist, bin_list[not (bins % 2) :]
hist, bin_edges = np.histogram(values, bins=bins, range=bounds)
bin_list = np.array(
[(bin_edges[i] + bin_edges[i + 1]) / 2 for i in range(len(bin_edges) - 1)]
)
if avg and cumul:
return hist / (i + 1), bin_list
return hist, bin_list
class Diagram:
"""Class for generating diagrams.
Attributes:
sim (SimData): the simulation data that contains all the frames and information.
diagrams (numpy.ndarray): array that selects which diagrams to show.
cumulative (bool): selects whether or not graph statistics are cumulative.
"""
__slots__ = ["sim", "diagrams", "cumulative"]
def __init__(
self, sim: Simulation, diagrams: np.ndarray, cumulative: bool = False
) -> None:
self.sim = SimData(sim)
self.diagrams = np.atleast_2d(diagrams)
self.cumulative = cumulative
def generate_frame(self, frame: int, mode: str, fol: str, name: str = None) -> None:
if mode not in ["save", "open"]:
raise ValueError("Not a valid mode for diagrams!")
shape = self.diagrams.shape
fig, axes = plt.subplots(*shape, figsize=(shape[1] * 8, shape[0] * 8))
if self.diagrams.shape == (1, 1):
getattr(self, str(self.diagrams[0][0]) + "_plot")(frame, axes)
else:
axes = np.atleast_2d(axes)
it = np.nditer(self.diagrams, flags=["multi_index"])
for diagram in it:
if diagram == "":
continue
getattr(self, str(diagram) + "_plot")(frame, axes[it.multi_index])
plt.tight_layout()
if name is None:
name = f"img{frame:05}.png"
if mode == "save":
plt.savefig(self.sim.path / fol / name)
plt.close(fig)
elif mode == "show":
plt.show()
def voronoi_plot(self, i: int, ax: AxesSubplot) -> None:
domain = self.sim.domains[i]
n, w, h = domain.n, domain.w, domain.h
scale = 1.5
area = n <= 60
voronoi_plot_2d(
self.sim.voronois[i], ax, show_vertices=False, point_size=7 - n / 100
)
ax.plot([-w, 2 * w], [0, 0], "r")
ax.plot([-w, 2 * w], [h, h], "r")
ax.plot([0, 0], [-h, 2 * h], "r")
ax.plot([w, w], [-h, 2 * h], "r")
ax.axis("equal")
ax.set_xlim([(1 - scale) * w / 2, (1 + scale) * w / 2])
ax.set_ylim([(1 - scale) * h / 2, (1 + scale) * h / 2])
ax.title.set_text("Voronoi Visualization")
props = dict(boxstyle="round", facecolor="wheat", alpha=0.8)
defects = {5: {"x": [], "y": []}, 7: {"x": [], "y": []}}
for j in range(n):
for s in SYMM:
vec = self.sim.voronois[i].points[j] + s * self.sim.domains[i].dim
if area:
txt = ax.text(
*vec, str(round(self.sim.stats[i]["site_areas"][j], 3))
)
txt.set_clip_on(True)
if self.sim.stats[i]["site_edge_count"][j] == 5:
defects[5]["x"].append(vec[0])
defects[5]["y"].append(vec[1])
elif self.sim.stats[i]["site_edge_count"][j] == 7:
defects[7]["x"].append(vec[0])
defects[7]["y"].append(vec[1])
ax.scatter(defects[5]["x"], defects[5]["y"], marker="p", color="C0")
ax.scatter(defects[7]["x"], defects[7]["y"], marker="*", color="C0")
ax.text(
0.05,
0.95,
f"Energy: {self.sim.energies[i]}",
transform=ax.transAxes,
fontsize=14,
verticalalignment="top",
bbox=props,
)
def energy_plot(self, i: int, ax: AxesSubplot) -> None:
ax.set_xlim([0, len(self.sim)])
energies = self.sim.energies[: (i + 1)]
ax.plot(list(range(i + 1)), energies)
ax.title.set_text("Energy vs. Time")
# max_value = round(self.sim[0].energy)
# min_value = round(self.sim[-1].energy)
# diff = max_value-min_value
# ax.set_yticks(np.arange(int(min_value-diff/5), int(max_value+diff/5), diff/25))
ax.set_xlabel("Iterations")
ax.set_ylabel("Energy")
ax.grid()
def site_areas_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist("site_areas", i, cumul=self.cumulative, avg=True)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Site Areas")
ax.set_xlabel("Area")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.ticklabel_format(useOffset=False)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
# for xtick, color in zip(ax.get_xticklabels(), areas_bar[2]):
# if color != 'C0':
# xtick.set_color(color)
def site_edge_count_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist(
"site_edge_count", i, bounds=(1, 11), cumul=self.cumulative, avg=True
)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Edges per Site")
ax.set_xlabel("Number of Edges")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.set_xticklabels([int(z) for z in x])
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
def site_isos_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist(
"site_isos", i, bounds=(0, 1), cumul=self.cumulative, avg=True
)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Isoparametric Values")
ax.set_xlabel("Isoparametric Value")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.ticklabel_format(useOffset=False)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
# for xtick, color in zip(ax.get_xticklabels(), isoparam_bar[2]):
# if color != 'C0':
# xtick.set_color(color)
def site_energies_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist("site_energies", i, cumul=self.cumulative, avg=True)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Site Energies")
ax.set_xlabel("Energy")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.ticklabel_format(useOffset=False)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
def avg_radius_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist("avg_radius", i, cumul=self.cumulative, avg=True)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Site Average Radii")
ax.set_xlabel("Average Radius")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.ticklabel_format(useOffset=False)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
def isoparam_avg_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist("isoparam_avg", i, cumul=self.cumulative, avg=True)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Site Isoperimetric Averages")
ax.set_xlabel("Isoperimetric Average")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.ticklabel_format(useOffset=False)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
def edge_lengths_plot(self, i: int, ax: AxesSubplot) -> None:
y, x = self.sim.hist("edge_lengths", i, 30, cumul=self.cumulative, avg=True)
ax.bar(x, y, width=0.8 * (x[1] - x[0]))
ax.title.set_text("Edge Lengths")
ax.set_xlabel("Length")
ax.set_ylabel("Average Occurances")
ax.set_xticks(x)
ax.set_xticklabels(ax.get_xticks(), rotation=90)
ax.xaxis.set_major_formatter(FormatStrFormatter("%.3f"))
# ax.ticklabel_format(useOffset=False)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
# for xtick, color in zip(ax.get_xticklabels(), lengths_bar[2]):
# if color != 'C0':
# xtick.set_color(color)
def eigs_plot(self, i: int, ax: AxesSubplot) -> None:
try:
eigs = self.sim.stats[i]["eigs"]
ax.plot(
list(range(len(eigs))), eigs, marker="o", linestyle="dashed", color="C0"
)
ax.plot([0, len(eigs)], [0, 0], color="red")
except KeyError:
ax.text(0.5, 0.5, "Not Computed")
ax.title.set_text("Hessian Eigenvalues")
ax.set_xlabel("")
ax.set_ylabel("Value")
def render_frames(self, frames: List[int], fol: str = "frames") -> None:
OUTPUT_DIR.mkdir(exist_ok=True)
self.sim.path.mkdir(exist_ok=True)
(self.sim.path / fol).mkdir(exist_ok=True)
combo_list = []
for i in range(cpu_count()):
start, end = (
int(i * len(frames) / cpu_count()),
int((i + 1) * len(frames) / cpu_count()),
)
new_dia = Diagram(None, self.diagrams, self.cumulative)
new_dia.sim = self.sim.slice(frames[start:end])
combo_list.append(
(new_dia, fol, start, len(frames[start:end]), len(frames))
)
# Free up memory, since it's already duplicated to other cores.
self.sim = self.sim.slice([])
with Pool(cpu_count()) as pool:
for _ in pool.imap_unordered(render_frame_range, combo_list):
pass
print(flush=True)
print(f'Wrote to "{self.sim.path / fol}".', flush=True)
def render_video(self, time: int, mode: str) -> None:
if mode not in ["use_all", "sample"]:
raise ValueError("Not a valid mode for videos!")
if mode == "use_all":
frames = list(range(len(self.sim)))
elif mode == "sample":
fps = 30
if len(self.sim) < fps * time:
frames = list(range(len(self.sim)))
fps = len(self.sim) / time
else:
frames = list(
np.round(np.linspace(0, len(self.sim) - 1, fps * time)).astype(int)
)
self.render_frames(frames, "temp")
path = self.sim.path / "simulation.mp4"
print("Assembling MP4...", flush=True)
os.system(
f"ffmpeg -hide_banner -loglevel error -r {fps} -i"
+ f' "{self.sim.path}/temp/img%05d.png"'
+ f" -c:v libx264 -crf 18 -preset slow -pix_fmt yuv420p -vf"
+ f' "scale=trunc(iw/2)*2:trunc(ih/2)*2" -f mp4 "{path}"'
)
# Remove files.
for i in range(len(frames)):
os.remove(self.sim.path / f"temp/img{i:05}.png")
os.rmdir(self.sim.path / "temp")
print(f'Wrote to "{path}".', flush=True)
def render_frame_range(combo: Tuple[Diagram, str, int, int, int]) -> None:
self, fol, offset, length, num_frames = combo
for i in range(length):
self.generate_frame(i, "save", fol, f"img{i+offset:05}.png")
i = len(list((self.sim.path / fol).iterdir()))
hashes = int(21 * i / num_frames)
print(
f'Generating frames... |{"#"*hashes}{" "*(20-hashes)}|'
+ f" {i}/{num_frames} frames rendered.",
flush=True,
end="\r",
)