Coverage for lynceus/utils/__init__.py: 96%

103 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-29 08:46 +0000

1import re 

2import time 

3import timeit 

4from collections.abc import Iterable 

5from contextlib import contextmanager 

6from datetime import datetime, timezone 

7from logging import Logger 

8from pathlib import Path 

9from string import ascii_letters, digits 

10from typing import Any, Generator 

11 

12from setuptools import find_packages 

13 

14# Default allowed characters when cleansing string values (e.g. activity/topic name). 

15from lynceus.core.config import DATETIME_FORMAT, DATETIME_FORMAT_SHORT 

16 

17ALLOWED_CHARACTERS = ascii_letters + digits + "ÀÂÄÆÇÈÉÊËÎÏÔŒÙÛÜàâäæçèéêëîïôœùûü" 

18 

19 

20def cleansed_str_value( 

21 value: str, 

22 *, 

23 to_lower_case: bool = True, 

24 replacement_character: str = "_", 

25 allowed_characters: str = ALLOWED_CHARACTERS, 

26): 

27 """ 

28 Clean and sanitize a string value by replacing disallowed characters. 

29 

30 Processes a string to ensure it contains only allowed characters, 

31 replacing any disallowed characters with a specified replacement. 

32 Commonly used for sanitizing activity/topic names and user input. 

33 

34 Parameters 

35 ---------- 

36 value : str 

37 The string value to cleanse. 

38 to_lower_case : bool, optional 

39 Whether to convert to lowercase. Defaults to True. 

40 replacement_character : str, optional 

41 Character to replace disallowed chars. Defaults to '_'. 

42 allowed_characters : str, optional 

43 Set of allowed characters. Defaults to ALLOWED_CHARACTERS 

44 (ASCII letters, digits, and common accented characters). 

45 

46 Returns 

47 ------- 

48 str 

49 The cleansed string with disallowed characters replaced. 

50 

51 Examples 

52 -------- 

53 >>> cleansed_str_value('Hello World!') 

54 'hello_world_' 

55 >>> cleansed_str_value('Test@123', replacement_character='-') 

56 'test-123' 

57 """ 

58 if to_lower_case: 

59 value = value.lower() 

60 return "".join( 

61 char if char in allowed_characters else replacement_character 

62 for char in value.strip() 

63 ) 

64 

65 

66def concatenate_string_with_limit( 

67 begin: str, extra: str, *, limit: int, truncate_begin: bool = True 

68): 

69 """ 

70 Concatenate two strings while respecting a total length limit. 

71 

72 Combines two strings ensuring the result doesn't exceed the specified limit. 

73 When truncation is needed, either the beginning or end of the extra string 

74 is preserved based on the truncate_begin parameter. 

75 

76 Parameters 

77 ---------- 

78 begin : str 

79 The beginning string that takes priority. 

80 extra : str 

81 The additional string to append. 

82 limit : int 

83 Maximum total length of the result. 

84 truncate_begin : bool, optional 

85 If True, truncate from the beginning of extra string 

86 when it's too long. If False, truncate from the end. Defaults to True. 

87 

88 Returns 

89 ------- 

90 str 

91 The concatenated string, truncated if necessary to fit the limit. 

92 

93 Examples 

94 -------- 

95 >>> concatenate_string_with_limit('Hello', ' World!', limit=10) 

96 'Hello Wor!' 

97 >>> concatenate_string_with_limit('Hello', ' World!', limit=10, truncate_begin=False) 

98 'Hello Worl' 

99 """ 

100 if len(begin) >= limit: 

101 return begin[:limit] 

102 

103 remaining_limit: int = limit - len(begin) 

104 kept_extra: str = extra 

105 if len(extra) > remaining_limit: 

106 if truncate_begin: 

107 kept_extra = extra[-remaining_limit:] 

108 else: 

109 kept_extra = extra[0:remaining_limit] 

110 

111 return begin + kept_extra 

112 

113 

114def exec_and_return_time(func, /, *args) -> float: 

