Function bodies 175 total
add_constant_current function · python · L8-L43 (36 LOC)example/generate.py
def add_constant_current(model, parameter_values, sol=None):
V_min = parameter_values["Lower voltage cut-off [V]"]
V_max = parameter_values["Upper voltage cut-off [V]"]
def charge(c_rate):
return (
pybamm.step.c_rate(-c_rate, termination=f"{V_max} V", period=6),
pybamm.step.voltage(V_max, termination="C/50", period=6),
pybamm.step.rest(3600),
)
def discharge(c_rate):
return (
pybamm.step.c_rate(c_rate, termination=f"{V_min} V", period=6),
pybamm.step.rest(3600),
)
experiment = pybamm.Experiment(
[
charge(1 / 5),
discharge(1 / 5),
charge(1 / 5),
discharge(1 / 2),
charge(1 / 2),
discharge(1),
charge(1),
]
)
sim = pybamm.Simulation(
model, parameter_values=parameter_values, experiment=experiment
)
if sol:
sol = sim.solve(starting_solution=sol)
add_pulse function · python · L46-L61 (16 LOC)example/generate.py
def add_pulse(model, parameter_values, experiment, sol):
experiment, max_cycles = experiment
sim = pybamm.Simulation(
model, parameter_values=parameter_values, experiment=experiment
)
i = 0
# Run until the voltage drops below the lower cut-off
# during the second-last step
while i == 0 or sol.cycles[-1].steps[-2].termination == "final time":
sol = sim.solve(starting_solution=sol)
i += 1
if i > max_cycles:
raise ValueError("Reached maximum number of cycles")
return soladd_hppt function · python · L64-L107 (44 LOC)example/generate.py
def add_hppt(model, parameter_values, sol):
# we want the C/3 discharge to increment SOC by 5%
# C/3 discharge duration = 3h
# 5% SOC increment = 0.05 * 3h = 0.15h
discharge_duration = 0.15 * 3600
# V_max = parameter_values["Upper voltage cut-off [V]"]
V_min = parameter_values["Lower voltage cut-off [V]"]
max_cycles = 25
hppt_experiment = pybamm.Experiment(
[
(
pybamm.step.c_rate(
1 / 2, duration=10, period=0.1, termination=f"{V_min} V"
),
pybamm.step.rest(duration=300, period=5),
pybamm.step.c_rate(
-1 / 2,
duration=10,
period=0.1, # termination=f"{V_max} V"
),
pybamm.step.rest(duration=300, period=5),
pybamm.step.c_rate(
1, duration=10, period=0.1, termination=f"{V_min} V"
),
pybamm.step.resadd_gitt function · python · L110-L131 (22 LOC)example/generate.py
def add_gitt(model, parameter_values, sol):
C_rate = -0.1
step_s = 30 * 60
step_h = step_s / 3600
max_cycles = int(1 / abs(C_rate) / step_h * 1.5)
V_max = parameter_values["Upper voltage cut-off [V]"]
gitt_experiment = pybamm.Experiment(
[
(
pybamm.step.c_rate(
C_rate,
duration=step_s,
period=step_s / 100,
termination=f"{V_max} V",
),
pybamm.step.rest(3600, period=10),
),
]
)
sol = add_pulse(model, parameter_values, (gitt_experiment, max_cycles), sol)
return sol_contiguous_blocks function · python · L23-L31 (9 LOC)example/process_and_plot_data.py
def _contiguous_blocks(values):
"""Split a sorted sequence of integers into lists of consecutive runs."""
blocks = []
for v in values:
if blocks and v == blocks[-1][-1] + 1:
blocks[-1].append(v)
else:
blocks.append([v])
return blocksplot_variables function · python · L37-L67 (31 LOC)example/process_and_plot_data.py
def plot_variables(data, vars_to_plot, steps=None, title=None, split_by_label=False):
n = len(vars_to_plot)
fig, axes = plt.subplots(n, 1, figsize=(6, 2 * n), sharex=True)
axes = np.atleast_1d(axes)
if split_by_label:
if steps is None:
raise ValueError("steps must be provided if split_by_label is True")
for label, color in LABEL_COLORS.items():
axes[0].plot([], [], color=color, label=label)
step_nums = steps.loc[steps["Label"] == label, "Step count"]
for block in _contiguous_blocks(step_nums):
block_data = data.loc[data["Step count"].isin(block)]
for ax, v in zip(axes, vars_to_plot):
ax.plot(block_data["Time [s]"], block_data[v], color=color)
axes[0].legend(bbox_to_anchor=(1.05, 1), loc="upper left", borderaxespad=0)
else:
for ax, v in zip(axes, vars_to_plot):
ax.plot(data["Time [s]"], data[v])
# Format axes
for axset_cache_ttl function · python · L55-L66 (12 LOC)ionworksdata/load.py
def set_cache_ttl(ttl_seconds: int | None) -> None:
"""
Set the cache time-to-live (TTL) in seconds.
Parameters
----------
ttl_seconds : int | None
The time in seconds before cached data is considered stale.
Set to None to disable TTL (cache never expires).
Default is 3600 (1 hour).
"""
_CACHE_CONFIG["ttl_seconds"] = ttl_secondsIf a scraper extracted this row, it came from Repobility (https://repobility.com)
get_cache_ttl function · python · L69-L78 (10 LOC)ionworksdata/load.py
def get_cache_ttl() -> int | None:
"""
Get the current cache TTL in seconds.
Returns
-------
int | None
The TTL in seconds, or None if TTL is disabled.
"""
return _CACHE_CONFIG["ttl_seconds"]clear_cache function · python · L81-L98 (18 LOC)ionworksdata/load.py
def clear_cache() -> int:
"""
Clear all cached data.
Returns
-------
int
Number of cache files deleted.
"""
cache_dir = _CACHE_CONFIG["directory"]
if not cache_dir.exists():
return 0
count = 0
for cache_file in cache_dir.glob("*.pkl"):
cache_file.unlink()
count += 1
return count_get_cache_path function · python · L101-L106 (6 LOC)ionworksdata/load.py
def _get_cache_path(measurement_id: str) -> Path:
"""Get the cache file path for a measurement ID."""
# Use hash to handle any special characters in measurement_id
hash_key = hashlib.md5(measurement_id.encode()).hexdigest()[:16]
safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in measurement_id)
return _CACHE_CONFIG["directory"] / f"{safe_id}_{hash_key}.pkl"_load_from_cache function · python · L109-L142 (34 LOC)ionworksdata/load.py
def _load_from_cache(measurement_id: str) -> dict | None:
"""
Load measurement data from cache if available and not expired.
Returns
-------
dict | None
Cached data dict with 'time_series' and 'steps' keys, or None if not cached
or if the cache has expired.
"""
import time
if not _CACHE_CONFIG["enabled"]:
return None
cache_path = _get_cache_path(measurement_id)
if cache_path.exists():
# Check if cache has expired based on TTL
ttl_seconds = _CACHE_CONFIG["ttl_seconds"]
if ttl_seconds is not None:
file_age = time.time() - cache_path.stat().st_mtime
if file_age > ttl_seconds:
# Cache has expired, delete it
cache_path.unlink(missing_ok=True)
return None
try:
with open(cache_path, "rb") as f:
return pickle.load(f)
except Exception:
# If cache is corrupted, delete it
_save_to_cache function · python · L145-L159 (15 LOC)ionworksdata/load.py
def _save_to_cache(measurement_id: str, data: dict) -> None:
"""Save measurement data to cache."""
if not _CACHE_CONFIG["enabled"]:
return
cache_dir = _CACHE_CONFIG["directory"]
cache_dir.mkdir(parents=True, exist_ok=True)
cache_path = _get_cache_path(measurement_id)
try:
with open(cache_path, "wb") as f:
pickle.dump(data, f)
except Exception:
# Silently fail if we can't write cache
passDataLoader.__init__ method · python · L218-L226 (9 LOC)ionworksdata/load.py
def __init__(
self,
time_series: pd.DataFrame | pl.DataFrame | dict,
steps: pd.DataFrame | pl.DataFrame | dict | None = None,
**kwargs,
):
options = {**(kwargs.pop("options", None) or {}), **kwargs}
self._measurement_id = None
self._setup(time_series, steps, options)DataLoader.data method · python · L247-L254 (8 LOC)ionworksdata/load.py
def data(self, value):
if isinstance(value, pl.DataFrame):
self._data_pl = value
elif isinstance(value, pd.DataFrame):
self._data_pl = pl.from_pandas(value)
else:
self._data_pl = pl.DataFrame(value)
self._data_pd_cache = NoneDataLoader._filter_data_pl method · python · L265-L281 (17 LOC)ionworksdata/load.py
def _filter_data_pl(data: pl.DataFrame, filters: dict) -> pl.DataFrame:
"""Filter a polars DataFrame using the specified filter functions."""
replacements = []
for variable, kwargs in filters.items():
match kwargs["filter_type"]:
case "savgol":
filtered_values = savgol_filter(
data[variable].to_numpy(), **kwargs["parameters"]
)
replacements.append(pl.Series(variable, filtered_values))
case _:
raise ValueError(
f"Unknown filter function: {kwargs['filter_type']}"
)
if replacements:
data = data.with_columns(replacements)
return dataMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
DataLoader._interpolate_data_pl method · python · L284-L304 (21 LOC)ionworksdata/load.py
def _interpolate_data_pl(
data: pl.DataFrame,
knots: float | np.ndarray,
x_column: str = "Time [s]",
) -> pl.DataFrame:
"""Interpolate a polars DataFrame using np.interp."""
if isinstance(knots, float):
x_min = data[x_column].min()
x_max = data[x_column].max()
knots = np.arange(x_min, x_max, knots)
x_values = data[x_column].to_numpy()
interpolated_data = {}
for col_name in data.columns:
if col_name == x_column:
continue
interpolated_data[col_name] = np.interp(
knots, x_values, data[col_name].to_numpy()
)
interpolated_data[x_column] = knots
return pl.DataFrame(interpolated_data)DataLoader.filter_data method · python · L307-L339 (33 LOC)ionworksdata/load.py
def filter_data(data: pd.DataFrame, filters: dict) -> pd.DataFrame:
"""
Process the data using a specified filter function.
Parameters
----------
data : pd.DataFrame
The raw data to be filtered.
filters : dict
The filter function to use. Currently supported: "savgol"
Returns
-------
pd.DataFrame
The filtered data.
Raises
------
ValueError
If an unknown filter function is specified.
"""
filtered_data = data.copy()
for variable, kwargs in filters.items():
match kwargs["filter_type"]:
case "savgol":
filtered_data[variable] = savgol_filter(
data[variable], **kwargs["parameters"]
)
case _:
raise ValueError(
f"Unknown filter function: {kwargs['filter_type']}"
DataLoader.interpolate_data method · python · L342-L387 (46 LOC)ionworksdata/load.py
def interpolate_data(
data: pd.DataFrame | pl.DataFrame,
knots: float | np.ndarray,
x_column: str = "Time [s]",
) -> pd.DataFrame:
"""
Interpolate the data using np.interp
Parameters
----------
knots : float | np.ndarray
The knots at which to interpolate the data. If a float is provided,
the data is interpolated at regular intervals of that size. If an
array is provided, the data is interpolated at the specified knots.
data : pd.DataFrame | pl.DataFrame
The data to interpolate. Must contain x_column.
x_column : str, optional
The column to use as the x-axis for interpolation. Defaults to "Time [s]".
Returns
-------
pd.DataFrame
The interpolated data.
"""
if isinstance(data, pl.DataFrame):
data_pl = data
data_pd = data.to_pandas()
else:
data_pl DataLoader.steps method · python · L406-L415 (10 LOC)ionworksdata/load.py
def steps(self, value):
if value is None:
self._steps_pl = None
elif isinstance(value, pl.DataFrame):
self._steps_pl = value
elif isinstance(value, pd.DataFrame):
self._steps_pl = pl.from_pandas(value)
else:
self._steps_pl = pl.DataFrame(value)
self._steps_pd_cache = NoneDataLoader._ensure_db_data_loaded method · python · L422-L454 (33 LOC)ionworksdata/load.py
def _ensure_db_data_loaded(self):
"""Fetch and initialise data from the DB on first access (lazy loading)."""
if not getattr(self, "_lazy_db_pending", False):
return
self._lazy_db_pending = False
measurement_id = self._measurement_id
use_cache = self._lazy_use_cache
options = self._lazy_options
timeout = getattr(self, "_lazy_timeout", None)
cached_data = None
if use_cache:
cached_data = _load_from_cache(measurement_id)
if cached_data is not None:
time_series = cached_data.get("time_series")
steps = cached_data.get("steps")
else:
from ionworks import Ionworks
client = Ionworks(timeout=timeout)
measurement_detail = client.cell_measurement.detail(measurement_id)
time_series = measurement_detail.time_series
steps = getattr(measurement_detail, "steps", None)
cache_payload = {"timDataLoader._parse_options method · python · L461-L474 (14 LOC)ionworksdata/load.py
def _parse_options(options):
"""Normalise a merged options dict and return derived attribute values."""
transforms = dict(options.get("transforms") or {})
# Backward compat: top-level filters/interpolate migrate into transforms
if "filters" in options and "filters" not in transforms:
transforms["filters"] = options["filters"]
if "interpolate" in options and "interpolate" not in transforms:
transforms["interpolate"] = options["interpolate"]
return {
"transforms": transforms,
"first_step": options.get("first_step"),
"last_step": options.get("last_step"),
"capacity_column": options.get("capacity_column"),
}DataLoader._setup method · python · L476-L491 (16 LOC)ionworksdata/load.py
def _setup(self, time_series, steps, options):
"""Set up data, steps, and transforms. Called from __init__ and lazy DB load."""
parsed = self._parse_options(options)
self._transforms = parsed["transforms"]
capacity_column = parsed["capacity_column"]
self._capacity_column = capacity_column
self._steps_pd_cache = None
self._data_steps_warned = False # noqa: SLF001
if steps is not None:
data_pl = self._init_with_steps(time_series, steps, options)
else:
data_pl = self._init_without_steps(time_series, options)
data_pl = self._alias_columns(data_pl, capacity_column)
self._data_pl = data_pl
self._data_pd_cache = None
self._apply_transforms()DataLoader._init_with_steps method · python · L493-L559 (67 LOC)ionworksdata/load.py
def _init_with_steps(self, time_series, steps, options):
# Normalize inputs to polars
if isinstance(time_series, pl.DataFrame):
time_series_pl = time_series
elif isinstance(time_series, pd.DataFrame):
time_series_pl = pl.from_pandas(time_series)
else:
time_series_pl = pl.DataFrame(time_series)
if isinstance(steps, pl.DataFrame):
steps_pl = steps
elif isinstance(steps, pd.DataFrame):
steps_pl = pl.from_pandas(steps)
else:
steps_pl = pl.DataFrame(steps)
# Polars DataFrames are immutable so no .copy() needed
self._original_time_series_pl = time_series_pl
self._original_steps_pl = steps_pl
first_step = options.get("first_step")
last_step = options.get("last_step")
first_step_dict = options.get("first_step_dict")
last_step_dict = options.get("last_step_dict")
if first_step is not None and first_Repobility · open methodology · https://repobility.com/research/
DataLoader._init_without_steps method · python · L561-L603 (43 LOC)ionworksdata/load.py
def _init_without_steps(self, time_series, options):
# Normalize to polars
if isinstance(time_series, dict):
time_series_pl = pl.DataFrame(time_series)
elif isinstance(time_series, pd.DataFrame):
time_series_pl = pl.from_pandas(time_series)
elif isinstance(time_series, pl.DataFrame):
time_series_pl = time_series
else:
time_series_pl = pl.DataFrame(time_series)
self._steps_pl = None
self._steps_pd_cache = None
self._original_time_series_pl = None
self._original_steps_pl = None
self._first_step = None
self._last_step = None
self._start_idx = 0
self._end_idx = time_series_pl.height
self.initial_voltage = None
# Voltage column aliasing
potential_ocp_column_names = [
"Voltage [V]",
"OCP [V]",
"OCV [V]",
"Open-circuit potential [V]",
"Open-circuit voltagDataLoader._alias_columns method · python · L606-L618 (13 LOC)ionworksdata/load.py
def _alias_columns(data: pl.DataFrame, capacity_column) -> pl.DataFrame:
"""Resolve capacity column aliases on the DataFrame."""
if capacity_column is not None:
if capacity_column in data.columns:
data = data.with_columns(
pl.col(capacity_column).alias("Capacity [A.h]")
)
else:
raise ValueError(
f"Specified capacity_column '{capacity_column}' not found in data. "
f"Available columns: {list(data.columns)}"
)
return dataDataLoader.set_processed_internal_state method · python · L620-L652 (33 LOC)ionworksdata/load.py
def set_processed_internal_state(
self,
*,
transforms=None,
measurement_id=None,
capacity_column=None,
first_step=None,
last_step=None,
original_time_series=None,
original_steps=None,
):
"""Set internal state when constructing from processed data. Used by from_processed_data.
original_time_series and original_steps may be pandas or Polars DataFrames (or None);
they are stored as Polars internally for config export.
"""
self._transforms = transforms if transforms is not None else {}
self._measurement_id = measurement_id
self._capacity_column = capacity_column
self._first_step = first_step
self._last_step = last_step
if original_time_series is None:
self._original_time_series_pl = None
elif isinstance(original_time_series, pd.DataFrame):
self._original_time_series_pl = pl.from_pandas(original_time_DataLoader._apply_transforms method · python · L658-L678 (21 LOC)ionworksdata/load.py
def _apply_transforms(self):
transforms = self._transforms
if transforms.get("gitt_to_ocp"):
self._transform_gitt_to_ocp()
if transforms.get("sort"):
self._data_pl = self._sort_capacity_and_ocp(self._data_pl)
self._data_pd_cache = None
if transforms.get("remove_duplicates"):
self._data_pl = self._remove_duplicate_ocp(self._data_pl)
self._data_pd_cache = None
if transforms.get("remove_extremes"):
self._data_pl = self._remove_ocp_extremes(self._data_pl)
self._data_pd_cache = None
filters = transforms.get("filters")
if filters:
self._data_pl = self._filter_data_pl(self._data_pl, filters)
self._data_pd_cache = None
interpolate = transforms.get("interpolate")
if interpolate is not None:
self._data_pl = self._interpolate_data_pl(self._data_pl, interpolate)
self._data_pd_cache = NoneDataLoader._transform_gitt_to_ocp method · python · L680-L715 (36 LOC)ionworksdata/load.py
def _transform_gitt_to_ocp(self):
"""Extract OCP from GITT rest steps: take the last data point of each rest."""
if self._steps_pl is None:
raise ValueError("gitt_to_ocp requires steps data")
gitt_rest = self._steps_pl.filter(
(pl.col("Label") == "GITT") & (pl.col("Step type") == "Rest")
)
if gitt_rest.height == 0:
raise ValueError("No GITT rest steps found in data")
ocp_points = []
data_pl = self._data_pl
for step_row in gitt_rest.iter_rows(named=True):
end_idx = int(step_row["End index"]) - self._start_idx
row = data_pl.row(end_idx, named=True)
discharge = row.get("Discharge capacity [A.h]", 0)
charge = row.get("Charge capacity [A.h]", 0)
capacity = abs(discharge - charge)
ocp_points.append(
{
"Capacity [A.h]": capacity,
"Voltage [V]": row["Voltage [V]"]DataLoader._remove_duplicate_ocp method · python · L718-L724 (7 LOC)ionworksdata/load.py
def _remove_duplicate_ocp(
data: pl.DataFrame, capacity_column_name="Capacity [A.h]"
) -> pl.DataFrame:
"""Remove any duplicate capacity and voltage values."""
data = data.unique(subset=[capacity_column_name], maintain_order=True)
data = data.unique(subset=["Voltage [V]"], maintain_order=True)
return dataDataLoader._sort_capacity_and_ocp method · python · L727-L751 (25 LOC)ionworksdata/load.py
def _sort_capacity_and_ocp(data: pl.DataFrame) -> pl.DataFrame:
"""Sort OCP data so voltage is decreasing and capacity is increasing."""
V = data["Voltage [V]"].to_numpy()
if V[-1] > V[0]:
data = data.reverse()
capacity_column_names = [
col for col in data.columns if col.startswith("Capacity [")
]
if len(capacity_column_names) == 0:
raise ValueError("No capacity column found")
elif len(capacity_column_names) > 1:
raise ValueError(
f"Multiple capacity columns found: {capacity_column_names}"
)
capacity_column_name = capacity_column_names[0]
Q = data[capacity_column_name].to_numpy().copy()
if Q[0] > Q[-1]:
Q = Q.max() - Q
Q -= Q.min()
data = data.with_columns(pl.Series(capacity_column_name, Q))
data = DataLoader._remove_duplicate_ocp(data, capacity_column_name)
return dataDataLoader._remove_ocp_extremes method · python · L754-L761 (8 LOC)ionworksdata/load.py
def _remove_ocp_extremes(data: pl.DataFrame) -> pl.DataFrame:
"""Remove data at extremes where the second derivative of voltage vs
capacity is zero."""
q = data["Capacity [A.h]"].to_numpy()
U = data["Voltage [V]"].to_numpy()
d2UdQ2 = np.gradient(np.gradient(U, q), q)
first_positive, last_positive = np.where(abs(d2UdQ2) > 1e-10)[0][[0, -1]]
return data.slice(first_positive, last_positive - first_positive + 1)About: code-quality intelligence by Repobility · https://repobility.com
DataLoader.calculate_dUdQ_cutoff method · python · L767-L798 (32 LOC)ionworksdata/load.py
def calculate_dUdQ_cutoff(
self,
method: str = "explicit",
show_plot: bool = False,
options: dict | None = None,
) -> float:
"""
Calculate the cut-off for dUdQ based on the data.
Parameters
----------
method : str, optional
Method to use for calculating the cut-off. Options are:
- "explicit" (default): Uses explicit method based on data range
- "quantile": Uses quantile-based method
- "peaks": Uses peak detection method
show_plot : bool, optional
Whether to show a plot of the dUdQ values with the cut-off.
options : dict, optional
Dictionary of options to pass to the method.
Returns
-------
float
Cut-off for dUdQ
"""
q = self._data_pl["Capacity [A.h]"].to_numpy()
U = self._data_pl["Voltage [V]"].to_numpy()
dUdQ = abs(np.gradient(U, q))
return seDataLoader.calculate_dQdU_cutoff method · python · L800-L831 (32 LOC)ionworksdata/load.py
def calculate_dQdU_cutoff(
self,
method: str = "explicit",
show_plot: bool = False,
options: dict | None = None,
) -> float:
"""
Calculate the cut-off for dQdU based on the data.
Parameters
----------
method : str, optional
Method to use for calculating the cut-off. Options are:
- "explicit" (default): Uses explicit method based on data range
- "quantile": Uses quantile-based method
- "peaks": Uses peak detection method
show_plot : bool, optional
Whether to show a plot of the dQdU values with the cut-off.
options : dict, optional
Dictionary of options to pass to the method.
Returns
-------
float
Cut-off for dQdU
"""
U = self._data_pl["Voltage [V]"].to_numpy()
q = self._data_pl["Capacity [A.h]"].to_numpy()
dQdU = abs(np.gradient(q, U))
return seDataLoader._calculate_differential_cutoff method · python · L833-L857 (25 LOC)ionworksdata/load.py
def _calculate_differential_cutoff(
self,
x,
y,
dydx,
method="explicit",
show_plot=False,
xlabel=None,
ylabel=None,
options=None,
) -> float:
options = options or {}
if method == "explicit":
return self._calculate_differential_cutoff_explicit(
x, y, dydx, show_plot=show_plot, xlabel=xlabel, ylabel=ylabel, **options
)
elif method == "quantile":
return self._calculate_differential_cutoff_quantile(
x, y, dydx, show_plot=show_plot, xlabel=xlabel, ylabel=ylabel, **options
)
elif method == "peaks":
return self._calculate_differential_cutoff_peaks(
x, y, dydx, show_plot=show_plot, xlabel=xlabel, ylabel=ylabel, **options
)
raise ValueError(f"Method {method} not recognized")DataLoader._calculate_differential_cutoff_explicit method · python · L859-L881 (23 LOC)ionworksdata/load.py
def _calculate_differential_cutoff_explicit(
self,
x,
y,
dydx,
show_plot=False,
xlabel=None,
ylabel=None,
lower_ratio=0.1,
upper_ratio=0.9,
scale=1.1,
) -> float:
x = np.array(x)
x_scaled = (x - x.min()) / (x.max() - x.min())
xmin_idx = np.argmin(np.abs(x_scaled - lower_ratio))
xmax_idx = np.argmin(np.abs(x_scaled - upper_ratio))
if xmin_idx > xmax_idx:
xmin_idx, xmax_idx = xmax_idx, xmin_idx
dydx_cutoff = dydx[xmin_idx:xmax_idx].max() * scale
if show_plot:
self._plot_differential_cutoff(x, dydx, dydx_cutoff, xlabel, ylabel)
plt.show()
return dydx_cutoffDataLoader._calculate_differential_cutoff_quantile method · python · L883-L915 (33 LOC)ionworksdata/load.py
def _calculate_differential_cutoff_quantile(
self,
x,
y,
dydx,
show_plot=False,
xlabel=None,
ylabel=None,
quantile=0.8,
scale=2,
) -> float:
x = np.array(x)
x_interp = np.linspace(x.min(), x.max(), 1000)
dydx_interp = interp1d(x, dydx, kind="linear")(x_interp)
quantile_value = np.percentile(dydx_interp, quantile * 100)
dydx_cutoff = scale * quantile_value
if show_plot:
fig, ax = self._plot_differential_cutoff(
x_interp, dydx_interp, dydx_cutoff, xlabel, ylabel
)
ax.axhline(
quantile_value,
color="tab:blue",
linestyle="--",
label="Quantile",
zorder=10,
)
ax.axhline(
dydx_cutoff, color="tab:red", linestyle="--", label="Cut-off", zorder=10
)
ax.legend()
plt.sDataLoader._calculate_differential_cutoff_peaks method · python · L917-L938 (22 LOC)ionworksdata/load.py
def _calculate_differential_cutoff_peaks(
self,
x,
y,
dydx,
show_plot=False,
xlabel=None,
ylabel=None,
scale=1.1,
**kwargs,
) -> float:
x = np.array(x)
x_interp = np.linspace(x.min(), x.max(), 1000)
dydx_interp = interp1d(x, dydx, kind="linear")(x_interp)
peaks, _ = find_peaks(dydx_interp, **kwargs)
dydx_cutoff = dydx_interp[peaks].max() * scale
if show_plot:
self._plot_differential_cutoff(
x_interp, dydx_interp, dydx_cutoff, xlabel, ylabel
)
plt.show()
return dydx_cutoffDataLoader._plot_differential_cutoff method · python · L940-L951 (12 LOC)ionworksdata/load.py
def _plot_differential_cutoff(self, x, dydx, dydx_cutoff, xlabel=None, ylabel=None):
if xlabel is None:
xlabel = "x"
if ylabel is None:
ylabel = "dy/dx"
fig, ax = plt.subplots()
ax.plot(x, dydx)
ax.axhline(dydx_cutoff, color="tab:red", linestyle="--")
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_ylim(0, dydx_cutoff * 1.5)
return fig, axDataLoader.to_config method · python · L957-L1025 (69 LOC)ionworksdata/load.py
def to_config(self, filter_data: bool = True) -> dict:
"""
Convert the DataLoader back to parser configuration format.
Parameters
----------
filter_data : bool, optional
If True (default) and steps are present, saves the filtered data
rather than the original unfiltered data.
Returns
-------
dict
Configuration dictionary that can recreate this DataLoader.
"""
if hasattr(self, "_measurement_id") and self._measurement_id is not None:
config = {"data": f"db:{self._measurement_id}"}
opts = self._build_options_for_config()
if opts:
config["options"] = opts
return config
if self._steps_pl is None:
config = {"data": self._data_pl.to_dict(as_series=False)}
opts = self._build_options_for_config()
if opts:
config["options"] = opts
return conIf a scraper extracted this row, it came from Repobility (https://repobility.com)
DataLoader._build_options_for_config method · python · L1027-L1044 (18 LOC)ionworksdata/load.py
def _build_options_for_config(self, include_step_options=True):
opts = {}
if include_step_options:
if getattr(self, "_first_step", None) is not None:
opts["first_step"] = self._first_step
if getattr(self, "_last_step", None) is not None:
opts["last_step"] = self._last_step
if getattr(self, "_capacity_column", None) is not None:
opts["capacity_column"] = self._capacity_column
if self._transforms:
serializable_transforms = {}
for k, v in self._transforms.items():
if isinstance(v, np.ndarray):
serializable_transforms[k] = v.tolist()
else:
serializable_transforms[k] = v
opts["transforms"] = serializable_transforms
return optsDataLoader._get_step_from_cycle method · python · L1071-L1076 (6 LOC)ionworksdata/load.py
def _get_step_from_cycle(cycle, steps, first):
steps_pl = steps if isinstance(steps, pl.DataFrame) else pl.from_pandas(steps)
steps_per_cycle = steps_pl.filter(pl.col("Cycle count") == cycle)
if steps_per_cycle.height == 0:
raise ValueError(f"No steps found for cycle {cycle}")
return steps_per_cycle["Step count"][0 if first else -1]DataLoader._get_step_from method · python · L1083-L1098 (16 LOC)ionworksdata/load.py
def _get_step_from(step_dict, all_steps, first):
match list(step_dict.keys())[0]:
case "cycle":
return DataLoader._get_step_from_cycle(
list(step_dict.values())[0],
all_steps,
first,
)
case "step":
return DataLoader._get_step_from_step(
list(step_dict.values())[0],
all_steps,
first,
)
case _:
raise ValueError(f"Unknown step type: {list(step_dict.keys())[0]}")DataLoader._get_step method · python · L1101-L1134 (34 LOC)ionworksdata/load.py
def _get_step(step_param, all_steps, first):
if step_param is None:
return 0 if first else len(all_steps) - 1
if isinstance(step_param, int):
return step_param
if isinstance(step_param, str):
try:
steps_pl = (
all_steps
if isinstance(all_steps, pl.DataFrame)
else pl.from_pandas(all_steps)
)
ctx = pl.SQLContext(steps=steps_pl)
result = ctx.execute(step_param).collect()
if len(result) == 0:
raise ValueError(f"SQL query returned no results: {step_param}")
if len(result) > 1:
raise ValueError(
f"SQL query returned {len(result)} results, expected exactly 1: {step_param}"
)
return result["Step count"][0]
except Exception as e:
raise ValueError(
DataLoader.generate_experiment method · python · L1140-L1177 (38 LOC)ionworksdata/load.py
def generate_experiment(self, use_cv: bool = False) -> pybamm.Experiment:
"""Generate a PyBaMM experiment from the loaded step information."""
if self._steps_pl is None:
raise ValueError("generate_experiment requires steps data")
steps = []
for step_row in self._steps_pl.iter_rows(named=True):
duration = step_row["Duration [s]"]
step_type = step_row["Step type"]
if duration <= np.nextafter(0, 1):
continue
match step_type:
case "Constant current discharge" | "Constant current charge":
mean_current = step_row["Mean current [A]"]
step = pybamm.step.Current(mean_current, duration=duration)
case "Constant voltage discharge" | "Constant voltage charge":
if use_cv:
mean_voltage = step_row["Mean voltage [V]"]
step = pybamm.step.Voltage(mean_voltaDataLoader.generate_interpolant method · python · L1179-L1205 (27 LOC)ionworksdata/load.py
def generate_interpolant(self) -> pybamm.Interpolant:
"""Generate a PyBaMM interpolant from the loaded step information."""
if self._steps_pl is None:
raise ValueError("generate_interpolant requires steps data")
ts = []
cs = []
for step_row in self._steps_pl.iter_rows(named=True):
if (
"constant current" in step_row["Step type"].lower()
or step_row["Step type"] == "Rest"
):
first_t = step_row["Start time [s]"]
last_t = step_row["End time [s]"]
ts.append(np.array([first_t, last_t]))
cs.append(
np.array(
[step_row["Mean current [A]"], step_row["Mean current [A]"]]
)
)
else:
times, currents = self._get_times_and_currents(step_row)
ts.append(times)
cs.append(currents)
ts DataLoader.plot_data method · python · L1207-L1220 (14 LOC)ionworksdata/load.py
def plot_data(self, show: bool = False) -> tuple[plt.Figure, plt.Axes]:
"""Plot voltage vs time data from the loaded experiment."""
fig, ax = plt.subplots(3, 1, sharex=True)
time = self._data_pl["Time [s]"].to_numpy()
ax[0].plot(time, self._data_pl["Voltage [V]"].to_numpy())
ax[0].set_ylabel("Voltage [V]")
ax[1].plot(time, self._data_pl["Current [A]"].to_numpy())
ax[1].set_ylabel("Current [A]")
ax[2].plot(time, self._data_pl["Temperature [degC]"].to_numpy())
ax[2].set_xlabel("Time [s]")
ax[2].set_ylabel("Temperature [degC]")
if show:
plt.show()
return fig, axDataLoader._get_times_and_currents method · python · L1222-L1228 (7 LOC)ionworksdata/load.py
def _get_times_and_currents(self, step):
start_idx = int(step["Start index"]) - self._start_idx
end_idx = int(step["End index"]) + 1 - self._start_idx
length = end_idx - start_idx
current = self._data_pl["Current [A]"].slice(start_idx, length).to_numpy()
time = self._data_pl["Time [s]"].slice(start_idx, length).to_numpy()
return time, currentMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
DataLoader.from_local method · python · L1241-L1271 (31 LOC)ionworksdata/load.py
def from_local(cls, data_path, options=None, use_polars=True):
"""
Load data from local filesystem.
Parameters
----------
data_path : str
Path to the directory containing time_series.csv and optionally
steps.csv files.
options : dict | None, optional
Options to pass to the DataLoader constructor.
use_polars : bool, optional
If True (default), read CSV with Polars. If False, read with Pandas
(data is still stored as Polars internally).
Returns
-------
DataLoader
"""
read_fn = pl.read_csv if use_polars else pd.read_csv
ts_path = Path(data_path) / "time_series.csv"
if not ts_path.exists():
raise ValueError(
f"Folder must contain either time_series.csv or both "
f"time_series.csv and steps.csv. Path: {data_path}"
)
time_series = read_fn(ts_path)
DataLoader.from_db method · python · L1274-L1320 (47 LOC)ionworksdata/load.py
def from_db(cls, measurement_id, options=None, use_cache=True, timeout=None):
"""
Load data from the Ionworks database.
Loads steps if available, otherwise creates a steps-less DataLoader.
Parameters
----------
measurement_id : str
The ID of the measurement to load from the database.
options : dict | None, optional
Options to pass to the DataLoader constructor.
use_cache : bool, optional
If True (default), use local file cache to avoid repeated API calls.
timeout : int | None, optional
Request timeout in seconds passed to the Ionworks client.
Returns
-------
DataLoader
"""
options = options or {}
instance = cls.__new__(cls)
# Parse the subset of options needed by to_config() /
# _build_options_for_config() before any data is loaded —
# uses _parse_options to stay in sync with __init__.
DataLoader.from_processed_data method · python · L1323-L1376 (54 LOC)ionworksdata/load.py
def from_processed_data(
cls,
data,
steps,
initial_voltage,
start_idx,
end_idx,
):
"""
Create a DataLoader from already-processed data, bypassing __init__.
Parameters
----------
data : pd.DataFrame | pl.DataFrame
The processed time series data.
steps : pd.DataFrame | pl.DataFrame | None
The processed steps data (or None).
initial_voltage : float
The initial voltage value.
start_idx : int
The start index for the data.
end_idx : int
The end index for the data.
Returns
-------
DataLoader
"""
instance = cls.__new__(cls)
# Accept both pandas and polars; store as polars internally
if isinstance(data, pl.DataFrame):
instance._data_pl = data.clone() # noqa: SLF001
elif isinstance(data, pd.DataFrame):
instance._data_pl =page 1 / 4next ›