Skip to content

Filters Math Module Documentation

This module provides functions for filtering data.

Functions:

Name Description
filter_median

Median filter for multi-dimensional data.

filter_savgol

Savitsky-Golay filter for multi-dimensional data.

filter_lowpass

Lowpass filter for multi-dimensional data.

Lowpass filter for multi-dimensional data for a signal with a given sample frequency and cutoff frequency. The filter is applied to each column of the data.

Parameters:

Name Type Description Default
data Union[ndarray, List[float]]

Data to be filtered.

required
sample_freq_hz Union[int, float]

Sample frequency in Hz. By default 1.0.

1.0
cutoff_freq_hz Union[int, float]

Cutoff frequency in Hz. By default 0.5.

0.1
filter_order int

Filter order. By default 5.

5
causality str

Causality of the filter. By default "noncausal".

'noncausal'

Returns:

Name Type Description
data_filtered ndarray

Filtered data.

Raises:

Type Description
TypeError

If any parameter has an invalid type.

ValueError

If numeric parameters are non-positive or if causality is invalid.

Source code in navlib/math/filters.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def filter_lowpass(
    data: Union[np.ndarray, List[float]],
    sample_freq_hz: Union[int, float] = 1.0,
    cutoff_freq_hz: Union[int, float] = 0.1,
    filter_order: int = 5,
    causality: str = "noncausal",
) -> np.ndarray:
    """
    Lowpass filter for multi-dimensional data for a signal with a given sample
    frequency and cutoff frequency. The filter is applied to each column of the
    data.

    Args:
        data (Union[np.ndarray, List[float]]): Data to be filtered.
        sample_freq_hz (Union[int, float], optional): Sample frequency in Hz. By default 1.0.
        cutoff_freq_hz (Union[int, float], optional): Cutoff frequency in Hz. By default 0.5.
        filter_order (int, optional): Filter order. By default 5.
        causality (str, optional): Causality of the filter. By default "noncausal".

    Returns:
        data_filtered (np.ndarray): Filtered data.

    Raises:
        TypeError: If any parameter has an invalid type.
        ValueError: If numeric parameters are non-positive or if causality is
            invalid.
    Examples:
        >>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        >>> filtered_data = filter_lowpass(data)
    """
    # Convert data to numpy array if list
    if isinstance(data, list):
        data = np.array(data)

    # Check the data type
    if not isinstance(data, np.ndarray):
        raise TypeError("Data must be a numpy array or list")
    if not isinstance(sample_freq_hz, (int, float)):
        raise TypeError("Sample frequency must be a float")
    if not isinstance(cutoff_freq_hz, (int, float)):
        raise TypeError("Cutoff frequency must be a float")
    if not isinstance(filter_order, int):
        raise TypeError("Filter order must be an integer")
    if not isinstance(causality, str):
        raise TypeError("Causality must be a string")

    # Make causality lowercase
    causality = causality.lower()

    # Make sure data is a 2D array
    if data.ndim == 1:
        data = data[:, np.newaxis]

    # Check values
    if sample_freq_hz <= 0:
        raise ValueError("Sample frequency must be greater than zero")
    if cutoff_freq_hz <= 0:
        raise ValueError("Cutoff frequency must be greater than zero")
    if filter_order <= 0:
        raise ValueError("Filter order must be greater than zero")
    if causality not in {"causal", "noncausal", "non-causal", "no causal", "acausal"}:
        raise ValueError(
            "Causality must be 'causal', 'noncausal', 'non-causal', 'no causal', or 'acausal'"
        )

    # Filter Design
    causal = True if causality == "causal" else False
    wn = cutoff_freq_hz / (0.5 * sample_freq_hz)

    if wn >= 1 or wn <= 0:
        raise ValueError("The cutoff frequency cannot exceed 1/2 the sample frequency.")

    # Create nth order butterworth filter
    b_matrix, a_matrix = butter(filter_order, wn, btype="low", analog=False)

    # Apply filter to each column of the data
    if causal:
        # Causal filtering. Initialize the filter state using the initial
        # conditions of the filter.
        zi = lfiltic(b_matrix, a_matrix, data[0, :])
        data_filtered, _ = lfilter(
            b_matrix, a_matrix, data, zi=zi[:, np.newaxis], axis=0
        )
    else:
        # Non-causal filtering. Apply the filter forward and backward in time
        # to remove phase shift.
        data_filtered = filtfilt(b_matrix, a_matrix, data, axis=0)

    return data_filtered.squeeze()

Median filter for multi-dimensional data. Removes data points that are above a threshold, replacing them with nan or interpolated data. The filter is applied to each column of the data.

Parameters:

Name Type Description Default
data Union[ndarray, List[float]]

Data to be filtered.

required
time Union[ndarray, List[float]]

Time vector.

None
window int

Median filter length. By default 13.

13
threshold Union[float, int]

Threshold size defined to remove data. If the threashold is not defined STD of the data error is used. By default None.

None
patch bool

