← back to ionworks__ionworksdata

Function bodies 175 total

All specs Real LLM only Function bodies
set_cumulative_cycle_number function · python · L247-L264 (18 LOC)
ionworksdata/transform.py
def set_cumulative_cycle_number(data: pl.DataFrame, **kwargs) -> pl.DataFrame:
    """
    Add a column with the cumulative cycle number to the data.

    Parameters
    ----------
    data : pl.DataFrame
        The data to add the cycle number to.
    kwargs
        Additional keyword arguments to pass to get_cumulative_cycle_number.

    Returns
    -------
    pl.DataFrame
        The data with the cycle number added.
    """
    cycle_series = get_cumulative_cycle_number(data, **kwargs)
    return data.with_columns(cycle_series.alias("Cycle number"))
set_cycle_count function · python · L267-L289 (23 LOC)
ionworksdata/transform.py
def set_cycle_count(data: pl.DataFrame) -> pl.DataFrame:
    """
    Assign a cumulative cycle number "Cycle count" to each row in the data by detecting
    changes in the "Cycle from cycler" column. If "Cycle from cycler" doesn't exist,
    sets all values to 0.

    Parameters
    ----------
    data : pl.DataFrame
        The data to assign cycle count to.

    Returns
    -------
    pl.DataFrame
        The data with "Cycle count" column added.
    """
    if "Cycle from cycler" in data.columns:
        options = {"method": "cycle column", "cycle column": "Cycle from cycler"}
        cycle_series = get_cumulative_cycle_number(data, options)
    else:
        # Set all to 0 if no cycle information available
        cycle_series = pl.Series([0] * data.height)
    return data.with_columns(cycle_series.alias("Cycle count"))
reset_time function · python · L292-L303 (12 LOC)
ionworksdata/transform.py
def reset_time(data: pl.DataFrame) -> pl.DataFrame:
    """
    Reset the time to start at zero

    Parameters
    ----------
    data : pl.DataFrame
        The data to reset the time for.
    """
    # Extract first value using numpy (simpler than Polars head/item)
    first = data.get_column("Time [s]").to_numpy()[0]
    return data.with_columns((pl.col("Time [s]") - pl.lit(first)).alias("Time [s]"))
offset_duplicate_times function · python · L306-L358 (53 LOC)
ionworksdata/transform.py
def offset_duplicate_times(data: pl.DataFrame, offset: float = 1e-6) -> pl.DataFrame:
    """
    Offset duplicate time values by a small amount. This is preferable to
    removing the duplicate time values because removing duplicate time values can
    lead to missing steps in the data.

    Parameters
    ----------
    data : pl.DataFrame
        The data to remove duplicate time values from.
    offset : float, optional
        The amount to offset the duplicate time values by.
    """
    t = data.get_column("Time [s]").to_numpy().astype(float)

    if len(t) <= 1:
        return data

    max_iter = 100
    while True:
        # Sort to make duplicates adjacent (stable sort preserves relative order)
        sort_idx = np.argsort(t, kind="stable")
        sorted_t = t[sort_idx]

        # Find group boundaries (where values change)
        is_new_group = np.concatenate([[True], sorted_t[1:] != sorted_t[:-1]])

        # Check if there are any duplicates (all True means no duplicat
