838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039 | def magyc_ifg(
magnetic_field: Union[np.ndarray, list],
angular_rate: Union[np.ndarray, list],
time: Union[np.ndarray, list],
measurements_window: int = 25
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict[str, np.ndarray]]:
"""
Proposed method for the full calibration of a three-axis magnetometer
using magnetic field and angular rate measurements. This particular approach
is based on a factor graph processing all the data in an incremental manner.
In particular MAGYC-IFG embeds the volume constraint for the soft-iron into
a reparametrization for the Cholesky decomposition of the soft-iron matrix,
allowing for the use of half the factors.
Args:
magnetic_field (numpy.ndarray or list): Magnetic field measurements in a
3xN or Nx3 numpy array or list.
angular_rate (numpy.ndarray or list): Angular rate measurements in a 3xN or
Nx3 numpy array or list.
time (numpy.ndarray or list): Time stamps of the measurements.
measurements_window (int): Window size for the measurements.
Returns:
hard_iron (numpy.ndarray): Hard iron bias.
soft_iron (numpy.ndarray): Soft iron matrix.
gyro_bias (numpy.ndarray): Gyroscope bias.
calibrated_magnetic_field (numpy.ndarray): Calibrated magnetic field measurements.
calibrated_angular_rate (numpy.ndarray): Calibrated angular rate measurements.
optimization_status (Dict[str, np.ndarray]): Dictionary with the SI, HI
and Wb for each iteartions. The keys are: "soft_iron", "hard_iron",
"gyro_bias".
Raises:
TypeError: If the magnetic field, angular rate, and time are not numpy arrays or lists.
ValueError: If the magnetic field and angular rate are not 3xN or Nx3 numpy
arrays, or if the time is not a 1D numpy array.
ValueError: If the measurements window is not a positive integer
"""
# Check if the magnetic_field, angular_rate, and time are lists and convert them to numpy arrays
if isinstance(magnetic_field, list):
magnetic_field = np.array(magnetic_field)
if isinstance(angular_rate, list):
angular_rate = np.array(angular_rate)
if isinstance(time, list):
time = np.array(time)
# Check if the magnetic_field, angular_rate, and time are numpy arrays
if not isinstance(magnetic_field, np.ndarray):
raise TypeError("The magnetic field must be a numpy array or a list.")
if not isinstance(angular_rate, np.ndarray):
raise TypeError("The angular rate must be a numpy array or a list.")
if not isinstance(time, np.ndarray):
raise TypeError("The time must be a numpy array or a list.")
# Check if the magnetic_field and angular_rate are 3xN or Nx3 numpy arrays
if magnetic_field.ndim != 2 or (magnetic_field.shape[0] != 3 and magnetic_field.shape[1] != 3):
raise ValueError("The magnetic field must be a 3xN or Nx3 numpy array.")
if angular_rate.ndim != 2 or (angular_rate.shape[0] != 3 and angular_rate.shape[1] != 3):
raise ValueError("The angular rate must be a 3xN or Nx3 numpy array.")
# Check if the time is a 1D numpy array
time = time.flatten()
if time.ndim != 1:
raise ValueError("The time must be a (n, ), (n, 1) or (1, n) numpy array.")
# Force the magnetic_field and angular_rate to be Nx3 numpy arrays
if magnetic_field.shape[0] == 3:
magnetic_field = magnetic_field.T
if angular_rate.shape[0] == 3:
angular_rate = angular_rate.T
# Check if the magnetic_field, angular_rate, and time have the same number of samples
if magnetic_field.shape[0] != angular_rate.shape[0] or magnetic_field.shape[0] != time.shape[0]:
raise ValueError("The magnetic field, angular rate, and time must have the same number of samples.")
# Check that the measurements window is a positive integer
if not isinstance(measurements_window, int) or measurements_window <= 0:
raise ValueError("The measurements window must be a positive integer.")
# Compute the magnetic calibration
# Smoothing and Mapping Factor Graph
# 1. Create the non-linear graph
graph = gtsam.NonlinearFactorGraph()
# 2. Set iSAM2 parameters and create iSAM2 object
isam_parameters = gtsam.ISAM2Params()
dogleg_parameters = gtsam.ISAM2DoglegParams()
dogleg_parameters.setInitialDelta(0.5)
dogleg_parameters.setAdaptationMode("ONE_STEP_PER_ITERATION")
isam_parameters.setOptimizationParams(dogleg_parameters)
isam = gtsam.ISAM2(isam_parameters)
# 3. noise model for each factor.
residual_noise = gtsam.noiseModel.Isotropic.Sigma(3, 1e-6)
# 4. Creates values structure with initial values
initial = gtsam.Values()
initial.insert(S(0), np.array([0.0, 0.0, 0.0, 0.0, 0.0]))
initial.insert(B(0), gtsam.Point3(0, 0, 0))
initial.insert(W(0), gtsam.Point3(0, 0, 0))
keys = [S(0), B(0), W(0)]
# Dictionary to save the progress of parameters during optimization
optimization_status = {"S": [], "B": [], "W": [], "T": []}
# 5. Add factor for each measurement accumulates in the measurements window into a single node
measurements_window = int(measurements_window)
m_window = np.empty((measurements_window, 3))
w_window = np.empty((measurements_window, 3))
t_window = np.empty((measurements_window, ))
# 6. Add factors to the graph
for i in range(magnetic_field.shape[0]):
# Get sensor measurements and estimated magnetic field derivative
t_window[i % measurements_window] = time[i]
m_window[i % measurements_window, :] = magnetic_field[i, :]
w_window[i % measurements_window, :] = angular_rate[i, :]
if (i % measurements_window == 0 and i != 0):
# Compute the derivative of the magnetic field for the window
m_dot_window = np.diff(m_window, axis=0) / np.diff(t_window).reshape(-1, 1)
# Average measurements by the measurements window size.
m_dot_meadian = np.median(m_dot_window, axis=0).reshape(3, 1)
m_median = np.median(m_window, axis=0).reshape(3, 1)
w_median = np.median(w_window, axis=0).reshape(3, 1)
# 6.1 Residual factor
rf = gtsam.CustomFactor(residual_noise, keys, partial(_residual_factor, m_dot_meadian, m_median, w_median))
graph.push_back(rf)
# 6.2 Perform incremental update to iSAM2's internal Bayes tree, optimizing only the affected variables.
# Set iterations to start optimization, otherwise the optimizations starts as a ill-posed problem.
# TODO: Fix this
# try:
if (i // measurements_window) % 10 == 0:
isam.update(graph, initial)
current = isam.calculateEstimate()
# Save the current parameters
for key, variable in zip([S(0), B(0), W(0)], "SBW"):
vector = current.atVector(key).reshape(1, -1)
optimization_status[variable].append(vector)
# Save the time as a unix timestamp in microseconds
optimization_status["T"].append(int(datetime.now().timestamp() * 1e6))
# except RuntimeError:
# warnings.warn("Skipping graph optimization due to indetermined system.")
# finally:
graph = gtsam.NonlinearFactorGraph()
initial = gtsam.Values()
# 6.5 Reset the measurements window
t_window = np.empty((measurements_window, ))
m_window = np.empty((measurements_window, 3))
w_window = np.empty((measurements_window, 3))
# 7. Process Results
# Update optimization status to have the actual matrices instead of the keys
optimization_steps = len(optimization_status["S"])
optimization_status_final = {
"soft_iron": np.empty((optimization_steps, 9)),
"hard_iron": np.empty((optimization_steps, 3)),
"gyro_bias": np.empty((optimization_steps, 3)),
"time": np.empty((optimization_steps, ))
}
for i in range(optimization_steps):
# Get parameters
l_params = optimization_status["S"][i].flatten()
b = optimization_status["B"][i]
d = optimization_status["W"][i]
# Compute soft-iron, hard-iron and gyroscope bias
lower_triangular_matrix = np.array([[exp(l_params[0]), 0, 0],
[l_params[1], exp(l_params[2]), 0],
[l_params[3], l_params[4], 1 / exp(l_params[0] + l_params[2])]])
soft_iron_i = np.linalg.inv(lower_triangular_matrix @ lower_triangular_matrix.T)
hard_iron_i = soft_iron_i @ b.reshape(3, 1)
gyro_bias_i = d.reshape(3, 1)
# Fill the new optimization status dictionary
optimization_status_final["soft_iron"][i, :] = soft_iron_i.flatten()
optimization_status_final["hard_iron"][i, :] = hard_iron_i.flatten()
optimization_status_final["gyro_bias"][i, :] = gyro_bias_i.flatten()
optimization_status_final["time"][i] = optimization_status["T"][i]
# Average the last 20% of the optimization steps to get the final calibration
optimization_steps = int(0.2 * optimization_steps)
soft_iron = np.mean(optimization_status_final["soft_iron"][-optimization_steps:], axis=0).reshape(3, 3)
hard_iron = np.mean(optimization_status_final["hard_iron"][-optimization_steps:], axis=0)
gyro_bias = np.mean(optimization_status_final["gyro_bias"][-optimization_steps:], axis=0)
# Calibrate magnetic field
calibrated_magnetic_field = (np.linalg.inv(soft_iron) @ (magnetic_field.reshape(3, -1) - hard_iron.reshape(3, 1))).T
# Calibrated gyroscope measurements
calibrated_angular_rate = angular_rate - gyro_bias.flatten()
return (hard_iron.flatten(), soft_iron, gyro_bias.flatten(), calibrated_magnetic_field, calibrated_angular_rate,
optimization_status_final)
|