Skip to content

LCM Logs Parser Module Documentation

This module contains functions to parse LCMLogs to a nav_structtype object.

Functions:

Name Description
dict_to_struct

Convert a dictionary to a nav_structtype object.

struct_to_dict

Convert a nav_structtype object to a dictionary.

filter_channels_from_data

Filter channels from an existing nav_structtype object based on channel patterns.

merge_lcm_logs

Merge multiple LCM log files into a single log file, with optional channel filtering.

read_log

Read a LCMLog file and return a nav_structtype object with optional channel filtering.

read_logs

Read multiple LCMLog files from a directory and return a merged nav_structtype object.

Base class to store data as a structure with numpy arrays.

nav_dict_to_struct takes as input a simple or nested dictionary with data and returns an object of the class nav_structtype with the data

Parameters:

Name Type Description Default
ddata Dict[str, Any]

Dictionary with data

required
verbose bool

If True, print warnings for non-homogeneous arrays. Default is False.

False

Returns:

Name Type Description
nav_structtype nav_structtype

Object with data as a structure with numpy arrays

Source code in navlib/lcmlog/parse_lcmlog.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def dict_to_struct(ddata: Dict[str, Any], verbose: bool = False) -> nav_structtype:
    """
    nav_dict_to_struct takes as input a simple or nested dictionary with
    data and returns an object of the class nav_structtype with the data

    Args:
        ddata (Dict[str, Any]): Dictionary with data
        verbose (bool): If True, print warnings for non-homogeneous arrays. Default is False.

    Returns:
        nav_structtype: Object with data as a structure with numpy arrays
    """
    # Create new object
    sdata = nav_structtype()

    # Iterate each element of the dictionary
    for k, v in ddata.items():
        # Fix dictionary key name (remove '.')
        k = k.replace(".", "_")
        k = k.replace("$", "S")

        # Recursive function to deal with nested dictionaries
        if isinstance(v, dict):
            exec(
                compile(
                    "sdata." + k + " = dict_to_struct(v, verbose=verbose)",
                    "<string>",
                    "exec",
                )
            )
        else:
            try:
                exec(compile("sdata." + k + " = np.array(v)", "<string>", "exec"))
            except ValueError:
                # Non-homogeneous arrays (e.g., variable-length data per message)
                # are kept as Python lists, which is expected behavior
                if verbose:
                    print(
                        f"Info: Field '{k}' contains variable-length data, kept as list instead of numpy array"
                    )
                exec(compile("sdata." + k + " = v", "<string>", "exec"))
    return sdata

Filter channels from an existing nav_structtype object based on channel patterns.

Parameters:

Name Type Description Default
data nav_structtype

Object with data as a structure with numpy arrays

required
channels_to_process str

Regular expression to filter the channels to process. Default is ".*" (all channels).

'.*'
channels_to_ignore str

Regular expression to filter the channels to ignore. Default is "" (no channels ignored).

''

Returns:

Name Type Description
nav_structtype nav_structtype

Filtered object with data as a structure with numpy arrays

Source code in navlib/lcmlog/parse_lcmlog.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def filter_channels_from_data(
    data: nav_structtype, channels_to_process: str = ".*", channels_to_ignore: str = ""
) -> nav_structtype:
    """
    Filter channels from an existing nav_structtype object based on channel patterns.

    Args:
        data (nav_structtype): Object with data as a structure with numpy arrays
        channels_to_process (str): Regular expression to filter the channels to process. Default is ".*" (all channels).
        channels_to_ignore (str): Regular expression to filter the channels to ignore. Default is "" (no channels ignored).

    Returns:
        nav_structtype: Filtered object with data as a structure with numpy arrays
    """
    if channels_to_process == ".*" and channels_to_ignore == "":
        # No filtering needed
        return data

    # Create regular expressions for channel filtering
    channels_to_process_re = re.compile(channels_to_process)
    channels_to_ignore_re = (
        re.compile(channels_to_ignore) if channels_to_ignore else None
    )

    # Create new filtered object
    filtered_data = nav_structtype()

    # Iterate through all attributes (channels) in the data
    for channel_name in data.__dict__.keys():
        # Check if channel should be processed
        if channels_to_process_re.match(channel_name):
            # Check if channel should not be ignored
            if channels_to_ignore_re is None or not channels_to_ignore_re.match(
                channel_name
            ):
                # Keep this channel
                setattr(filtered_data, channel_name, getattr(data, channel_name))

    return filtered_data

