← back to ionworks__ionworksdata

Function bodies 175 total

All specs Real LLM only Function bodies
add_constant_current function · python · L8-L43 (36 LOC)
example/generate.py
def add_constant_current(model, parameter_values, sol=None):
    V_min = parameter_values["Lower voltage cut-off [V]"]
    V_max = parameter_values["Upper voltage cut-off [V]"]

    def charge(c_rate):
        return (
            pybamm.step.c_rate(-c_rate, termination=f"{V_max} V", period=6),
            pybamm.step.voltage(V_max, termination="C/50", period=6),
            pybamm.step.rest(3600),
        )

    def discharge(c_rate):
        return (
            pybamm.step.c_rate(c_rate, termination=f"{V_min} V", period=6),
            pybamm.step.rest(3600),
        )

    experiment = pybamm.Experiment(
        [
            charge(1 / 5),
            discharge(1 / 5),
            charge(1 / 5),
            discharge(1 / 2),
            charge(1 / 2),
            discharge(1),
            charge(1),
        ]
    )
    sim = pybamm.Simulation(
        model, parameter_values=parameter_values, experiment=experiment
    )
    if sol:
        sol = sim.solve(starting_solution=sol)
  
add_pulse function · python · L46-L61 (16 LOC)
example/generate.py
def add_pulse(model, parameter_values, experiment, sol):
    experiment, max_cycles = experiment
    sim = pybamm.Simulation(
        model, parameter_values=parameter_values, experiment=experiment
    )
    i = 0

    # Run until the voltage drops below the lower cut-off
    # during the second-last step
    while i == 0 or sol.cycles[-1].steps[-2].termination == "final time":
        sol = sim.solve(starting_solution=sol)
        i += 1
        if i > max_cycles:
            raise ValueError("Reached maximum number of cycles")

    return sol