115 """ 

116 Measure and return the execution time of a function call. 

117 

118 Executes the specified function with given arguments and returns the time 

119 taken for execution. Uses timeit for accurate timing measurements. 

120 

121 Parameters 

122 ---------- 

123 func : callable 

124 The function to execute and time. 

125 *args 

126 Arguments to pass to the function. 

127 

128 Returns 

129 ------- 

130 float 

131 Execution time in seconds. 

132 

133 Notes 

134 ----- 

135 - Does NOT support async functions 

136 - Uses timeit.timeit() with number=1 for accurate measurement 

137 - Implementation can be easily changed for different timing strategies 

138 

139 Examples 

140 -------- 

141 >>> def slow_function(n): 

142 ... return sum(range(n)) 

143 >>> execution_time = exec_and_return_time(slow_function, 1000) 

144 >>> print(f'Function took {execution_time:.3f} seconds') 

145 """ 

146 return timeit.timeit(lambda: func(*args), number=1) 

147 

148 

149@contextmanager 

150def time_catcher() -> Generator[str, Any, None]: 

151 """ 

152 Context manager for measuring execution time of code blocks. 

153 

154 Captures both CPU time and elapsed (wall clock) time for performance analysis. 

155 Returns a lambda function that formats the timing information as a string. 

156 

157 Yields 

158 ------ 

159 callable 

160 A lambda function that returns a formatted string with timing information. 

161 

162 Examples 

163 -------- 

164 >>> with time_catcher() as timer: 

165 ... # Some time-consuming operation 

166 ... result = heavy_computation() 

167 >>> print(f'Operation took: {timer()}') 

168 'Operation took: 0.123 CPU seconds, 0.456 elapsed seconds' 

169 """ 

170 start = time.time() 

171 cpu_start = time.process_time() 

172 yield lambda: f"{(time.process_time() - cpu_start):0.03f} CPU seconds, {(time.time() - start):0.03f} elapsed seconds" 

173 

174 

175def parse_string_to_datetime( 

176 datetime_str: str, 

177 *, 

178 datetime_format: str = DATETIME_FORMAT, 

179 datetime_format_short: str = DATETIME_FORMAT_SHORT, 

180 override_timezone: timezone | None = timezone.utc, 

181) -> datetime: 

182 """ 

183 Parse a datetime string into a datetime object with flexible format support. 

184 

185 Attempts to parse the string using the primary format first, then falls back 

186 to the short format if parsing fails. Optionally applies a timezone override. 

187 

188 Parameters 

189 ---------- 

190 datetime_str : str 

191 The datetime string to parse. 

192 datetime_format : str, optional 

193 Primary datetime format. Defaults to DATETIME_FORMAT. 

194 datetime_format_short : str, optional 

195 Fallback datetime format. Defaults to DATETIME_FORMAT_SHORT. 

196 override_timezone : timezone | None, optional 

197 Timezone to apply to the parsed datetime. 

198 Defaults to timezone.utc. 

199 

200 Returns 

201 ------- 

202 datetime 

203 The parsed datetime object with timezone applied if specified. 

204 

205 Raises 

206 ------ 

207 ValueError 

208 If the string cannot be parsed with either format. 

209 

210 Examples 

211 -------- 

212 >>> parse_string_to_datetime('2023-12-25 15:30:45') 

213 datetime(2023, 12, 25, 15, 30, 45, tzinfo=timezone.utc) 

214 """ 

215 try: 

216 final_datetime = datetime.strptime(datetime_str, datetime_format) 

217 except ValueError: 

218 final_datetime = datetime.strptime(datetime_str, datetime_format_short) 

219 

220 if override_timezone: 

221 final_datetime = final_datetime.replace(tzinfo=override_timezone) 

222 

223 return final_datetime 

224 

225 

226def format_exception_human_readable( 

227 exc: Exception, *, quote_message: bool = False 

228) -> str: 