Patch data. Replace removed data with linear interpolation of the points. By default False.

False
remove_outliers bool

Remove outliers. By default False.

False

Returns:

Name Type Description
data_filtered ndarray

Filtered data.

time_filtered ndarray

Filtered time.

Raises:

Type Description
TypeError

If any parameter has an invalid type.

ValueError

If numeric parameters are non-positive, time is not 1D, or if data and time lengths mismatch.

Examples:

>>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
>>> filtered_data, filtered_time = filter_median(data, window=3)
Source code in navlib/math/filters.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def filter_median(
    data: Union[np.ndarray, List[float]],
    time: Union[np.ndarray, List[float]] = None,
    window: int = 13,
    threshold: Union[float, int] = None,
    patch: bool = False,
    remove_outliers: bool = False,
) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
    """
    Median filter for multi-dimensional data. Removes data points that are above
    a threshold, replacing them with nan or interpolated data. The filter is
    applied to each column of the data.

    Args:
        data (Union[np.ndarray, List[float]]): Data to be filtered.
        time (Union[np.ndarray, List[float]]): Time vector.
        window (int, optional): Median filter length. By default 13.
        threshold (Union[float, int], optional): Threshold size defined to remove data. If the
            threashold is not defined STD of the data error is used. By default
            None.
        patch (bool, optional): Patch data. Replace removed data with linear
            interpolation of the points. By default False.
        remove_outliers (bool, optional): Remove outliers. By default False.

    Returns:
        data_filtered (np.ndarray): Filtered data.
        time_filtered (np.ndarray): Filtered time.

    Raises:
        TypeError: If any parameter has an invalid type.
        ValueError: If numeric parameters are non-positive, time is not 1D, or if
            data and time lengths mismatch.

    Examples:
        >>> data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        >>> filtered_data, filtered_time = filter_median(data, window=3)
    """
    # Convert data to numpy array if list
    if isinstance(data, list):
        data = np.array(data)
    if time is not None and isinstance(time, list):
        time = np.array(time)

    # Check the data type
    if not isinstance(data, np.ndarray):
        raise TypeError("Data must be a numpy array or list")
    if time is not None and not isinstance(time, np.ndarray):
        raise TypeError("Time must be a numpy array or list")
    if not isinstance(window, int):
        raise TypeError("Window must be an integer")
    if threshold is not None:
        if not isinstance(threshold, (int, float)):
            raise TypeError("Threshold must be a float or an integer")
        threshold = float(threshold)
    if not isinstance(patch, bool):
        raise TypeError("Patch must be a boolean")
    if not isinstance(remove_outliers, bool):
        raise TypeError("Remove outliers must be a boolean")

    # Make sure data is a 2D array
    if data.ndim == 1:
        data = data[:, np.newaxis]
    # Make sure the time is a 1D array
    if time is not None:
        time = np.squeeze(time)

    # Check values
    if window <= 0:
        raise ValueError("Window must be greater than zero")
    if window % 2 == 0:
        window += 1  # Ensure odd window size for medfilt
    if threshold is not None and threshold <= 0:
        raise ValueError("Threshold must be greater than zero")
    if time is not None:
        if time.ndim != 1:
            raise ValueError("Time must be a 1D array")
        if time.size != data.shape[0]:
            raise ValueError(
                "Time and data must have the same length if single"
                "column or same number of rows if multiple columns"
            )

    # Compute the median filter of the data
    data_filtered = np.zeros(data.shape)
    for ix in range(data.shape[1]):
        data_filtered[:, ix] = medfilt(data[:, ix], window)
    time_filtered = time.copy().astype(float) if time is not None else None

    # Compute residuals and threshold
    if threshold is None:
        residuals = np.abs(data - data_filtered)
        threshold = 3 * np.max(np.std(residuals, axis=0))

        # Identify outliers
        is_bad = np.any(residuals > threshold, axis=1)
        is_good = ~is_bad

        # Handle outliers
        if remove_outliers:
            data_filtered_good = data[is_good]
            time_filtered_good = time[is_good] if time is not None else None
        else:
            data_filtered_good = data_filtered.copy().astype(float)
            data_filtered_good[is_bad] = np.nan
            time_filtered_good = time.copy().astype(float) if time is not None else None
            if time_filtered_good is not None:
                time_filtered_good[is_bad] = np.nan

        # Patch removed data if needed
        if time is not None and patch and remove_outliers:
            # Use external resample function (from your internal library)
            data_filtered = resample(time, time_filtered_good, data_filtered_good)
            time_filtered = time
        else:
            data_filtered = data_filtered_good
            time_filtered = time_filtered_good

    return data_filtered.squeeze(), (
        time_filtered.squeeze() if time_filtered is not None else None
    )

Savitzky-Golay filter for multi-dimensional data. Removes data points that are above a threshold, replacing them with NaN or interpolated data.

Parameters:

Name Type Description Default
data Union[ndarray, List[float]]

Data to be filtered. Shape (N,) or (N, D)

