Skip to content

Attitude Estimation Module Documentation

This module provides functions to estimate the attitude of a vehicle using different algorithms and sensors.

Functions:

Name Description
so3_integrator

Integrate the orientation of a frame using the exponential coordinate representation of rotations.

ahrs_raw_rp

Compute the pitch and roll angles from raw accelerometer measurements.

ahrs_raw_hdg

Compute the heading angle from raw magnetic field measurements.

ahrs_raw_rph

Compute the roll, pitch, and heading angles from raw accelerometer and magnetic field measurements.

ahrs_complementary_filter

Estimate the attitude of a vehicle using a complementary filter.

ahrs_mahony_filter

Estimate the attitude of a vehicle using the Mahony filter.

ahrs_hua_filter

Estimate the attitude of a vehicle using the Hua filter.

ahrs_madgwick_filter

Estimate the attitude of a vehicle using the Madgwick filter.

Estimates the attitude of a vehicle using a complementary filter. The filter fuses accelerometer and gyroscope, and optionally magnetometer data to estimate the roll, pitch, and heading angles.

For the pitch and roll estimation, we use the accelerometer data, which is assumed to be aligned with the gravity vector:

\[ \text{roll} = \arctan2(-a_y, -a_z) \quad\quad\quad \text{pitch} = \arctan2(a_x, \sqrt{a_y^2 + a_z^2}) \]

The gyroscope integration is performed in the lie algebra of the special orthogonal group SO(3) using the Rodrigues' formula:

\[ R_{new} = R_{old} \cdot \exp([\omega]_\times \cdot dt) \]

Parameters:

Name Type Description Default
angular_rate Union[ndarray, List[float]]

Angular rate measurements (gyroscope data) in rad/s. Should be a 3xN or Nx3 array where N is the number of samples.

required
acceleration Union[ndarray, List[float]]

Acceleration measurements in m/s^2. Should be a 3xN or Nx3 array where N is the number of samples.

required
time Union[ndarray, List[float]]

Time vector in seconds. Should be a 1D array of length N.

required
magnetic_field Union[ndarray, List[float]]

Magnetic field measurements in microteslas. Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.

None
gain Union[float, int]

Gain for the complementary filter. Should be between 0 and 1. With 1 meaning only accelerometer and magnetometer (if available) data is used, and 0 meaning only gyroscope data is used. Defaults to 0.9.

0.1
rph0 Union[ndarray, List[float]]

Initial roll, pitch, and heading angles in radians.

None

Returns:

Name Type Description
rph_rad ndarray

Estimated roll, pitch, and heading (yaw) angles in radians. The output is an Nx3 array where N is the number of samples.

Raises:

Type Description
TypeError

If any of the inputs are not of the expected type.

ValueError

If any of the inputs do not have the expected dimensions.

Source code in navlib/nav/attitude_estimation.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def ahrs_complementary_filter(
    angular_rate: Union[np.ndarray, List[float]],
    acceleration: Union[np.ndarray, List[float]],
    time: Union[np.ndarray, List[float]],
    magnetic_field: Union[np.ndarray, List[float]] = None,
    gain: Union[float, int] = 0.1,
    rph0: Union[np.ndarray, List[float]] = None,
) -> np.ndarray:
    """
    Estimates the attitude of a vehicle using a complementary filter. The filter fuses accelerometer and gyroscope,
    and optionally magnetometer data to estimate the roll, pitch, and heading angles.

    For the pitch and roll estimation, we use the accelerometer data, which is assumed to be aligned with the
    gravity vector:

    $$ \\text{roll} = \\arctan2(-a_y, -a_z) \\quad\\quad\\quad \\text{pitch} = \\arctan2(a_x, \\sqrt{a_y^2 + a_z^2}) $$

    The gyroscope integration is performed in the lie algebra of the special orthogonal group SO(3) using the
    Rodrigues' formula:

    $$ R_{new} = R_{old} \\cdot \\exp([\\omega]_\\times \\cdot dt) $$

    Args:
        angular_rate (Union[np.ndarray, List[float]]): Angular rate measurements (gyroscope data) in rad/s.
            Should be a 3xN or Nx3 array where N is the number of samples.
        acceleration (Union[np.ndarray, List[float]]): Acceleration measurements in m/s^2.
            Should be a 3xN or Nx3 array where N is the number of samples.
        time (Union[np.ndarray, List[float]]): Time vector in seconds. Should be a 1D array of length N.
        magnetic_field (Union[np.ndarray, List[float]], optional): Magnetic field measurements in microteslas.
            Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.
        gain (Union[float, int], optional): Gain for the complementary filter. Should be between 0 and 1. With 1 meaning
            only accelerometer and magnetometer (if available) data is used, and 0 meaning only gyroscope data is used.
            Defaults to 0.9.
        rph0 (Union[np.ndarray, List[float]], optional): Initial roll, pitch, and heading angles in radians.

    Returns:
        rph_rad (np.ndarray): Estimated roll, pitch, and heading (yaw) angles in radians.
            The output is an Nx3 array where N is the number of samples.

    Raises:
        TypeError: If any of the inputs are not of the expected type.
        ValueError: If any of the inputs do not have the expected dimensions.
    """
    # Convert lists to numpy arrays if necessary
    if isinstance(acceleration, list):
        acceleration = np.array(acceleration)
    if isinstance(angular_rate, list):
        angular_rate = np.array(angular_rate)
    if isinstance(time, list):
        time = np.array(time)
    if magnetic_field is not None and isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if rph0 is not None and isinstance(rph0, list):
        rph0 = np.array(rph0)

    # Validate inputs
    if not isinstance(acceleration, np.ndarray):
        raise TypeError("The acceleration must be a numpy array or a list.")
    if not isinstance(angular_rate, np.ndarray):
        raise TypeError("The angular rate must be a numpy array or a list.")
    if not isinstance(time, np.ndarray):
        raise TypeError("The time must be a numpy array or a list.")
    if magnetic_field is not None and not isinstance(magnetic_field, np.ndarray):
        raise TypeError("The magnetic field must be a numpy array or a list.")
    if rph0 is not None and not isinstance(rph0, np.ndarray):
        raise TypeError(
            "The initial roll, pitch, and heading must be a numpy array or a list."
        )

    # Validate dimensions
    if acceleration.ndim != 2 or (
        acceleration.shape[0] != 3 and acceleration.shape[1] != 3
    ):
        raise ValueError("The acceleration must be a 3xN or Nx3 numpy array.")
    if angular_rate.ndim != 2 or (
        angular_rate.shape[0] != 3 and angular_rate.shape[1] != 3
    ):
        raise ValueError("The angular rate must be a 3xN or Nx3 numpy array.")
    if magnetic_field is not None:
        if magnetic_field.ndim != 2 or (
            magnetic_field.shape[0] != 3 and magnetic_field.shape[1] != 3
        ):
            raise ValueError("The magnetic field must be a 3xN or Nx3 numpy array.")
    if rph0 is not None:
        if rph0.ndim != 1 or rph0.shape[0] != 3:
            raise ValueError(
                "The initial roll, pitch, and heading must be a 1D numpy array with 3 elements."
            )

    # Ensure time is a 1D array
    time = time.squeeze()
    if time.ndim >= 2:
        raise ValueError("The time must be a (n, ), (n, 1) or (1, n) numpy array.")

    # Force Nx3 shape for acceleration, angular_rate, and magnetic_field
    if acceleration.shape[0] == 3 and acceleration.shape[1] != 3:
        acceleration = acceleration.T
    if angular_rate.shape[0] == 3 and angular_rate.shape[1] != 3:
        angular_rate = angular_rate.T
    if (
        magnetic_field is not None
        and magnetic_field.shape[0] == 3
        and magnetic_field.shape[1] != 3
    ):
        magnetic_field = magnetic_field.T

    # Validate initial_heading and gain
    if not isinstance(gain, (float, int)):
        raise TypeError("The gain must be a float or integer.")
    if gain < 0 or gain > 1:
        raise ValueError("The gain must be between 0 and 1.")
    gain = float(gain)

    # Compute the attitude from accelerometer and magnetic field, if available
    if magnetic_field is None:
        rph = np.zeros_like(acceleration)
        rph[:, :2] = ahrs_raw_rp(acceleration)
    else:
        rph = ahrs_raw_rph(magnetic_field, acceleration)

    # If an initial attitude is provided, remove the offset from the computed attitude
    # and add it to the initial attitude.
    if rph0 is not None:
        rph = wrap1pi(remove_offset(unwrap1pi(rph), rph[0] - rph0))

    # If the gain is 0, return the computed attitude
    if gain == 1:
        return rph

    # Complementary Filter - In the SO(3) manifold
    estimated_rph = np.apply_along_axis(rph2rot, 1, rph)
    for ix in range(1, angular_rate.shape[0]):
        # Measurements
        w = angular_rate[ix, :]
        dt = time[ix] - time[ix - 1]

        # Angular rates so(3) Integration based on Rodrigues' formula
        rot_mat_old = estimated_rph[ix - 1]
        rot_mat_new = rot_mat_old @ scipy.linalg.expm(vec_to_so3(w) * dt)

        # Fuse with the accelerometer and magnetometer RPH estimation
        if magnetic_field is None:
            estimated_rph[ix] = rot_mat_new
        else:
            rot_mat_error = rot_mat_new.T @ estimated_rph[ix]
            rot_mat_delta = scipy.linalg.expm(gain * matrix_log3(rot_mat_error))
            estimated_rph[ix] = rot_mat_new @ rot_mat_delta

    return np.array([rot2rph(rot) for rot in estimated_rph])

Corrects the magnetic field measurements for hard-iron and soft-iron distortions.

The magnetic field measurements are corrected using the following formula: magnetic_field_corrected = (soft_iron^-1 @ (magnetic_field - hard_iron)).T

Parameters:

Name Type Description Default
magnetic_field Union[ndarray, List[float]]