229 """ 

230 Format an exception as a human-readable string. 

231 

232 Creates a standardized string representation of an exception including 

233 the exception class name and message, with optional message quoting. 

234 

235 Parameters 

236 ---------- 

237 exc : Exception 

238 The exception to format. 

239 quote_message : bool, optional 

240 Whether to wrap the message in quotes. Defaults to False. 

241 

242 Returns 

243 ------- 

244 str 

245 A formatted string in the format 'ExceptionName: message' or 'ExceptionName: "message"'. 

246 

247 Examples 

248 -------- 

249 >>> try: 

250 ... raise ValueError('Invalid input') 

251 ... except Exception as e: 

252 ... print(format_exception_human_readable(e)) 

253 'ValueError: Invalid input' 

254 >>> print(format_exception_human_readable(e, quote_message=True)) 

255 'ValueError: "Invalid input"' 

256 """ 

257 result_begin: str = f"{exc.__class__.__name__}: " 

258 exc_msg: str = str(exc) 

259 

260 return result_begin + (f'"{exc_msg}"' if quote_message else exc_msg) 

261 

262 

263def lookup_root_path( 

264 path_to_search_string: Path | str, 

265 remaining_iteration: int = 3, 

266 root_path: Path = Path().resolve(), 

267) -> Path: 

268 """ 

269 Search for a path by traversing up the directory hierarchy. 

270 

271 Recursively searches for the specified path starting from a root directory 

272 and moving up through parent directories for a limited number of iterations. 

273 Useful for finding project root directories or configuration files. 

274 

275 Parameters 

276 ---------- 

277 path_to_search_string : Path | str 

278 The relative path or file to search for. 

279 remaining_iteration : int, optional 

280 Maximum number of parent directories 

281 to check. Defaults to 3. 

282 root_path : Path, optional 

283 Starting directory for the search. 

284 Defaults to current working directory. 

285 

286 Returns 

287 ------- 

288 Path 

289 The root directory containing the specified path. To get the full 

290 path to the target, concatenate this result with path_to_search_string. 

291 

292 Raises 

293 ------ 

294 FileNotFoundError 

295 If the path is not found after exhausting all iterations. 

296 

297 Examples 

298 -------- 

299 >>> # Search for 'src/main.py' starting from current directory 

300 >>> root = lookup_root_path('src/main.py') 

301 >>> full_path = root / 'src/main.py' 

302 

303 >>> # Search for config file in parent directories 

304 >>> config_root = lookup_root_path('config.ini', remaining_iteration=5) 

305 """ 

306 full_path: Path = root_path / Path(path_to_search_string) 

307 if full_path.exists(): 

308 return root_path 

309 

310 if not remaining_iteration: 

311 raise FileNotFoundError( 

312 f'Unable to find root_path of specified "{path_to_search_string}" path, after several iteration (last check in "{root_path}" directory).' 

313 ) 

314 

315 return lookup_root_path( 

316 path_to_search_string, remaining_iteration - 1, root_path.parent 

317 ) 

318 

319 

320def lookup_files_from_pattern( 

321 root_path: Path, 

322 pattern: str, 

323 *, 

324 min_file_size: float = None, 

325 case_insensitive: bool = True, 

326 logger: Logger = None, 

327): 

328 """ 

329 Find files matching a glob pattern with optional filtering. 

330 

331 Searches for files matching the specified pattern, with options for 

332 case-insensitive matching and minimum file size filtering. 

333 

334 Parameters 

335 ---------- 

336 root_path : Path 

337 The root directory to search in. 

338 pattern : str 

339 Glob pattern to match files against (e.g., '*.txt', '**/*.py'). 

340 min_file_size : float, optional 

341 Minimum file size in bytes. Files smaller 

342 than this will be excluded. Defaults to None (no size filter). 

343 case_insensitive : bool, optional 

344 Whether to perform case-insensitive matching. 

345 Defaults to True. 

346 logger : Logger, optional 

347 Logger for debug information. Defaults to None. 

348 

349 Returns 

350 ------- 

351 list[Path] 

352 List of Path objects for files matching the criteria. 

353 

354 Notes 

355 ----- 

356 Case-insensitive matching is implemented by expanding each alphabetic character 

357 in the pattern to [lower][upper] character classes. 

358 

359 Examples 

360 -------- 

361 >>> files = lookup_files_from_pattern(Path('/project'), '*.py') 

362 >>> large_files = lookup_files_from_pattern( 

363 ... Path('/data'), '*.log', min_file_size=1024 

364 ... ) 

365 """ 