add_hppt function · python · L64-L107 (44 LOC)
example/generate.py
def add_hppt(model, parameter_values, sol):
    # we want the C/3 discharge to increment SOC by 5%
    # C/3 discharge duration = 3h
    # 5% SOC increment = 0.05 * 3h = 0.15h
    discharge_duration = 0.15 * 3600
    # V_max = parameter_values["Upper voltage cut-off [V]"]
    V_min = parameter_values["Lower voltage cut-off [V]"]
    max_cycles = 25

    hppt_experiment = pybamm.Experiment(
        [
            (
                pybamm.step.c_rate(
                    1 / 2, duration=10, period=0.1, termination=f"{V_min} V"
                ),
                pybamm.step.rest(duration=300, period=5),
                pybamm.step.c_rate(
                    -1 / 2,
                    duration=10,
                    period=0.1,  # termination=f"{V_max} V"
                ),
                pybamm.step.rest(duration=300, period=5),
                pybamm.step.c_rate(
                    1, duration=10, period=0.1, termination=f"{V_min} V"
                ),
                pybamm.step.res
add_gitt function · python · L110-L131 (22 LOC)
example/generate.py
def add_gitt(model, parameter_values, sol):
    C_rate = -0.1
    step_s = 30 * 60
    step_h = step_s / 3600
    max_cycles = int(1 / abs(C_rate) / step_h * 1.5)
    V_max = parameter_values["Upper voltage cut-off [V]"]

    gitt_experiment = pybamm.Experiment(
        [
            (
                pybamm.step.c_rate(
                    C_rate,
                    duration=step_s,
                    period=step_s / 100,
                    termination=f"{V_max} V",
                ),
                pybamm.step.rest(3600, period=10),
            ),
        ]
    )
    sol = add_pulse(model, parameter_values, (gitt_experiment, max_cycles), sol)
    return sol
_contiguous_blocks function · python · L23-L31 (9 LOC)
example/process_and_plot_data.py
def _contiguous_blocks(values):
    """Split a sorted sequence of integers into lists of consecutive runs."""
    blocks = []
    for v in values:
        if blocks and v == blocks[-1][-1] + 1:
            blocks[-1].append(v)
        else:
            blocks.append([v])
    return blocks
plot_variables function · python · L37-L67 (31 LOC)
example/process_and_plot_data.py
def plot_variables(data, vars_to_plot, steps=None, title=None, split_by_label=False):
    n = len(vars_to_plot)
    fig, axes = plt.subplots(n, 1, figsize=(6, 2 * n), sharex=True)
    axes = np.atleast_1d(axes)

    if split_by_label:
        if steps is None:
            raise ValueError("steps must be provided if split_by_label is True")
        for label, color in LABEL_COLORS.items():
            axes[0].plot([], [], color=color, label=label)
            step_nums = steps.loc[steps["Label"] == label, "Step count"]
            for block in _contiguous_blocks(step_nums):
                block_data = data.loc[data["Step count"].isin(block)]
                for ax, v in zip(axes, vars_to_plot):
                    ax.plot(block_data["Time [s]"], block_data[v], color=color)
        axes[0].legend(bbox_to_anchor=(1.05, 1), loc="upper left", borderaxespad=0)
    else:
        for ax, v in zip(axes, vars_to_plot):
            ax.plot(data["Time [s]"], data[v])

    # Format axes
    for ax
set_cache_ttl function · python · L55-L66 (12 LOC)
ionworksdata/load.py
def set_cache_ttl(ttl_seconds: int | None) -> None:
    """
    Set the cache time-to-live (TTL) in seconds.

    Parameters
    ----------
    ttl_seconds : int | None
        The time in seconds before cached data is considered stale.
        Set to None to disable TTL (cache never expires).
        Default is 3600 (1 hour).
    """
    _CACHE_CONFIG["ttl_seconds"] = ttl_seconds
If a scraper extracted this row, it came from Repobility (https://repobility.com)
get_cache_ttl function · python · L69-L78 (10 LOC)
ionworksdata/load.py
def get_cache_ttl() -> int | None:
    """
    Get the current cache TTL in seconds.

    Returns
    -------
    int | None
        The TTL in seconds, or None if TTL is disabled.
    """
    return _CACHE_CONFIG["ttl_seconds"]
clear_cache function · python · L81-L98 (18 LOC)
ionworksdata/load.py
def clear_cache() -> int:
    """
    Clear all cached data.

    Returns
    -------
    int
        Number of cache files deleted.
    """
    cache_dir = _CACHE_CONFIG["directory"]
    if not cache_dir.exists():
        return 0

    count = 0
    for cache_file in cache_dir.glob("*.pkl"):
        cache_file.unlink()
        count += 1
    return count
_get_cache_path function · python · L101-L106 (6 LOC)
ionworksdata/load.py
def _get_cache_path(measurement_id: str) -> Path:
    """Get the cache file path for a measurement ID."""
    # Use hash to handle any special characters in measurement_id
    hash_key = hashlib.md5(measurement_id.encode()).hexdigest()[:16]
    safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in measurement_id)
    return _CACHE_CONFIG["directory"] / f"{safe_id}_{hash_key}.pkl"
_load_from_cache function · python · L109-L142 (34 LOC)
ionworksdata/load.py
def _load_from_cache(measurement_id: str) -> dict | None:
    """
    Load measurement data from cache if available and not expired.

    Returns
    -------
    dict | None
        Cached data dict with 'time_series' and 'steps' keys, or None if not cached
        or if the cache has expired.
    """
    import time

    if not _CACHE_CONFIG["enabled"]:
        return None

    cache_path = _get_cache_path(measurement_id)
    if cache_path.exists():
        # Check if cache has expired based on TTL
        ttl_seconds = _CACHE_CONFIG["ttl_seconds"]
        if ttl_seconds is not None:
            file_age = time.time() - cache_path.stat().st_mtime
            if file_age > ttl_seconds:
                # Cache has expired, delete it
                cache_path.unlink(missing_ok=True)
                return None

        try:
            with open(cache_path, "rb") as f:
                return pickle.load(f)
        except Exception:
            # If cache is corrupted, delete it
        
_save_to_cache function · python · L145-L159 (15 LOC)
ionworksdata/load.py
def _save_to_cache(measurement_id: str, data: dict) -> None:
    """Save measurement data to cache."""
    if not _CACHE_CONFIG["enabled"]:
        return

    cache_dir = _CACHE_CONFIG["directory"]
    cache_dir.mkdir(parents=True, exist_ok=True)

    cache_path = _get_cache_path(measurement_id)
    try:
        with open(cache_path, "wb") as f:
            pickle.dump(data, f)
    except Exception:
        # Silently fail if we can't write cache
        pass
DataLoader.__init__ method · python · L218-L226 (9 LOC)
ionworksdata/load.py
    def __init__(
        self,
        time_series: pd.DataFrame | pl.DataFrame | dict,
        steps: pd.DataFrame | pl.DataFrame | dict | None = None,
        **kwargs,
    ):
        options = {**(kwargs.pop("options", None) or {}), **kwargs}
        self._measurement_id = None
        self._setup(time_series, steps, options)
DataLoader.data method · python · L247-L254 (8 LOC)
ionworksdata/load.py
    def data(self, value):
        if isinstance(value, pl.DataFrame):
            self._data_pl = value
        elif isinstance(value, pd.DataFrame):
            self._data_pl = pl.from_pandas(value)
        else:
            self._data_pl = pl.DataFrame(value)
        self._data_pd_cache = None
DataLoader._filter_data_pl method · python · L265-L281 (17 LOC)
ionworksdata/load.py
    def _filter_data_pl(data: pl.DataFrame, filters: dict) -> pl.DataFrame:
        """Filter a polars DataFrame using the specified filter functions."""
        replacements = []
        for variable, kwargs in filters.items():
            match kwargs["filter_type"]:
                case "savgol":
                    filtered_values = savgol_filter(
                        data[variable].to_numpy(), **kwargs["parameters"]
                    )
                    replacements.append(pl.Series(variable, filtered_values))
                case _:
                    raise ValueError(
                        f"Unknown filter function: {kwargs['filter_type']}"
                    )
        if replacements:
            data = data.with_columns(replacements)
        return data
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
DataLoader._interpolate_data_pl method · python · L284-L304 (21 LOC)
ionworksdata/load.py
    def _interpolate_data_pl(
        data: pl.DataFrame,
        knots: float | np.ndarray,
        x_column: str = "Time [s]",
    ) -> pl.DataFrame:
        """Interpolate a polars DataFrame using np.interp."""
        if isinstance(knots, float):
            x_min = data[x_column].min()
            x_max = data[x_column].max()
            knots = np.arange(x_min, x_max, knots)

        x_values = data[x_column].to_numpy()
        interpolated_data = {}
        for col_name in data.columns:
            if col_name == x_column:
                continue
            interpolated_data[col_name] = np.interp(
                knots, x_values, data[col_name].to_numpy()
            )
        interpolated_data[x_column] = knots
        return pl.DataFrame(interpolated_data)
DataLoader.filter_data method · python · L307-L339 (33 LOC)
ionworksdata/load.py
    def filter_data(data: pd.DataFrame, filters: dict) -> pd.DataFrame:
        """
        Process the data using a specified filter function.

        Parameters
        ----------
        data : pd.DataFrame
            The raw data to be filtered.
        filters : dict
            The filter function to use. Currently supported: "savgol"

        Returns
        -------
        pd.DataFrame
            The filtered data.

        Raises
        ------
        ValueError
            If an unknown filter function is specified.
        """
        filtered_data = data.copy()
        for variable, kwargs in filters.items():
            match kwargs["filter_type"]:
                case "savgol":
                    filtered_data[variable] = savgol_filter(
                        data[variable], **kwargs["parameters"]
                    )
                case _:
                    raise ValueError(
                        f"Unknown filter function: {kwargs['filter_type']}"
           
DataLoader.interpolate_data method · python · L342-L387 (46 LOC)
ionworksdata/load.py
    def interpolate_data(
        data: pd.DataFrame | pl.DataFrame,
        knots: float | np.ndarray,
        x_column: str = "Time [s]",
    ) -> pd.DataFrame:
        """
        Interpolate the data using np.interp

        Parameters
        ----------
        knots : float | np.ndarray
            The knots at which to interpolate the data. If a float is provided,
            the data is interpolated at regular intervals of that size. If an
            array is provided, the data is interpolated at the specified knots.

        data : pd.DataFrame | pl.DataFrame
            The data to interpolate. Must contain x_column.
        x_column : str, optional
            The column to use as the x-axis for interpolation. Defaults to "Time [s]".

        Returns
        -------
        pd.DataFrame
            The interpolated data.
        """
        if isinstance(data, pl.DataFrame):
            data_pl = data
            data_pd = data.to_pandas()
        else:
            data_pl 
DataLoader.steps method · python · L406-L415 (10 LOC)
ionworksdata/load.py
    def steps(self, value):
        if value is None:
            self._steps_pl = None
        elif isinstance(value, pl.DataFrame):
            self._steps_pl = value
        elif isinstance(value, pd.DataFrame):
            self._steps_pl = pl.from_pandas(value)
        else:
            self._steps_pl = pl.DataFrame(value)
        self._steps_pd_cache = None
DataLoader._ensure_db_data_loaded method · python · L422-L454 (33 LOC)
ionworksdata/load.py
    def _ensure_db_data_loaded(self):
        """Fetch and initialise data from the DB on first access (lazy loading)."""
        if not getattr(self, "_lazy_db_pending", False):
            return
        self._lazy_db_pending = False
        measurement_id = self._measurement_id
        use_cache = self._lazy_use_cache
        options = self._lazy_options
        timeout = getattr(self, "_lazy_timeout", None)

        cached_data = None
        if use_cache:
            cached_data = _load_from_cache(measurement_id)

        if cached_data is not None:
            time_series = cached_data.get("time_series")
            steps = cached_data.get("steps")
        else:
            from ionworks import Ionworks

            client = Ionworks(timeout=timeout)
            measurement_detail = client.cell_measurement.detail(measurement_id)
            time_series = measurement_detail.time_series
            steps = getattr(measurement_detail, "steps", None)
            cache_payload = {"tim
DataLoader._parse_options method · python · L461-L474 (14 LOC)
ionworksdata/load.py
    def _parse_options(options):
        """Normalise a merged options dict and return derived attribute values."""
        transforms = dict(options.get("transforms") or {})
        # Backward compat: top-level filters/interpolate migrate into transforms
        if "filters" in options and "filters" not in transforms:
            transforms["filters"] = options["filters"]
        if "interpolate" in options and "interpolate" not in transforms:
            transforms["interpolate"] = options["interpolate"]
        return {
            "transforms": transforms,
            "first_step": options.get("first_step"),
            "last_step": options.get("last_step"),
            "capacity_column": options.get("capacity_column"),
        }
DataLoader._setup method · python · L476-L491 (16 LOC)
ionworksdata/load.py
    def _setup(self, time_series, steps, options):
        """Set up data, steps, and transforms. Called from __init__ and lazy DB load."""
        parsed = self._parse_options(options)
        self._transforms = parsed["transforms"]
        capacity_column = parsed["capacity_column"]
        self._capacity_column = capacity_column
        self._steps_pd_cache = None
        self._data_steps_warned = False  # noqa: SLF001
        if steps is not None:
            data_pl = self._init_with_steps(time_series, steps, options)
        else:
            data_pl = self._init_without_steps(time_series, options)
        data_pl = self._alias_columns(data_pl, capacity_column)
        self._data_pl = data_pl
        self._data_pd_cache = None
        self._apply_transforms()
DataLoader._init_with_steps method · python · L493-L559 (67 LOC)
ionworksdata/load.py
    def _init_with_steps(self, time_series, steps, options):
        # Normalize inputs to polars
        if isinstance(time_series, pl.DataFrame):
            time_series_pl = time_series
        elif isinstance(time_series, pd.DataFrame):
            time_series_pl = pl.from_pandas(time_series)
        else:
            time_series_pl = pl.DataFrame(time_series)
        if isinstance(steps, pl.DataFrame):
            steps_pl = steps
        elif isinstance(steps, pd.DataFrame):
            steps_pl = pl.from_pandas(steps)
        else:
            steps_pl = pl.DataFrame(steps)

        # Polars DataFrames are immutable so no .copy() needed
        self._original_time_series_pl = time_series_pl
        self._original_steps_pl = steps_pl

        first_step = options.get("first_step")
        last_step = options.get("last_step")
        first_step_dict = options.get("first_step_dict")
        last_step_dict = options.get("last_step_dict")

        if first_step is not None and first_
Repobility · open methodology · https://repobility.com/research/
DataLoader._init_without_steps method · python · L561-L603 (43 LOC)
ionworksdata/load.py
    def _init_without_steps(self, time_series, options):
        # Normalize to polars
        if isinstance(time_series, dict):
            time_series_pl = pl.DataFrame(time_series)
        elif isinstance(time_series, pd.DataFrame):
            time_series_pl = pl.from_pandas(time_series)
        elif isinstance(time_series, pl.DataFrame):
            time_series_pl = time_series
        else:
            time_series_pl = pl.DataFrame(time_series)

        self._steps_pl = None
        self._steps_pd_cache = None
        self._original_time_series_pl = None
        self._original_steps_pl = None
        self._first_step = None
        self._last_step = None
        self._start_idx = 0
        self._end_idx = time_series_pl.height
        self.initial_voltage = None

        # Voltage column aliasing
        potential_ocp_column_names = [
            "Voltage [V]",
            "OCP [V]",
            "OCV [V]",
            "Open-circuit potential [V]",
            "Open-circuit voltag
DataLoader._alias_columns method · python · L606-L618 (13 LOC)
ionworksdata/load.py
    def _alias_columns(data: pl.DataFrame, capacity_column) -> pl.DataFrame:
        """Resolve capacity column aliases on the DataFrame."""
        if capacity_column is not None:
            if capacity_column in data.columns:
                data = data.with_columns(
                    pl.col(capacity_column).alias("Capacity [A.h]")
                )
            else:
                raise ValueError(
                    f"Specified capacity_column '{capacity_column}' not found in data. "
                    f"Available columns: {list(data.columns)}"
                )
        return data
DataLoader.set_processed_internal_state method · python · L620-L652 (33 LOC)
ionworksdata/load.py
    def set_processed_internal_state(
        self,
        *,
        transforms=None,
        measurement_id=None,
        capacity_column=None,
        first_step=None,
        last_step=None,
        original_time_series=None,
        original_steps=None,
    ):
        """Set internal state when constructing from processed data. Used by from_processed_data.

        original_time_series and original_steps may be pandas or Polars DataFrames (or None);
        they are stored as Polars internally for config export.
        """
        self._transforms = transforms if transforms is not None else {}
        self._measurement_id = measurement_id
        self._capacity_column = capacity_column
        self._first_step = first_step
        self._last_step = last_step
        if original_time_series is None:
            self._original_time_series_pl = None
        elif isinstance(original_time_series, pd.DataFrame):
            self._original_time_series_pl = pl.from_pandas(original_time_
DataLoader._apply_transforms method · python · L658-L678 (21 LOC)
ionworksdata/load.py
    def _apply_transforms(self):
        transforms = self._transforms
        if transforms.get("gitt_to_ocp"):
            self._transform_gitt_to_ocp()
        if transforms.get("sort"):
            self._data_pl = self._sort_capacity_and_ocp(self._data_pl)
            self._data_pd_cache = None
        if transforms.get("remove_duplicates"):
            self._data_pl = self._remove_duplicate_ocp(self._data_pl)
            self._data_pd_cache = None
        if transforms.get("remove_extremes"):
            self._data_pl = self._remove_ocp_extremes(self._data_pl)
            self._data_pd_cache = None
        filters = transforms.get("filters")
        if filters:
            self._data_pl = self._filter_data_pl(self._data_pl, filters)
            self._data_pd_cache = None
        interpolate = transforms.get("interpolate")
        if interpolate is not None:
            self._data_pl = self._interpolate_data_pl(self._data_pl, interpolate)
            self._data_pd_cache = None
DataLoader._transform_gitt_to_ocp method · python · L680-L715 (36 LOC)
ionworksdata/load.py
    def _transform_gitt_to_ocp(self):
        """Extract OCP from GITT rest steps: take the last data point of each rest."""
        if self._steps_pl is None:
            raise ValueError("gitt_to_ocp requires steps data")

        gitt_rest = self._steps_pl.filter(
            (pl.col("Label") == "GITT") & (pl.col("Step type") == "Rest")
        )
        if gitt_rest.height == 0:
            raise ValueError("No GITT rest steps found in data")

        ocp_points = []
        data_pl = self._data_pl
        for step_row in gitt_rest.iter_rows(named=True):
            end_idx = int(step_row["End index"]) - self._start_idx
            row = data_pl.row(end_idx, named=True)
            discharge = row.get("Discharge capacity [A.h]", 0)
            charge = row.get("Charge capacity [A.h]", 0)
            capacity = abs(discharge - charge)
            ocp_points.append(
                {
                    "Capacity [A.h]": capacity,
                    "Voltage [V]": row["Voltage [V]"]
DataLoader._remove_duplicate_ocp method · python · L718-L724 (7 LOC)
ionworksdata/load.py
    def _remove_duplicate_ocp(
        data: pl.DataFrame, capacity_column_name="Capacity [A.h]"
    ) -> pl.DataFrame:
        """Remove any duplicate capacity and voltage values."""
        data = data.unique(subset=[capacity_column_name], maintain_order=True)
        data = data.unique(subset=["Voltage [V]"], maintain_order=True)
        return data
DataLoader._sort_capacity_and_ocp method · python · L727-L751 (25 LOC)
ionworksdata/load.py
    def _sort_capacity_and_ocp(data: pl.DataFrame) -> pl.DataFrame:
        """Sort OCP data so voltage is decreasing and capacity is increasing."""
        V = data["Voltage [V]"].to_numpy()
        if V[-1] > V[0]:
            data = data.reverse()

        capacity_column_names = [
            col for col in data.columns if col.startswith("Capacity [")
        ]
        if len(capacity_column_names) == 0:
            raise ValueError("No capacity column found")
        elif len(capacity_column_names) > 1:
            raise ValueError(
                f"Multiple capacity columns found: {capacity_column_names}"
            )
        capacity_column_name = capacity_column_names[0]

        Q = data[capacity_column_name].to_numpy().copy()
        if Q[0] > Q[-1]:
            Q = Q.max() - Q
        Q -= Q.min()
        data = data.with_columns(pl.Series(capacity_column_name, Q))

        data = DataLoader._remove_duplicate_ocp(data, capacity_column_name)
        return data
DataLoader._remove_ocp_extremes method · python · L754-L761 (8 LOC)
ionworksdata/load.py
    def _remove_ocp_extremes(data: pl.DataFrame) -> pl.DataFrame:
        """Remove data at extremes where the second derivative of voltage vs
        capacity is zero."""
        q = data["Capacity [A.h]"].to_numpy()
        U = data["Voltage [V]"].to_numpy()
        d2UdQ2 = np.gradient(np.gradient(U, q), q)
        first_positive, last_positive = np.where(abs(d2UdQ2) > 1e-10)[0][[0, -1]]
        return data.slice(first_positive, last_positive - first_positive + 1)
About: code-quality intelligence by Repobility · https://repobility.com
DataLoader.calculate_dUdQ_cutoff method · python · L767-L798 (32 LOC)
ionworksdata/load.py
    def calculate_dUdQ_cutoff(
        self,
        method: str = "explicit",
        show_plot: bool = False,
        options: dict | None = None,
    ) -> float:
        """
        Calculate the cut-off for dUdQ based on the data.

        Parameters
        ----------
        method : str, optional
            Method to use for calculating the cut-off. Options are:
            - "explicit" (default): Uses explicit method based on data range
            - "quantile": Uses quantile-based method
            - "peaks": Uses peak detection method
        show_plot : bool, optional
            Whether to show a plot of the dUdQ values with the cut-off.
        options : dict, optional
            Dictionary of options to pass to the method.

        Returns
        -------
        float
            Cut-off for dUdQ
        """
        q = self._data_pl["Capacity [A.h]"].to_numpy()
        U = self._data_pl["Voltage [V]"].to_numpy()
        dUdQ = abs(np.gradient(U, q))
        return se
DataLoader.calculate_dQdU_cutoff method · python · L800-L831 (32 LOC)
ionworksdata/load.py
    def calculate_dQdU_cutoff(
        self,
        method: str = "explicit",
        show_plot: bool = False,
        options: dict | None = None,
    ) -> float:
        """
        Calculate the cut-off for dQdU based on the data.

        Parameters
        ----------
        method : str, optional
            Method to use for calculating the cut-off. Options are:
            - "explicit" (default): Uses explicit method based on data range
            - "quantile": Uses quantile-based method
            - "peaks": Uses peak detection method
        show_plot : bool, optional
            Whether to show a plot of the dQdU values with the cut-off.
        options : dict, optional
            Dictionary of options to pass to the method.

        Returns
        -------
        float
            Cut-off for dQdU
        """
        U = self._data_pl["Voltage [V]"].to_numpy()
        q = self._data_pl["Capacity [A.h]"].to_numpy()
        dQdU = abs(np.gradient(q, U))
        return se
DataLoader._calculate_differential_cutoff method · python · L833-L857 (25 LOC)
ionworksdata/load.py
    def _calculate_differential_cutoff(
        self,
        x,
        y,
        dydx,
        method="explicit",
        show_plot=False,
        xlabel=None,
        ylabel=None,
        options=None,
    ) -> float:
        options = options or {}
        if method == "explicit":
            return self._calculate_differential_cutoff_explicit(
                x, y, dydx, show_plot=show_plot, xlabel=xlabel, ylabel=ylabel, **options
            )
        elif method == "quantile":
            return self._calculate_differential_cutoff_quantile(
                x, y, dydx, show_plot=show_plot, xlabel=xlabel, ylabel=ylabel, **options
            )
        elif method == "peaks":
            return self._calculate_differential_cutoff_peaks(
                x, y, dydx, show_plot=show_plot, xlabel=xlabel, ylabel=ylabel, **options
            )
        raise ValueError(f"Method {method} not recognized")
DataLoader._calculate_differential_cutoff_explicit method · python · L859-L881 (23 LOC)
ionworksdata/load.py
    def _calculate_differential_cutoff_explicit(
        self,
        x,
        y,
        dydx,
        show_plot=False,
        xlabel=None,
        ylabel=None,
        lower_ratio=0.1,
        upper_ratio=0.9,
        scale=1.1,
    ) -> float:
        x = np.array(x)
        x_scaled = (x - x.min()) / (x.max() - x.min())
        xmin_idx = np.argmin(np.abs(x_scaled - lower_ratio))
        xmax_idx = np.argmin(np.abs(x_scaled - upper_ratio))
        if xmin_idx > xmax_idx:
            xmin_idx, xmax_idx = xmax_idx, xmin_idx
        dydx_cutoff = dydx[xmin_idx:xmax_idx].max() * scale
        if show_plot:
            self._plot_differential_cutoff(x, dydx, dydx_cutoff, xlabel, ylabel)
            plt.show()
        return dydx_cutoff
DataLoader._calculate_differential_cutoff_quantile method · python · L883-L915 (33 LOC)
ionworksdata/load.py
    def _calculate_differential_cutoff_quantile(
        self,
        x,
        y,
        dydx,
        show_plot=False,
        xlabel=None,
        ylabel=None,
        quantile=0.8,
        scale=2,
    ) -> float:
        x = np.array(x)
        x_interp = np.linspace(x.min(), x.max(), 1000)
        dydx_interp = interp1d(x, dydx, kind="linear")(x_interp)
        quantile_value = np.percentile(dydx_interp, quantile * 100)
        dydx_cutoff = scale * quantile_value
        if show_plot:
            fig, ax = self._plot_differential_cutoff(
                x_interp, dydx_interp, dydx_cutoff, xlabel, ylabel
            )
            ax.axhline(
                quantile_value,
                color="tab:blue",
                linestyle="--",
                label="Quantile",
                zorder=10,
            )
            ax.axhline(
                dydx_cutoff, color="tab:red", linestyle="--", label="Cut-off", zorder=10
            )
            ax.legend()
            plt.s
DataLoader._calculate_differential_cutoff_peaks method · python · L917-L938 (22 LOC)
ionworksdata/load.py
    def _calculate_differential_cutoff_peaks(
        self,
        x,
        y,
        dydx,
        show_plot=False,
        xlabel=None,
        ylabel=None,
        scale=1.1,
        **kwargs,
    ) -> float:
        x = np.array(x)
        x_interp = np.linspace(x.min(), x.max(), 1000)
        dydx_interp = interp1d(x, dydx, kind="linear")(x_interp)
        peaks, _ = find_peaks(dydx_interp, **kwargs)
        dydx_cutoff = dydx_interp[peaks].max() * scale
        if show_plot:
            self._plot_differential_cutoff(
                x_interp, dydx_interp, dydx_cutoff, xlabel, ylabel
            )
            plt.show()
        return dydx_cutoff
DataLoader._plot_differential_cutoff method · python · L940-L951 (12 LOC)
ionworksdata/load.py
    def _plot_differential_cutoff(self, x, dydx, dydx_cutoff, xlabel=None, ylabel=None):
        if xlabel is None:
            xlabel = "x"
        if ylabel is None:
            ylabel = "dy/dx"
        fig, ax = plt.subplots()
        ax.plot(x, dydx)
        ax.axhline(dydx_cutoff, color="tab:red", linestyle="--")
        ax.set_xlabel(xlabel)
        ax.set_ylabel(ylabel)
        ax.set_ylim(0, dydx_cutoff * 1.5)
        return fig, ax
DataLoader.to_config method · python · L957-L1025 (69 LOC)
ionworksdata/load.py
    def to_config(self, filter_data: bool = True) -> dict:
        """
        Convert the DataLoader back to parser configuration format.

        Parameters
        ----------
        filter_data : bool, optional
            If True (default) and steps are present, saves the filtered data
            rather than the original unfiltered data.

        Returns
        -------
        dict
            Configuration dictionary that can recreate this DataLoader.
        """
        if hasattr(self, "_measurement_id") and self._measurement_id is not None:
            config = {"data": f"db:{self._measurement_id}"}
            opts = self._build_options_for_config()
            if opts:
                config["options"] = opts
            return config

        if self._steps_pl is None:
            config = {"data": self._data_pl.to_dict(as_series=False)}
            opts = self._build_options_for_config()
            if opts:
                config["options"] = opts
            return con
If a scraper extracted this row, it came from Repobility (https://repobility.com)
DataLoader._build_options_for_config method · python · L1027-L1044 (18 LOC)
ionworksdata/load.py
    def _build_options_for_config(self, include_step_options=True):
        opts = {}
        if include_step_options:
            if getattr(self, "_first_step", None) is not None:
                opts["first_step"] = self._first_step
            if getattr(self, "_last_step", None) is not None:
                opts["last_step"] = self._last_step
        if getattr(self, "_capacity_column", None) is not None:
            opts["capacity_column"] = self._capacity_column
        if self._transforms:
            serializable_transforms = {}
            for k, v in self._transforms.items():
                if isinstance(v, np.ndarray):
                    serializable_transforms[k] = v.tolist()
                else:
                    serializable_transforms[k] = v
            opts["transforms"] = serializable_transforms
        return opts
DataLoader._get_step_from_cycle method · python · L1071-L1076 (6 LOC)
ionworksdata/load.py
    def _get_step_from_cycle(cycle, steps, first):
        steps_pl = steps if isinstance(steps, pl.DataFrame) else pl.from_pandas(steps)
        steps_per_cycle = steps_pl.filter(pl.col("Cycle count") == cycle)
        if steps_per_cycle.height == 0:
            raise ValueError(f"No steps found for cycle {cycle}")
        return steps_per_cycle["Step count"][0 if first else -1]
DataLoader._get_step_from method · python · L1083-L1098 (16 LOC)
ionworksdata/load.py
    def _get_step_from(step_dict, all_steps, first):
        match list(step_dict.keys())[0]:
            case "cycle":
                return DataLoader._get_step_from_cycle(
                    list(step_dict.values())[0],
                    all_steps,
                    first,
                )
            case "step":
                return DataLoader._get_step_from_step(
                    list(step_dict.values())[0],
                    all_steps,
                    first,
                )
            case _:
                raise ValueError(f"Unknown step type: {list(step_dict.keys())[0]}")
DataLoader._get_step method · python · L1101-L1134 (34 LOC)
ionworksdata/load.py
    def _get_step(step_param, all_steps, first):
        if step_param is None:
            return 0 if first else len(all_steps) - 1
        if isinstance(step_param, int):
            return step_param
        if isinstance(step_param, str):
            try:
                steps_pl = (
                    all_steps
                    if isinstance(all_steps, pl.DataFrame)
                    else pl.from_pandas(all_steps)
                )
                ctx = pl.SQLContext(steps=steps_pl)
                result = ctx.execute(step_param).collect()
                if len(result) == 0:
                    raise ValueError(f"SQL query returned no results: {step_param}")
                if len(result) > 1:
                    raise ValueError(
                        f"SQL query returned {len(result)} results, expected exactly 1: {step_param}"
                    )
                return result["Step count"][0]
            except Exception as e:
                raise ValueError(
     
DataLoader.generate_experiment method · python · L1140-L1177 (38 LOC)
ionworksdata/load.py
    def generate_experiment(self, use_cv: bool = False) -> pybamm.Experiment:
        """Generate a PyBaMM experiment from the loaded step information."""
        if self._steps_pl is None:
            raise ValueError("generate_experiment requires steps data")
        steps = []
        for step_row in self._steps_pl.iter_rows(named=True):
            duration = step_row["Duration [s]"]
            step_type = step_row["Step type"]
            if duration <= np.nextafter(0, 1):
                continue
            match step_type:
                case "Constant current discharge" | "Constant current charge":
                    mean_current = step_row["Mean current [A]"]
                    step = pybamm.step.Current(mean_current, duration=duration)
                case "Constant voltage discharge" | "Constant voltage charge":
                    if use_cv:
                        mean_voltage = step_row["Mean voltage [V]"]
                        step = pybamm.step.Voltage(mean_volta
DataLoader.generate_interpolant method · python · L1179-L1205 (27 LOC)
ionworksdata/load.py
    def generate_interpolant(self) -> pybamm.Interpolant:
        """Generate a PyBaMM interpolant from the loaded step information."""
        if self._steps_pl is None:
            raise ValueError("generate_interpolant requires steps data")
        ts = []
        cs = []
        for step_row in self._steps_pl.iter_rows(named=True):
            if (
                "constant current" in step_row["Step type"].lower()
                or step_row["Step type"] == "Rest"
            ):
                first_t = step_row["Start time [s]"]
                last_t = step_row["End time [s]"]
                ts.append(np.array([first_t, last_t]))
                cs.append(
                    np.array(
                        [step_row["Mean current [A]"], step_row["Mean current [A]"]]
                    )
                )
            else:
                times, currents = self._get_times_and_currents(step_row)
                ts.append(times)
                cs.append(currents)
        ts 
DataLoader.plot_data method · python · L1207-L1220 (14 LOC)
ionworksdata/load.py
    def plot_data(self, show: bool = False) -> tuple[plt.Figure, plt.Axes]:
        """Plot voltage vs time data from the loaded experiment."""
        fig, ax = plt.subplots(3, 1, sharex=True)
        time = self._data_pl["Time [s]"].to_numpy()
        ax[0].plot(time, self._data_pl["Voltage [V]"].to_numpy())
        ax[0].set_ylabel("Voltage [V]")
        ax[1].plot(time, self._data_pl["Current [A]"].to_numpy())
        ax[1].set_ylabel("Current [A]")
        ax[2].plot(time, self._data_pl["Temperature [degC]"].to_numpy())
        ax[2].set_xlabel("Time [s]")
        ax[2].set_ylabel("Temperature [degC]")
        if show:
            plt.show()
        return fig, ax
DataLoader._get_times_and_currents method · python · L1222-L1228 (7 LOC)
ionworksdata/load.py
    def _get_times_and_currents(self, step):
        start_idx = int(step["Start index"]) - self._start_idx
        end_idx = int(step["End index"]) + 1 - self._start_idx
        length = end_idx - start_idx
        current = self._data_pl["Current [A]"].slice(start_idx, length).to_numpy()
        time = self._data_pl["Time [s]"].slice(start_idx, length).to_numpy()
        return time, current
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
DataLoader.from_local method · python · L1241-L1271 (31 LOC)
ionworksdata/load.py
    def from_local(cls, data_path, options=None, use_polars=True):
        """
        Load data from local filesystem.

        Parameters
        ----------
        data_path : str
            Path to the directory containing time_series.csv and optionally
            steps.csv files.
        options : dict | None, optional
            Options to pass to the DataLoader constructor.
        use_polars : bool, optional
            If True (default), read CSV with Polars. If False, read with Pandas
            (data is still stored as Polars internally).

        Returns
        -------
        DataLoader
        """
        read_fn = pl.read_csv if use_polars else pd.read_csv
        ts_path = Path(data_path) / "time_series.csv"
        if not ts_path.exists():
            raise ValueError(
                f"Folder must contain either time_series.csv or both "
                f"time_series.csv and steps.csv. Path: {data_path}"
            )
        time_series = read_fn(ts_path)
      
DataLoader.from_db method · python · L1274-L1320 (47 LOC)
ionworksdata/load.py
    def from_db(cls, measurement_id, options=None, use_cache=True, timeout=None):
        """
        Load data from the Ionworks database.

        Loads steps if available, otherwise creates a steps-less DataLoader.

        Parameters
        ----------
        measurement_id : str
            The ID of the measurement to load from the database.
        options : dict | None, optional
            Options to pass to the DataLoader constructor.
        use_cache : bool, optional
            If True (default), use local file cache to avoid repeated API calls.
        timeout : int | None, optional
            Request timeout in seconds passed to the Ionworks client.

        Returns
        -------
        DataLoader
        """
        options = options or {}
        instance = cls.__new__(cls)

        # Parse the subset of options needed by to_config() /
        # _build_options_for_config() before any data is loaded —
        # uses _parse_options to stay in sync with __init__.
   
DataLoader.from_processed_data method · python · L1323-L1376 (54 LOC)
ionworksdata/load.py
    def from_processed_data(
        cls,
        data,
        steps,
        initial_voltage,
        start_idx,
        end_idx,
    ):
        """
        Create a DataLoader from already-processed data, bypassing __init__.

        Parameters
        ----------
        data : pd.DataFrame | pl.DataFrame
            The processed time series data.
        steps : pd.DataFrame | pl.DataFrame | None
            The processed steps data (or None).
        initial_voltage : float
            The initial voltage value.
        start_idx : int
            The start index for the data.
        end_idx : int
            The end index for the data.

        Returns
        -------
        DataLoader
        """
        instance = cls.__new__(cls)
        # Accept both pandas and polars; store as polars internally
        if isinstance(data, pl.DataFrame):
            instance._data_pl = data.clone()  # noqa: SLF001
        elif isinstance(data, pd.DataFrame):
            instance._data_pl =
page 1 / 4next ›