Magnetic field measurements.

required
hard_iron Union[ndarray, List[float]]

Hard-iron distortion vector.

required
soft_iron Union[ndarray, List[float]]

Soft-iron distortion matrix.

eye(3, dtype=float64)

Returns:

Name Type Description
corrected_magfield ndarray

Corrected magnetic field measurements.

Source code in navlib/nav/attitude_estimation.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def ahrs_correct_magfield(
    magnetic_field: Union[np.ndarray, List[float]],
    hard_iron: Union[np.ndarray, List[float]],
    soft_iron: Union[np.ndarray, List[float]] = np.eye(3, dtype=np.float64),
):
    """
    Corrects the magnetic field measurements for hard-iron and soft-iron distortions.

    The magnetic field measurements are corrected using the following formula:
    magnetic_field_corrected = (soft_iron^-1 @ (magnetic_field - hard_iron)).T

    Args:
        magnetic_field (Union[np.ndarray, List[float]]): Magnetic field measurements.
        hard_iron (Union[np.ndarray, List[float]]): Hard-iron distortion vector.
        soft_iron (Union[np.ndarray, List[float]], optional): Soft-iron distortion matrix.

    Returns:
        corrected_magfield (np.ndarray): Corrected magnetic field measurements.
    """
    # Convert to numpy arrays
    if isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if isinstance(hard_iron, list):
        hard_iron = np.array(hard_iron)
    if isinstance(soft_iron, list):
        soft_iron = np.array(soft_iron)

    # Check inputs
    if not isinstance(magnetic_field, np.ndarray):
        raise TypeError("The magnetic field must be a numpy array or list.")
    if not isinstance(hard_iron, np.ndarray):
        raise TypeError("The hard iron must be a numpy array or list.")
    if not isinstance(soft_iron, np.ndarray):
        raise TypeError("The soft iron must be a numpy array or list.")

    # Check shape
    if magnetic_field.ndim != 2 or (
        magnetic_field.shape[0] != 3 and magnetic_field.shape[1] != 3
    ):
        raise ValueError("The magnetic field must be a 3xN or Nx3 numpy array.")
    hard_iron = hard_iron.squeeze()
    if hard_iron.ndim != 1 or hard_iron.shape[0] != 3:
        raise ValueError("The hard iron must be a 1x3 numpy array.")
    if soft_iron.ndim != 2 or (soft_iron.shape[0] != 3 and soft_iron.shape[1] != 3):
        raise ValueError("The soft iron must be a 3x3 numpy array.")

    # Check if the magnetic field and hard iron are 3xN or Nx3 matrices
    if magnetic_field.shape[0] == 3 and magnetic_field.shape[1] != 3:
        magnetic_field = magnetic_field.T

    # Check if the soft iron is a symmetric matrix
    if not np.allclose(soft_iron, soft_iron.T, atol=1e-8):
        raise ValueError("The soft iron matrix must be symmetric.")

    # Check if the soft-iron is a positive definite matrix
    try:
        np.linalg.cholesky(soft_iron)
    except np.linalg.LinAlgError:
        raise ValueError("The soft iron matrix must be positive definite.")

    # Correct the magnetic field
    magnetic_field_corrected = magnetic_field.copy()[..., np.newaxis]
    N = magnetic_field.shape[0]
    magnetic_field_corrected = magnetic_field_corrected - np.tile(
        hard_iron.reshape(3, 1), (N, 1, 1)
    )
    magnetic_field_corrected = (
        np.tile(np.linalg.inv(soft_iron), (N, 1, 1)) @ magnetic_field_corrected
    ).squeeze()

    return magnetic_field_corrected

Estimates the attitude of a vehicle using the Hua filter.

This estimator proposed by Robert Mahony et al. [Mahony2008] is formulated as a deterministic kinematic observer on the Special Orthogonal group SO(3) driven by an instantaneous attitude and angular velocity measurements. The implementation includes modifications by Hua et al. [Hua2014] for measurement decoupling and anti-windup gyro-bias compensation.

k1, k2, k3, k4, and kb tuning: Obtained from values proposed in [Hua2014].

Parameters:

Name Type Description Default
angular_rate Union[ndarray, List[float]]

Angular rate measurements (gyroscope data) in rad/s. Should be a 3xN or Nx3 array where N is the number of samples.

required
acceleration Union[ndarray, List[float]]

Acceleration measurements in m/s^2. Should be a 3xN or Nx3 array where N is the number of samples.

required
time Union[ndarray, List[float]]

Time vector in seconds. Should be a 1D array of length N.

required
magnetic_field Union[ndarray, List[float]]

Magnetic field measurements in microteslas. Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.

None
reference_magnetic_field Union[ndarray, List[float]]

Reference magnetic field vector in any units. Should be a 1D array with 3 elements. Defaults to None.

None
rph0 Union[ndarray, List[float]]

Initial roll, pitch, and heading angles in radians.

None
k1 Union[float, int]

Gain for the accelerometer measurements on the rotation estimation. As recommended in [Hua2014], defaults to 1.0.

1.0
k2 Union[float, int]

Gain for the magnetic field measurements on the rotation estimation. As recommended in [Hua2014], defaults to 1/5 of k1, which is 0.2. However, if the magnetic field is is highly affected by external disturbances, it is recommended to increase the ratio to 1/10 or 1/20.

0.2
k3 Union[float, int]

Gain for the accelerometer measurements on the gyroscope bias estimation. As recommended in [Hua2014], defaults to k1/32, which is 0.03.

0.03
k4 Union[float, int]

Gain for the magnetic field measurements on the gyroscope bias estimation. As recommended in [Hua2014], defaults to k2/32, which is 0.006. If k2 is increased for robustness against disturbances, this value should be updated accordingly.

0.006
kb Union[float, int]

Anti-windup gain. As recommended in [Hua2014], defaults to 16.0.

16.0
delta Union[float, int]

Saturation limit for the gyro biases. As recommended in [Hua2014], to bound the gyro biases to 1 deg/s, defaults to 0.003.

0.03
max_dt_factor_s Union[float, int]

Factor to determine the maximum allowed time step between samples for a soft-reset in seconds. The threshold will be computed as max_dt_factor_s * median(dt). Defaults to 5.0.

5.0
max_dt_reinit_s Union[float, int]

Maximum allowed time step between samples for a hard-reset in seconds. Defaults to 60.0.

60.0

Returns:

Name Type Description
rph_filtered ndarray

Estimated roll, pitch, and heading (yaw) angles in radians. The output is an Nx3 array.

angrate_filtered ndarray

Bias compensated angular rates in rad/s. The output is an Nx3 array.

Raises:

Type Description
TypeError

If any of the inputs are not of the expected type.

ValueError

If any of the inputs do not have the expected dimensions.

Warns:

Type Description
UserWarning

Issued when time jump handling is triggered: - Non-increasing time steps (dt <= 0): State held constant - Moderate time gaps (soft reset): Attitude and biases held for re-convergence - Large time gaps (hard reset): Attitude and biases reinitialized from sensor data

References

Mahony, R., Hamel, T., & Pflimlin, J. M. (2008). Nonlinear complementary filters on the special orthogonal group. IEEE Transactions on automatic control, 53(5), 1203-1218. Hua, M. D., Ducard, G., Hamel, T., Mahony, R., & Rudin, K. (2014). Implementation of Nonlinear attitude estimator for Aerial Robotic Vehicles. IEEE Transactions on Control Systems Technology, Volumes, 22(1), 2972-2978. January 2014.