required
time Union[ndarray, List[float]]

Optional time vector (N,)

None
window int

Smoothing window size. If time is provided, interpreted as seconds and converted to samples.

10
polynomial int

Polynomial order for SG filter. Must be less than window.

2
threshold Union[float, int]

Threshold to detect outliers (absolute difference from smoothed). If None, defaults to 3*STD of residuals.

None
patch bool

Whether to patch removed data using interpolation.

True
remove_outliers bool

Whether to remove outliers based on the residual threshold.

False

Returns:

Name Type Description
data_filtered ndarray

Filtered data.

time_filtered ndarray

Filtered time.

Raises:

Type Description
TypeError / ValueError

On invalid input types or shapes.

Source code in navlib/math/filters.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def filter_savgol(
    data: Union[np.ndarray, List[float]],
    time: Union[np.ndarray, List[float]] = None,
    window: int = 10,
    polynomial: int = 2,
    threshold: Union[float, int] = None,
    patch: bool = True,
    remove_outliers: bool = False,
) -> Tuple[np.ndarray, Union[np.ndarray, None]]:
    """
    Savitzky-Golay filter for multi-dimensional data. Removes data points that
    are above a threshold, replacing them with NaN or interpolated data.

    Args:
        data: Data to be filtered. Shape (N,) or (N, D)
        time: Optional time vector (N,)
        window: Smoothing window size. If `time` is provided, interpreted as seconds and converted to samples.
        polynomial: Polynomial order for SG filter. Must be less than window.
        threshold: Threshold to detect outliers (absolute difference from smoothed). If None, defaults to 3*STD of residuals.
        patch: Whether to patch removed data using interpolation.
        remove_outliers: Whether to remove outliers based on the residual threshold.

    Returns:
        data_filtered (np.ndarray): Filtered data.
        time_filtered (np.ndarray): Filtered time.

    Raises:
        TypeError / ValueError: On invalid input types or shapes.
    """
    # Convert to numpy
    if isinstance(data, list):
        data = np.array(data, dtype=float)
    if time is not None and isinstance(time, list):
        time = np.array(time, dtype=float)

    if not isinstance(data, np.ndarray):
        raise TypeError("Data must be a numpy array or list.")
    if time is not None and not isinstance(time, np.ndarray):
        raise TypeError("Time must be a numpy array or list.")
    if not isinstance(window, int) and window <= 0:
        raise ValueError("Window must be a positive integer.")
    if not isinstance(polynomial, int) and polynomial <= 0:
        raise ValueError("Polynomial must be a positive integer.")
    if threshold is not None and (
        not isinstance(threshold, (int, float)) and threshold <= 0
    ):
        raise ValueError("Threshold must be a positive float.")
    if not isinstance(patch, bool):
        raise TypeError("Patch must be a boolean.")
    if not isinstance(remove_outliers, bool):
        raise TypeError("Remove outliers must be a boolean.")

    # Ensure 2D data
    if data.ndim == 1:
        data = data[:, np.newaxis]
    N, D = data.shape

    # Check time shape and match
    if time is not None:
        time = np.squeeze(time)
        if time.ndim != 1:
            raise ValueError("Time must be a 1D array.")
        if time.shape[0] != N:
            raise ValueError("Time and data must have the same number of samples.")

        # Convert smoothing window in seconds to samples
        dt = np.median(np.diff(time))
        window_samples = int(window / dt)
        if window_samples % 2 == 0:
            window_samples += 1
        window = window_samples

    # Final window checks
    if window > N:
        raise ValueError("Window must not exceed number of samples.")
    if window <= polynomial:
        raise ValueError("Window must be greater than polynomial order.")

    # Apply Savitzky-Golay filter
    data_filtered = savgol_filter(
        data, window_length=window, polyorder=polynomial, axis=0
    )
    time_filtered = time.copy().astype(float) if time is not None else None

    # Compute residuals and threshold
    if threshold is not None:
        residuals = np.abs(data - data_filtered)
        threshold = 3 * np.max(np.std(residuals, axis=0))

        # Identify outliers
        is_bad = np.any(residuals > threshold, axis=1)
        is_good = ~is_bad

        # Handle outliers
        if remove_outliers:
            data_filtered_good = data[is_good]
            time_filtered_good = time[is_good] if time is not None else None
        else:
            data_filtered_good = data_filtered.copy().astype(float)
            data_filtered_good[is_bad] = np.nan
            time_filtered_good = time.copy().astype(float) if time is not None else None
            if time_filtered_good is not None:
                time_filtered_good[is_bad] = np.nan

        # Patch removed data if needed
        if time is not None and patch and remove_outliers:
            # Use external resample function (from your internal library)
            data_filtered = resample(time, time_filtered_good, data_filtered_good)
            time_filtered = time
        else:
            data_filtered = data_filtered_good
            time_filtered = time_filtered_good

    return data_filtered.squeeze(), (
        time_filtered.squeeze() if time_filtered is not None else None
    )