Function bodies 175 total
set_cumulative_cycle_number function · python · L247-L264 (18 LOC)ionworksdata/transform.py
def set_cumulative_cycle_number(data: pl.DataFrame, **kwargs) -> pl.DataFrame:
"""
Add a column with the cumulative cycle number to the data.
Parameters
----------
data : pl.DataFrame
The data to add the cycle number to.
kwargs
Additional keyword arguments to pass to get_cumulative_cycle_number.
Returns
-------
pl.DataFrame
The data with the cycle number added.
"""
cycle_series = get_cumulative_cycle_number(data, **kwargs)
return data.with_columns(cycle_series.alias("Cycle number"))set_cycle_count function · python · L267-L289 (23 LOC)ionworksdata/transform.py
def set_cycle_count(data: pl.DataFrame) -> pl.DataFrame:
"""
Assign a cumulative cycle number "Cycle count" to each row in the data by detecting
changes in the "Cycle from cycler" column. If "Cycle from cycler" doesn't exist,
sets all values to 0.
Parameters
----------
data : pl.DataFrame
The data to assign cycle count to.
Returns
-------
pl.DataFrame
The data with "Cycle count" column added.
"""
if "Cycle from cycler" in data.columns:
options = {"method": "cycle column", "cycle column": "Cycle from cycler"}
cycle_series = get_cumulative_cycle_number(data, options)
else:
# Set all to 0 if no cycle information available
cycle_series = pl.Series([0] * data.height)
return data.with_columns(cycle_series.alias("Cycle count"))reset_time function · python · L292-L303 (12 LOC)ionworksdata/transform.py
def reset_time(data: pl.DataFrame) -> pl.DataFrame:
"""
Reset the time to start at zero
Parameters
----------
data : pl.DataFrame
The data to reset the time for.
"""
# Extract first value using numpy (simpler than Polars head/item)
first = data.get_column("Time [s]").to_numpy()[0]
return data.with_columns((pl.col("Time [s]") - pl.lit(first)).alias("Time [s]"))offset_duplicate_times function · python · L306-L358 (53 LOC)ionworksdata/transform.py
def offset_duplicate_times(data: pl.DataFrame, offset: float = 1e-6) -> pl.DataFrame:
"""
Offset duplicate time values by a small amount. This is preferable to
removing the duplicate time values because removing duplicate time values can
lead to missing steps in the data.
Parameters
----------
data : pl.DataFrame
The data to remove duplicate time values from.
offset : float, optional
The amount to offset the duplicate time values by.
"""
t = data.get_column("Time [s]").to_numpy().astype(float)
if len(t) <= 1:
return data
max_iter = 100
while True:
# Sort to make duplicates adjacent (stable sort preserves relative order)
sort_idx = np.argsort(t, kind="stable")
sorted_t = t[sort_idx]
# Find group boundaries (where values change)
is_new_group = np.concatenate([[True], sorted_t[1:] != sorted_t[:-1]])
# Check if there are any duplicates (all True means no duplicat_apply_step_resets function · python · L361-L406 (46 LOC)ionworksdata/transform.py
def _apply_step_resets(
discharge_values: np.ndarray,
charge_values: np.ndarray,
step_numbers: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""
Reset cumulative values to 0 at each step boundary.
Vectorized implementation that subtracts the starting value of each step
from all points within that step, ensuring each step starts at 0.
Parameters
----------
discharge_values : np.ndarray
Cumulative discharge values to reset.
charge_values : np.ndarray
Cumulative charge values to reset.
step_numbers : np.ndarray
Step numbers for each data point.
Returns
-------
tuple[np.ndarray, np.ndarray]
Reset discharge and charge values.
"""
# Detect step boundaries (first row is always a step start)
step_changes = np.concatenate(([True], np.diff(step_numbers) != 0))
# For each row, subtract the cumulative value at the start of its step
step_start_indices = np.where(step_changes)[0]
_split_cumulative_by_direction function · python · L409-L463 (55 LOC)ionworksdata/transform.py
def _split_cumulative_by_direction(
cumulative_values: np.ndarray,
direction_indicator: np.ndarray,
step_numbers: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Split cumulative values into positive and negative components based on
direction indicator (e.g., current or power).
Vectorized implementation that splits a cumulative metric (capacity/energy)
into discharge and charge components based on the sign of a direction
indicator. Resets accumulation at step boundaries if step_numbers provided.
Parameters
----------
cumulative_values : np.ndarray
Cumulative values to split (e.g., capacity or energy).
direction_indicator : np.ndarray
Direction indicator (e.g., current or power). Positive values indicate
discharge, negative indicate charge.
step_numbers : np.ndarray, optional
Step numbers for each data point. If provided, accumulation resets at
each step boundary.
Ret_calculate_capacity function · python · L466-L534 (69 LOC)ionworksdata/transform.py
def _calculate_capacity(
data: pl.DataFrame, options: dict | None = None
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate discharge and charge capacity from the data.
First checks if charge/discharge capacity columns exist. If so, uses them
directly. Otherwise checks if a single "Capacity" column exists and splits
it into discharge and charge based on current direction. Finally, calculates
capacities using cumulative trapezoidal integration if no capacity columns
are found. Resets capacity to zero at each step boundary if step information
is available.
Parameters
----------
data : pl.DataFrame
The data to get the capacity columns from.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
tuple[np.ndarray, np.ndarray]
Discharge capacity and charge capacity as numpy arrays.
"""
current_units, capacity_units = iwdata.util.get_current_and_Repobility — same analyzer, your code, free for public repos · /scan/
set_capacity function · python · L537-L568 (32 LOC)ionworksdata/transform.py
def set_capacity(data: pl.DataFrame, options: dict | None = None) -> pl.DataFrame:
"""
Calculate discharge and charge capacity for the data and assign them to new
columns called "Discharge capacity [A.h]" and "Charge capacity [A.h]"
Drops the single "Capacity [A.h]" column if it exists.
Parameters
----------
data : pl.DataFrame
The data to calculate the capacity for.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
pl.DataFrame
The data with discharge and charge capacity columns added, and single
capacity column removed if it existed.
"""
_, capacity_units = iwdata.util.get_current_and_capacity_units(options)
discharge_cap, charge_cap = _calculate_capacity(data, options)
result = data.with_columns(
[
pl.Series(f"Discharge capacity [{capacity_units}]", discharge_cap),
pl.Series(f"Charge capacity [{capacity__calculate_energy function · python · L571-L645 (75 LOC)ionworksdata/transform.py
def _calculate_energy(
data: pl.DataFrame, options: dict | None = None
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate discharge and charge energy from the data.
First checks if charge/discharge energy columns exist. If so, uses them
directly. Otherwise checks if a single "Energy [W.h]" column exists and splits
it into discharge and charge based on power direction. Finally, calculates
energies using cumulative trapezoidal integration if no energy columns are
found. Resets energy to zero at each step boundary if step information is
available.
Parameters
----------
data : pl.DataFrame
The data to get the energy columns from.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
tuple[np.ndarray, np.ndarray]
Discharge energy and charge energy as numpy arrays.
"""
# Calculate power from current and voltage if Power column doesn't exist
set_energy function · python · L648-L677 (30 LOC)ionworksdata/transform.py
def set_energy(data: pl.DataFrame, options: dict | None = None) -> pl.DataFrame:
"""
Calculate discharge and charge energy for the data and assign them to new
columns called "Discharge energy [W.h]" and "Charge energy [W.h]"
Drops the single "Energy [W.h]" column if it exists.
Parameters
----------
data : pl.DataFrame
The data to calculate the energy for.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
pl.DataFrame
The data with discharge and charge energy columns added, and single
energy column removed if it existed.
"""
discharge_energy, charge_energy = _calculate_energy(data, options)
result = data.with_columns(
[
pl.Series("Discharge energy [W.h]", discharge_energy),
pl.Series("Charge energy [W.h]", charge_energy),
]
)
# Drop single energy column if it exists
if "Energy [W.h]" in resul_calculate_net_capacity function · python · L680-L713 (34 LOC)ionworksdata/transform.py
def _calculate_net_capacity(
data: pl.DataFrame, options: dict | None = None
) -> np.ndarray:
"""
Calculate net capacity (discharge minus charge) from the data.
Net capacity represents the net amount of charge removed from the battery.
Positive values indicate net discharge, negative values indicate net charge.
Parameters
----------
data : pl.DataFrame
The data to calculate net capacity from.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
np.ndarray
Net capacity as a numpy array.
"""
_, capacity_units = iwdata.util.get_current_and_capacity_units(options)
discharge_cap_col = f"Discharge capacity [{capacity_units}]"
charge_cap_col = f"Charge capacity [{capacity_units}]"
# Check if columns exist
if discharge_cap_col in data.columns and charge_cap_col in data.columns:
discharge_cap = data.get_column(discharge_cap_col).to_numpset_net_capacity function · python · L716-L738 (23 LOC)ionworksdata/transform.py
def set_net_capacity(data: pl.DataFrame, options: dict | None = None) -> pl.DataFrame:
"""
Calculate the net capacity for the data and assign it to a new column called
"Capacity [A.h]".
Parameters
----------
data : pl.DataFrame
The data to calculate the net capacity for.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
pl.DataFrame
The data with the net capacity added.
"""
_, capacity_units = iwdata.util.get_current_and_capacity_units(options)
cap_col = f"Capacity [{capacity_units}]"
if cap_col in data.columns:
raise ValueError(f"Column '{cap_col}' already exists in data.")
net_capacity = _calculate_net_capacity(data, options)
return data.with_columns(pl.Series(cap_col, net_capacity))set_nominal_soc function · python · L741-L771 (31 LOC)ionworksdata/transform.py
def set_nominal_soc(
data: pl.DataFrame, cell_metadata: dict, options: dict | None = None
) -> pl.DataFrame:
"""
Calculate the nominal SOC for the data and assign it to a new column called
"Nominal SOC". SOC is calculated based on net capacity (discharge - charge).
Parameters
----------
data : pl.DataFrame
The data to calculate the nominal SOC for. Must have columns
"Discharge capacity [A.h]" and "Charge capacity [A.h]" (or mA.h.cm-2).
If they don't exist, use set_capacity to calculate them first.
cell_metadata : dict
The metadata for the cell. Should have a key "Nominal cell capacity [A.h]" or
"Nominal cell capacity [mA.h.cm-2]"
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
pl.DataFrame
The data with the nominal SOC added.
"""
_, capacity_units = iwdata.util.get_current_and_capacity_units(options)
net_capacityconvert_current_density_to_total_current function · python · L774-L798 (25 LOC)ionworksdata/transform.py
def convert_current_density_to_total_current(
data: pl.DataFrame, metadata: dict
) -> pl.DataFrame:
"""
Convert the current density from mA.cm-2 to A
Parameters
----------
data : pl.DataFrame
The data to convert. Should have a column "Current [mA.cm-2]".
metadata : dict
The metadata for the data. Should have a key "Electrode area [cm2]".
Returns
-------
pl.DataFrame
The data with the current converted to A.
"""
return data.with_columns(
(
pl.col("Current [mA.cm-2]")
* float(metadata["Electrode area [cm2]"])
/ 1000.0
).alias("Current [A]")
).drop(["Current [mA.cm-2]"])convert_total_current_to_current_density function · python · L801-L823 (23 LOC)ionworksdata/transform.py
def convert_total_current_to_current_density(
data: pl.DataFrame, metadata: dict
) -> pl.DataFrame:
"""
Convert the total current from A to mA.cm-2
Parameters
----------
data : pl.DataFrame
The data to convert. Should have a column "Current [A]".
metadata : dict
The metadata for the data. Should have a key "Electrode area [cm2]".
Returns
-------
pl.DataFrame
The data with the current converted to mA.cm-2.
"""
return data.with_columns(
(
pl.col("Current [A]") / float(metadata["Electrode area [cm2]"]) * 1000.0
).alias("Current [mA.cm-2]")
).drop(["Current [A]"])Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
set_positive_current_for_discharge function · python · L826-L919 (94 LOC)ionworksdata/transform.py
def set_positive_current_for_discharge(
data: pl.DataFrame, options: dict | None = None
) -> pl.DataFrame:
"""
Identify whether positive current is charging or discharging, then make sure that
positive current is discharging and negative current is charging
Parameters
----------
data : pl.DataFrame
The data to set the current direction for.
options : dict, optional
Additional options to pass to the function. The default is None.
Returns
-------
pl.DataFrame
The data with the current direction set to positive current is discharging.
"""
options = options or {}
options["method"] = "current sign"
current_units, _ = iwdata.util.get_current_and_capacity_units(options)
# Create a new dataframe with only the columns we need and set the step count
# based on current sign
new_cols = [
c
for c in [
"Time [s]",
f"Current [{current_units}]",
"Voltaremove_outliers function · python · L922-L968 (47 LOC)ionworksdata/transform.py
def remove_outliers(
data: pl.DataFrame,
column: str,
z_threshold: float = 3,
data_range: slice | None = None,
) -> pl.DataFrame:
"""
Remove outliers from the data based on the z-score of a column
Parameters
----------
data : pl.DataFrame
The data to remove outliers from.
column : str
The column to calculate the z-score for.
z_threshold : float, optional
The z-score threshold to use for removing outliers. The default is 3.
data_range : slice, optional
The range of data points to consider for outlier detection.
If None, all points are used. Use Python's slice notation, e.g.,
slice(0, 100) for first 100 points, slice(-100, None) for last 100 points.
Returns
-------
pl.DataFrame
The data with the outliers removed.
"""
# Get unique step numbers
processed_frames: list[pl.DataFrame] = []
for step in data.get_column("Step number").unique().to_list():
stget_current_and_capacity_units function · python · L9-L16 (8 LOC)ionworksdata/util.py
def get_current_and_capacity_units(options: dict | None) -> tuple[str, str]:
options = options or {}
current_format = options.get("current units", "total")
if current_format not in ["total", "density"]:
raise ValueError("Invalid current units option")
current_units = {"total": "A", "density": "mA.cm-2"}[current_format]
capacity_units = {"total": "A.h", "density": "mA.h.cm-2"}[current_format]
return current_units, capacity_unitscheck_for_duplicates function · python · L19-L27 (9 LOC)ionworksdata/util.py
def check_for_duplicates(column_renamings: dict, data: pl.DataFrame) -> None:
duplicates = []
for c, n in column_renamings.items():
if c in data.columns:
if n in duplicates:
message = f"Duplicate columns for {n} found: {c}"
warnings.warn(message, category=UserWarning, stacklevel=2)
else:
duplicates.append(n)check_and_convert_datetime function · python · L30-L47 (18 LOC)ionworksdata/util.py
def check_and_convert_datetime(start_datetime: str | datetime) -> datetime:
"""Check that the datetime is valid and convert to datetime object if necessary."""
if isinstance(start_datetime, str):
start_datetime = datetime.fromisoformat(start_datetime)
# error if the datetime is not timezone aware
if start_datetime.tzinfo is None:
raise ValueError("Start datetime must be timezone aware")
# convert to UTC
start_datetime = start_datetime.astimezone(timezone.utc)
# error if the datetime seems to recent
if start_datetime > datetime.now(timezone.utc):
raise ValueError("Start datetime cannot be in the future")
elif start_datetime > datetime.now(timezone.utc) - timedelta(seconds=1):
raise ValueError("Do not use datetime.now() as the start datetime.")
return start_datetimemonotonic_time_offset function · python · L50-L96 (47 LOC)ionworksdata/util.py
def monotonic_time_offset(
time_points: np.ndarray,
start_time: float,
offset_initial_time: bool | None = None,
) -> np.ndarray:
"""
Returns a time array that is strictly increasing and greater than start_time.
If offset_initial_time is True, the first time is also greater than start_time.
Parameters
----------
time_points : np.ndarray
The time points to offset.
start_time : float
The time to offset the time points by.
offset_initial_time : bool, optional
Whether to offset the initial time. If None, the initial time is offset if it
is not equal to start_time. Default is `True` if the first time is not equal
to start_time, otherwise `False`.
Returns
-------
np.ndarray
The offset time points.
"""
if offset_initial_time is None:
offset_initial_time = time_points[0] != start_time
time_points = np.asarray(time_points, dtype=np.float64)
start_time_np = np.float64convert_steps function · python · L276-L361 (86 LOC)scripts/convert_to_new_format.py
def convert_steps(steps: pl.DataFrame) -> pl.DataFrame:
"""
Convert steps from old format to new format.
Parameters
----------
steps : pl.DataFrame
Steps data in old format with Delta capacity column.
Returns
-------
pl.DataFrame
Steps data in new format with separate discharge/charge columns.
"""
result = steps.clone()
# Check for capacity columns
capacity_units = "A.h" if "Delta capacity [A.h]" in steps.columns else "mA.h.cm-2"
old_delta_col = f"Delta capacity [{capacity_units}]"
if old_delta_col in steps.columns:
print(f" Converting steps column: {old_delta_col}")
# For steps, we need to determine if each step was discharge or charge
# We can use the "Mean current" or "Type" column if available
delta_capacity = steps.get_column(old_delta_col).to_numpy()
if "Mean current [A]" in steps.columns:
mean_current = steps.get_column("Mean current [A]").to_numpy()convert_files function · python · L364-L428 (65 LOC)scripts/convert_to_new_format.py
def convert_files(
input_time_series: Path,
input_steps: Path,
output_time_series: Path,
output_steps: Path,
):
"""Convert a pair of time series and steps files."""
print("\nConverting files:")
print(f" Input time series: {input_time_series}")
print(f" Input steps: {input_steps}")
# Load data
print("\nLoading data...")
time_series = pl.read_csv(input_time_series)
steps = pl.read_csv(input_steps)
print(f" Time series: {time_series.shape[0]} rows, {time_series.shape[1]} columns")
print(f" Steps: {steps.shape[0]} rows, {steps.shape[1]} columns")
# Convert
print("\nConverting time series...")
new_time_series = convert_time_series(time_series)
print("\nConverting steps...")
new_steps = convert_steps(steps)
# Save
print("\nSaving converted data...")
output_time_series.parent.mkdir(parents=True, exist_ok=True)
output_steps.parent.mkdir(parents=True, exist_ok=True)
new_time_series.write_csHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
convert_directory function · python · L431-L456 (26 LOC)scripts/convert_to_new_format.py
def convert_directory(input_dir: Path, output_dir: Path):
"""Convert all time_series.csv and steps.csv files in a directory."""
print(f"\nScanning directory: {input_dir}")
# Find all time_series.csv files
time_series_files = list(input_dir.rglob("time_series.csv"))
print(f"Found {len(time_series_files)} time_series.csv files")
for ts_file in time_series_files:
# Find corresponding steps.csv
steps_file = ts_file.parent / "steps.csv"
if not steps_file.exists():
print(f"\n⚠️ Skipping {ts_file} (no corresponding steps.csv)")
continue
# Create output paths maintaining directory structure
rel_path = ts_file.parent.relative_to(input_dir)
output_ts = output_dir / rel_path / "time_series.csv"
output_st = output_dir / rel_path / "steps.csv"
try:
convert_files(ts_file, steps_file, output_ts, output_st)
except Exception as e:
print(f"\n❌ Error convermain function · python · L459-L496 (38 LOC)scripts/convert_to_new_format.py
def main():
"""Main entry point."""
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
input_path = Path(sys.argv[1])
output_path = Path(sys.argv[2])
# Check if we're converting directories or files
if input_path.is_dir():
if len(sys.argv) != 3:
print("Error: When converting directories, provide input_dir output_dir")
sys.exit(1)
convert_directory(input_path, output_path)
else:
# Individual files
if len(sys.argv) != 5:
print(
"Error: When converting files, provide:\n"
" input_time_series.csv input_steps.csv "
"output_time_series.csv output_steps.csv"
)
sys.exit(1)
input_ts = Path(sys.argv[1])
input_st = Path(sys.argv[2])
output_ts = Path(sys.argv[3])
output_st = Path(sys.argv[4])
if not input_ts.exists():
print(f"Error: {input_ts} does not exist")
‹ prevpage 4 / 4