Source code in navlib/nav/attitude_estimation.py
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
def ahrs_hua_filter(
    angular_rate: Union[np.ndarray, List[float]],
    acceleration: Union[np.ndarray, List[float]],
    time: Union[np.ndarray, List[float]],
    magnetic_field: Union[np.ndarray, List[float]] = None,
    reference_magnetic_field: Union[np.ndarray, List[float]] = None,
    rph0: Union[np.ndarray, List[float]] = None,
    k1: Union[float, int] = 1.0,
    k2: Union[float, int] = 0.2,
    k3: Union[float, int] = 0.03,
    k4: Union[float, int] = 0.006,
    kb: Union[float, int] = 16.0,
    delta: Union[float, int] = 0.03,
    max_dt_factor_s: Union[float, int] = 5.0,
    max_dt_reinit_s: Union[float, int] = 60.0,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Estimates the attitude of a vehicle using the Hua filter.

    This estimator proposed by Robert Mahony et al. [Mahony2008] is formulated as a deterministic kinematic observer
    on the Special Orthogonal group SO(3) driven by an instantaneous attitude and angular velocity measurements.
    The implementation includes modifications by Hua et al. [Hua2014] for measurement decoupling and anti-windup
    gyro-bias compensation.

    k1, k2, k3, k4, and kb tuning: Obtained from values proposed in [Hua2014].

    Args:
        angular_rate (Union[np.ndarray, List[float]]): Angular rate measurements (gyroscope data) in rad/s.
            Should be a 3xN or Nx3 array where N is the number of samples.
        acceleration (Union[np.ndarray, List[float]]): Acceleration measurements in m/s^2.
            Should be a 3xN or Nx3 array where N is the number of samples.
        time (Union[np.ndarray, List[float]]): Time vector in seconds. Should be a 1D array of length N.
        magnetic_field (Union[np.ndarray, List[float]], optional): Magnetic field measurements in microteslas.
            Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.
        reference_magnetic_field (Union[np.ndarray, List[float]], optional): Reference magnetic field vector in any units.
            Should be a 1D array with 3 elements. Defaults to None.
        rph0 (Union[np.ndarray, List[float]], optional): Initial roll, pitch, and heading angles in radians.
        k1 (Union[float, int], optional): Gain for the accelerometer measurements on the rotation estimation. As recommended
            in [Hua2014], defaults to 1.0.
        k2 (Union[float, int], optional): Gain for the magnetic field measurements on the rotation estimation. As recommended
            in [Hua2014], defaults to 1/5 of k1, which is 0.2. However, if the magnetic field is is highly affected by
            external disturbances, it is recommended to increase the ratio to 1/10 or 1/20.
        k3 (Union[float, int], optional): Gain for the accelerometer measurements on the gyroscope bias estimation. As recommended
            in [Hua2014], defaults to k1/32, which is 0.03.
        k4 (Union[float, int], optional): Gain for the magnetic field measurements on the gyroscope bias estimation. As recommended
            in [Hua2014], defaults to k2/32, which is 0.006. If k2 is increased for robustness against disturbances,
            this value should be updated accordingly.
        kb (Union[float, int], optional): Anti-windup gain. As recommended in [Hua2014], defaults to 16.0.
        delta (Union[float, int], optional): Saturation limit for the gyro biases. As recommended in [Hua2014], to bound
            the gyro biases to 1 deg/s, defaults to 0.003.
        max_dt_factor_s (Union[float, int], optional): Factor to determine the maximum allowed time step between samples for
            a soft-reset in seconds. The threshold will be computed as max_dt_factor_s * median(dt). Defaults to 5.0.
        max_dt_reinit_s (Union[float, int], optional): Maximum allowed time step between samples for a hard-reset in seconds.
            Defaults to 60.0.

    Returns:
        rph_filtered (np.ndarray): Estimated roll, pitch, and heading (yaw) angles in radians. The output is an
            Nx3 array.
        angrate_filtered (np.ndarray): Bias compensated angular rates in rad/s. The output is an Nx3 array.

    Raises:
        TypeError: If any of the inputs are not of the expected type.
        ValueError: If any of the inputs do not have the expected dimensions.

    Warnings:
        UserWarning: Issued when time jump handling is triggered:
            - Non-increasing time steps (dt <= 0): State held constant
            - Moderate time gaps (soft reset): Attitude and biases held for re-convergence
            - Large time gaps (hard reset): Attitude and biases reinitialized from sensor data
        Warnings can be suppressed using standard Python warning control mechanisms.

    References:
        Mahony, R., Hamel, T., & Pflimlin, J. M. (2008). Nonlinear complementary filters on the special orthogonal group.
        IEEE Transactions on automatic control, 53(5), 1203-1218.
        Hua, M. D., Ducard, G., Hamel, T., Mahony, R., & Rudin, K. (2014). Implementation of Nonlinear attitude estimator
        for Aerial Robotic Vehicles. IEEE Transactions on Control Systems Technology, Volumes, 22(1), 2972-2978. January
        2014.
    """
    # Convert lists to numpy arrays if necessary
    if isinstance(acceleration, list):
        acceleration = np.array(acceleration)
    if isinstance(angular_rate, list):
        angular_rate = np.array(angular_rate)
    if isinstance(time, list):
        time = np.array(time)
    if magnetic_field is not None and isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if reference_magnetic_field is not None and isinstance(
        reference_magnetic_field, list
    ):
        reference_magnetic_field = np.array(reference_magnetic_field)
    if rph0 is not None and isinstance(rph0, list):
        rph0 = np.array(rph0)

    # Validate inputs
    if not isinstance(acceleration, np.ndarray):
        raise TypeError("The acceleration must be a numpy array or a list.")
    if not isinstance(angular_rate, np.ndarray):
        raise TypeError("The angular rate must be a numpy array or a list.")
    if not isinstance(time, np.ndarray):
        raise TypeError("The time must be a numpy array or a list.")
    if magnetic_field is not None and not isinstance(magnetic_field, np.ndarray):
        raise TypeError("The magnetic field must be a numpy array or a list.")
    if reference_magnetic_field is not None and not isinstance(
        reference_magnetic_field, np.ndarray
    ):
        raise TypeError("The reference magnetic field must be a numpy array or a list.")
    if rph0 is not None and not isinstance(rph0, np.ndarray):
        raise TypeError(
            "The initial roll, pitch, and heading must be a numpy array or a list."
        )

    # Validate dimensions
    if acceleration.ndim != 2 or (
        acceleration.shape[0] != 3 and acceleration.shape[1] != 3
    ):
        raise ValueError("The acceleration must be a 3xN or Nx3 numpy array.")
    if angular_rate.ndim != 2 or (
        angular_rate.shape[0] != 3 and angular_rate.shape[1] != 3
    ):
        raise ValueError("The angular rate must be a 3xN or Nx3 numpy array.")
    if magnetic_field is not None:
        if magnetic_field.ndim != 2 or (
            magnetic_field.shape[0] != 3 and magnetic_field.shape[1] != 3
        ):
            raise ValueError("The magnetic field must be a 3xN or Nx3 numpy array.")
    if reference_magnetic_field is not None:
        reference_magnetic_field = reference_magnetic_field.squeeze()
        if reference_magnetic_field.ndim >= 2 or reference_magnetic_field.size != 3:
            raise ValueError(
                "The reference magnetic field must be a 1D numpy array with 3 elements."
            )
    if rph0 is not None:
        if rph0.ndim != 1 or rph0.shape[0] != 3:
            raise ValueError(
                "The initial roll, pitch, and heading must be a 1D numpy array with 3 elements."
            )

    # Ensure time is a 1D array
    time = time.squeeze()
    if time.ndim >= 2:
        raise ValueError("The time must be a (n, ), (n, 1) or (1, n) numpy array.")

    # Force Nx3 shape for acceleration, angular_rate, and magnetic_field
    if acceleration.shape[0] == 3 and acceleration.shape[1] != 3:
        acceleration = acceleration.T
    if angular_rate.shape[0] == 3 and angular_rate.shape[1] != 3:
        angular_rate = angular_rate.T
    if (
        magnetic_field is not None
        and magnetic_field.shape[0] == 3
        and magnetic_field.shape[1] != 3
    ):
        magnetic_field = magnetic_field.T

    # If magnetic field is provided, a reference magnetic field is required
    if magnetic_field is not None and reference_magnetic_field is None:
        raise ValueError(
            "If magnetic field is provided, a reference magnetic field is required."
        )

    # Validate gains
    if not isinstance(k1, (float, int)):
        raise TypeError("The k1 gain must be a float or integer.")
    k1 = float(k1)
    if not isinstance(k2, (float, int)):
        raise TypeError("The k2 gain must be a float or integer.")
    k2 = float(k2)
    if not isinstance(k3, (float, int)):
        raise TypeError("The k3 gain must be a float or integer.")
    k3 = float(k3)
    if not isinstance(k4, (float, int)):
        raise TypeError("The k4 gain must be a float or integer.")
    k4 = float(k4)
    if not isinstance(kb, (float, int)):
        raise TypeError("The kb gain must be a float or integer.")
    kb = float(kb)
    if not isinstance(delta, (float, int)):
        raise TypeError("The delta gain must be a float or integer.")
    delta = float(delta)

    # Validate max_dt_factor_s and max_dt_reinit_s
    if not isinstance(max_dt_factor_s, (float, int)):
        raise TypeError("The max_dt_factor_s must be a float or integer.")
    max_dt_factor_s = float(max_dt_factor_s)
    if not isinstance(max_dt_reinit_s, (float, int)):
        raise TypeError("The max_dt_reinit_s must be a float or integer.")
    max_dt_reinit_s = float(max_dt_reinit_s)

    # Ensure time is float
    time = time.astype(np.float64)

    # Precompute dt and nominal dt
    dt_array = np.diff(time)
    positive_dt = dt_array[dt_array > 0.0]
    if positive_dt.size == 0:
        raise ValueError("Cannot infer sampling period: all time differences are <= 0.")

    nominal_dt = np.median(positive_dt)
    max_dt_soft = max_dt_factor_s * nominal_dt

    # Create arrays to store the results
    samples = time.shape[0]
    rot_mat_filtered = np.tile(np.eye(3), (samples, 1, 1))
    gyro_biases = np.zeros([samples, 3])

    # Initialize the attitude if an initial attitude is provided
    rot_mat_filtered[0] = (
        rph2rot(rph0) if rph0 is not None else np.eye(3, dtype=np.float64)
    )

    # Reference gravity vector [Eq. 8 in [Hua2014]]
    u_i = np.array([0.0, 0.0, 1.0], dtype=np.float64).reshape(3, 1)

    # Reference magnetic field vector [Eq. 8 in [Hua2014]]. It includes the decoupling of the magnetic field
    # from the gravity vector, as proposed by Hua et al. [Hua2014], to avoid the influence of the magnetic field
    # perturbations on the roll and pitch estimation.
    if reference_magnetic_field is not None:
        v_i = _ahrs_hua_orthogonal_projection(u_i) @ reference_magnetic_field.reshape(
            3, 1
        )
        v_i /= np.linalg.norm(v_i)

    # Run the filter
    for ix in range(1, samples):
        # Time step
        dt = time[ix] - time[ix - 1]

        # Handler for negative time steps
        if dt <= 0.0:
            # Non-increasing time: hold state
            warnings.warn(
                f"AHRS Hua filter: Non-increasing time step detected at sample {ix} "
                f"(dt={dt:.6f}s <= 0). State held constant.",
                UserWarning,
                stacklevel=2,
            )
            rot_mat_filtered[ix] = rot_mat_filtered[ix - 1]
            gyro_biases[ix] = gyro_biases[ix - 1]
            continue

        if dt > max_dt_reinit_s:
            # Very long gap: treat as a new run, reinit attitude and biases
            warnings.warn(
                f"AHRS Hua filter: Hard reset performed at sample {ix} due to large time gap "
                f"(dt={dt:.3f}s > max_dt_reinit_s={max_dt_reinit_s:.3f}s). "
                "Attitude and gyro biases reinitialized from sensor measurements.",
                UserWarning,
                stacklevel=2,
            )
            # (you can write a helper accel/mag -> R_world_to_body)
            if magnetic_field is not None:
                R0 = rph2rot(ahrs_raw_rph(magnetic_field[ix], acceleration[ix]))
            else:
                R0 = rph2rot(
                    np.concatenate(
                        [ahrs_raw_rp(acceleration[ix]), np.zeros((1, 1))], axis=1
                    )[0]
                )

            rot_mat_filtered[ix] = R0
            gyro_biases[ix] = np.zeros(3)
            continue

        if dt > max_dt_soft:
            # Medium gap: hold attitude and bias, let them re-lock softly
            warnings.warn(
                f"AHRS Hua filter: Soft reset performed at sample {ix} due to moderate time gap "
                f"(dt={dt:.3f}s > max_dt_soft={max_dt_soft:.3f}s). "
                "Attitude and gyro biases held constant to allow soft re-convergence.",
                UserWarning,
                stacklevel=2,
            )
            rot_mat_filtered[ix] = rot_mat_filtered[ix - 1]
            gyro_biases[ix] = gyro_biases[ix - 1]
            continue

        # Get attitude from previous step
        rot_mat_old = rot_mat_filtered[ix - 1]

        # Get angular rate
        w = angular_rate[ix - 1].reshape(3, 1)

        # Compute u_b and v_b [Eq. 8 in [Hua2014]]
        u_b = -1 * normalize(acceleration[ix].reshape(3, 1))
        u_b_hat = rot_mat_old.T @ u_i
        if magnetic_field is not None:
            v_b_numerator = _ahrs_hua_orthogonal_projection(u_b) @ magnetic_field[
                ix
            ].reshape(3, 1)
            v_b_denominator = np.linalg.norm(
                _ahrs_hua_orthogonal_projection(u_i)
                @ reference_magnetic_field.reshape(3, 1)
            )
            v_b = v_b_numerator / v_b_denominator
            v_b_hat = rot_mat_old.T @ v_i

        # sigma_r [Eq. 12 in [Hua2014]]
        sigma_r = k1 * (vec_to_so3(u_b) @ (u_b_hat))
        if magnetic_field is not None:
            sigma_r += k2 * (u_b_hat @ u_b_hat.T) @ (vec_to_so3(v_b) @ v_b_hat)

        # sigma_b [Eq. 12 in [Hua2014]]
        sigma_b = -k3 * (vec_to_so3(u_b) @ (u_b_hat))
        if magnetic_field is not None:
            sigma_b += -k4 * (vec_to_so3(v_b) @ v_b_hat)

        # Corrected angular rate [Eq. 12 in [Hua2014]]
        gyro_bias_old = gyro_biases[ix - 1].reshape(3, 1)
        w_corrected = w - gyro_bias_old

        # Gyros bias dot [Eq. 12 in [Hua2014]]
        gyro_bias_dot = -kb * gyro_bias_old
        gyro_bias_dot += kb * _ahrs_hua_saturation(gyro_bias_old, delta)
        gyro_bias_dot += sigma_b

        # Rotation dot [Eq. 12 in [Hua2014]]
        rot_mat_dot = rot_mat_old @ vec_to_so3(w_corrected + sigma_r)

        # Updated gyro bias and attitude
        gyro_biases[ix] = (gyro_bias_old + dt * gyro_bias_dot).flatten()
        rot_mat_filtered[ix] = so3_integrator(rot_mat_old, rot_mat_dot, dt)

    return (
        np.array([rot2rph(matrix) for matrix in rot_mat_filtered]),
        angular_rate - gyro_biases,
    )

Estimates the attitude of a vehicle using the Madgwick filter.

Madgwick Filter: The complemenetary filter proposed by Sebastian Madgwick. The algorithm calculates the orientation as the integration of the gyroscope summed with a feedback term. The feedback term is equal to the error in the current measurement of orientation as determined by the other sensors, multiplied by a gain. The algorithm therefore functions as a complementary filter that combines high-pass filtered gyroscope measurements with low-pass filtered measurements from other sensors with a corner frequency determined by the gain. A low gain will 'trust' the gyroscope more and so be more susceptible to drift. A high gain will increase the influence of other sensors and the errors that result from accelerations and magnetic distortions. A gain of zero will ignore the other sensors so that the measurement of orientation is determined by only the gyroscope.

Parameters:

Name Type Description Default
angular_rate Union[ndarray, List[float]]

Angular rate measurements (gyroscope data) in rad/s. Should be a 3xN or Nx3 array where N is the number of samples.

required
acceleration Union[ndarray, List[float]]

Acceleration measurements in m/s^2. Should be a 3xN or Nx3 array where N is the number of samples.

required
time Union[ndarray, List[float]]

Time vector in seconds. Should be a 1D array of length N.

required
magnetic_field Union[ndarray, List[float]]

Magnetic field measurements in microteslas. Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.

None
rph0 Union[ndarray, List[float]]

Initial roll, pitch, and heading angles in radians.

None
gain Union[float, int]

Determines the influence of the gyroscope relative to other sensors. A value of zero will disable initialisation and the acceleration and magnetic rejection features. A value of 0.5 is appropriate for most applications. Defaults to 0.5.

0.5
gyro_range Union[float, int]

Gyroscope range (in degrees per second). Angular rate recovery will activate if the gyroscope measurement exceeds 98% of this value. A value of zero will disable this feature. The value should be set to the range specified in the gyroscope datasheet. Defaults to 2000.0.

2000.0
accel_rejection Union[float, int]

Acceleration Rejection: Threshold (in degrees) used by the acceleration rejection feature. A value of zero will disable this feature. A value of 10 degrees is appropriate for most applications. Defaults to 10.0.

10.0
magnetic_rejection Union[float, int]

Magnetic Rejection: Threshold (in degrees) used by the magnetic rejection feature. A value of zero will disable the feature. A value of 10 degrees is appropriate for most applications. Defaults to 10.0.

10.0
recovery_trigger Union[float, int]

Acceleration and magnetic recovery trigger period (in seconds). A value of zero will disable the acceleration and magnetic rejection features. A period of 5 seconds is appropriate for most applications. Defaults to 5.0.

5.0

Returns:

Name Type Description
rph_filtered ndarray

Estimated roll, pitch, and heading angles in radians. The output is an Nx3 array.

angrate_filtered ndarray

Offset-corrected angular rate measurements in rad/s (not the original input). The output is an Nx3 array.

flags ndarray

Flags indicating the status of the filter. The output is a dictionary with the following keys: - "initializing": Indicates if the filter is initializing. - "angular_rate_recovery": Indicates if the angular rate recovery feature is active. - "acceleration_recovery": Indicates if the acceleration recovery feature is active. - "magnetic_recovery": Indicates if the magnetic recovery feature is active.

Raises:

Type Description
TypeError

If any of the inputs are not of the expected type.

ValueError

If any of the inputs do not have the expected dimensions.

References

Madgwick, S. O. (2010). An efficient orientation filter for inertial and inertial/magnetic sensor arrays.

Source code in navlib/nav/attitude_estimation.py
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
def ahrs_madgwick_filter(
    angular_rate: Union[np.ndarray, List[float]],
    acceleration: Union[np.ndarray, List[float]],
    time: Union[np.ndarray, List[float]],
    magnetic_field: Union[np.ndarray, List[float]] = None,
    rph0: Union[np.ndarray, List[float]] = None,
    gain: Union[float, int] = 0.5,
    gyro_range: Union[float, int] = 2000.0,
    accel_rejection: Union[float, int] = 10.0,
    magnetic_rejection: Union[float, int] = 10.0,
    recovery_trigger: Union[float, int] = 5.0,
) -> Tuple[np.ndarray, np.ndarray, Dict[str, np.ndarray]]:
    """
    Estimates the attitude of a vehicle using the Madgwick filter.

    Madgwick Filter: The complemenetary filter proposed by Sebastian Madgwick. The algorithm calculates the orientation
    as the integration of the gyroscope summed with a feedback term. The feedback term is equal to the error in the
    current measurement of orientation as determined by the other sensors, multiplied by a gain. The algorithm therefore
    functions as a complementary filter that combines high-pass filtered gyroscope measurements with low-pass
    filtered measurements from other sensors with a corner frequency determined by the gain. A low gain will 'trust' the
    gyroscope more and so be more susceptible to drift. A high gain will increase the influence of other sensors and the
    errors that result from accelerations and magnetic distortions. A gain of zero will ignore the other sensors so that
    the measurement of orientation is determined by only the gyroscope.

    Args:
        angular_rate (Union[np.ndarray, List[float]]): Angular rate measurements (gyroscope data) in rad/s.
            Should be a 3xN or Nx3 array where N is the number of samples.
        acceleration (Union[np.ndarray, List[float]]): Acceleration measurements in m/s^2.
            Should be a 3xN or Nx3 array where N is the number of samples.
        time (Union[np.ndarray, List[float]]): Time vector in seconds. Should be a 1D array of length N.
        magnetic_field (Union[np.ndarray, List[float]], optional): Magnetic field measurements in microteslas.
            Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.
        rph0 (Union[np.ndarray, List[float]], optional): Initial roll, pitch, and heading angles in radians.
        gain (Union[float, int], optional): Determines the influence of the gyroscope relative to other sensors. A value
            of zero will disable initialisation and the acceleration and magnetic rejection features. A value of 0.5 is
            appropriate for most applications. Defaults to 0.5.
        gyro_range (Union[float, int], optional): Gyroscope range (in degrees per second). Angular rate recovery will
            activate if the gyroscope measurement exceeds 98% of this value. A value of zero will disable this feature.
            The value should be set to the range specified in the gyroscope datasheet. Defaults to 2000.0.
        accel_rejection (Union[float, int], optional): Acceleration Rejection: Threshold (in degrees) used by the
            acceleration rejection feature. A value of zero will disable this feature. A value of 10 degrees is
            appropriate for most applications. Defaults to 10.0.
        magnetic_rejection (Union[float, int], optional): Magnetic Rejection: Threshold (in degrees) used by the
            magnetic rejection feature. A value of zero will disable the feature. A value of 10 degrees is appropriate
            for most applications. Defaults to 10.0.
        recovery_trigger (Union[float, int], optional): Acceleration and magnetic recovery trigger period (in seconds).
            A value of zero will disable the acceleration and magnetic rejection features. A period of 5 seconds is
            appropriate for most applications. Defaults to 5.0.

    Returns:
        rph_filtered (np.ndarray): Estimated roll, pitch, and heading angles in radians. The output is an Nx3 array.
        angrate_filtered (np.ndarray): Offset-corrected angular rate measurements in rad/s (not the original input). The output is an Nx3 array.
        flags (np.ndarray): Flags indicating the status of the filter. The output is a dictionary with the following
            keys:
            - "initializing": Indicates if the filter is initializing.
            - "angular_rate_recovery": Indicates if the angular rate recovery feature is active.
            - "acceleration_recovery": Indicates if the acceleration recovery feature is active.
            - "magnetic_recovery": Indicates if the magnetic recovery feature is active.

    Raises:
        TypeError: If any of the inputs are not of the expected type.
        ValueError: If any of the inputs do not have the expected dimensions.

    References:
        Madgwick, S. O. (2010). An efficient orientation filter for inertial and inertial/magnetic sensor arrays.
    """
    # Convert lists to numpy arrays if necessary
    if isinstance(acceleration, list):
        acceleration = np.array(acceleration)
    if isinstance(angular_rate, list):
        angular_rate = np.array(angular_rate)
    if isinstance(time, list):
        time = np.array(time)
    if magnetic_field is not None and isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if rph0 is not None and isinstance(rph0, list):
        rph0 = np.array(rph0)

    # Validate inputs
    if not isinstance(acceleration, np.ndarray):
        raise TypeError("The acceleration must be a numpy array or a list.")
    if not isinstance(angular_rate, np.ndarray):
        raise TypeError("The angular rate must be a numpy array or a list.")
    if not isinstance(time, np.ndarray):
        raise TypeError("The time must be a numpy array or a list.")
    if magnetic_field is not None and not isinstance(magnetic_field, np.ndarray):
        raise TypeError("The magnetic field must be a numpy array or a list.")
    if rph0 is not None and not isinstance(rph0, np.ndarray):
        raise TypeError(
            "The initial roll, pitch, and heading must be a numpy array or a list."
        )

    # Validate dimensions
    if acceleration.ndim != 2 or (
        acceleration.shape[0] != 3 and acceleration.shape[1] != 3
    ):
        raise ValueError("The acceleration must be a 3xN or Nx3 numpy array.")
    if angular_rate.ndim != 2 or (
        angular_rate.shape[0] != 3 and angular_rate.shape[1] != 3
    ):
        raise ValueError("The angular rate must be a 3xN or Nx3 numpy array.")
    if magnetic_field is not None:
        if magnetic_field.ndim != 2 or (
            magnetic_field.shape[0] != 3 and magnetic_field.shape[1] != 3
        ):
            raise ValueError("The magnetic field must be a 3xN or Nx3 numpy array.")
    if rph0 is not None:
        if rph0.ndim != 1 or rph0.shape[0] != 3:
            raise ValueError(
                "The initial roll, pitch, and heading must be a 1D numpy array with 3 elements."
            )

    # Ensure time is a 1D array
    time = time.squeeze()
    if time.ndim >= 2:
        raise ValueError("The time must be a (n, ), (n, 1) or (1, n) numpy array.")

    # Force Nx3 shape for acceleration, angular_rate, and magnetic_field
    if acceleration.shape[0] == 3 and acceleration.shape[1] != 3:
        acceleration = acceleration.T
    if angular_rate.shape[0] == 3 and angular_rate.shape[1] != 3:
        angular_rate = angular_rate.T
    if (
        magnetic_field is not None
        and magnetic_field.shape[0] == 3
        and magnetic_field.shape[1] != 3
    ):
        magnetic_field = magnetic_field.T

    # Validate Madgwick filter parameters
    if not isinstance(gain, (float, int)):
        raise TypeError("The gain must be a float or integer.")
    if gain < 0 or gain > 1:
        raise ValueError("The gain must be between 0 and 1.")
    gain = float(gain)
    if not isinstance(gyro_range, (float, int)):
        raise TypeError("The gyro range must be a float or integer.")
    if gyro_range <= 0:
        raise ValueError("The gyro range must be greater than 0.")
    gyro_range = float(gyro_range)
    if not isinstance(accel_rejection, (float, int)):
        raise TypeError("The accel rejection must be a float or integer.")
    if accel_rejection < 0:
        raise ValueError("The accel rejection must be greater than or equal to 0.")
    accel_rejection = float(accel_rejection)
    if not isinstance(magnetic_rejection, (float, int)):
        raise TypeError("The magnetic rejection must be a float or integer.")
    if magnetic_rejection < 0:
        raise ValueError("The magnetic rejection must be greater than or equal to 0.")
    magnetic_rejection = float(magnetic_rejection)
    if not isinstance(recovery_trigger, (float, int)):
        raise TypeError("The recovery trigger must be a float or integer.")
    if recovery_trigger < 0:
        raise ValueError("The recovery trigger must be greater than or equal to 0.")
    recovery_trigger = float(recovery_trigger)

    # Compute sample rate
    sample_rate = int(1.0 / median(difference(time)))

    # Match units required by the imufusion library
    # Acceleration in g and angular rate in degrees per second
    acceleration = acceleration / GRAVITY
    angular_rate = np.rad2deg(angular_rate)

    # Instantiate algorithms
    offset = imufusion.Offset(sample_rate)
    ahrs = imufusion.Ahrs()

    ahrs.settings = imufusion.Settings(
        imufusion.CONVENTION_NED,
        gain,
        gyro_range,
        accel_rejection,
        magnetic_rejection,
        int(
            recovery_trigger * sample_rate,
        ),
    )

    # Process sensor data
    delta_time = np.concatenate([np.zeros((1,)), difference(time)], axis=0)
    rph_filtered = np.empty((time.shape[0], 3))
    flags = {
        "initializing": [],
        "angular_rate_recovery": [],
        "acceleration_recovery": [],
        "magnetic_recovery": [],
        "acceleration_error": [],
        "accelerometer_ignored": [],
        "magnetic_error": [],
        "magnetometer_ignored": [],
    }

    for idx in range(time.shape[0]):
        # Apply gyroscope bias compensation
        angular_rate[idx] = offset.update(angular_rate[idx])

        # Update the Madgwick filter
        if magnetic_field is None:
            ahrs.update_no_magnetometer(
                angular_rate[idx], acceleration[idx], delta_time[idx]
            )
        else:
            ahrs.update(
                angular_rate[idx],
                acceleration[idx],
                magnetic_field[idx],
                delta_time[idx],
            )

        if ahrs.flags.initialising and rph0 is not None:
            rph_filtered[idx] = rph0
        else:
            rph_filtered[idx] = np.deg2rad(ahrs.quaternion.to_euler())

        # Flags for initialization or recovery trigger
        ahrs_flags = ahrs.flags
        flags["initializing"].append(ahrs_flags.initialising)
        flags["angular_rate_recovery"].append(ahrs_flags.angular_rate_recovery)
        flags["acceleration_recovery"].append(ahrs_flags.acceleration_recovery)
        flags["magnetic_recovery"].append(ahrs_flags.magnetic_recovery)

        ahrs_internal_state = ahrs.internal_states
        flags["acceleration_error"].append(ahrs_internal_state.acceleration_error)
        flags["accelerometer_ignored"].append(ahrs_internal_state.accelerometer_ignored)
        flags["magnetic_error"].append(ahrs_internal_state.magnetic_error)
        flags["magnetometer_ignored"].append(ahrs_internal_state.magnetometer_ignored)

    # Convert flags to numpy arrays
    for key in flags:
        flags[key] = np.array(flags[key])

    # Wrap angles to [-pi, pi]
    rph_filtered = wrap1pi(rph_filtered)

    return rph_filtered, np.deg2rad(angular_rate), flags

Estimates the attitude of a vehicle using the Mahony filter.

This estimator proposed by Robert Mahony et al. [Mahony2008] is formulated as a deterministic kinematic observer on the Special Orthogonal group SO(3) driven by an instantaneous attitude and angular velocity measurements.

k1 and k2 tunning: The weights k1 and k2 are introduced to weight the confidence in each measure. In situations where the IMU is subject to high magnitude accelerations (such as during takeoff or landing manoeuvres) it may be wise to reduce the relative weighting of the accelerometer data (k1 << k2) compared to the magnetometer data. Conversely, in many applications the IMU is mounted in the proximity to powerful electric motors and their power supply busses leading to low confidence in the magnetometer readings (choose k1 >> k2). This is a very common situation in the case of mini aerial vehicles with electric motors. In extreme cases the magnetometer data is unusable and provides motivation for a filter based solely on accelerometer data.

Parameters:

Name Type Description Default
angular_rate Union[ndarray, List[float]]

Angular rate measurements (gyroscope data) in rad/s. Should be a 3xN or Nx3 array where N is the number of samples.

required
acceleration Union[ndarray, List[float]]

Acceleration measurements in m/s^2. Should be a 3xN or Nx3 array where N is the number of samples.

required
time Union[ndarray, List[float]]

Time vector in seconds. Should be a 1D array of length N.

required
magnetic_field Union[ndarray, List[float]]

Magnetic field measurements in microteslas. Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.

None
reference_magnetic_field Union[ndarray, List[float]]

Reference magnetic field vector in any units. Should be a 1D array with 3 elements. Defaults to None.

None
rph0 Union[ndarray, List[float]]

Initial roll, pitch, and heading angles in radians.

None
k1 Union[float, int]

Gain for the accelerometer measurements. Defaults to 50.0.

50.0
k2 Union[float, int]

Gain for the magnetic field measurements. Defaults to 1.0.

1.0
kp Union[float, int]

Proportional gain. Defaults to 1.0.

1.0
ki Union[float, int]

Integral gain. Defaults to 0.01.

0.01

Returns:

Name Type Description
rph_filtered ndarray

Estimated roll, pitch, and heading (yaw) angles in radians. The output is an Nx3 array.

angrate_filtered ndarray

Bias compensated angular rates in rad/s. The output is an Nx3 array.

Raises:

Type Description
TypeError

If any of the inputs are not of the expected type.

ValueError

If any of the inputs do not have the expected dimensions.

References

Mahony, R., Hamel, T., & Pflimlin, J. M. (2008). Nonlinear complementary filters on the special orthogonal group. IEEE Transactions on automatic control, 53(5), 1203-1218.

Source code in navlib/nav/attitude_estimation.py
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
def ahrs_mahony_filter(
    angular_rate: Union[np.ndarray, List[float]],
    acceleration: Union[np.ndarray, List[float]],
    time: Union[np.ndarray, List[float]],
    magnetic_field: Union[np.ndarray, List[float]] = None,
    reference_magnetic_field: Union[np.ndarray, List[float]] = None,
    rph0: Union[np.ndarray, List[float]] = None,
    k1: Union[float, int] = 50.0,
    k2: Union[float, int] = 1.0,
    kp: Union[float, int] = 1.0,
    ki: Union[float, int] = 0.01,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Estimates the attitude of a vehicle using the Mahony filter.

    This estimator proposed by Robert Mahony et al. [Mahony2008] is formulated as a deterministic kinematic observer
    on the Special Orthogonal group SO(3) driven by an instantaneous attitude and angular velocity measurements.

    k1 and k2 tunning: The weights k1 and k2 are introduced to weight the confidence in each measure. In situations where
    the IMU is subject to high magnitude accelerations (such as during takeoff or landing manoeuvres) it may be wise to
    reduce the relative weighting of the accelerometer data (k1 << k2) compared to the magnetometer data. Conversely, in
    many applications the IMU is mounted in the proximity to powerful electric motors and their power supply busses
    leading to low confidence in the magnetometer readings (choose k1 >> k2). This is a very common situation in the case
    of mini aerial vehicles with electric motors. In extreme cases the magnetometer data is unusable and provides
    motivation for a filter based solely on accelerometer data.

    Args:
        angular_rate (Union[np.ndarray, List[float]]): Angular rate measurements (gyroscope data) in rad/s.
            Should be a 3xN or Nx3 array where N is the number of samples.
        acceleration (Union[np.ndarray, List[float]]): Acceleration measurements in m/s^2.
            Should be a 3xN or Nx3 array where N is the number of samples.
        time (Union[np.ndarray, List[float]]): Time vector in seconds. Should be a 1D array of length N.
        magnetic_field (Union[np.ndarray, List[float]], optional): Magnetic field measurements in microteslas.
            Should be a 3xN or Nx3 array where N is the number of samples. Defaults to None.
        reference_magnetic_field (Union[np.ndarray, List[float]], optional): Reference magnetic field vector in any units.
            Should be a 1D array with 3 elements. Defaults to None.
        rph0 (Union[np.ndarray, List[float]], optional): Initial roll, pitch, and heading angles in radians.
        k1 (Union[float, int], optional): Gain for the accelerometer measurements. Defaults to 50.0.
        k2 (Union[float, int], optional): Gain for the magnetic field measurements. Defaults to 1.0.
        kp (Union[float, int], optional): Proportional gain. Defaults to 1.0.
        ki (Union[float, int], optional): Integral gain. Defaults to 0.01.

    Returns:
        rph_filtered (np.ndarray): Estimated roll, pitch, and heading (yaw) angles in radians. The output is an
            Nx3 array.
        angrate_filtered (np.ndarray): Bias compensated angular rates in rad/s. The output is an Nx3 array.

    Raises:
        TypeError: If any of the inputs are not of the expected type.
        ValueError: If any of the inputs do not have the expected dimensions.

    References:
        Mahony, R., Hamel, T., & Pflimlin, J. M. (2008). Nonlinear complementary filters on the special orthogonal group.
        IEEE Transactions on automatic control, 53(5), 1203-1218.
    """
    # Convert lists to numpy arrays if necessary
    if isinstance(acceleration, list):
        acceleration = np.array(acceleration)
    if isinstance(angular_rate, list):
        angular_rate = np.array(angular_rate)
    if isinstance(time, list):
        time = np.array(time)
    if magnetic_field is not None and isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if reference_magnetic_field is not None and isinstance(
        reference_magnetic_field, list
    ):
        reference_magnetic_field = np.array(reference_magnetic_field)
    if rph0 is not None and isinstance(rph0, list):
        rph0 = np.array(rph0)

    # Validate inputs
    if not isinstance(acceleration, np.ndarray):
        raise TypeError("The acceleration must be a numpy array or a list.")
    if not isinstance(angular_rate, np.ndarray):
        raise TypeError("The angular rate must be a numpy array or a list.")
    if not isinstance(time, np.ndarray):
        raise TypeError("The time must be a numpy array or a list.")
    if magnetic_field is not None and not isinstance(magnetic_field, np.ndarray):
        raise TypeError("The magnetic field must be a numpy array or a list.")
    if reference_magnetic_field is not None and not isinstance(
        reference_magnetic_field, np.ndarray
    ):
        raise TypeError("The reference magnetic field must be a numpy array or a list.")
    if rph0 is not None and not isinstance(rph0, np.ndarray):
        raise TypeError(
            "The initial roll, pitch, and heading must be a numpy array or a list."
        )

    # Validate dimensions
    if acceleration.ndim != 2 or (
        acceleration.shape[0] != 3 and acceleration.shape[1] != 3
    ):
        raise ValueError("The acceleration must be a 3xN or Nx3 numpy array.")
    if angular_rate.ndim != 2 or (
        angular_rate.shape[0] != 3 and angular_rate.shape[1] != 3
    ):
        raise ValueError("The angular rate must be a 3xN or Nx3 numpy array.")
    if magnetic_field is not None:
        if magnetic_field.ndim != 2 or (
            magnetic_field.shape[0] != 3 and magnetic_field.shape[1] != 3
        ):
            raise ValueError("The magnetic field must be a 3xN or Nx3 numpy array.")
    if reference_magnetic_field is not None:
        reference_magnetic_field = reference_magnetic_field.squeeze()
        if reference_magnetic_field.ndim >= 2 or reference_magnetic_field.shape[0] != 3:
            raise ValueError(
                "The reference magnetic field must be a 1D numpy array with 3 elements."
            )
    if rph0 is not None:
        if rph0.ndim != 1 or rph0.shape[0] != 3:
            raise ValueError(
                "The initial roll, pitch, and heading must be a 1D numpy array with 3 elements."
            )

    # Ensure time is a 1D array
    time = time.squeeze()
    if time.ndim >= 2:
        raise ValueError("The time must be a (n, ), (n, 1) or (1, n) numpy array.")

    # Force Nx3 shape for acceleration, angular_rate, and magnetic_field
    if acceleration.shape[0] == 3 and acceleration.shape[1] != 3:
        acceleration = acceleration.T
    if angular_rate.shape[0] == 3 and angular_rate.shape[1] != 3:
        angular_rate = angular_rate.T
    if (
        magnetic_field is not None
        and magnetic_field.shape[0] == 3
        and magnetic_field.shape[1] != 3
    ):
        magnetic_field = magnetic_field.T

    # If magnetic field is provided, a reference magnetic field is required
    if magnetic_field is not None and reference_magnetic_field is None:
        raise ValueError(
            "If magnetic field is provided, a reference magnetic field is required."
        )

    # Validate gains
    if not isinstance(k1, (float, int)):
        raise TypeError("The k1 gain must be a float or integer.")
    k1 = float(k1)
    if not isinstance(k2, (float, int)):
        raise TypeError("The k2 gain must be a float or integer.")
    k2 = float(k2)
    if not isinstance(kp, (float, int)):
        raise TypeError("The kp gain must be a float or integer.")
    kp = float(kp)
    if not isinstance(ki, (float, int)):
        raise TypeError("The ki gain must be a float or integer.")
    ki = float(ki)

    # Create arrays to store the results
    samples = time.shape[0]
    rot_mat_filtered = np.tile(np.eye(3), (samples, 1, 1))
    gyro_biases = np.zeros([samples, 3])

    # Initialize the attitude if an initial attitude is provided
    rot_mat_filtered[0] = (
        rph2rot(rph0) if rph0 is not None else np.eye(3, dtype=np.float64)
    )

    # Reference gravity vector
    g_vec = np.array([0.0, 0.0, -1.0])
    g_vec = (g_vec / norm(g_vec)).reshape(3, 1)

    # Reference magnetic field vector
    if reference_magnetic_field is not None:
        m_vec = reference_magnetic_field
        m_vec = (m_vec / norm(m_vec)).reshape(3, 1)

    # Run the filter
    for ix in range(1, samples):
        # Gyroscope, accelerometer, and magnetic field measurements
        w = angular_rate[ix]
        a = acceleration[ix] / norm(acceleration[ix])
        if magnetic_field is not None:
            m = magnetic_field[ix] / norm(magnetic_field[ix])

        rot_mat_old = rot_mat_filtered[ix - 1]

        # Innovation term computation (Eq. 32c in [Mahony2008])
        wmes = k1 * (vec_to_so3(a) @ (rot_mat_old.T @ g_vec)).flatten()
        if magnetic_field is not None:
            wmes += k2 * (vec_to_so3(m) @ (rot_mat_old.T @ m_vec)).flatten()

        # Rotation change in time and gyroscope bias change in time (Eq. 32a and 32b in [Mahony2008])
        rot_mat_dot = rot_mat_old @ (
            vec_to_so3(w - gyro_biases[ix - 1]) + kp * vec_to_so3(wmes)
        )
        gyro_biases_dot = -ki * wmes

        # Time step
        dt = time[ix] - time[ix - 1]

        # Update the gyro bias and the filtered roll, pitch, and heading
        gyro_biases[ix] = gyro_biases[ix - 1] + dt * gyro_biases_dot
        rot_mat_filtered[ix] = so3_integrator(rot_mat_old, rot_mat_dot, dt)

    return (
        np.array([rot2rph(matrix) for matrix in rot_mat_filtered]),
        angular_rate - gyro_biases,
    )

raw_hdg computes the heading from magnetic field measurements and rph data.

If rph is a parameter, the using roll and pitch the corresponding rotation matrices are computed and the magnetic field measuremnts are transformated to measurements in the xy plane. With the planar magnetic field measuremnts, the heading is computed as: heading = np.arcant2(-my, mx)

Parameters:

Name Type Description Default
magnetic_field Union[ndarray, List[float]]

Magnetic field raw data in three dimensions. Can be: - 1D array or list with 3 elements: [mx, my, mz] (single measurement) - 2D array with shape 3xN or Nx3 (multiple measurements)

required
rph Union[ndarray, List[float]]

Roll, pitch and heading data. Can be: - 1D array or list with 3 elements: [roll, pitch, heading] (single measurement) - 2D array with shape 3xN or Nx3 (multiple measurements)

None

Returns:

Name Type Description
heading_rad ndarray

Heading angle in radians. Shape is (N, 1) where N is the number of measurements.

Raises:

Type Description
TypeError

If magnetic_field or rph are not numpy arrays or lists

ValueError

If magnetic_field or rph don't have the correct shape or dimensions

Source code in navlib/nav/attitude_estimation.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def ahrs_raw_hdg(
    magnetic_field: Union[np.ndarray, List[float]],
    rph: Union[np.ndarray, List[float]] = None,
) -> np.ndarray:
    """
    raw_hdg computes the heading from magnetic field measurements and rph data.

    If rph is a parameter, the using roll and pitch the corresponding rotation
    matrices are computed and the magnetic field measuremnts are transformated
    to measurements in the xy plane. With the planar magnetic field measuremnts,
    the heading is computed as: heading = np.arcant2(-my, mx)

    Args:
        magnetic_field (Union[np.ndarray, List[float]]): Magnetic field raw data in three
            dimensions. Can be:
            - 1D array or list with 3 elements: [mx, my, mz] (single measurement)
            - 2D array with shape 3xN or Nx3 (multiple measurements)
        rph (Union[np.ndarray, List[float]], optional): Roll, pitch and heading data.
            Can be:
            - 1D array or list with 3 elements: [roll, pitch, heading] (single measurement)
            - 2D array with shape 3xN or Nx3 (multiple measurements)

    Returns:
        heading_rad (np.ndarray): Heading angle in radians. Shape is (N, 1)
            where N is the number of measurements.

    Raises:
        TypeError: If magnetic_field or rph are not numpy arrays or lists
        ValueError: If magnetic_field or rph don't have the correct shape or dimensions
    """
    # Convert to numpy array
    if isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if rph is not None and isinstance(rph, list):
        rph = np.array(rph)

    # Check inputs
    if not isinstance(magnetic_field, np.ndarray):
        raise TypeError("magnetic_field must be a numpy array or list")
    if rph is not None and not isinstance(rph, np.ndarray):
        raise TypeError("rph must be a numpy array or list")

    # Handle different input shapes for magnetic_field
    if magnetic_field.ndim == 1:
        # Handle flat array of 3 elements
        if magnetic_field.shape[0] != 3:
            raise ValueError(
                "For 1D arrays, magnetic_field must have exactly 3 elements"
            )
        # Reshape to 1x3 (single measurement)
        magnetic_field = magnetic_field.reshape(1, 3)
    elif magnetic_field.ndim == 2:
        # Handle 2D arrays (Nx3 or 3xN)
        if not (magnetic_field.shape[0] == 3 or magnetic_field.shape[1] == 3):
            raise ValueError("For 2D arrays, magnetic_field must be 3xN or Nx3")
        # Check if magnetic_field is a 3xN matrix and transpose to Nx3
        if magnetic_field.shape[0] == 3 and magnetic_field.shape[1] != 3:
            magnetic_field = magnetic_field.T
    else:
        raise ValueError(
            "magnetic_field must be a 1D array (3 elements) or 2D array (3xN or Nx3)"
        )

    # Handle different input shapes for rph (if provided)
    if rph is not None:
        if rph.ndim == 1:
            # Handle flat array of 3 elements
            if rph.shape[0] != 3:
                raise ValueError("For 1D arrays, rph must have exactly 3 elements")
            # Reshape to 1x3 (single measurement)
            rph = rph.reshape(1, 3)
        elif rph.ndim == 2:
            # Handle 2D arrays (Nx3 or 3xN)
            if not (rph.shape[0] == 3 or rph.shape[1] == 3):
                raise ValueError("For 2D arrays, rph must be 3xN or Nx3")
            # Check if rph is a 3xN matrix and transpose to Nx3
            if rph.shape[0] == 3 and rph.shape[1] != 3:
                rph = rph.T
        else:
            raise ValueError(
                "rph must be a 1D array (3 elements) or 2D array (3xN or Nx3)"
            )

        # Ensure magnetic_field and rph have the same number of measurements
        if magnetic_field.shape[0] != rph.shape[0]:
            raise ValueError(
                "magnetic_field and rph must have the same number of measurements"
            )

    # Flatten Magnetic Field if the RPH is provided
    if rph is not None:
        rot_mat_flat = np.apply_along_axis(
            rph2rot, 1, np.concatenate([rph[:, [0, 1]], rph[:, [2]] * 0], axis=1)
        )
        mf = np.einsum(
            "ijk->ikj", rot_mat_flat @ magnetic_field.reshape(-1, 3, 1)
        ).squeeze()
        # Ensure mf is 2D (N, 3) even for single measurements
        if mf.ndim == 1:
            mf = mf.reshape(1, 3)
    else:
        mf = magnetic_field

    # Calculate HDG
    heading = np.arctan2(-mf[:, 1], mf[:, 0]).reshape(-1, 1)

    return heading

ahrs_raw_rp computes pitch and roll from raw accelerometer measurements.

The computations is based in the following formulas: * roll = np.arctan2(-ay, -az) * pitch = np.arctan2(ax, np.sqrt(ay^2 + az^2))

Parameters:

Name Type Description Default
acceleration Union[ndarray, List[float]]

Accelerometer raw data in three dimensions. Can be: - 1D array or list with 3 elements: [ax, ay, az] (single measurement) - 2D array with shape 3xN or Nx3 (multiple measurements)

required

Returns:

Name Type Description
rp_rad ndarray

Roll and pitch angles in radians. Shape is (N, 2) where N is the number of measurements.

Raises:

Type Description
TypeError

If acceleration is not a numpy array or list

ValueError

If acceleration doesn't have the correct shape or dimensions

Source code in navlib/nav/attitude_estimation.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def ahrs_raw_rp(acceleration: Union[np.ndarray, List[float]]) -> np.ndarray:
    """
    ahrs_raw_rp computes pitch and roll from raw accelerometer measurements.

    The computations is based in the following formulas:
    * roll  = np.arctan2(-ay, -az)
    * pitch = np.arctan2(ax, np.sqrt(ay^2 + az^2))

    Args:
        acceleration (Union[np.ndarray, List[float]]): Accelerometer raw data in three
            dimensions. Can be:
            - 1D array or list with 3 elements: [ax, ay, az] (single measurement)
            - 2D array with shape 3xN or Nx3 (multiple measurements)

    Returns:
        rp_rad (np.ndarray): Roll and pitch angles in radians. Shape is (N, 2)
            where N is the number of measurements.

    Raises:
        TypeError: If acceleration is not a numpy array or list
        ValueError: If acceleration doesn't have the correct shape or dimensions
    """
    # Convert to numpy array
    if isinstance(acceleration, list):
        acceleration = np.array(acceleration)

    # Check inputs
    if not isinstance(acceleration, np.ndarray):
        raise TypeError("Acceleration must be a numpy array or list")

    # Handle different input shapes
    if acceleration.ndim == 1:
        # Handle flat array of 3 elements
        if acceleration.shape[0] != 3:
            raise ValueError("For 1D arrays, acceleration must have exactly 3 elements")
        # Reshape to 1x3 (single measurement)
        acceleration = acceleration.reshape(1, 3)
    elif acceleration.ndim == 2:
        # Handle 2D arrays (Nx3 or 3xN)
        if not (acceleration.shape[0] == 3 or acceleration.shape[1] == 3):
            raise ValueError("For 2D arrays, acceleration must be 3xN or Nx3")
        # Check if acceleration is a 3xN matrix and transpose to Nx3
        if acceleration.shape[0] == 3 and acceleration.shape[1] != 3:
            acceleration = acceleration.T
    else:
        raise ValueError(
            "Acceleration must be a 1D array (3 elements) or 2D array (3xN or Nx3)"
        )

    # Normalize Accelerations
    acc = normalize(acceleration)

    # Calculating Roll and Pitch (base on gravity vector)
    roll = np.arctan2(-acc[:, 1], -acc[:, 2]).reshape(-1, 1)
    pitch = np.arctan2(acc[:, 0], np.sqrt(acc[:, 1] ** 2 + acc[:, 2] ** 2)).reshape(
        -1, 1
    )

    return np.concatenate([roll, pitch], axis=1)

ahrs_raw_rph computes the roll, pitch and heading from magnetic field measurements and accelerometer raw data.

The computations is based in the following formulas: * roll = np.arctan2(-ay, -az) * pitch = np.arctan2(ax, np.sqrt(ay^2 + az^2)) * heading = np.arctan2(-my, mx)

Parameters:

Name Type Description Default
magnetic_field Union[ndarray, List[float]]

Magnetic field raw data in three dimensions. Can be: - 1D array or list with 3 elements: [mx, my, mz] (single measurement) - 2D array with shape 3xN or Nx3 (multiple measurements)

required
accelerometer Union[ndarray, List[float]]

Accelerometer raw data in three dimensions. Can be: - 1D array or list with 3 elements: [ax, ay, az] (single measurement) - 2D array with shape 3xN or Nx3 (multiple measurements)

required

Returns:

Name Type Description
rph_rad ndarray

Roll, pitch and heading angles in radians. Shape is (N, 3) where N is the number of measurements.

Raises:

Type Description
TypeError

If magnetic_field or accelerometer are not numpy arrays or lists

ValueError

If magnetic_field or accelerometer don't have the correct shape or dimensions

ValueError

If magnetic_field and accelerometer don't have the same number of measurements

Source code in navlib/nav/attitude_estimation.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def ahrs_raw_rph(
    magnetic_field: Union[np.ndarray, List[float]],
    accelerometer: Union[np.ndarray, List[float]],
) -> np.ndarray:
    """
    ahrs_raw_rph computes the roll, pitch and heading from magnetic field
    measurements and accelerometer raw data.

    The computations is based in the following formulas:
    * roll  = np.arctan2(-ay, -az)
    * pitch = np.arctan2(ax, np.sqrt(ay^2 + az^2))
    * heading = np.arctan2(-my, mx)

    Args:
        magnetic_field (Union[np.ndarray, List[float]]): Magnetic field raw data in three
            dimensions. Can be:
            - 1D array or list with 3 elements: [mx, my, mz] (single measurement)
            - 2D array with shape 3xN or Nx3 (multiple measurements)
        accelerometer (Union[np.ndarray, List[float]]): Accelerometer raw data in three
            dimensions. Can be:
            - 1D array or list with 3 elements: [ax, ay, az] (single measurement)
            - 2D array with shape 3xN or Nx3 (multiple measurements)

    Returns:
        rph_rad (np.ndarray): Roll, pitch and heading angles in radians. Shape is (N, 3)
            where N is the number of measurements.

    Raises:
        TypeError: If magnetic_field or accelerometer are not numpy arrays or lists
        ValueError: If magnetic_field or accelerometer don't have the correct shape or dimensions
        ValueError: If magnetic_field and accelerometer don't have the same number of measurements
    """
    # Convert to numpy arrays
    if isinstance(magnetic_field, list):
        magnetic_field = np.array(magnetic_field)
    if isinstance(accelerometer, list):
        accelerometer = np.array(accelerometer)

    # Check inputs
    if not isinstance(magnetic_field, np.ndarray):
        raise TypeError("magnetic_field must be a numpy array or list")
    if not isinstance(accelerometer, np.ndarray):
        raise TypeError("accelerometer must be a numpy array or list")

    # Handle different input shapes for magnetic_field
    if magnetic_field.ndim == 1:
        # Handle flat array of 3 elements
        if magnetic_field.shape[0] != 3:
            raise ValueError(
                "For 1D arrays, magnetic_field must have exactly 3 elements"
            )
        # Reshape to 1x3 (single measurement)
        magnetic_field = magnetic_field.reshape(1, 3)
    elif magnetic_field.ndim == 2:
        # Handle 2D arrays (Nx3 or 3xN)
        if not (magnetic_field.shape[0] == 3 or magnetic_field.shape[1] == 3):
            raise ValueError("For 2D arrays, magnetic_field must be 3xN or Nx3")
        # Check if magnetic_field is a 3xN matrix and transpose to Nx3
        if magnetic_field.shape[0] == 3 and magnetic_field.shape[1] != 3:
            magnetic_field = magnetic_field.T
    else:
        raise ValueError(
            "magnetic_field must be a 1D array (3 elements) or 2D array (3xN or Nx3)"
        )

    # Handle different input shapes for accelerometer
    if accelerometer.ndim == 1:
        # Handle flat array of 3 elements
        if accelerometer.shape[0] != 3:
            raise ValueError(
                "For 1D arrays, accelerometer must have exactly 3 elements"
            )
        # Reshape to 1x3 (single measurement)
        accelerometer = accelerometer.reshape(1, 3)
    elif accelerometer.ndim == 2:
        # Handle 2D arrays (Nx3 or 3xN)
        if not (accelerometer.shape[0] == 3 or accelerometer.shape[1] == 3):
            raise ValueError("For 2D arrays, accelerometer must be 3xN or Nx3")
        # Check if accelerometer is a 3xN matrix and transpose to Nx3
        if accelerometer.shape[0] == 3 and accelerometer.shape[1] != 3:
            accelerometer = accelerometer.T
    else:
        raise ValueError(
            "accelerometer must be a 1D array (3 elements) or 2D array (3xN or Nx3)"
        )

    # Ensure magnetic_field and accelerometer have the same number of measurements
    if magnetic_field.shape[0] != accelerometer.shape[0]:
        raise ValueError(
            "magnetic_field and accelerometer must have the same number of measurements"
        )
    # Roll and Pitch
    roll_pitch = ahrs_raw_rp(accelerometer)

    # Heading
    heading = ahrs_raw_hdg(
        magnetic_field, np.concatenate([roll_pitch, roll_pitch[:, [0]] * 0], axis=1)
    )

    # RPH
    rph = np.concatenate([roll_pitch, heading], axis=1)

    return rph

so3_integrator integrates the final orientation R from a frame that was initially coincident with Rold followed a rotation described by Rdot by a time dt. This computation is based in the exponential coordinate representation of rotations.

The Rodrigues' formula states that R = exp(skew(w)*dt). If we define the world frame w and the vehicle frame v, we have a initiall pose R_{wv} that we will denote as Rold. If we know the rotation w in the frame v and then the skew-symmetric matrix [w] we can compute Rdot as: R_dot = Rold @ [w] and equivalentlly: [w] = Rold.T @ Rdot, both inputs of the function. Then:

        Rnew = Rold @ exp((Rold.T @ Rdot) * dt)

Source: Modern Robotics - Chapter 3.2.3 Exponential Coordinate Representation of Rotation (Linch & Park, 2017)

Note: scipy.linalg.expm compute the matrix exponential using Pade approximation

Parameters:

Name Type Description Default
Rold Union[ndarray, List[float]]

Initial Rotation Matrix

required
Rdot Union[ndarray, List[float]]

Rotation Derivative computed from angular velocity skew-symmetric matrix

required
dt float

Time step between frames

required

Returns:

Name Type Description
Rnew Union[ndarray, List[float]]

New frame rotation matrix

Raises:

Type Description
ValueError

If Rold or Rdot are not numpy arrays or lists

ValueError

If Rold or Rdot are not 3x3 matrices

Source code in navlib/nav/attitude_estimation.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def so3_integrator(
    Rold: Union[np.ndarray, List[float]],
    Rdot: Union[np.ndarray, List[float]],
    dt: float,
) -> np.ndarray:
    """
    so3_integrator integrates the final orientation R from a frame
    that was initially coincident with Rold followed a rotation described
    by Rdot by a time dt. This computation is based in the exponential
    coordinate representation of rotations.

    The Rodrigues' formula states that R = exp(skew(w)*dt). If we define
    the world frame w and the vehicle frame v, we have a initiall pose R_{wv}
    that we will denote as Rold. If we know the rotation w in the frame v and
    then the skew-symmetric matrix [w] we can compute Rdot as: R_dot = Rold @ [w]
    and equivalentlly: [w] = Rold.T @ Rdot, both inputs of the function. Then:

                Rnew = Rold @ exp((Rold.T @ Rdot) * dt)

    Source: Modern Robotics - Chapter 3.2.3 Exponential Coordinate Representation
    of Rotation (Linch & Park, 2017)

    Note: scipy.linalg.expm compute the matrix exponential using Pade approximation

    Args:
        Rold (Union[np.ndarray, List[float]]): Initial Rotation Matrix
        Rdot (Union[np.ndarray, List[float]]): Rotation Derivative computed from
            angular velocity skew-symmetric matrix
        dt (float): Time step between frames

    Returns:
        Rnew (Union[np.ndarray, List[float]]): New frame rotation matrix

    Raises:
        ValueError: If Rold or Rdot are not numpy arrays or lists
        ValueError: If Rold or Rdot are not 3x3 matrices
    """
    # Convert to numpy arrays
    if isinstance(Rold, list):
        Rold = np.array(Rold)
    if isinstance(Rdot, list):
        Rdot = np.array(Rdot)

    # Check inputs
    if not isinstance(Rold, np.ndarray):
        raise TypeError("Rold must be a numpy array or list")
    if not isinstance(Rdot, np.ndarray):
        raise TypeError("Rdot must be a numpy array or list")
    if not isinstance(dt, (float, int)):
        raise TypeError("dt must be a float or integer")
    dt = float(dt)
    if dt <= 0:
        raise ValueError("dt must be a positive value")

    # Check if Rold and Rdot are 3x3 matrices
    if Rold.shape != (3, 3):
        raise ValueError("Rold must be a 3x3 matrix")
    if Rdot.shape != (3, 3):
        raise ValueError("Rdot must be a 3x3 matrix")

    # Convert to numpy array
    Rold = np.asanyarray(Rold)
    Rdot = np.asanyarray(Rdot)

    # Check if Rold and Rdot are 3x3 matrices
    if Rold.shape != (3, 3):
        raise ValueError("Rold must be a 3x3 matrix")
    if Rdot.shape != (3, 3):
        raise ValueError("Rdot must be a 3x3 matrix")

    return np.dot(Rold, scipy.linalg.expm(np.dot(Rold.T, Rdot) * dt))