Merge multiple LCM log files into a single LCM log file, chronologically sorted by timestamp. Optionally filter channels during the merge to create a lighter output file.

This function reads raw LCM log events from multiple files, filters them by channel, sorts them chronologically, and writes them to a new merged log file. This approach is more efficient than merging decoded data structures.

Parameters:

Name Type Description Default
log_files List[Path]

List of LCM log file paths to merge

required
output_file Path

Output path for the merged LCM log file

required
channels_to_process str

Regular expression to filter the channels to process. Default is ".*" (all channels).

'.*'
channels_to_ignore str

Regular expression to filter the channels to ignore. Default is "" (no channels ignored).

''
verbose bool

Verbose flag. Default is True.

True

Returns:

Name Type Description
None None

Writes merged log to output_file

Source code in navlib/lcmlog/parse_lcmlog.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def merge_lcm_logs(
    log_files: List[Path],
    output_file: Path,
    channels_to_process: str = ".*",
    channels_to_ignore: str = "",
    verbose: bool = True,
) -> None:
    """
    Merge multiple LCM log files into a single LCM log file, chronologically sorted by timestamp.
    Optionally filter channels during the merge to create a lighter output file.

    This function reads raw LCM log events from multiple files, filters them by channel,
    sorts them chronologically, and writes them to a new merged log file. This approach
    is more efficient than merging decoded data structures.

    Args:
        log_files (List[Path]): List of LCM log file paths to merge
        output_file (Path): Output path for the merged LCM log file
        channels_to_process (str): Regular expression to filter the channels to process. Default is ".*" (all channels).
        channels_to_ignore (str): Regular expression to filter the channels to ignore. Default is "" (no channels ignored).
        verbose (bool): Verbose flag. Default is True.

    Returns:
        None: Writes merged log to output_file
    """
    # Create regular expressions for channel filtering
    channels_to_process_re = re.compile(channels_to_process)
    channels_to_ignore_re = (
        re.compile(channels_to_ignore) if channels_to_ignore else None
    )

    # Collect all events from all log files
    all_events = []
    ignored_channels = set()

    if verbose:
        print("Reading and filtering events from log files...")

    for log_file in log_files:
        print(f"  Reading {log_file.name}...")

        try:
            log = EventLog(str(log_file), "r")

            for event in log:
                # Check if channel should be ignored
                if event.channel in ignored_channels:
                    continue

                # Apply channel filtering
                if channels_to_ignore_re and channels_to_ignore_re.match(event.channel):
                    if verbose and event.channel not in ignored_channels:
                        print(f"    Ignoring channel: {event.channel}")
                    ignored_channels.add(event.channel)
                    continue

                if not channels_to_process_re.match(event.channel):
                    if verbose and event.channel not in ignored_channels:
                        print(f"    Skipping channel: {event.channel}")
                    ignored_channels.add(event.channel)
                    continue

                # Store event for merging
                all_events.append(event)

        except Exception as e:
            print(f"  Error reading {log_file.name}: {e}")
            continue

    if not all_events:
        raise ValueError("No events found in any log files after filtering")

    # Sort all events by timestamp
    if verbose:
        print(f"Sorting {len(all_events)} events chronologically...")

    all_events.sort(key=lambda e: e.timestamp)

    # Write merged events to output file
    if verbose:
        print(f"Writing merged log to {output_file}...")

    try:
        output_log = EventLog(str(output_file), "w")

        for event in all_events:
            output_log.write_event(event.timestamp, event.channel, event.data)

        if verbose:
            print(f"Successfully wrote {len(all_events)} events to merged log")

    except Exception as e:
        raise RuntimeError(f"Error writing merged log file: {e}")
    finally:
        if "output_log" in locals():
            del output_log

nav_read_log gets a file name and returns the data as a nav_structtype object. If the file_name does not correspond to a .pkl file the transforms a lcm_log to a pickle file.

