Coverage for lynceus/utils/__init__.py: 96%
103 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-29 08:46 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-29 08:46 +0000
1import re
2import time
3import timeit
4from collections.abc import Iterable
5from contextlib import contextmanager
6from datetime import datetime, timezone
7from logging import Logger
8from pathlib import Path
9from string import ascii_letters, digits
10from typing import Any, Generator
12from setuptools import find_packages
14# Default allowed characters when cleansing string values (e.g. activity/topic name).
15from lynceus.core.config import DATETIME_FORMAT, DATETIME_FORMAT_SHORT
17ALLOWED_CHARACTERS = ascii_letters + digits + "ÀÂÄÆÇÈÉÊËÎÏÔŒÙÛÜàâäæçèéêëîïôœùûü"
20def cleansed_str_value(
21 value: str,
22 *,
23 to_lower_case: bool = True,
24 replacement_character: str = "_",
25 allowed_characters: str = ALLOWED_CHARACTERS,
26):
27 """
28 Clean and sanitize a string value by replacing disallowed characters.
30 Processes a string to ensure it contains only allowed characters,
31 replacing any disallowed characters with a specified replacement.
32 Commonly used for sanitizing activity/topic names and user input.
34 Parameters
35 ----------
36 value : str
37 The string value to cleanse.
38 to_lower_case : bool, optional
39 Whether to convert to lowercase. Defaults to True.
40 replacement_character : str, optional
41 Character to replace disallowed chars. Defaults to '_'.
42 allowed_characters : str, optional
43 Set of allowed characters. Defaults to ALLOWED_CHARACTERS
44 (ASCII letters, digits, and common accented characters).
46 Returns
47 -------
48 str
49 The cleansed string with disallowed characters replaced.
51 Examples
52 --------
53 >>> cleansed_str_value('Hello World!')
54 'hello_world_'
55 >>> cleansed_str_value('Test@123', replacement_character='-')
56 'test-123'
57 """
58 if to_lower_case:
59 value = value.lower()
60 return "".join(
61 char if char in allowed_characters else replacement_character
62 for char in value.strip()
63 )
66def concatenate_string_with_limit(
67 begin: str, extra: str, *, limit: int, truncate_begin: bool = True
68):
69 """
70 Concatenate two strings while respecting a total length limit.
72 Combines two strings ensuring the result doesn't exceed the specified limit.
73 When truncation is needed, either the beginning or end of the extra string
74 is preserved based on the truncate_begin parameter.
76 Parameters
77 ----------
78 begin : str
79 The beginning string that takes priority.
80 extra : str
81 The additional string to append.
82 limit : int
83 Maximum total length of the result.
84 truncate_begin : bool, optional
85 If True, truncate from the beginning of extra string
86 when it's too long. If False, truncate from the end. Defaults to True.
88 Returns
89 -------
90 str
91 The concatenated string, truncated if necessary to fit the limit.
93 Examples
94 --------
95 >>> concatenate_string_with_limit('Hello', ' World!', limit=10)
96 'Hello Wor!'
97 >>> concatenate_string_with_limit('Hello', ' World!', limit=10, truncate_begin=False)
98 'Hello Worl'
99 """
100 if len(begin) >= limit:
101 return begin[:limit]
103 remaining_limit: int = limit - len(begin)
104 kept_extra: str = extra
105 if len(extra) > remaining_limit:
106 if truncate_begin:
107 kept_extra = extra[-remaining_limit:]
108 else:
109 kept_extra = extra[0:remaining_limit]
111 return begin + kept_extra
114def exec_and_return_time(func, /, *args) -> float:
115 """
116 Measure and return the execution time of a function call.
118 Executes the specified function with given arguments and returns the time
119 taken for execution. Uses timeit for accurate timing measurements.
121 Parameters
122 ----------
123 func : callable
124 The function to execute and time.
125 *args
126 Arguments to pass to the function.
128 Returns
129 -------
130 float
131 Execution time in seconds.
133 Notes
134 -----
135 - Does NOT support async functions
136 - Uses timeit.timeit() with number=1 for accurate measurement
137 - Implementation can be easily changed for different timing strategies
139 Examples
140 --------
141 >>> def slow_function(n):
142 ... return sum(range(n))
143 >>> execution_time = exec_and_return_time(slow_function, 1000)
144 >>> print(f'Function took {execution_time:.3f} seconds')
145 """
146 return timeit.timeit(lambda: func(*args), number=1)
149@contextmanager
150def time_catcher() -> Generator[str, Any, None]:
151 """
152 Context manager for measuring execution time of code blocks.
154 Captures both CPU time and elapsed (wall clock) time for performance analysis.
155 Returns a lambda function that formats the timing information as a string.
157 Yields
158 ------
159 callable
160 A lambda function that returns a formatted string with timing information.
162 Examples
163 --------
164 >>> with time_catcher() as timer:
165 ... # Some time-consuming operation
166 ... result = heavy_computation()
167 >>> print(f'Operation took: {timer()}')
168 'Operation took: 0.123 CPU seconds, 0.456 elapsed seconds'
169 """
170 start = time.time()
171 cpu_start = time.process_time()
172 yield lambda: f"{(time.process_time() - cpu_start):0.03f} CPU seconds, {(time.time() - start):0.03f} elapsed seconds"
175def parse_string_to_datetime(
176 datetime_str: str,
177 *,
178 datetime_format: str = DATETIME_FORMAT,
179 datetime_format_short: str = DATETIME_FORMAT_SHORT,
180 override_timezone: timezone | None = timezone.utc,
181) -> datetime:
182 """
183 Parse a datetime string into a datetime object with flexible format support.
185 Attempts to parse the string using the primary format first, then falls back
186 to the short format if parsing fails. Optionally applies a timezone override.
188 Parameters
189 ----------
190 datetime_str : str
191 The datetime string to parse.
192 datetime_format : str, optional
193 Primary datetime format. Defaults to DATETIME_FORMAT.
194 datetime_format_short : str, optional
195 Fallback datetime format. Defaults to DATETIME_FORMAT_SHORT.
196 override_timezone : timezone | None, optional
197 Timezone to apply to the parsed datetime.
198 Defaults to timezone.utc.
200 Returns
201 -------
202 datetime
203 The parsed datetime object with timezone applied if specified.
205 Raises
206 ------
207 ValueError
208 If the string cannot be parsed with either format.
210 Examples
211 --------
212 >>> parse_string_to_datetime('2023-12-25 15:30:45')
213 datetime(2023, 12, 25, 15, 30, 45, tzinfo=timezone.utc)
214 """
215 try:
216 final_datetime = datetime.strptime(datetime_str, datetime_format)
217 except ValueError:
218 final_datetime = datetime.strptime(datetime_str, datetime_format_short)
220 if override_timezone:
221 final_datetime = final_datetime.replace(tzinfo=override_timezone)
223 return final_datetime
226def format_exception_human_readable(
227 exc: Exception, *, quote_message: bool = False
228) -> str:
229 """
230 Format an exception as a human-readable string.
232 Creates a standardized string representation of an exception including
233 the exception class name and message, with optional message quoting.
235 Parameters
236 ----------
237 exc : Exception
238 The exception to format.
239 quote_message : bool, optional
240 Whether to wrap the message in quotes. Defaults to False.
242 Returns
243 -------
244 str
245 A formatted string in the format 'ExceptionName: message' or 'ExceptionName: "message"'.
247 Examples
248 --------
249 >>> try:
250 ... raise ValueError('Invalid input')
251 ... except Exception as e:
252 ... print(format_exception_human_readable(e))
253 'ValueError: Invalid input'
254 >>> print(format_exception_human_readable(e, quote_message=True))
255 'ValueError: "Invalid input"'
256 """
257 result_begin: str = f"{exc.__class__.__name__}: "
258 exc_msg: str = str(exc)
260 return result_begin + (f'"{exc_msg}"' if quote_message else exc_msg)
263def lookup_root_path(
264 path_to_search_string: Path | str,
265 remaining_iteration: int = 3,
266 root_path: Path = Path().resolve(),
267) -> Path:
268 """
269 Search for a path by traversing up the directory hierarchy.
271 Recursively searches for the specified path starting from a root directory
272 and moving up through parent directories for a limited number of iterations.
273 Useful for finding project root directories or configuration files.
275 Parameters
276 ----------
277 path_to_search_string : Path | str
278 The relative path or file to search for.
279 remaining_iteration : int, optional
280 Maximum number of parent directories
281 to check. Defaults to 3.
282 root_path : Path, optional
283 Starting directory for the search.
284 Defaults to current working directory.
286 Returns
287 -------
288 Path
289 The root directory containing the specified path. To get the full
290 path to the target, concatenate this result with path_to_search_string.
292 Raises
293 ------
294 FileNotFoundError
295 If the path is not found after exhausting all iterations.
297 Examples
298 --------
299 >>> # Search for 'src/main.py' starting from current directory
300 >>> root = lookup_root_path('src/main.py')
301 >>> full_path = root / 'src/main.py'
303 >>> # Search for config file in parent directories
304 >>> config_root = lookup_root_path('config.ini', remaining_iteration=5)
305 """
306 full_path: Path = root_path / Path(path_to_search_string)
307 if full_path.exists():
308 return root_path
310 if not remaining_iteration:
311 raise FileNotFoundError(
312 f'Unable to find root_path of specified "{path_to_search_string}" path, after several iteration (last check in "{root_path}" directory).'
313 )
315 return lookup_root_path(
316 path_to_search_string, remaining_iteration - 1, root_path.parent
317 )
320def lookup_files_from_pattern(
321 root_path: Path,
322 pattern: str,
323 *,
324 min_file_size: float = None,
325 case_insensitive: bool = True,
326 logger: Logger = None,
327):
328 """
329 Find files matching a glob pattern with optional filtering.
331 Searches for files matching the specified pattern, with options for
332 case-insensitive matching and minimum file size filtering.
334 Parameters
335 ----------
336 root_path : Path
337 The root directory to search in.
338 pattern : str
339 Glob pattern to match files against (e.g., '*.txt', '**/*.py').
340 min_file_size : float, optional
341 Minimum file size in bytes. Files smaller
342 than this will be excluded. Defaults to None (no size filter).
343 case_insensitive : bool, optional
344 Whether to perform case-insensitive matching.
345 Defaults to True.
346 logger : Logger, optional
347 Logger for debug information. Defaults to None.
349 Returns
350 -------
351 list[Path]
352 List of Path objects for files matching the criteria.
354 Notes
355 -----
356 Case-insensitive matching is implemented by expanding each alphabetic character
357 in the pattern to [lower][upper] character classes.
359 Examples
360 --------
361 >>> files = lookup_files_from_pattern(Path('/project'), '*.py')
362 >>> large_files = lookup_files_from_pattern(
363 ... Path('/data'), '*.log', min_file_size=1024
364 ... )
365 """
366 if case_insensitive:
367 # Enhances the pattern to be case-insentitive.
368 pattern = "".join(
369 map(lambda c: f"[{c.lower()}{c.upper()}]" if c.isalpha() else c, pattern)
370 )
372 # Uses globbing in any case, according to the way pattern may have been enhanced to manage case.
373 existing_files_list = list(root_path.glob(pattern))
375 # Checks file size if needed.
376 if min_file_size is not None:
377 # Keeps only file whose size is greater of equal to specified size.
378 existing_files_list = list(
379 filter(
380 lambda file: file.stat().st_size >= min_file_size, existing_files_list
381 )
382 )
384 if logger:
385 logger.debug(
386 f'After "file_size>={min_file_size} Bytes" filter, this is the list of files matching pattern "{pattern}": {existing_files_list=}'
387 )
389 return existing_files_list
392def check_file_exist_from_pattern(
393 root_path: Path,
394 pattern: str,
395 *,
396 min_file_size: float | None = None,
397 case_insensitive: bool = True,
398 logger: Logger | None = None,
399):
400 """
401 Check if any files exist matching the specified pattern and criteria.
403 A convenience function that uses lookup_files_from_pattern() to determine
404 if at least one file matches the given pattern and optional size constraint.
406 Parameters
407 ----------
408 root_path : Path
409 The root directory to search in.
410 pattern : str
411 Glob pattern to match files against.
412 min_file_size : float | None, optional
413 Minimum file size in bytes. Defaults to None.
414 case_insensitive : bool, optional
415 Whether to perform case-insensitive matching.
416 Defaults to True.
417 logger : Logger | None, optional
418 Logger for debug information. Defaults to None.
420 Returns
421 -------
422 bool
423 True if at least one file matches the criteria, False otherwise.
425 Examples
426 --------
427 >>> has_python_files = check_file_exist_from_pattern(Path('/project'), '*.py')
428 >>> has_large_logs = check_file_exist_from_pattern(
429 ... Path('/logs'), '*.log', min_file_size=1024
430 ... )
431 """
432 return (
433 len(
434 lookup_files_from_pattern(
435 root_path,
436 pattern,
437 min_file_size=min_file_size,
438 case_insensitive=case_insensitive,
439 logger=logger,
440 )
441 )
442 > 0
443 )
446def lookup_available_packages(
447 root_dir: Path | str, *, keep_children_packages: bool = False
448) -> set[str]:
449 """
450 Discover Python packages in a directory with optional filtering.
452 Uses setuptools.find_packages() to discover packages and optionally filters
453 out child packages to return only top-level packages.
455 Parameters
456 ----------
457 root_dir : Path | str
458 The root directory to search for packages.
459 keep_children_packages : bool, optional
460 Whether to include child packages
461 (e.g., 'package.subpackage'). If False, only top-level packages are returned.
462 Defaults to False.
464 Returns
465 -------
466 set[str]
467 Set of package names. If keep_children_packages is False,
468 only top-level packages are included.
470 Examples
471 --------
472 >>> packages = lookup_available_packages('/project')
473 >>> # Returns {'mypackage', 'tests'} instead of
474 >>> # {'mypackage', 'mypackage.utils', 'mypackage.core', 'tests'}
476 >>> all_packages = lookup_available_packages('/project', keep_children_packages=True)
477 >>> # Returns {'mypackage', 'mypackage.utils', 'mypackage.core', 'tests'}
478 """
479 packages: list[str | bytes] = find_packages(root_dir)
480 packages: set[str] = set(packages)
481 if keep_children_packages:
482 return packages
484 # Removes all packages children.
485 something_change: bool = True
486 filtered_packages = packages
487 while something_change:
488 merged_children = set()
489 for element in filtered_packages:
490 # Merges all package children in the same set.
491 merged_children ^= {
492 child
493 for child in filtered_packages - {element}
494 if child.startswith(element)
495 }
497 # Updates filtered packages set if needed.
498 filtered_packages -= merged_children
500 # Registers something change.
501 something_change = len(merged_children) > 0
503 # Returns the final filtered packages set.
504 return filtered_packages
507def compute_file_line_count(file_path: Path):
508 """
509 Count meaningful source code lines in a file, excluding comments and empty lines.
511 Counts only lines that contain actual source code, filtering out:
512 - One-line comments (starting with #)
513 - Empty lines or lines with only whitespace
514 - Lines shorter than 4 characters
515 - Docstring beginning lines (lines starting with quotes)
517 Parameters
518 ----------
519 file_path : Path
520 Path to the file to analyze.
522 Returns
523 -------
524 int
525 Number of meaningful source code lines.
527 Notes
528 -----
529 This is a heuristic approach with known limitations:
530 - Lines following docstring start lines are still counted
531 - Multi-line strings that aren't docstrings may be excluded
532 - Complex comment patterns may not be detected perfectly
534 Examples
535 --------
536 >>> line_count = compute_file_line_count(Path('script.py'))
537 >>> print(f'Script has {line_count} lines of code')
538 """
539 # Does not count one-line comment, empty line, line with only spaces characters, and docstring begin lines.
540 # But following lines of docstring will unfortunately be counted, and it is an accepted limitation.
541 source_code_line_pattern = re.compile(r'^\s*[^#"\s\']\S+.*$')
542 source_code_line_count: int = 0
543 with open(file_path, encoding="utf8") as file:
544 for line in file:
545 source_code_line_count += (
546 1 if source_code_line_pattern.match(line) and len(line) > 4 else 0
547 )
548 return source_code_line_count
551def extract_class_fqn(specified_class: type) -> str:
552 """
553 Extract the fully qualified name (FQN) of a class.
555 Constructs the full module path and class name for a given class,
556 useful for serialization, logging, and dynamic loading.
558 Parameters
559 ----------
560 specified_class : type
561 The class to extract the FQN from.
563 Returns
564 -------
565 str
566 The fully qualified name in format 'module.path.ClassName'.
568 Examples
569 --------
570 >>> extract_class_fqn(dict)
571 'builtins.dict'
572 >>> extract_class_fqn(Path)
573 'pathlib.Path'
574 """
575 return f"{specified_class.__module__}.{specified_class.__name__}"
578def dynamically_load_class(module_path: str, class_name: str):
579 """
580 Dynamically import and return a class from a module path.
582 Loads a class by module path and class name, useful for plugin systems,
583 configuration-driven class loading, and dynamic instantiation.
585 Parameters
586 ----------
587 module_path : str
588 The full module path (e.g., 'package.module').
589 class_name : str
590 The name of the class to load from the module.
592 Returns
593 -------
594 type
595 The loaded class object.
597 Raises
598 ------
599 ImportError
600 If the module cannot be imported.
601 AttributeError
602 If the class doesn't exist in the module.
603 """
604 mod = __import__(module_path, fromlist=[class_name])
605 return getattr(mod, class_name)
608def inspect_attrs(obj, logger: Logger, patterns=None):
609 """
610 Debug utility to inspect and log object attributes with optional filtering.
612 Logs all attributes of an object's __dict__, with optional pattern filtering
613 to show only attributes containing specific substrings.
615 Parameters
616 ----------
617 obj
618 The object to inspect.
619 logger : Logger
620 Logger instance for output.
621 patterns : list[str] | None, optional
622 List of string patterns to filter
623 attributes. Only attributes containing any of these patterns will be logged.
624 Defaults to None (show all attributes).
626 Examples
627 --------
628 >>> inspect_attrs(my_object, logger) # Show all attributes
629 >>> inspect_attrs(my_object, logger, patterns=['config', 'setting']) # Filter attributes
630 """
631 pattern_info: str = (
632 "with no condition"
633 if not patterns
634 else f"matching any of one of these patterns: {patterns}"
635 )
636 logger.debug(f'Checking all Python attributes of instance "{obj}", {pattern_info}')
637 for attr, value in obj.__dict__.items():
638 if not patterns or any(pattern in attr for pattern in patterns):
639 logger.debug(f"\t{attr=} => {value=}")
642def flatten(collection):
643 """
644 Recursively flatten a nested collection into a flat generator.
646 Traverses nested iterables and yields individual items in a flat sequence.
647 Useful for processing nested lists, tuples, or other iterable structures.
649 Parameters
650 ----------
651 collection
652 An iterable that may contain nested iterables.
654 Yields
655 ------
656 Any
657 Individual items from the flattened collection.
659 Notes
660 -----
661 String objects are treated as non-iterable to prevent character-level
662 iteration (strings are iterable but usually shouldn't be flattened).
664 Examples
665 --------
666 >>> list(flatten([1, [2, 3], [[4, 5], 6]]))
667 [1, 2, 3, 4, 5, 6]
668 >>> list(flatten((1, (2, [3, 4]), 5)))
669 [1, 2, 3, 4, 5]
670 """
671 for item in collection:
672 if isinstance(item, Iterable) and not isinstance(item, (str, bytes)):
673 yield from flatten(item)
674 else:
675 yield item
678def filter_kwargs(*, args_filter: list[str], **kwargs):
679 """
680 Filter keyword arguments to include only specified keys.
682 Creates a new dictionary containing only the keyword arguments whose
683 keys are present in the filter list. Useful for passing only relevant
684 arguments to functions that don't accept **kwargs.
686 Parameters
687 ----------
688 args_filter : list[str]
689 List of argument names to keep.
690 **kwargs
691 Keyword arguments to filter.
693 Returns
694 -------
695 dict
696 Dictionary containing only the filtered keyword arguments.
698 Examples
699 --------
700 >>> def my_func(a, b): pass
701 >>> kwargs = {'a': 1, 'b': 2, 'c': 3, 'd': 4}
702 >>> filtered = filter_kwargs(args_filter=['a', 'b'], **kwargs)
703 >>> my_func(**filtered) # Only passes a=1, b=2
704 """
705 return dict(filter(lambda kv: kv[0] in args_filter, kwargs.items()))