366 if case_insensitive: 

367 # Enhances the pattern to be case-insentitive. 

368 pattern = "".join( 

369 map(lambda c: f"[{c.lower()}{c.upper()}]" if c.isalpha() else c, pattern) 

370 ) 

371 

372 # Uses globbing in any case, according to the way pattern may have been enhanced to manage case. 

373 existing_files_list = list(root_path.glob(pattern)) 

374 

375 # Checks file size if needed. 

376 if min_file_size is not None: 

377 # Keeps only file whose size is greater of equal to specified size. 

378 existing_files_list = list( 

379 filter( 

380 lambda file: file.stat().st_size >= min_file_size, existing_files_list 

381 ) 

382 ) 

383 

384 if logger: 

385 logger.debug( 

386 f'After "file_size>={min_file_size} Bytes" filter, this is the list of files matching pattern "{pattern}": {existing_files_list=}' 

387 ) 

388 

389 return existing_files_list 

390 

391 

392def check_file_exist_from_pattern( 

393 root_path: Path, 

394 pattern: str, 

395 *, 

396 min_file_size: float | None = None, 

397 case_insensitive: bool = True, 

398 logger: Logger | None = None, 

399): 

400 """ 

401 Check if any files exist matching the specified pattern and criteria. 

402 

403 A convenience function that uses lookup_files_from_pattern() to determine 

404 if at least one file matches the given pattern and optional size constraint. 

405 

406 Parameters 

407 ---------- 

408 root_path : Path 

409 The root directory to search in. 

410 pattern : str 

411 Glob pattern to match files against. 

412 min_file_size : float | None, optional 

413 Minimum file size in bytes. Defaults to None. 

414 case_insensitive : bool, optional 

415 Whether to perform case-insensitive matching. 

416 Defaults to True. 

417 logger : Logger | None, optional 

418 Logger for debug information. Defaults to None. 

419 

420 Returns 

421 ------- 

422 bool 

423 True if at least one file matches the criteria, False otherwise. 

424 

425 Examples 

426 -------- 

427 >>> has_python_files = check_file_exist_from_pattern(Path('/project'), '*.py') 

428 >>> has_large_logs = check_file_exist_from_pattern( 

429 ... Path('/logs'), '*.log', min_file_size=1024 

430 ... ) 

431 """ 

432 return ( 

433 len( 

434 lookup_files_from_pattern( 

435 root_path, 

436 pattern, 

437 min_file_size=min_file_size, 

438 case_insensitive=case_insensitive, 

439 logger=logger, 

440 ) 

441 ) 

442 > 0 

443 ) 

444 

445 

446def lookup_available_packages( 

447 root_dir: Path | str, *, keep_children_packages: bool = False 

448) -> set[str]: 

449 """ 

450 Discover Python packages in a directory with optional filtering. 

451 

452 Uses setuptools.find_packages() to discover packages and optionally filters 

453 out child packages to return only top-level packages. 

454 

455 Parameters 

456 ---------- 

457 root_dir : Path | str 

458 The root directory to search for packages. 

459 keep_children_packages : bool, optional 

460 Whether to include child packages 

461 (e.g., 'package.subpackage'). If False, only top-level packages are returned. 

462 Defaults to False. 

463 

464 Returns 

465 ------- 

466 set[str] 

467 Set of package names. If keep_children_packages is False, 

468 only top-level packages are included. 

469 

470 Examples 

471 -------- 

472 >>> packages = lookup_available_packages('/project') 

473 >>> # Returns {'mypackage', 'tests'} instead of 

474 >>> # {'mypackage', 'mypackage.utils', 'mypackage.core', 'tests'} 

475 

476 >>> all_packages = lookup_available_packages('/project', keep_children_packages=True) 

477 >>> # Returns {'mypackage', 'mypackage.utils', 'mypackage.core', 'tests'} 

478 """ 