Parameters:

Name Type Description Default
file_name str

File name of the log file

required
verbose bool

Verbose flag. Default is True.

True
channels_to_process str

Regular expression to filter the channels to process. Default is ".*" (all channels).

'.*'
channels_to_ignore str

Regular expression to filter the channels to ignore. Default is "" (no channels ignored).

''
lcm_packages List[str]

List of directories to search for LCM types in addition to the directories in the python path.

[]

Returns:

Name Type Description
nav_structtype nav_structtype

Object with data as a structure with numpy arrays

Source code in navlib/lcmlog/parse_lcmlog.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
def read_log(
    file_name: str,
    verbose=True,
    channels_to_process: str = ".*",
    channels_to_ignore: str = "",
    lcm_packages: List[str] = [],
) -> nav_structtype:
    """
    nav_read_log gets a file name and returns the data as a nav_structtype
    object.
    If the file_name does not correspond to a .pkl file the transforms a lcm_log
    to a pickle file.

    Args:
        file_name (str): File name of the log file
        verbose (bool): Verbose flag. Default is True.
        channels_to_process (str): Regular expression to filter the channels to process. Default is ".*" (all channels).
        channels_to_ignore (str): Regular expression to filter the channels to ignore. Default is "" (no channels ignored).
        lcm_packages (List[str]): List of directories to search for LCM types in addition to the directories in the
            python path.

    Returns:
        nav_structtype: Object with data as a structure with numpy arrays
    """
    fname = Path(file_name)
    parent, stem = Path(fname.parent), Path(fname.stem)
    pkl_file_exists = os.path.isfile(str(parent / stem) + ".pkl")

    # Check if file is not already parsed to a pickle file
    if not pkl_file_exists:
        # If it is not a file, replace "." and "-" by "_"
        stem_alt = str(stem).replace(".", "_").replace("-", "_")
        try:
            if not os.path.isfile("/".join([str(parent), ".".join([stem_alt, "pkl"])])):
                if verbose:
                    print("Pickle file not found, looking for lcmlog with same stem")
                parse_and_save(
                    str(fname),
                    output_file_name=str(parent / stem) + ".pkl",
                    channels_to_process=channels_to_process,
                    channels_to_ignore=channels_to_ignore,
                    verbose=verbose,
                    lcm_packages=lcm_packages,
                )
                if verbose:
                    print(f"LCMLogs {str(fname)} parsed to .pkl format")
                fname_alt = str(parent / stem)
        except FileNotFoundError:
            print(f"{fname} not found neither as lcmlog nor as .pkl file.")
            return
    else:
        fname_alt = str(parent / stem)

    # Load Pickle file
    print(f"Loading pickle logfile:  {fname_alt}.pkl")
    pkl_file = open(fname_alt + ".pkl", "rb")
    ddata = pickle.load(pkl_file)

    # Convert dictionary to a structure with numpy arrays
    data = dict_to_struct(ddata, verbose=False)

    # Apply channel filtering if loading from existing pickle file
    # (when parsing from LCM, filtering is already applied in parse_and_save)
    if pkl_file_exists and (channels_to_process != ".*" or channels_to_ignore != ""):
        if verbose:
            print("Applying channel filtering to loaded data")
        data = filter_channels_from_data(data, channels_to_process, channels_to_ignore)

    if verbose:
        print("LCMLog successfully transformed to nav_structtype object")
    return data

Read multiple LCM log files from a directory, merge them chronologically, and return a combined nav_structtype object.

By default, this function looks for LCM log files using the pattern "lcmlog.*" which matches the common naming convention: lcmlog.00, lcmlog.01, lcmlog.02, etc.

The chronological merging is performed at the raw LCM log level by sorting events by their timestamps, then parsing the merged log once. This approach is more efficient and robust than merging already-decoded data structures. Channel filtering is applied during the merge to reduce the size of intermediate files and speed up parsing.

Parameters:

Name Type Description Default
directory str

Directory path containing LCM log files

required
file_pattern str

File pattern to match. Default is "lcmlog." for standard LCM log naming (lcmlog.00, lcmlog.01, etc.). Other examples: ".lcm", "sensor_*.lcm".

