Function bodies 175 total
DataLoader.copy method · python · L1378-L1386 (9 LOC)ionworksdata/load.py
def copy(self) -> DataLoader:
"""Create a copy of the DataLoader instance."""
return DataLoader.from_processed_data(
data=self._data_pl.clone(),
steps=self._steps_pl.clone() if self._steps_pl is not None else None,
initial_voltage=self.initial_voltage,
start_idx=self._start_idx,
end_idx=self._end_idx,
)OCPDataLoader.__init__ method · python · L1392-L1413 (22 LOC)ionworksdata/load.py
def __init__(self, data, **kwargs):
warnings.warn(
"OCPDataLoader is deprecated. Use DataLoader(data) instead.",
DeprecationWarning,
stacklevel=2,
)
# Map old flat options into transforms
options = kwargs.pop("options", None) or {}
merged = {**options, **kwargs}
transforms = dict(merged.pop("transforms", None) or {})
for key in (
"sort",
"remove_duplicates",
"remove_extremes",
"filters",
"interpolate",
):
if key in merged and key not in transforms:
transforms[key] = merged.pop(key)
if transforms:
merged["transforms"] = transforms
super().__init__(data, steps=None, **merged)OCPDataLoader.from_db method · python · L1416-L1424 (9 LOC)ionworksdata/load.py
def from_db(cls, measurement_id, options=None, use_cache=True, timeout=None):
warnings.warn(
"OCPDataLoader.from_db is deprecated. Use DataLoader.from_db instead.",
DeprecationWarning,
stacklevel=2,
)
return DataLoader.from_db(
measurement_id, options=options, use_cache=use_cache, timeout=timeout
)get_log_level_func function · python · L19-L24 (6 LOC)ionworksdata/logger.py
def get_log_level_func(value_to_log: int):
def func(self, message: str, *args, **kws) -> None:
if self.isEnabledFor(value_to_log):
self._log(value_to_log, message, args, **kws)
return func_get_new_logger function · python · L46-L54 (9 LOC)ionworksdata/logger.py
def _get_new_logger(name: str, filename: str | None = None) -> logging.Logger:
new_logger = logging.getLogger(name)
if filename is None:
handler = logging.StreamHandler()
else:
handler = logging.FileHandler(filename)
handler.setFormatter(LOG_FORMATTER)
new_logger.addHandler(handler)
return new_logger_trapezoid function · python · L24-L50 (27 LOC)ionworksdata/piecewise_linear_timeseries.py
def _trapezoid(y, x=None, dx=1.0, axis=-1):
"""
Compatibility wrapper for numpy trapezoid integration.
Uses np.trapezoid for numpy >= 2.0.0, otherwise falls back to np.trapz.
Parameters
----------
y : array_like
Input array to integrate.
x : array_like, optional
The sample points corresponding to the y values.
dx : scalar, optional
The spacing between sample points when x is None.
axis : int, optional
The axis along which to integrate.
Returns
-------
float or ndarray
Definite integral as approximated by trapezoidal rule.
"""
# Try np.trapezoid first (NumPy >= 2.0), fall back to np.trapz (NumPy < 2.0)
try:
return np.trapezoid(y, x=x, dx=dx, axis=axis)
except AttributeError:
return np.trapz(y, x=x, dx=dx, axis=axis)PiecewiseLinearTimeseries.__init__ method · python · L84-L118 (35 LOC)ionworksdata/piecewise_linear_timeseries.py
def __init__(
self,
t_data: np.ndarray,
y_data: np.ndarray,
atol: float | None = None,
rtol: float | None = None,
name: str | None = None,
options: dict[str, Any] | None = None,
):
if atol is None:
atol = _default_atol()
self.atol = atol
if rtol is None:
rtol = _default_rtol()
self.rtol = rtol
self.name = name or "Piecewise linear timeseries"
# Set options
options = options or {}
default_options = {
"solver_max_save_points": None,
"interactive_preprocessing": False,
"window_max": _default_window_max(),
}
options = iwutil.check_and_combine_options(
default_options, options, filter_unknown=True
)
self.options = options
self.t_data = t_data
self.y_data = y_data
self._linearize()All rows scored by the Repobility analyzer (https://repobility.com)
PiecewiseLinearTimeseries._linearize method · python · L120-L194 (75 LOC)ionworksdata/piecewise_linear_timeseries.py
def _linearize(self) -> None:
"""
Linearizes the time series data based on the provided solver and options.
This method processes the time series data (`t_data` and `y_data`) and
linearizes it according to the specified solver and options. The method
also identifies discontinuities in the data and reduces the number of
save points if necessary.
Attributes:
-----------
t_data : array-like
The time data points.
y_data : array-like
The corresponding data values.
atol : float
Absolute tolerance for the solver.
rtol : float
Relative tolerance for the solver.
options : dict
Dictionary containing various options for preprocessing and solver
settings.
"""
t = self.t_data
y = self.y_data
atol = self.atol
rtol = self.rtol
name = self.name
solver_max_save_points PiecewiseLinearTimeseries.interpolant method · python · L196-L226 (31 LOC)ionworksdata/piecewise_linear_timeseries.py
def interpolant(
self, interpolator: str = "linear", name: str | None = None, **kwargs
) -> pybamm.Interpolant:
r"""
Generate an interpolant for the given sparse time series data.
Parameters
----------
interpolator : str, optional
The type of interpolation to use. Default is "linear".
name : str, optional
The name of the interpolant. Default is the name of the timeseries.
\*\*kwargs
Additional keyword arguments to pass to pybamm.Interpolant.
Returns
-------
pybamm.Interpolant
An interpolant object for the sparse time series data.
"""
name = name or self.name
itp = pybamm.Interpolant(
self.t_sparse,
self.y_sparse,
pybamm.t,
interpolator=interpolator,
name=name,
**kwargs,
)
return itpprocess_input_data function · python · L229-L315 (87 LOC)ionworksdata/piecewise_linear_timeseries.py
def process_input_data(
t: np.ndarray,
y: np.ndarray,
rtol: float,
atol: float,
window_max: int,
) -> np.ndarray:
"""
Process data to find significant segments and points for efficient representation.
This function identifies key points in the data that represent significant changes
or important features, allowing for a more compact representation of the data
while preserving its essential characteristics.
Parameters
----------
t : array-like
Full time array of the data.
y : array-like
Full value array of the data.
rtol : float
Relative tolerance for identifying significant changes.
atol : float
Absolute tolerance for identifying significant changes.
window_max : int
Maximum window size for removing neighboring points.
Returns
-------
array
An array of indices representing key points in the data.
"""
def atol_check(y: np.ndarray, atol: float):
find_input_discontinuities function · python · L318-L357 (40 LOC)ionworksdata/piecewise_linear_timeseries.py
def find_input_discontinuities(
t: np.ndarray,
y: np.ndarray,
atol: float,
rtol: float,
scale_factor: float | None = None,
) -> np.ndarray:
"""
Find discontinuities in the data based on changes in slope.
This function identifies points where the change in slope exceeds a threshold,
which is determined by both absolute and relative tolerances.
Parameters
----------
t : array-like
Time array of the data.
y : array-like
Value array of the data.
atol : float
Absolute tolerance for identifying discontinuities.
rtol : float
Relative tolerance for identifying discontinuities.
scale_factor : float, optional (default=1)
Factor to scale the threshold for identifying discontinuities.
Returns
-------
array
An array of indices where discontinuities are detected.
"""
if scale_factor is None:
scale_factor = 1
t_diff = np.diff(t)
t_diff[t_diff == 0] = afind_contiguous_segments function · python · L360-L389 (30 LOC)ionworksdata/piecewise_linear_timeseries.py
def find_contiguous_segments(mask: np.ndarray) -> list[list[int]]:
"""
Find contiguous segments in a boolean mask.
Parameters
----------
mask : array-like
Boolean mask to find segments in.
Returns
-------
list
List of [start, end] indices for each contiguous segment.
"""
segments = []
start = None
for i, val in enumerate(mask):
if val:
if start is None:
start = i
else:
if start is not None and i >= start + 2:
segments.append([start, i])
start = None
if start is not None and len(mask) >= start + 2:
segments.append([start, len(mask) - 1])
return segmentscalculate_linear_fit_error function · python · L392-L413 (22 LOC)ionworksdata/piecewise_linear_timeseries.py
def calculate_linear_fit_error(t: np.ndarray, y: np.ndarray, atol: float) -> float:
"""
Calculate the error of a linear fit to the data.
Parameters
----------
t : array-like
Time array for the segment.
y : array-like
Value array for the segment.
atol : float
Absolute tolerance for calculations.
Returns
-------
float
Error of the linear fit.
"""
if len(t) <= 2 or t[0] == t[-1]:
return 0
slope = (y[-1] - y[0]) / (t[-1] - t[0])
return _trapezoid(np.abs(y - (slope * (t - t[0]) + y[0])), t)calculate_normalized_linear_fit_error function · python · L416-L439 (24 LOC)ionworksdata/piecewise_linear_timeseries.py
def calculate_normalized_linear_fit_error(
t: np.ndarray, y: np.ndarray, atol: float
) -> float:
"""
Calculate the normalized error of a linear fit to the data.
Parameters
----------
t : array-like
Time array for the segment.
y : array-like
Value array for the segment.
atol : float
Absolute tolerance for calculations.
Returns
-------
float
Normalized error of the linear fit.
"""
numerator = calculate_linear_fit_error(t, y, atol)
if numerator == 0:
return 0
return numerator / (np.abs(_trapezoid(y, t)) + atol)find_linear_segment_end function · python · L442-L467 (26 LOC)ionworksdata/piecewise_linear_timeseries.py
def find_linear_segment_end(
t: np.ndarray, y: np.ndarray, rtol: float, atol: float
) -> int:
"""
Find the end index of a linear segment.
Parameters
----------
t : array-like
Time array for the segment.
y : array-like
Value array for the segment.
rtol : float
Relative tolerance for identifying significant changes.
atol : float
Absolute tolerance for calculations.
Returns
-------
int
Index where the segment stops being linear within tolerance.
"""
for i in range(2, len(t)):
if calculate_normalized_linear_fit_error(t[: i + 1], y[: i + 1], atol) > rtol:
return i - 1
return len(t) - 1Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
find_linear_segments function · python · L470-L503 (34 LOC)ionworksdata/piecewise_linear_timeseries.py
def find_linear_segments(
t: np.ndarray, y: np.ndarray, rtol: float, atol: float
) -> np.ndarray:
"""
Find all linear segments in the data.
Parameters
----------
t : array-like
Time array for the data.
y : array-like
Value array for the data.
rtol : float
Relative tolerance for identifying significant changes.
atol : float
Absolute tolerance for calculations.
Returns
-------
array
Array of indices where linear segments end.
"""
points = [0, len(t) - 1]
start = 0
while True:
seg_end = find_linear_segment_end(t, y, rtol=rtol, atol=atol)
start += seg_end
if start in points:
break
points.append(start)
t = t[seg_end:]
y = y[seg_end:]
return np.sort(points)find_optimal_midpoint function · python · L506-L556 (51 LOC)ionworksdata/piecewise_linear_timeseries.py
def find_optimal_midpoint(t: np.ndarray, y: np.ndarray, N_max: int, atol: float) -> int:
"""
Find the optimal midpoint in a segment that minimizes linear fit error.
Parameters
----------
t : array-like
Time array for the segment.
y : array-like
Value array for the segment.
N_max : int
Maximum number of segments to consider in optimization steps.
atol : float
Absolute tolerance for calculations.
Returns
-------
int
Index of the optimal midpoint.
"""
err_best = np.inf
idx_best = 0
idx_left = 0
idx_right = 0
reduced_segments = len(t) > N_max
if reduced_segments:
segments = np.linspace(0, len(t) - 1, N_max, dtype=int)
else:
segments = np.arange(len(t))
for idx, i in enumerate(segments):
t_vec1 = t[: i + 1]
y_vec1 = y[: i + 1]
t_vec2 = t[i:]
y_vec2 = y[i:]
err = calculate_linear_fit_error(
t_vec1, y_vecalc_r2 function · python · L559-L584 (26 LOC)ionworksdata/piecewise_linear_timeseries.py
def calc_r2(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""
Compute the R^2 (coefficient of determination).
Parameters
----------
y_true : array-like, shape (n,)
Observed (true) values.
y_pred : array-like, shape (n,)
Predicted values.
Returns
-------
r2_weighted : float
The R^2 value.
"""
# Weighted sum of squares of residuals
ss_res = np.sum((y_true - y_pred) ** 2)
# Weighted total sum of squares
ss_tot = np.sum((y_true - np.average(y_true)) ** 2)
# Compute R^2. Add a small number to the denominator to avoid division by zero.
r2 = 1 - ss_res / (ss_tot + 1e-100)
return r2interactive_time_stepping_fit function · python · L587-L657 (71 LOC)ionworksdata/piecewise_linear_timeseries.py
def interactive_time_stepping_fit(
t: np.ndarray, y: np.ndarray, atol: float, rtol: float, window_max: int, name: str
) -> tuple[float, float]:
# Create the plot
fig, ax = plt.subplots()
plt.subplots_adjust(bottom=0.35)
# Plot the raw data (which doesn't change)
ax.plot(t, y, "o", label="Raw data", color="gray", alpha=0.5, ms=4)
# Initial plot for the piecewise linear fit
segments = process_input_data(t, y, atol=atol, rtol=rtol, window_max=window_max)
t_sparse = t[segments]
y_sparse = y[segments]
[line_fit] = ax.plot(
t_sparse, y_sparse, "x-", label="Piecewise linear fit", color="red", lw=2
)
ax.set_xlabel("Time")
ax.set_ylabel("Value")
def make_title(t_sparse: np.ndarray, y_sparse: np.ndarray) -> str:
y_linear = np.interp(t, t_sparse, y_sparse)
R2 = calc_r2(y, y_linear)
title = f"Piecewise linear fit - {100 * (1 - len(t_sparse) / len(t)):.2f}% reduction, {R2:.5f} R²"
return title
_default_atol function · python · L660-L665 (6 LOC)ionworksdata/piecewise_linear_timeseries.py
def _default_atol() -> float:
"""
Default absolute tolerance for the solver. Matches
the `IDAKLUSolver` default.
"""
return 1e-6_default_rtol function · python · L668-L673 (6 LOC)ionworksdata/piecewise_linear_timeseries.py
def _default_rtol() -> float:
"""
Default relative tolerance for the solver. Matches
the `IDAKLUSolver` default.
"""
return 1e-4Biologic._get_file_args method · python · L34-L78 (45 LOC)ionworksdata/read/biologic.py
def _get_file_args(
filename: str | Path, options: dict[str, str] | None = None
) -> tuple[int, str]:
"""
Get file arguments for reading a Biologic file.
Parameters
----------
filename : str | Path
Path to the Biologic file.
options : dict[str, str] | None
Options dict with file_encoding key.
Returns
-------
tuple[int, str]
Tuple of (skiprows, sep).
"""
encoding = options["file_encoding"]
ext = Path(filename).suffix.lower()
with open(filename, encoding=encoding) as f:
lines = f.readlines()
# Determine separator based on file type
if ext == ".mpt":
sep = "\t"
else:
# Auto-detect for other file types
sep = "\t" if any("\t" in line for line in lines[:20]) else ","
# Determine skiprows
# Try to find line with "Nb header lines : int" and extract Biologic._get_column_renamings method · python · L81-L100 (20 LOC)ionworksdata/read/biologic.py
def _get_column_renamings() -> dict[str, str]:
"""
Get standard column renaming mappings for Biologic files.
Returns
-------
dict[str, str]
Dictionary mapping original column names to standardized names.
"""
return {
"Ecell/V": "Voltage [V]",
"Ewe/V": "Voltage [V]",
"<Ewe>/V": "Voltage [V]",
"I/mA": "Current [mA]",
"<I>/mA": "Current [mA]",
"time/s": "Time [s]",
"Ns": "Step from cycler",
"Cycle number": "Cycle from cycler",
"cycle number": "Cycle from cycler",
}Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
Biologic.run method · python · L102-L207 (106 LOC)ionworksdata/read/biologic.py
def run(
self,
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
"""
Read and process data from a BioLogic file.
The following column mappings are applied by default:
- "Ecell/V" -> "Voltage [V]"
- "Ewe/V" -> "Voltage [V]"
- "I/mA" -> "Current [mA]"
- "<I>/mA" -> "Current [mA]"
- "time/s" -> "Time [s]"
- "Ns" -> "Step from cycler"
- "Cycle number" -> "Cycle from cycler"
Additional column mappings can be provided via extra_column_mappings.
Parameters
----------
filename : str
Path to the BioLogic file to be read (.mpt or .txt).
extra_column_mappings : dict of str to str, optional
Dictionary of additional column mappings. Keys are original column
names, values are the new column namBiologic.read_start_time method · python · L209-L251 (43 LOC)ionworksdata/read/biologic.py
def read_start_time(
self,
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> datetime | None:
"""
Read the start time from a BioLogic file.
Parameters
----------
filename : str
Path to the BioLogic file to be read (.mpt or .txt).
options : dict of str to str, optional
Dictionary of options for reading the BioLogic file.
Returns
-------
datetime | None
The start time of the BioLogic file, or None if not found.
"""
options = iwutil.check_and_combine_options(self.default_options, options)
skiprows, sep = self._get_file_args(filename, options)
# Try to load the date column
data = pl.read_csv(
filename,
encoding=options["file_encoding"],
separator=sep,
skip_rows=skiprows,
trunbiologic function · python · L265-L272 (8 LOC)ionworksdata/read/biologic.py
def biologic(
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
return Biologic().run(
filename, extra_column_mappings=extra_column_mappings, options=options
)biologic_mpt function · python · L275-L282 (8 LOC)ionworksdata/read/biologic.py
def biologic_mpt(
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
return BiologicMPT().run(
filename, extra_column_mappings=extra_column_mappings, options=options
)_find_column function · python · L113-L124 (12 LOC)ionworksdata/read/csv.py
def _find_column(
data_columns: list[str], options: list[dict]
) -> tuple[str, float, float]:
"""
Find the first column in a list of options that is present in a DataFrame.
"""
for values_scale_shift in options:
for column in values_scale_shift["values"]:
if column in data_columns:
return column, values_scale_shift["scale"], values_scale_shift["shift"]
else:
raise ValueError(f"Could not find appropriate column out of {options}")CSV.run method · python · L133-L245 (113 LOC)ionworksdata/read/csv.py
def run(
self,
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
"""
Read a CSV file and return a Polars DataFrame with appropriate column names.
Parameters
----------
filename : str | Path
Path to the CSV file to be read.
extra_column_mappings : dict[str, str] | None, optional
Dictionary of additional column mappings to use when reading the CSV file.
The keys are the original column names and the values are the new column
names. Default is None.
options : dict[str, str] | None, optional
Dictionary of options to use when reading the CSV file.
Options are:
- cell_metadata: dict, optional
Additional metadata about the cell. Default is empty dict.
Returns
-------
pl.DataFrame
CSV.read_start_time method · python · L247-L257 (11 LOC)ionworksdata/read/csv.py
def read_start_time(
self,
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> None:
warnings.warn(
"CSV reader does not support reading start time from file",
stacklevel=2,
)
return Nonecsv function · python · L260-L267 (8 LOC)ionworksdata/read/csv.py
def csv(
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
return CSV().run(
filename, extra_column_mappings=extra_column_mappings, options=options
)Repobility (the analyzer behind this table) · https://repobility.com
_is_maccor_excel function · python · L65-L98 (34 LOC)ionworksdata/read/detect.py
def _is_maccor_excel(filename: Path) -> bool:
"""
Check if an Excel file is a Maccor file by examining column headers.
Assumes the first row is always the header row.
Parameters
----------
filename : Path
Path to the Excel file to check.
Returns
-------
bool
True if the file appears to be a Maccor file, False otherwise.
"""
try:
# Import here to avoid circular dependency
df, column_names = read_excel_and_get_column_names(filename)
# Check column headers for Maccor signature
has_step = any("step" in col for col in column_names)
has_time = any(_has_maccor_time_col(col) for col in column_names)
if has_step and has_time:
return True
maccor_col_count = sum(
1 for col in column_names if any(mc in col for mc in MACCOR_COLUMNS)
)
if maccor_col_count >= 3:
return True
return False
except Exception:
return_read_first_lines function · python · L101-L109 (9 LOC)ionworksdata/read/detect.py
def _read_first_lines(filename: Path, num_lines: int = 10) -> list[str]:
"""Read first lines from file, trying multiple encodings."""
for encoding in ["utf-8", "latin1", "ISO-8859-1"]:
try:
with open(filename, encoding=encoding) as f:
return [f.readline() for _ in range(num_lines)]
except UnicodeDecodeError:
continue
return []detect_reader function · python · L112-L184 (73 LOC)ionworksdata/read/detect.py
def detect_reader(filename: str | Path) -> str:
"""
Automatically detect the reader type based on file content.
Parameters
----------
filename : str | Path
Path to the file to detect the reader for.
Returns
-------
str
The detected reader name (e.g., "novonix", "maccor", "neware",
"repower").
Raises
------
ValueError
If the reader type cannot be determined from the file.
"""
filename = Path(filename)
ext = filename.suffix.lower()
# Check for Excel files (Neware or Maccor)
if ext in [".xls", ".xlsx"]:
if _is_neware_excel(filename):
return "neware"
if _is_maccor_excel(filename):
return "maccor"
# Read first few lines to check file signatures
first_lines = _read_first_lines(filename)
first_line = first_lines[0] if first_lines else ""
first_10_lines = "".join(first_lines)
# Check for Novonix: starts with [Summary] and contains "No_is_neware_excel function · python · L187-L224 (38 LOC)ionworksdata/read/detect.py
def _is_neware_excel(filename: Path) -> bool:
"""
Check if an Excel file is a Neware file by examining column headers.
Parameters
----------
filename : Path
Path to the Excel file to check.
Returns
-------
bool
True if the file appears to be a Neware file, False otherwise.
"""
try:
xl_reader = fastexcel.read_excel(filename)
# Check each sheet for Neware column signatures
for sheet_name in xl_reader.sheet_names:
try:
with suppress_excel_dtype_warnings():
df = pl.read_excel(filename, sheet_name=sheet_name)
if df is None or df.height == 0:
continue
columns = df.columns
has_timestamp = any(col in columns for col in NEWARE_TIMESTAMP_COLS)
has_current = any(col in columns for col in NEWARE_CURRENT_COLS)
has_voltage = any(col in columns for col in NEWARE_VOLTAGE_COLMaccor._get_file_args method · python · L36-L92 (57 LOC)ionworksdata/read/maccor.py
def _get_file_args(
filename: str | Path, options: dict[str, str] | None = None
) -> tuple[str, list[int], str, str | None, str | None, bool]:
# Find how many header rows to skip and set the read kwargs based on the file extension
encoding = options["file_encoding"]
thousands = None
is_excel = False
ext = Path(filename).suffix.lower()
if ext in [".xls", ".xlsx"]:
# Excel files - return special flag
is_excel = True
# For Excel, we'll handle header detection separately
return encoding, [], ",", None, None, is_excel
with open(filename, encoding=encoding) as f:
if ext == ".csv":
lines = f.readlines()
# Detect delimiter: some Maccor .csv files are tab-separated (e.g. export)
skiprows = None
sep = ","
units_row = True
comment = "#"
for i, line in enuMaccor.run method · python · L94-L286 (193 LOC)ionworksdata/read/maccor.py
def run(
self,
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
"""
Read and process data from a Maccor file. The following column mappings are applied by default:
- "Voltage", "Volts", "Voltage (V)" -> "Voltage [V]"
- "Current", "Amps", "Current (A)" -> "Current [A]"
- "Prog Time", "Test (Sec)", "Test Time (sec)" -> "Time [s]"
- "Test Time (Hr)" -> "Time [h]"
- "Cycle", "Cyc#", "Cycle ID", "Cycle P" -> "Cycle from cycler"
- "Step", "Step ID" -> "Step from cycler"
- "LogTemp001", "Temperature (°C)", "EVTemp (C)" -> "Temperature [degC]"
- "Status", "State", "MD" -> "Status"
- "Capacity (Ah)", "Capacity (AHr)", "Cap. (Ah)" -> "Capacity [A.h]"
- "Energy (Wh)", "Energy (WHr)" -> "Energy [W.h]"
- "Chg Capacity (Ah)", "Chg Maccor._parse_timestamp_column method · python · L289-L334 (46 LOC)ionworksdata/read/maccor.py
def _parse_timestamp_column(data: pl.DataFrame) -> pl.DataFrame:
"""
Parse Timestamp column and compute Time [s] if needed.
Parameters
----------
data : pl.DataFrame
Input dataframe with potential "Timestamp" column.
Returns
-------
pl.DataFrame
Dataframe with parsed timestamps and computed Time [s] if applicable.
"""
if "Timestamp" not in data.columns:
return data
# Parse datetime with multiple format attempts
data = data.with_columns(
pl.coalesce(
# Try MM/DD/YYYY HH:MM:SS format (common for Maccor DPT)
pl.col("Timestamp").str.strptime(
pl.Datetime, format="%m/%d/%Y %H:%M:%S", strict=False
),
# Try YYYY-MM-DD HH:MM:SS format
pl.col("Timestamp").str.strptime(
pl.Datetime, format="%Y-%m-%d %H:%M:%S", strict=False
Maccor._validate_and_fix_time method · python · L336-L404 (69 LOC)ionworksdata/read/maccor.py
def _validate_and_fix_time(
self, data: pl.DataFrame, time_offset_fix: float
) -> pl.DataFrame:
"""
Validate that time is strictly increasing and optionally fix it.
Parameters
----------
data : pl.DataFrame
Input dataframe with "Time [s]" column.
time_offset_fix : float
Minimum time difference to enforce when fixing.
If -1, raises ValueError. If >= 0, ensures all time differences are at least this value.
Returns
-------
pl.DataFrame
Dataframe with validated or fixed time.
Raises
------
ValueError
If time is not strictly increasing and time_offset_fix is -1.
"""
if "Time [s]" not in data.columns:
return data
# Vectorized check: compute differences between consecutive times
time_col = data["Time [s]"]
time_diff = time_col.diff() # time[i] - time[i-1]
# CAll rows scored by the Repobility analyzer (https://repobility.com)
Maccor._fix_unsigned_current method · python · L406-L454 (49 LOC)ionworksdata/read/maccor.py
def _fix_unsigned_current(self, data: pl.DataFrame) -> pl.DataFrame:
"""
Fix unsigned current by flipping sign during charge if needed.
If both "D" (discharge) and "C" (charge) are in the "Status" column
and the current is always positive, then the current isn't signed,
so we need to flip it during charge.
Parameters
----------
data : pl.DataFrame
Input dataframe with potential "Status" and "Current [A]" columns.
Returns
-------
pl.DataFrame
Dataframe with current sign corrected if needed.
"""
if "Status" not in data.columns or "Current [A]" not in data.columns:
return data
statuses = set(data.select(pl.col("Status").unique()).to_series().to_list())
if "D" not in statuses or "C" not in statuses:
return data
# Ensure numeric current
data = self._coerce_numeric(data, "Current [A]")
c_min =Maccor._get_column_renamings method · python · L457-L519 (63 LOC)ionworksdata/read/maccor.py
def _get_column_renamings(options: dict[str, Any] | None = None) -> dict[str, str]:
"""
Get standard column renaming mappings for Maccor files.
Parameters
----------
options : dict, optional
Options dict. If options["skip_capacity_columns"] is True,
capacity and energy column mappings are excluded, forcing
ionworksdata to compute them from current/power integration.
Returns
-------
dict[str, str]
Dictionary mapping original column names to standardized names.
"""
renamings = {
"Voltage": "Voltage [V]",
"Volts": "Voltage [V]",
"Voltage (V)": "Voltage [V]",
"Current": "Current [A]",
"Amps": "Current [A]",
"Current (A)": "Current [A]",
"Prog Time": "Time [s]",
"Test (Sec)": "Time [s]",
"Test Time (sec)": "Time [s]",
"Test Time (Hr)": "TimeMaccor._parse_excel_duration method · python · L522-L545 (24 LOC)ionworksdata/read/maccor.py
def _parse_excel_duration(duration_str: str) -> float | None:
"""
Parse Excel duration format :D:HH:MM:SS to total seconds.
Parameters
----------
duration_str : str
Duration string in format ":D:HH:MM:SS"
Returns
-------
float | None
Total seconds, or None if parsing fails.
"""
if not duration_str.startswith(":"):
return None
parts = duration_str[1:].split(":")
if len(parts) != 4:
return None
try:
days, hours, minutes, seconds = map(int, parts)
return float(days * 86400 + hours * 3600 + minutes * 60 + seconds)
except (ValueError, TypeError):
return NoneMaccor._process_test_time_column method · python · L547-L598 (52 LOC)ionworksdata/read/maccor.py
def _process_test_time_column(
self, data: pl.DataFrame, column_renamings: dict[str, str]
) -> tuple[pl.DataFrame, dict[str, str]]:
"""
Process "Test Time" column and determine its format.
Handles three formats:
1. Excel duration (":D:HH:MM:SS") -> converts to seconds
2. Datetime strings (contains "/" or "-") -> maps to Timestamp
3. Numeric values -> leaves as-is
Parameters
----------
data : pl.DataFrame
Input dataframe with potential "Test Time" column.
column_renamings : dict[str, str]
Column renaming dictionary to update.
Returns
-------
tuple[pl.DataFrame, dict[str, str]]
Updated dataframe and column_renamings dict.
"""
if "Test Time" not in data.columns:
return data, column_renamings
# Sample first non-null value to determine type
sample = (
data.select(pl.col("Test TiMaccor._read_excel_file method · python · L600-L627 (28 LOC)ionworksdata/read/maccor.py
def _read_excel_file(self, filename: str | Path, encoding: str) -> pl.DataFrame:
"""
Read Maccor data from an Excel file (.xls or .xlsx).
Parameters
----------
filename : str | Path
Path to the Excel file.
encoding : str
File encoding (not used for Excel but kept for consistency).
Returns
-------
pl.DataFrame
Raw data from Excel file with header row identified.
"""
# Read Excel file - first row is always the header
xl_reader = fastexcel.read_excel(filename)
sheet_names = xl_reader.sheet_names
# Read the first sheet, assuming first row is header
# Suppress pandas dtype warning when reading Excel (printed to stderr)
with suppress_excel_dtype_warnings():
data, _ = read_excel_and_get_column_names(
filename, sheet_name=sheet_names[0]
)
return dataMaccor.read_header method · python · L629-L656 (28 LOC)ionworksdata/read/maccor.py
def read_header(
self, filename: str | Path, options: dict[str, str] | None = None
) -> str:
"""
Read the header from a Maccor file.
"""
options = iwutil.check_and_combine_options(self.default_options, options)
encoding, skiprows, _, _, _, is_excel = self._get_file_args(filename, options)
if is_excel:
# For Excel files, first row is always the header
xl_reader = fastexcel.read_excel(filename)
sheet_names = xl_reader.sheet_names
# Suppress pandas dtype warning when reading Excel (printed to stderr)
with suppress_excel_dtype_warnings():
df_raw = pl.read_excel(filename, sheet_name=sheet_names[0])
# Return header row as string (column names are the header)
return "\t".join(str(col) for col in df_raw.columns)
else:
with open(filename, encoding=encoding) as f:
if len(skiprows) == 1:
Maccor.read_start_time method · python · L658-L721 (64 LOC)ionworksdata/read/maccor.py
def read_start_time(
self,
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> datetime | None:
"""
Read the start time from a Maccor file.
Parameters
----------
filename : str | Path
Path to the Maccor file to be read. Supports:
- .txt files (tab-separated)
- .csv files (comma-separated with units row)
- .xls/.xlsx files (Excel format)
- Files with .+3digits extension (e.g., .123, .456)
options : dict of str to str, optional
See :func:`ionworksdata.read.Maccor.run`.
Returns
-------
datetime | None
The start time of the Maccor file, or None if not found.
"""
options = iwutil.check_and_combine_options(self.default_options, options)
# Load the header row
start_datetime = None
header_textmaccor function · python · L724-L731 (8 LOC)ionworksdata/read/maccor.py
def maccor(
filename: str | Path,
extra_column_mappings: dict[str, str] | None = None,
options: dict[str, str] | None = None,
) -> pl.DataFrame:
return Maccor().run(
filename, extra_column_mappings=extra_column_mappings, options=options
)Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
Neware._read_file_data method · python · L33-L95 (63 LOC)ionworksdata/read/neware.py
def _read_file_data(
self, filename: str | Path, sheets: dict | None = None
) -> pl.DataFrame:
"""Read data from CSV or Excel with Polars, optional sheet filtering."""
filename = Path(filename)
if filename.suffix.lower() in [".xls", ".xlsx"]:
# Read Excel file with Polars
if sheets is None:
# No sheet specification - read the first sheet
df_pl = pl.read_excel(filename)
# Cast raw numeric columns to Float64 to handle type inference issues
df_pl = self._coerce_numeric_columns(
df_pl, columns=self._raw_numeric_columns
)
return df_pl
# Get all sheet names in the Excel file (using fastexcel for sheet discovery)
xl_reader = fastexcel.read_excel(filename)
available_sheets = xl_reader.sheet_names
# Determine which sheets to read based on specification
sheNeware._get_sheets_to_read method · python · L97-L164 (68 LOC)ionworksdata/read/neware.py
def _get_sheets_to_read(
self, sheets: dict, available_sheets: list[str]
) -> list[str]:
"""Parse sheet specification and return list of sheet names to read."""
if not isinstance(sheets, dict):
raise ValueError(
"'sheets' must be a dictionary with 'type' and 'value' keys"
)
if "type" not in sheets:
raise ValueError("'sheets' dict must contain 'type' key")
sheet_type = sheets["type"]
sheet_value = sheets.get("value")
if sheet_type == "name":
if sheet_value is None:
raise ValueError(
"For 'name' type, 'value' must be a sheet name or list of sheet names"
)
# Convert single string to list for uniform processing
if isinstance(sheet_value, str):
sheet_names = [sheet_value]
elif isinstance(sheet_value, list):
sheet_names = sheet_value
Neware._apply_column_renamings method · python · L166-L194 (29 LOC)ionworksdata/read/neware.py
def _apply_column_renamings(
self, data: pl.DataFrame, extra_column_mappings: dict[str, str] | None = None
) -> tuple[pl.DataFrame, dict[str, str]]:
"""Apply column renamings to Neware files data."""
column_renamings = {
"Current (mA)": "Current [mA]",
"Cur(mA)": "Current [mA]",
"Current (A)": "Current [A]",
"Current(A)": "Current [A]",
"Voltage (V)": "Voltage [V]",
"Voltage(V)": "Voltage [V]",
"Temperature 1 (degC)": "Temperature [degC]",
"Step ID": "Step from cycler",
"Step": "Step from cycler",
"Cycle ID": "Cycle from cycler",
"Cycle": "Cycle from cycler",
"Status": "Status",
"DateTime": "Timestamp",
"Absolute Time": "Timestamp",
"Date(h:min:s.ms)": "Timestamp",
}
column_renamings.update(extra_column_mappings or {})
# Validate duplicates (check_for_du