479 packages: list[str | bytes] = find_packages(root_dir) 

480 packages: set[str] = set(packages) 

481 if keep_children_packages: 

482 return packages 

483 

484 # Removes all packages children. 

485 something_change: bool = True 

486 filtered_packages = packages 

487 while something_change: 

488 merged_children = set() 

489 for element in filtered_packages: 

490 # Merges all package children in the same set. 

491 merged_children ^= { 

492 child 

493 for child in filtered_packages - {element} 

494 if child.startswith(element) 

495 } 

496 

497 # Updates filtered packages set if needed. 

498 filtered_packages -= merged_children 

499 

500 # Registers something change. 

501 something_change = len(merged_children) > 0 

502 

503 # Returns the final filtered packages set. 

504 return filtered_packages 

505 

506 

507def compute_file_line_count(file_path: Path): 

508 """ 

509 Count meaningful source code lines in a file, excluding comments and empty lines. 

510 

511 Counts only lines that contain actual source code, filtering out: 

512 - One-line comments (starting with #) 

513 - Empty lines or lines with only whitespace 

514 - Lines shorter than 4 characters 

515 - Docstring beginning lines (lines starting with quotes) 

516 

517 Parameters 

518 ---------- 

519 file_path : Path 

520 Path to the file to analyze. 

521 

522 Returns 

523 ------- 

524 int 

525 Number of meaningful source code lines. 

526 

527 Notes 

528 ----- 

529 This is a heuristic approach with known limitations: 

530 - Lines following docstring start lines are still counted 

531 - Multi-line strings that aren't docstrings may be excluded 

532 - Complex comment patterns may not be detected perfectly 

533 

534 Examples 

535 -------- 

536 >>> line_count = compute_file_line_count(Path('script.py')) 

537 >>> print(f'Script has {line_count} lines of code') 

538 """ 

539 # Does not count one-line comment, empty line, line with only spaces characters, and docstring begin lines. 

540 # But following lines of docstring will unfortunately be counted, and it is an accepted limitation. 

541 source_code_line_pattern = re.compile(r'^\s*[^#"\s\']\S+.*$') 

542 source_code_line_count: int = 0 

543 with open(file_path, encoding="utf8") as file: 

544 for line in file: 

545 source_code_line_count += ( 

546 1 if source_code_line_pattern.match(line) and len(line) > 4 else 0 

547 ) 

548 return source_code_line_count 

549 

550 

551def extract_class_fqn(specified_class: type) -> str: 

552 """ 

553 Extract the fully qualified name (FQN) of a class. 

554 

555 Constructs the full module path and class name for a given class, 

556 useful for serialization, logging, and dynamic loading. 

557 

558 Parameters 

559 ---------- 

560 specified_class : type 

561 The class to extract the FQN from. 

562 

563 Returns 

564 ------- 

565 str 

566 The fully qualified name in format 'module.path.ClassName'. 

567 

568 Examples 

569 -------- 

570 >>> extract_class_fqn(dict) 

571 'builtins.dict' 

572 >>> extract_class_fqn(Path) 

573 'pathlib.Path' 

574 """ 

575 return f"{specified_class.__module__}.{specified_class.__name__}" 

576 

577 

578def dynamically_load_class(module_path: str, class_name: str): 