'lcmlog.*'
ignore_pattern str

Regular expression pattern for files to ignore. Default is "" (ignore none).

''
verbose bool

Verbose flag. Default is False.

False
channels_to_process str

Regular expression to filter the channels to process. Default is ".*" (all channels).

'.*'
channels_to_ignore str

Regular expression to filter the channels to ignore. Default is "" (no channels ignored).

''
lcm_packages List[str]

List of directories to search for LCM types in addition to the directories in the python path.

[]
output_file_name str

Optional output file name for the merged pickle file. If not provided, a default name will be generated based on the directory name.

''

Returns:

Name Type Description
nav_structtype nav_structtype

Combined object with data from all logs as a structure with numpy arrays

Examples:

>>> # Read standard LCM log sequence (lcmlog.00, lcmlog.01, lcmlog.02, etc.)
>>> data = read_logs("/path/to/logs")
>>> # Read all .lcm files in a directory
>>> data = read_logs("/path/to/logs", file_pattern="*.lcm")
>>> # Read specific numbered LCM log sequence
>>> data = read_logs("/path/to/logs", file_pattern="lcmlog.0[0-5]")
>>> # Read files and ignore some
>>> data = read_logs("/path/to/logs",
                ignore_pattern=".*debug.*")
>>> # Filter channels during merge
>>> data = read_logs("/path/to/logs",
                channels_to_process=".*IMU.*",
                channels_to_ignore=".*DEBUG.*")