_apply_step_resets function · python · L361-L406 (46 LOC)
ionworksdata/transform.py
def _apply_step_resets(
    discharge_values: np.ndarray,
    charge_values: np.ndarray,
    step_numbers: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Reset cumulative values to 0 at each step boundary.

    Vectorized implementation that subtracts the starting value of each step
    from all points within that step, ensuring each step starts at 0.

    Parameters
    ----------
    discharge_values : np.ndarray
        Cumulative discharge values to reset.
    charge_values : np.ndarray
        Cumulative charge values to reset.
    step_numbers : np.ndarray
        Step numbers for each data point.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        Reset discharge and charge values.
    """
    # Detect step boundaries (first row is always a step start)
    step_changes = np.concatenate(([True], np.diff(step_numbers) != 0))

    # For each row, subtract the cumulative value at the start of its step
    step_start_indices = np.where(step_changes)[0]
   
_split_cumulative_by_direction function · python · L409-L463 (55 LOC)
ionworksdata/transform.py
def _split_cumulative_by_direction(
    cumulative_values: np.ndarray,
    direction_indicator: np.ndarray,
    step_numbers: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Split cumulative values into positive and negative components based on
    direction indicator (e.g., current or power).

    Vectorized implementation that splits a cumulative metric (capacity/energy)
    into discharge and charge components based on the sign of a direction
    indicator. Resets accumulation at step boundaries if step_numbers provided.

    Parameters
    ----------
    cumulative_values : np.ndarray
        Cumulative values to split (e.g., capacity or energy).
    direction_indicator : np.ndarray
        Direction indicator (e.g., current or power). Positive values indicate
        discharge, negative indicate charge.
    step_numbers : np.ndarray, optional
        Step numbers for each data point. If provided, accumulation resets at
        each step boundary.

    Ret
_calculate_capacity function · python · L466-L534 (69 LOC)
ionworksdata/transform.py
def _calculate_capacity(
    data: pl.DataFrame, options: dict | None = None
) -> tuple[np.ndarray, np.ndarray]:
    """
    Calculate discharge and charge capacity from the data.

    First checks if charge/discharge capacity columns exist. If so, uses them
    directly. Otherwise checks if a single "Capacity" column exists and splits
    it into discharge and charge based on current direction. Finally, calculates
    capacities using cumulative trapezoidal integration if no capacity columns
    are found. Resets capacity to zero at each step boundary if step information
    is available.

    Parameters
    ----------
    data : pl.DataFrame
        The data to get the capacity columns from.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        Discharge capacity and charge capacity as numpy arrays.
    """
    current_units, capacity_units = iwdata.util.get_current_and_
Repobility — same analyzer, your code, free for public repos · /scan/
set_capacity function · python · L537-L568 (32 LOC)
ionworksdata/transform.py
def set_capacity(data: pl.DataFrame, options: dict | None = None) -> pl.DataFrame:
    """
    Calculate discharge and charge capacity for the data and assign them to new
    columns called "Discharge capacity [A.h]" and "Charge capacity [A.h]"
    Drops the single "Capacity [A.h]" column if it exists.

    Parameters
    ----------
    data : pl.DataFrame
        The data to calculate the capacity for.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    pl.DataFrame
        The data with discharge and charge capacity columns added, and single
        capacity column removed if it existed.
    """
    _, capacity_units = iwdata.util.get_current_and_capacity_units(options)
    discharge_cap, charge_cap = _calculate_capacity(data, options)
    result = data.with_columns(
        [
            pl.Series(f"Discharge capacity [{capacity_units}]", discharge_cap),
            pl.Series(f"Charge capacity [{capacity_
_calculate_energy function · python · L571-L645 (75 LOC)
ionworksdata/transform.py
def _calculate_energy(
    data: pl.DataFrame, options: dict | None = None
) -> tuple[np.ndarray, np.ndarray]:
    """
    Calculate discharge and charge energy from the data.

    First checks if charge/discharge energy columns exist. If so, uses them
    directly. Otherwise checks if a single "Energy [W.h]" column exists and splits
    it into discharge and charge based on power direction. Finally, calculates
    energies using cumulative trapezoidal integration if no energy columns are
    found. Resets energy to zero at each step boundary if step information is
    available.

    Parameters
    ----------
    data : pl.DataFrame
        The data to get the energy columns from.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        Discharge energy and charge energy as numpy arrays.
    """
    # Calculate power from current and voltage if Power column doesn't exist
   
set_energy function · python · L648-L677 (30 LOC)
ionworksdata/transform.py
def set_energy(data: pl.DataFrame, options: dict | None = None) -> pl.DataFrame:
    """
    Calculate discharge and charge energy for the data and assign them to new
    columns called "Discharge energy [W.h]" and "Charge energy [W.h]"
    Drops the single "Energy [W.h]" column if it exists.

    Parameters
    ----------
    data : pl.DataFrame
        The data to calculate the energy for.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    pl.DataFrame
        The data with discharge and charge energy columns added, and single
        energy column removed if it existed.
    """
    discharge_energy, charge_energy = _calculate_energy(data, options)
    result = data.with_columns(
        [
            pl.Series("Discharge energy [W.h]", discharge_energy),
            pl.Series("Charge energy [W.h]", charge_energy),
        ]
    )
    # Drop single energy column if it exists
    if "Energy [W.h]" in resul
_calculate_net_capacity function · python · L680-L713 (34 LOC)
ionworksdata/transform.py
def _calculate_net_capacity(
    data: pl.DataFrame, options: dict | None = None
) -> np.ndarray:
    """
    Calculate net capacity (discharge minus charge) from the data.

    Net capacity represents the net amount of charge removed from the battery.
    Positive values indicate net discharge, negative values indicate net charge.

    Parameters
    ----------
    data : pl.DataFrame
        The data to calculate net capacity from.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    np.ndarray
        Net capacity as a numpy array.
    """
    _, capacity_units = iwdata.util.get_current_and_capacity_units(options)
    discharge_cap_col = f"Discharge capacity [{capacity_units}]"
    charge_cap_col = f"Charge capacity [{capacity_units}]"

    # Check if columns exist
    if discharge_cap_col in data.columns and charge_cap_col in data.columns:
        discharge_cap = data.get_column(discharge_cap_col).to_nump
set_net_capacity function · python · L716-L738 (23 LOC)
ionworksdata/transform.py
def set_net_capacity(data: pl.DataFrame, options: dict | None = None) -> pl.DataFrame:
    """
    Calculate the net capacity for the data and assign it to a new column called
    "Capacity [A.h]".

    Parameters
    ----------
    data : pl.DataFrame
        The data to calculate the net capacity for.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    pl.DataFrame
        The data with the net capacity added.
    """
    _, capacity_units = iwdata.util.get_current_and_capacity_units(options)
    cap_col = f"Capacity [{capacity_units}]"
    if cap_col in data.columns:
        raise ValueError(f"Column '{cap_col}' already exists in data.")
    net_capacity = _calculate_net_capacity(data, options)
    return data.with_columns(pl.Series(cap_col, net_capacity))
set_nominal_soc function · python · L741-L771 (31 LOC)
ionworksdata/transform.py
def set_nominal_soc(
    data: pl.DataFrame, cell_metadata: dict, options: dict | None = None
) -> pl.DataFrame:
    """
    Calculate the nominal SOC for the data and assign it to a new column called
    "Nominal SOC". SOC is calculated based on net capacity (discharge - charge).

    Parameters
    ----------
    data : pl.DataFrame
        The data to calculate the nominal SOC for. Must have columns
        "Discharge capacity [A.h]" and "Charge capacity [A.h]" (or mA.h.cm-2).
        If they don't exist, use set_capacity to calculate them first.
    cell_metadata : dict
        The metadata for the cell. Should have a key "Nominal cell capacity [A.h]" or
        "Nominal cell capacity [mA.h.cm-2]"
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    pl.DataFrame
        The data with the nominal SOC added.
    """
    _, capacity_units = iwdata.util.get_current_and_capacity_units(options)
    net_capacity
convert_current_density_to_total_current function · python · L774-L798 (25 LOC)
ionworksdata/transform.py
def convert_current_density_to_total_current(
    data: pl.DataFrame, metadata: dict
) -> pl.DataFrame:
    """
    Convert the current density from mA.cm-2 to A

    Parameters
    ----------
    data : pl.DataFrame
        The data to convert. Should have a column "Current [mA.cm-2]".
    metadata : dict
        The metadata for the data. Should have a key "Electrode area [cm2]".

    Returns
    -------
    pl.DataFrame
        The data with the current converted to A.
    """
    return data.with_columns(
        (
            pl.col("Current [mA.cm-2]")
            * float(metadata["Electrode area [cm2]"])
            / 1000.0
        ).alias("Current [A]")
    ).drop(["Current [mA.cm-2]"])
convert_total_current_to_current_density function · python · L801-L823 (23 LOC)
ionworksdata/transform.py
def convert_total_current_to_current_density(
    data: pl.DataFrame, metadata: dict
) -> pl.DataFrame:
    """
    Convert the total current from A to mA.cm-2

    Parameters
    ----------
    data : pl.DataFrame
        The data to convert. Should have a column "Current [A]".
    metadata : dict
        The metadata for the data. Should have a key "Electrode area [cm2]".

    Returns
    -------
    pl.DataFrame
        The data with the current converted to mA.cm-2.
    """
    return data.with_columns(
        (
            pl.col("Current [A]") / float(metadata["Electrode area [cm2]"]) * 1000.0
        ).alias("Current [mA.cm-2]")
    ).drop(["Current [A]"])
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
set_positive_current_for_discharge function · python · L826-L919 (94 LOC)
ionworksdata/transform.py
def set_positive_current_for_discharge(
    data: pl.DataFrame, options: dict | None = None
) -> pl.DataFrame:
    """
    Identify whether positive current is charging or discharging, then make sure that
    positive current is discharging and negative current is charging

    Parameters
    ----------
    data : pl.DataFrame
        The data to set the current direction for.
    options : dict, optional
        Additional options to pass to the function. The default is None.

    Returns
    -------
    pl.DataFrame
        The data with the current direction set to positive current is discharging.
    """
    options = options or {}
    options["method"] = "current sign"

    current_units, _ = iwdata.util.get_current_and_capacity_units(options)

    # Create a new dataframe with only the columns we need and set the step count
    # based on current sign
    new_cols = [
        c
        for c in [
            "Time [s]",
            f"Current [{current_units}]",
            "Volta
remove_outliers function · python · L922-L968 (47 LOC)
ionworksdata/transform.py
def remove_outliers(
    data: pl.DataFrame,
    column: str,
    z_threshold: float = 3,
    data_range: slice | None = None,
) -> pl.DataFrame:
    """
    Remove outliers from the data based on the z-score of a column

    Parameters
    ----------
    data : pl.DataFrame
        The data to remove outliers from.
    column : str
        The column to calculate the z-score for.
    z_threshold : float, optional
        The z-score threshold to use for removing outliers. The default is 3.
    data_range : slice, optional
        The range of data points to consider for outlier detection.
        If None, all points are used. Use Python's slice notation, e.g.,
        slice(0, 100) for first 100 points, slice(-100, None) for last 100 points.

    Returns
    -------
    pl.DataFrame
        The data with the outliers removed.
    """
    # Get unique step numbers
    processed_frames: list[pl.DataFrame] = []
    for step in data.get_column("Step number").unique().to_list():
        st
get_current_and_capacity_units function · python · L9-L16 (8 LOC)
ionworksdata/util.py
def get_current_and_capacity_units(options: dict | None) -> tuple[str, str]:
    options = options or {}
    current_format = options.get("current units", "total")
    if current_format not in ["total", "density"]:
        raise ValueError("Invalid current units option")
    current_units = {"total": "A", "density": "mA.cm-2"}[current_format]
    capacity_units = {"total": "A.h", "density": "mA.h.cm-2"}[current_format]
    return current_units, capacity_units
check_for_duplicates function · python · L19-L27 (9 LOC)
ionworksdata/util.py
def check_for_duplicates(column_renamings: dict, data: pl.DataFrame) -> None:
    duplicates = []
    for c, n in column_renamings.items():
        if c in data.columns:
            if n in duplicates:
                message = f"Duplicate columns for {n} found: {c}"
                warnings.warn(message, category=UserWarning, stacklevel=2)
            else:
                duplicates.append(n)
check_and_convert_datetime function · python · L30-L47 (18 LOC)
ionworksdata/util.py
def check_and_convert_datetime(start_datetime: str | datetime) -> datetime:
    """Check that the datetime is valid and convert to datetime object if necessary."""
    if isinstance(start_datetime, str):
        start_datetime = datetime.fromisoformat(start_datetime)

    # error if the datetime is not timezone aware
    if start_datetime.tzinfo is None:
        raise ValueError("Start datetime must be timezone aware")

    # convert to UTC
    start_datetime = start_datetime.astimezone(timezone.utc)

    # error if the datetime seems to recent
    if start_datetime > datetime.now(timezone.utc):
        raise ValueError("Start datetime cannot be in the future")
    elif start_datetime > datetime.now(timezone.utc) - timedelta(seconds=1):
        raise ValueError("Do not use datetime.now() as the start datetime.")
    return start_datetime
monotonic_time_offset function · python · L50-L96 (47 LOC)
ionworksdata/util.py
def monotonic_time_offset(
    time_points: np.ndarray,
    start_time: float,
    offset_initial_time: bool | None = None,
) -> np.ndarray:
    """
    Returns a time array that is strictly increasing and greater than start_time.
    If offset_initial_time is True, the first time is also greater than start_time.

    Parameters
    ----------
    time_points : np.ndarray
        The time points to offset.
    start_time : float
        The time to offset the time points by.
    offset_initial_time : bool, optional
        Whether to offset the initial time. If None, the initial time is offset if it
        is not equal to start_time. Default is `True` if the first time is not equal
        to start_time, otherwise `False`.

    Returns
    -------
    np.ndarray
        The offset time points.
    """
    if offset_initial_time is None:
        offset_initial_time = time_points[0] != start_time

    time_points = np.asarray(time_points, dtype=np.float64)
    start_time_np = np.float64
convert_steps function · python · L276-L361 (86 LOC)
scripts/convert_to_new_format.py
def convert_steps(steps: pl.DataFrame) -> pl.DataFrame:
    """
    Convert steps from old format to new format.

    Parameters
    ----------
    steps : pl.DataFrame
        Steps data in old format with Delta capacity column.

    Returns
    -------
    pl.DataFrame
        Steps data in new format with separate discharge/charge columns.
    """
    result = steps.clone()

    # Check for capacity columns
    capacity_units = "A.h" if "Delta capacity [A.h]" in steps.columns else "mA.h.cm-2"
    old_delta_col = f"Delta capacity [{capacity_units}]"

    if old_delta_col in steps.columns:
        print(f"  Converting steps column: {old_delta_col}")

        # For steps, we need to determine if each step was discharge or charge
        # We can use the "Mean current" or "Type" column if available
        delta_capacity = steps.get_column(old_delta_col).to_numpy()

        if "Mean current [A]" in steps.columns:
            mean_current = steps.get_column("Mean current [A]").to_numpy()
convert_files function · python · L364-L428 (65 LOC)
scripts/convert_to_new_format.py
def convert_files(
    input_time_series: Path,
    input_steps: Path,
    output_time_series: Path,
    output_steps: Path,
):
    """Convert a pair of time series and steps files."""
    print("\nConverting files:")
    print(f"  Input time series: {input_time_series}")
    print(f"  Input steps: {input_steps}")

    # Load data
    print("\nLoading data...")
    time_series = pl.read_csv(input_time_series)
    steps = pl.read_csv(input_steps)

    print(f"  Time series: {time_series.shape[0]} rows, {time_series.shape[1]} columns")
    print(f"  Steps: {steps.shape[0]} rows, {steps.shape[1]} columns")

    # Convert
    print("\nConverting time series...")
    new_time_series = convert_time_series(time_series)

    print("\nConverting steps...")
    new_steps = convert_steps(steps)

    # Save
    print("\nSaving converted data...")
    output_time_series.parent.mkdir(parents=True, exist_ok=True)
    output_steps.parent.mkdir(parents=True, exist_ok=True)

    new_time_series.write_cs
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
convert_directory function · python · L431-L456 (26 LOC)
scripts/convert_to_new_format.py
def convert_directory(input_dir: Path, output_dir: Path):
    """Convert all time_series.csv and steps.csv files in a directory."""
    print(f"\nScanning directory: {input_dir}")

    # Find all time_series.csv files
    time_series_files = list(input_dir.rglob("time_series.csv"))
    print(f"Found {len(time_series_files)} time_series.csv files")

    for ts_file in time_series_files:
        # Find corresponding steps.csv
        steps_file = ts_file.parent / "steps.csv"

        if not steps_file.exists():
            print(f"\n⚠️  Skipping {ts_file} (no corresponding steps.csv)")
            continue

        # Create output paths maintaining directory structure
        rel_path = ts_file.parent.relative_to(input_dir)
        output_ts = output_dir / rel_path / "time_series.csv"
        output_st = output_dir / rel_path / "steps.csv"

        try:
            convert_files(ts_file, steps_file, output_ts, output_st)
        except Exception as e:
            print(f"\n❌ Error conver
main function · python · L459-L496 (38 LOC)
scripts/convert_to_new_format.py
def main():
    """Main entry point."""
    if len(sys.argv) < 3:
        print(__doc__)
        sys.exit(1)

    input_path = Path(sys.argv[1])
    output_path = Path(sys.argv[2])

    # Check if we're converting directories or files
    if input_path.is_dir():
        if len(sys.argv) != 3:
            print("Error: When converting directories, provide input_dir output_dir")
            sys.exit(1)
        convert_directory(input_path, output_path)
    else:
        # Individual files
        if len(sys.argv) != 5:
            print(
                "Error: When converting files, provide:\n"
                "  input_time_series.csv input_steps.csv "
                "output_time_series.csv output_steps.csv"
            )
            sys.exit(1)

        input_ts = Path(sys.argv[1])
        input_st = Path(sys.argv[2])
        output_ts = Path(sys.argv[3])
        output_st = Path(sys.argv[4])

        if not input_ts.exists():
            print(f"Error: {input_ts} does not exist")
   
‹ prevpage 4 / 4