579 """ 

580 Dynamically import and return a class from a module path. 

581 

582 Loads a class by module path and class name, useful for plugin systems, 

583 configuration-driven class loading, and dynamic instantiation. 

584 

585 Parameters 

586 ---------- 

587 module_path : str 

588 The full module path (e.g., 'package.module'). 

589 class_name : str 

590 The name of the class to load from the module. 

591 

592 Returns 

593 ------- 

594 type 

595 The loaded class object. 

596 

597 Raises 

598 ------ 

599 ImportError 

600 If the module cannot be imported. 

601 AttributeError 

602 If the class doesn't exist in the module. 

603 """ 

604 mod = __import__(module_path, fromlist=[class_name]) 

605 return getattr(mod, class_name) 

606 

607 

608def inspect_attrs(obj, logger: Logger, patterns=None): 

609 """ 

610 Debug utility to inspect and log object attributes with optional filtering. 

611 

612 Logs all attributes of an object's __dict__, with optional pattern filtering 

613 to show only attributes containing specific substrings. 

614 

615 Parameters 

616 ---------- 

617 obj 

618 The object to inspect. 

619 logger : Logger 

620 Logger instance for output. 

621 patterns : list[str] | None, optional 

622 List of string patterns to filter 

623 attributes. Only attributes containing any of these patterns will be logged. 

624 Defaults to None (show all attributes). 

625 

626 Examples 

627 -------- 

628 >>> inspect_attrs(my_object, logger) # Show all attributes 

629 >>> inspect_attrs(my_object, logger, patterns=['config', 'setting']) # Filter attributes 

630 """ 

631 pattern_info: str = ( 

632 "with no condition" 

633 if not patterns 

634 else f"matching any of one of these patterns: {patterns}" 

635 ) 

636 logger.debug(f'Checking all Python attributes of instance "{obj}", {pattern_info}') 

637 for attr, value in obj.__dict__.items(): 

638 if not patterns or any(pattern in attr for pattern in patterns): 

639 logger.debug(f"\t{attr=} => {value=}") 

640 

641 

642def flatten(collection): 

643 """ 

644 Recursively flatten a nested collection into a flat generator. 

645 

646 Traverses nested iterables and yields individual items in a flat sequence. 

647 Useful for processing nested lists, tuples, or other iterable structures. 

648 

649 Parameters 

650 ---------- 

651 collection 

652 An iterable that may contain nested iterables. 

653 

654 Yields 

655 ------ 

656 Any 

657 Individual items from the flattened collection. 

658 

659 Notes 

660 ----- 

661 String objects are treated as non-iterable to prevent character-level 

662 iteration (strings are iterable but usually shouldn't be flattened). 

663 

664 Examples 

665 -------- 

666 >>> list(flatten([1, [2, 3], [[4, 5], 6]])) 

667 [1, 2, 3, 4, 5, 6] 

668 >>> list(flatten((1, (2, [3, 4]), 5))) 

669 [1, 2, 3, 4, 5] 

670 """ 

671 for item in collection: 

672 if isinstance(item, Iterable) and not isinstance(item, (str, bytes)): 

673 yield from flatten(item) 

674 else: 

675 yield item 

676 

677 

678def filter_kwargs(*, args_filter: list[str], **kwargs): 

679 """ 

680 Filter keyword arguments to include only specified keys. 

681 

682 Creates a new dictionary containing only the keyword arguments whose 

683 keys are present in the filter list. Useful for passing only relevant 

684 arguments to functions that don't accept **kwargs. 

685 

686 Parameters 

687 ---------- 

688 args_filter : list[str] 

689 List of argument names to keep. 

690 **kwargs 

691 Keyword arguments to filter. 

692 

693 Returns 

694 ------- 

695 dict 

696 Dictionary containing only the filtered keyword arguments. 

697 

698 Examples 

699 -------- 

700 >>> def my_func(a, b): pass 

701 >>> kwargs = {'a': 1, 'b': 2, 'c': 3, 'd': 4} 

702 >>> filtered = filter_kwargs(args_filter=['a', 'b'], **kwargs) 

703 >>> my_func(**filtered) # Only passes a=1, b=2 

704 """ 

705 return dict(filter(lambda kv: kv[0] in args_filter, kwargs.items()))