Source code in navlib/lcmlog/parse_lcmlog.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def read_logs(
    directory: str,
    file_pattern: str = "lcmlog.*",
    ignore_pattern: str = "",
    verbose: bool = False,
    channels_to_process: str = ".*",
    channels_to_ignore: str = "",
    lcm_packages: List[str] = [],
    output_file_name: str = "",
) -> nav_structtype:
    """
    Read multiple LCM log files from a directory, merge them chronologically,
    and return a combined nav_structtype object.

    By default, this function looks for LCM log files using the pattern "lcmlog.*"
    which matches the common naming convention: lcmlog.00, lcmlog.01, lcmlog.02, etc.

    The chronological merging is performed at the raw LCM log level by sorting events
    by their timestamps, then parsing the merged log once. This approach is more efficient
    and robust than merging already-decoded data structures. Channel filtering is applied
    during the merge to reduce the size of intermediate files and speed up parsing.

    Args:
        directory (str): Directory path containing LCM log files
        file_pattern (str): File pattern to match. Default is "lcmlog.*" for standard
            LCM log naming (lcmlog.00, lcmlog.01, etc.). Other examples: "*.lcm", "sensor_*.lcm".
        ignore_pattern (str): Regular expression pattern for files to ignore. Default is "" (ignore none).
        verbose (bool): Verbose flag. Default is False.
        channels_to_process (str): Regular expression to filter the channels to process. Default is ".*" (all channels).
        channels_to_ignore (str): Regular expression to filter the channels to ignore. Default is "" (no channels ignored).
        lcm_packages (List[str]): List of directories to search for LCM types in addition to the directories in the
            python path.
        output_file_name (str): Optional output file name for the merged pickle file. If not provided,
            a default name will be generated based on the directory name.

    Returns:
        nav_structtype: Combined object with data from all logs as a structure with numpy arrays

    Examples:
        >>> # Read standard LCM log sequence (lcmlog.00, lcmlog.01, lcmlog.02, etc.)
        >>> data = read_logs("/path/to/logs")

        >>> # Read all .lcm files in a directory
        >>> data = read_logs("/path/to/logs", file_pattern="*.lcm")

        >>> # Read specific numbered LCM log sequence
        >>> data = read_logs("/path/to/logs", file_pattern="lcmlog.0[0-5]")

        >>> # Read files and ignore some
        >>> data = read_logs("/path/to/logs",
                        ignore_pattern=".*debug.*")

        >>> # Filter channels during merge
        >>> data = read_logs("/path/to/logs",
                        channels_to_process=".*IMU.*",
                        channels_to_ignore=".*DEBUG.*")
    """
    directory_path = Path(directory)
    if not directory_path.exists():
        raise FileNotFoundError(f"Directory {directory} does not exist")

    if not directory_path.is_dir():
        raise NotADirectoryError(f"{directory} is not a directory")

    # Find all matching log files
    log_files = list(directory_path.glob(file_pattern))

    # Filter out ignored files
    if ignore_pattern:
        ignore_re = re.compile(ignore_pattern)
        log_files = [f for f in log_files if not ignore_re.match(f.name)]

    if not log_files:
        raise FileNotFoundError(
            f"No log files found matching pattern '{file_pattern}' in {directory}"
        )

    # Sort files by name for consistent processing order
    log_files.sort()

    print(f"Found {len(log_files)} log files to process:")
    for log_file in log_files:
        print(f"  - {log_file.name}")

    # Determine output file paths
    if output_file_name:
        merged_pkl_path = Path(output_file_name)
    else:
        merged_pkl_path = directory_path / f"{directory_path.name}_merged.pkl"

    # If merged pickle exists and no filtering is requested, load it directly
    if (
        merged_pkl_path.exists()
        and channels_to_process == ".*"
        and channels_to_ignore == ""
    ):
        if verbose:
            print(f"Loading existing merged pickle file: {merged_pkl_path}")

        with open(merged_pkl_path, "rb") as f:
            merged_data_dict = pickle.load(f)

        # Convert to nav_structtype and return
        return dict_to_struct(merged_data_dict, verbose=False)

    # Special case: if only one log file, use it directly without merging
    if len(log_files) == 1:
        if verbose:
            print("Single log file detected, parsing directly...")

        merged_data_dict = lcmlog_to_dict(
            str(log_files[0]),
            channels_to_process=channels_to_process,
            channels_to_ignore=channels_to_ignore,
            verbose=verbose,
            lcm_packages=lcm_packages,
        )
    else:
        # Create merged LCM log file path
        merged_lcm_path = directory_path / f"{directory_path.name}_merged.lcm"

        # Merge LCM log files at the raw binary level with channel filtering
        if verbose:
            print(f"Merging {len(log_files)} log files...")

        merge_lcm_logs(
            log_files,
            merged_lcm_path,
            channels_to_process=channels_to_process,
            channels_to_ignore=channels_to_ignore,
            verbose=verbose,
        )

        # Parse the merged LCM log file once
        if verbose:
            print("Parsing merged log file...")

        merged_data_dict = lcmlog_to_dict(
            str(merged_lcm_path),
            channels_to_process=".*",  # Already filtered during merge
            channels_to_ignore="",
            verbose=verbose,
            lcm_packages=lcm_packages,
        )

        # Clean up temporary merged LCM log file
        if merged_lcm_path.exists():
            merged_lcm_path.unlink()
            if verbose:
                print("Cleaned up temporary merged log file")

    # Save merged data to pickle file
    if verbose:
        print(f"Saving merged data to: {merged_pkl_path}")

    with open(merged_pkl_path, "wb") as f:
        pickle.dump(merged_data_dict, f, -1)

    # Convert merged dictionary to nav_structtype
    merged_data = dict_to_struct(merged_data_dict, verbose=False)

    if verbose:
        channels = [attr for attr in dir(merged_data) if not attr.startswith("_")]
        print(f"Successfully processed {len(log_files)} log files")
        print(f"Found {len(channels)} channels in merged data")

    return merged_data

Convert a nav_structtype object to a dictionary.

Parameters:

Name Type Description Default
sdata nav_structtype

Object with data as a structure with numpy arrays

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary with data

Source code in navlib/lcmlog/parse_lcmlog.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def struct_to_dict(sdata: nav_structtype) -> Dict[str, Any]:
    """
    Convert a nav_structtype object to a dictionary.

    Args:
        sdata (nav_structtype): Object with data as a structure with numpy arrays

    Returns:
        Dict[str, Any]: Dictionary with data
    """
    result = {}

    for key, value in sdata.__dict__.items():
        if isinstance(value, nav_structtype):
            # Recursively convert nested nav_structtype objects
            result[key] = struct_to_dict(value)
        else:
            # Keep the value as is (including numpy arrays)
            result[key] = value

    return result