Coverage for lynceus/files/lynceus_file.py: 100%

158 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-29 08:46 +0000

1import shutil 

2from abc import ABCMeta 

3from logging import Logger 

4from pathlib import Path 

5from typing import (Generic, 

6 TypeVar) 

7 

8import pandas as pd 

9from fsspec.asyn import AsyncFileSystem 

10from pandas import DataFrame 

11 

12from lynceus.core.config import (CONFIG_STORAGE_LOCAL, 

13 LYNCEUS_S3_CONFIG_KEY) 

14from lynceus.core.config.lynceus_config import LynceusConfig 

15from lynceus.files.remote.s3 import (S3FileSystemPatched, 

16 S3Utils) 

17from lynceus.lynceus_exceptions import LynceusFileError 

18 

19# pylint: disable=invalid-name 

20FileSystemType = TypeVar("FileSystemType", bound=AsyncFileSystem) 

21 

22 

23class LynceusFile(Generic[FileSystemType], metaclass=ABCMeta): 

24 """ 

25 Abstract base class for file operations in the Lynceus system. 

26 

27 Provides a unified interface for handling both local and remote files, 

28 supporting operations like reading, writing, copying, and listing files. 

29 Implements the filesystem abstraction pattern for different storage backends. 

30 

31 Parameters 

32 ---------- 

33 FileSystemType : type 

34 Generic type parameter for the underlying filesystem implementation 

35 """ 

36 S3_PATH_BEGIN = 's3://' 

37 

38 FILE_STORAGE_PATH_SEPARATOR: str = '|' 

39 

40 def __init__(self, 

41 path: Path, 

42 logger: Logger, 

43 filesystem: FileSystemType = None): 

44 """ 

45 Initialize a LynceusFile instance. 

46 

47 Parameters 

48 ---------- 

49 path : Path 

50 The file path 

51 logger : Logger 

52 Logger instance for operations 

53 filesystem : FileSystemType, optional 

54 Optional filesystem implementation for operations 

55 """ 

56 self._path: Path = path 

57 self._logger: Logger = logger 

58 self._filesystem: FileSystemType = filesystem 

59 

60 @staticmethod 

61 def extract_storage_and_path(file_metadata: str): 

62 """ 

63 Extract storage type and path from file metadata string. 

64 

65 Parses file metadata to separate storage identifier from the actual path. 

66 If no storage separator is found, assumes local storage. 

67 

68 Parameters 

69 ---------- 

70 file_metadata : str 

71 String containing storage info and path separated by FILE_STORAGE_PATH_SEPARATOR 

72 

73 Returns 

74 ------- 

75 tuple 

76 (storage_name, file_path) where storage_name is the storage identifier 

77 and file_path is the actual path to the file 

78 """ 

79 # Checks if there is storage information in the metadata. 

80 if LynceusFile.FILE_STORAGE_PATH_SEPARATOR in file_metadata: 

81 file_metadata_parts = file_metadata.split(LynceusFile.FILE_STORAGE_PATH_SEPARATOR) 

82 return file_metadata_parts[0], file_metadata_parts[1] 

83 

84 # There is none, so consider it as a file hosted on Local storage. 

85 return CONFIG_STORAGE_LOCAL, file_metadata 

86 

87 @staticmethod 

88 def build_file_metadata(storage_name: str, file_path: Path | str): 

89 """ 

90 Build file metadata string from storage name and file path. 

91 

92 Creates a standardized metadata string by combining storage identifier 

93 and file path with the appropriate separator. 

94 

95 Parameters 

96 ---------- 

97 storage_name : str 

98 Name of the storage system 

99 file_path : Path or str 

100 Path to the file (can be Path object or string) 

101 

102 Returns 

103 ------- 

104 str 

105 Formatted metadata string in format 'storage_name|file_path' 

106 """ 

107 return f'{storage_name}{LynceusFile.FILE_STORAGE_PATH_SEPARATOR}{str(file_path)}' 

108 

109 def read_parquet(self, **params) -> DataFrame: 

110 """ 

111 Read corresponding (local or remote) file, considering it as a parquet file, with optional parameters. 

112 

113 Parameters 

114 ---------- 

115 **params : dict 

116 Optional parameters (usually, it can be columns, to specify which columns 

117 to read from parquet file). 

118 

119 Returns 

120 ------- 

121 DataFrame 

122 Corresponding DataFrame. 

123 """ 

124 # Cf. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html 

125 # Cf. http://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html 

126 

127 self._logger.debug(f'Reading file {self} ...') 

128 return pd.read_parquet(self.get_path(), storage_options=self.get_storage_options(), **params) 

129 

130 def write_to_parquet(self, dataframe: DataFrame, **kwargs): 

131 """ 

132 Write a DataFrame to parquet format at the file location. 

133 

134 Write the provided DataFrame as a parquet file with standardized settings 

135 for timestamp handling and storage options. Invalidate filesystem cache 

136 after writing to ensure consistency. 

137 

138 Parameters 

139 ---------- 

140 dataframe : DataFrame 

141 Pandas DataFrame to write 

142 **kwargs 

143 Additional parameters passed to pandas.DataFrame.to_parquet() 

144 (e.g., compression, index settings) 

145 """ 

146 # N.B.: **kwargs is the opportunity to provide parameters for internal implementation (e.g. PyArrow), 

147 # for instances, pyarrow filters param, to better control what is load in memory. 

148 # 

149 # Cf. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html 

150 # Cf. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-parquet 

151 # Cf. https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html 

152 

153 self._logger.debug(f'Writing specified DataFrame to file {self} ...') 

154 dataframe.to_parquet(self.get_path(), 

155 coerce_timestamps='ms', allow_truncated_timestamps=True, 

156 storage_options=self.get_storage_options(), 

157 **kwargs) 

158 # Important: ensures cache is updated straight away after this operation. 

159 if self._filesystem: 

160 self._filesystem.invalidate_cache(str(self.get_parent_path())) 

161 

162 def get_storage_options(self): 

163 """ 

164 Get storage-specific options for file operations. 

165 

166 Return configuration options specific to the storage backend. 

167 Default implementation returns None, subclasses should override 

168 to provide appropriate options for their storage type. 

169 

170 Returns 

171 ------- 

172 dict or None 

173 Storage options for the specific filesystem implementation 

174 """ 

175 # pylint: disable=no-self-use 

176 return None 

177 

178 def is_local(self): 

179 """ 

180 Check if this file is stored locally. 

181 

182 Abstract method that must be implemented by subclasses to determine 

183 whether the file is stored on the local filesystem. 

184 

185 Returns 

186 ------- 

187 bool 

188 True if file is stored locally, False otherwise 

189 

190 Raises 

191 ------ 

192 NotImplementedError 

193 If not implemented by subclass 

194 """ 

195 raise NotImplementedError() 

196 

197 def is_remote(self): 

198 """ 

199 Check if this file is stored remotely. 

200 

201 Convenience method that returns the inverse of is_local(). 

202 

203 Returns 

204 ------- 

205 bool 

206 True if file is stored remotely, False if local 

207 """ 

208 return not self.is_local() 

209 

210 def delete(self): 

211 """ 

212 Delete the file from its storage location. 

213 

214 Remove the file from the filesystem. Log the operation before 

215 delegating to the implementation-specific delete method. 

216 

217 Returns 

218 ------- 

219 object 

220 Result of the delete operation (implementation-dependent) 

221 """ 

222 self._logger.debug(f'Deleting file {self} ...') 

223 return self._do_delete() 

224 

225 def _do_delete(self): 

226 """ 

227 Implementation-specific delete operation. 

228 

229 Abstract method that subclasses must implement to handle 

230 the actual file deletion for their storage type. 

231 

232 Raises 

233 ------ 

234 NotImplementedError 

235 If not implemented by subclass 

236 """ 

237 raise NotImplementedError() 

238 

239 def download_to(self, destination: Path, *, create_sub_directories: bool = True): 

240 """ 

241 Download/copy the file to a local destination. 

242 

243 Retrieve the file content and save it to the specified local path. 

244 Optionally create parent directories if they don't exist. 

245 

246 Parameters 

247 ---------- 

248 destination : Path 

249 Local path where the file should be saved 

250 create_sub_directories : bool, default True 

251 Whether to create parent directories if they don't exist 

252 

253 Returns 

254 ------- 

255 object 

256 Result of the download operation (implementation-dependent) 

257 """ 

258 self._logger.debug(f'Retrieving/downloading file to "{destination}" from {self} ...') 

259 return self._do_download_to(destination=destination, create_sub_directories=create_sub_directories) 

260 

261 def _do_download_to(self, *, destination: Path, create_sub_directories: bool): 

262 """ 

263 Implementation-specific download operation. 

264 

265 Abstract method that subclasses must implement to handle 

266 the actual file download for their storage type. 

267 

268 Parameters 

269 ---------- 

270 destination : Path 

271 Local path where the file should be saved 

272 create_sub_directories : bool 

273 Whether to create parent directories 

274 

275 Raises 

276 ------ 

277 NotImplementedError 

278 If not implemented by subclass 

279 """ 

280 raise NotImplementedError() 

281 

282 def exists(self, *, reason: str = None): 

283 """ 

284 Check if the file exists in its storage location. 

285 

286 Verify file existence and optionally log the reason for the check. 

287 

288 Parameters 

289 ---------- 

290 reason : str, optional 

291 Optional explanation for why existence is being checked 

292 

293 Returns 

294 ------- 

295 bool 

296 True if file exists, False otherwise 

297 """ 

298 check_msg: str = f'Checking existence of file {self}' 

299 if reason: 

300 check_msg += f' (reason: {reason})' 

301 self._logger.debug(f'{check_msg} ...') 

302 return self._do_exists() 

303 

304 def _do_exists(self): 

305 """ 

306 Implementation-specific existence check. 

307 

308 Abstract method that subclasses must implement to check 

309 file existence for their storage type. 

310 

311 Returns 

312 ------- 

313 bool 

314 True if file exists, False otherwise 

315 

316 Raises 

317 ------ 

318 NotImplementedError 

319 If not implemented by subclass 

320 """ 

321 raise NotImplementedError() 

322 

323 def list_files(self, *, recursive: bool = False, pattern: str | None = None, **kwargs): 

324 """ 

325 List files in the directory represented by this file path. 

326 

327 Return a list of files in the directory, with options for recursive 

328 traversal and pattern matching. 

329 

330 Parameters 

331 ---------- 

332 recursive : bool, default False 

333 Whether to search subdirectories recursively 

334 pattern : str, optional 

335 Optional glob pattern to filter files 

336 **kwargs 

337 Additional arguments for the listing operation 

338 

339 Returns 

340 ------- 

341 Iterable 

342 Collection of file paths or file objects 

343 """ 

344 self._logger.debug(f'Listing files from {self}, {pattern=} ...') 

345 return self._do_list_files(recursive=recursive, pattern=pattern, **kwargs) 

346 

347 def _do_list_files(self, *, recursive: bool, pattern: str | None = None, **kwargs): 

348 """ 

349 Implementation-specific file listing operation. 

350 

351 Abstract method that subclasses must implement to list 

352 files for their storage type. 

353 

354 Parameters 

355 ---------- 

356 recursive : bool 

357 Whether to search subdirectories recursively 

358 pattern : str, optional 

359 Optional glob pattern to filter files 

360 **kwargs 

361 Additional arguments for the listing operation 

362 

363 Raises 

364 ------ 

365 NotImplementedError 

366 If not implemented by subclass 

367 """ 

368 raise NotImplementedError() 

369 

370 def copy_to(self, destination: Path, *, create_sub_directories: bool = True) -> 'LynceusFile': 

371 """ 

372 Copy this file to a new destination. 

373 

374 Create a copy of the file at the specified destination path. 

375 Invalidate filesystem cache after the operation for consistency. 

376 

377 Parameters 

378 ---------- 

379 destination : Path 

380 Path where the file should be copied 

381 create_sub_directories : bool, default True 

382 Whether to create parent directories if they don't exist 

383 

384 Returns 

385 ------- 

386 LynceusFile 

387 New LynceusFile instance representing the copied file 

388 """ 

389 self._logger.debug(f"Copying '{self}' to '{destination}' ...") 

390 copied_lynceus_file: LynceusFile = self._do_copy_to(destination=destination, create_sub_directories=create_sub_directories) 

391 # Important: ensures cache is updated straight away after this operation. 

392 if self._filesystem: 

393 self._filesystem.invalidate_cache(str(destination.parent)) 

394 return copied_lynceus_file 

395 

396 def _do_copy_to(self, *, destination: Path, create_sub_directories: bool) -> 'LynceusFile': 

397 """ 

398 Implementation-specific copy operation. 

399 

400 Abstract method that subclasses must implement to handle 

401 file copying for their storage type. 

402 

403 Parameters 

404 ---------- 

405 destination : Path 

406 Path where the file should be copied 

407 create_sub_directories : bool 

408 Whether to create parent directories 

409 

410 Returns 

411 ------- 

412 LynceusFile 

413 New LynceusFile instance for the copied file 

414 

415 Raises 

416 ------ 

417 NotImplementedError 

418 If not implemented by subclass 

419 """ 

420 raise NotImplementedError() 

421 

422 def get_name(self) -> str: 

423 """ 

424 Get the filename (without directory path). 

425 

426 Returns 

427 ------- 

428 str 

429 The name portion of the file path 

430 """ 

431 return self._path.name 

432 

433 @property 

434 def path(self) -> Path: 

435 """ 

436 Get the file path as a Path object. 

437 

438 Returns 

439 ------- 

440 Path 

441 The file path 

442 """ 

443 return self._path 

444 

445 def get_path(self) -> str: 

446 """ 

447 Get the file path as a string. 

448 

449 Returns 

450 ------- 

451 str 

452 String representation of the file path 

453 """ 

454 return str(self._path) 

455 

456 def get_raw_path(self) -> str: 

457 """ 

458 Get the raw path without any protocol prefixes. 

459 

460 Abstract method that returns the underlying path without 

461 storage-specific prefixes (e.g., without 's3://' for S3 files). 

462 

463 Returns 

464 ------- 

465 str 

466 Raw path string 

467 

468 Raises 

469 ------ 

470 NotImplementedError 

471 If not implemented by subclass 

472 """ 

473 raise NotImplementedError() 

474 

475 def get_relative_path(self) -> str: 

476 """ 

477 Get the relative path from remote storage container. 

478 

479 Returns 

480 ------- 

481 str 

482 For remote file: the relative path from remote storage container, 

483 for local file: same than raw_path. 

484 """ 

485 raise NotImplementedError() 

486 

487 def get_parent_path(self) -> Path: 

488 """ 

489 Get the parent directory path. 

490 

491 Returns 

492 ------- 

493 Path 

494 Path object representing the parent directory 

495 """ 

496 return self._path.parent 

497 

498 def parent_exists(self): 

499 """ 

500 Check if the parent directory exists. 

501 

502 Verify that the parent directory of this file exists in the storage system. 

503 

504 Returns 

505 ------- 

506 bool 

507 True if parent directory exists, False otherwise 

508 """ 

509 self._logger.debug(f'Checking existence of parent folder of file {self} ...') 

510 return self._do_parent_exists() 

511 

512 def _do_parent_exists(self): 

513 """ 

514 Implementation-specific parent directory existence check. 

515 

516 Abstract method that subclasses must implement to check 

517 parent directory existence for their storage type. 

518 

519 Returns 

520 ------- 

521 bool 

522 True if parent directory exists, False otherwise 

523 

524 Raises 

525 ------ 

526 NotImplementedError 

527 If not implemented by subclass 

528 """ 

529 raise NotImplementedError() 

530 

531 def get_extension(self) -> str: 

532 """ 

533 Get the file extension including the dot. 

534 

535 Returns 

536 ------- 

537 str 

538 File extension (e.g., '.txt', '.parquet') or empty string if no extension 

539 """ 

540 return self._path.suffix 

541 

542 def __str__(self): 

543 """ 

544 Get string representation of the file. 

545 

546 Returns 

547 ------- 

548 str 

549 Human-readable string describing the file 

550 """ 

551 return f'"{self.__class__.__name__}" with path "{self._path}"' 

552 

553 def __repr__(self): 

554 """ 

555 Get detailed string representation for debugging. 

556 

557 Returns 

558 ------- 

559 str 

560 String representation suitable for debugging 

561 """ 

562 return str(self) 

563 

564 

565class _LocalLynceusFile(LynceusFile[AsyncFileSystem]): 

566 """ 

567 Implementation of LynceusFile for local filesystem operations. 

568 

569 Handle file operations on the local filesystem using standard 

570 Python pathlib and shutil operations. 

571 """ 

572 

573 def is_local(self): 

574 """ 

575 Check if this file is stored locally. 

576 

577 Returns 

578 ------- 

579 bool 

580 Always True for local files 

581 """ 

582 return True 

583 

584 def _do_delete(self): 

585 """ 

586 Delete the local file. 

587 

588 Use pathlib's unlink() method to remove the file from 

589 the local filesystem. 

590 """ 

591 self._path.unlink() 

592 

593 def _do_download_to(self, *, destination: Path, create_sub_directories: bool): 

594 """ 

595 Download/copy local file to destination. 

596 

597 For local files, this is equivalent to copying the file. 

598 

599 Parameters 

600 ---------- 

601 destination : Path 

602 Target path for the copy 

603 create_sub_directories : bool 

604 Whether to create parent directories 

605 

606 Returns 

607 ------- 

608 object 

609 Result of the copy operation 

610 """ 

611 return self._do_copy_to(destination=destination, create_sub_directories=create_sub_directories) 

612 

613 def _do_exists(self): 

614 """ 

615 Check if the local file exists. 

616 

617 Returns 

618 ------- 

619 bool 

620 True if file exists on local filesystem, False otherwise 

621 """ 

622 return self._path.exists() 

623 

624 def _do_parent_exists(self): 

625 """ 

626 Check if the parent directory exists locally. 

627 

628 Returns 

629 ------- 

630 bool 

631 True if parent directory exists, False otherwise 

632 """ 

633 return self.get_parent_path().exists() 

634 

635 def _do_list_files(self, *, recursive: bool, pattern: str | None = None, **kwargs): 

636 """ 

637 List files in the local directory. 

638 

639 Parameters 

640 ---------- 

641 recursive : bool 

642 If True, use glob for recursive search; if False, use iterdir 

643 pattern : str, optional 

644 Glob pattern for file matching (defaults to '**/*' for recursive) 

645 **kwargs 

646 Additional arguments (ignored for local implementation) 

647 

648 Returns 

649 ------- 

650 Iterator 

651 File paths matching the criteria 

652 """ 

653 if not recursive: 

654 return self._path.iterdir() 

655 return self._path.glob(pattern or '**/*') 

656 

657 def _do_copy_to(self, *, destination: Path, create_sub_directories: bool) -> LynceusFile: 

658 """ 

659 Copy local file to destination. 

660 

661 Create parent directories if needed and copy the file using shutil.copyfile. 

662 

663 Parameters 

664 ---------- 

665 destination : Path 

666 Target path for the copy 

667 create_sub_directories : bool 

668 Whether to create parent directories if they don't exist 

669 

670 Returns 

671 ------- 

672 _LocalLynceusFile 

673 New instance representing the copied file 

674 

675 Raises 

676 ------ 

677 LynceusFileError 

678 If parent directory doesn't exist and create_sub_directories is False 

679 """ 

680 if not destination.parent.exists(): 

681 if create_sub_directories: 

682 destination.parent.mkdir(parents=True, exist_ok=True) 

683 else: 

684 raise LynceusFileError(f'Parent directory of specified destination "{destination}" does not exist;' + 

685 ' you should either create it yourself, or use the corresponding option.') 

686 

687 # Requests the copy. 

688 shutil.copyfile(self.get_path(), destination) 

689 return _LocalLynceusFile(path=destination, logger=self._logger) 

690 

691 def get_raw_path(self) -> str: 

692 """ 

693 Get the raw local file path. 

694 

695 For local files, this is the same as the string representation of the path. 

696 

697 Returns 

698 ------- 

699 str 

700 Local file path as string 

701 """ 

702 return str(self._path) 

703 

704 def get_relative_path(self) -> str: 

705 """ 

706 Get the relative path for local files. 

707 

708 For local files, this returns the same as the raw path. 

709 

710 Returns 

711 ------- 

712 str 

713 Local file path as string 

714 """ 

715 return self.get_raw_path() 

716 

717 

718class _RemoteS3LynceusFile(LynceusFile[S3FileSystemPatched]): 

719 """ 

720 Implementation of LynceusFile for S3-compatible remote storage. 

721 

722 Handle file operations on S3-compatible storage systems using 

723 the S3FileSystemPatched filesystem and S3Utils for operations. 

724 """ 

725 

726 # In addition there is a self.S3_PATH_BEGIN usage in Factory which should be adapted (in case it is NOT S3 !). 

727 def __init__(self, path: Path, logger: Logger, s3filesystem: S3FileSystemPatched, s3_utils: S3Utils): 

728 """ 

729 Initialize remote S3 file instance. 

730 

731 Parameters 

732 ---------- 

733 path : Path 

734 S3 file path 

735 logger : Logger 

736 Logger instance 

737 s3filesystem : S3FileSystemPatched 

738 S3 filesystem implementation 

739 s3_utils : S3Utils 

740 S3 utilities for operations 

741 """ 

742 super().__init__(path, logger, filesystem=s3filesystem) 

743 self.__s3_utils = s3_utils 

744 

745 def get_storage_options(self): 

746 """ 

747 Get S3-specific storage options for file operations. 

748 

749 Build storage options including authentication and S3 configuration. 

750 Include special handling for OVH storage providers. 

751 

752 Returns 

753 ------- 

754 dict 

755 Storage options for S3 operations including authentication and ACL settings 

756 """ 

757 # N.B.: in our Patched remote fs System, we added the needed lynceus_s3_config. 

758 storage_options = { 

759 'anon': False, 

760 LYNCEUS_S3_CONFIG_KEY: self._filesystem.lynceus_s3_config 

761 } 

762 

763 # Checks if it is an OVH remote storage. 

764 if '.ovh.' in self._filesystem.lynceus_s3_config['s3_endpoint']: 

765 # Hacks ACL information to workaround OVH Bug, with default ACL specified by s3fs/botocore. 

766 # Leading to an useless "OSError: [Errno 22] Invalid Argument." ... 

767 storage_options.update( 

768 { 

769 's3_additional_kwargs': {'ACL': 'private'} 

770 } 

771 ) 

772 

773 return storage_options 

774 

775 def is_local(self): 

776 """ 

777 Check if this file is stored locally. 

778 

779 Returns 

780 ------- 

781 bool 

782 Always False for remote S3 files 

783 """ 

784 return False 

785 

786 def _do_delete(self): 

787 """ 

788 Delete the remote S3 file. 

789 

790 Use the S3 filesystem's rm_file method to remove the file 

791 from remote storage. 

792 """ 

793 self._filesystem.rm_file(self.get_raw_path()) 

794 

795 # pylint: disable=unused-argument 

796 def _do_download_to(self, *, destination: Path, create_sub_directories: bool): 

797 """ 

798 Download remote S3 file to local destination. 

799 

800 Use the S3 filesystem's get method to download the file. 

801 

802 Parameters 

803 ---------- 

804 destination : Path 

805 Local path where file should be downloaded 

806 create_sub_directories : bool 

807 Whether to create parent directories (ignored in this implementation) 

808 

809 Returns 

810 ------- 

811 object 

812 Result of the S3 filesystem get operation 

813 """ 

814 return self._filesystem.get(self.get_path(), str(destination)) 

815 

816 def _do_exists(self): 

817 """ 

818 Check if the remote S3 file exists. 

819 

820 Returns 

821 ------- 

822 bool 

823 True if file exists in S3 storage, False otherwise 

824 """ 

825 return self._filesystem.exists(self.get_raw_path()) 

826 

827 def _do_parent_exists(self): 

828 """ 

829 Check if the parent directory exists in S3. 

830 

831 Returns 

832 ------- 

833 bool 

834 True if parent directory exists in S3, False otherwise 

835 """ 

836 return self._filesystem.exists(self.get_raw_path_from_remote_path(self.get_parent_path())) 

837 

838 # pylint: disable=arguments-differ 

839 def _do_list_files(self, *, 

840 recursive: bool, 

841 pattern: str | None = None, 

842 maxdepth: int | None = None, 

843 withdirs: bool | None = None, 

844 detail: bool = False): 

845 """ 

846 List files in the remote S3 directory. 

847 

848 Use S3Utils to list remote files with various filtering options. 

849 

850 Parameters 

851 ---------- 

852 recursive : bool 

853 Whether to search subdirectories recursively 

854 pattern : str, optional 

855 Optional glob pattern to filter files 

856 maxdepth : int, optional 

857 Maximum depth for directory traversal 

858 withdirs : bool, optional 

859 Whether to include directories in results 

860 detail : bool, default False 

861 Whether to return detailed metadata 

862 

863 Returns 

864 ------- 

865 list or dict 

866 File paths or detailed file information 

867 """ 

868 return self.__s3_utils.list_remote_files(remote_root_path=Path(self.get_raw_path()), 

869 recursive=recursive, 

870 pattern=pattern, 

871 maxdepth=maxdepth, 

872 withdirs=withdirs, 

873 detail=detail) 

874 

875 # pylint: disable=unused-argument 

876 def _do_copy_to(self, *, destination: Path, create_sub_directories: bool) -> LynceusFile: 

877 """ 

878 Copy remote S3 file to another S3 location. 

879 

880 Copy the file within S3 storage using the filesystem's copy method. 

881 

882 Parameters 

883 ---------- 

884 destination : Path 

885 S3 destination path (must be relative for remote files) 

886 create_sub_directories : bool 

887 Whether to create parent directories (ignored) 

888 

889 Returns 

890 ------- 

891 _RemoteS3LynceusFile 

892 New instance representing the copied file 

893 

894 Raises 

895 ------ 

896 LynceusFileError 

897 If destination is absolute path for remote files 

898 """ 

899 if self.is_remote() and destination.is_absolute(): 

900 raise LynceusFileError(f'You should use only relative Path with remote file ("{self}"), which is not the case of destination "{destination}"') 

901 

902 bucket_name, _, _ = self.__s3_utils.split_path(remote_file_path=self.get_path()) 

903 complete_destination_path = Path(bucket_name) / destination 

904 self._filesystem.copy(self.get_path(), str(complete_destination_path)) 

905 return _RemoteS3LynceusFile(path=Path(LynceusFile.S3_PATH_BEGIN) / complete_destination_path, 

906 logger=self._logger, 

907 s3filesystem=self._filesystem, 

908 s3_utils=self.__s3_utils) 

909 

910 def get_path(self) -> str: 

911 """ 

912 Get the full S3 path including the s3:// prefix. 

913 

914 Returns 

915 ------- 

916 str 

917 Complete S3 path with protocol prefix 

918 """ 

919 # Important: to work, we must ensure the S3 PATH Begin is unaltered here (the double slash is mandatory ...). 

920 return LynceusFile.S3_PATH_BEGIN + self.get_raw_path() 

921 

922 @staticmethod 

923 def get_raw_path_from_remote_path(path: Path | str): 

924 """ 

925 Convert remote path to raw path without S3 prefix. 

926 

927 Remove the 's3:/' prefix and ensure proper path formatting 

928 for S3 operations. Add trailing slash for root paths to avoid 

929 S3 traversal issues. 

930 

931 Parameters 

932 ---------- 

933 path : Path or str 

934 Remote path with S3 prefix 

935 

936 Returns 

937 ------- 

938 str 

939 Raw path suitable for S3 operations 

940 """ 

941 # Removes the 's3:/' prefix to get a raw path. 

942 raw_path_from_remote_path = str(path)[len(LynceusFile.S3_PATH_BEGIN) - 1:] 

943 

944 # Safe-guard: ensures there is at least one '/' in the final raw path (which is NOT the case for remote 'root path', 

945 # to avoid issue with s3fs path splitting feature, and avoid 'Could not traverse all s3' issue). 

946 if '/' not in raw_path_from_remote_path: 

947 raw_path_from_remote_path += '/' 

948 

949 return raw_path_from_remote_path 

950 

951 def get_raw_path(self) -> str: 

952 """ 

953 Get the raw S3 path without protocol prefix. 

954 

955 Returns 

956 ------- 

957 str 

958 S3 path without the s3:// prefix 

959 """ 

960 return self.get_raw_path_from_remote_path(self._path) 

961 

962 def get_relative_path(self) -> str: 

963 """ 

964 Get the relative path within the S3 bucket. 

965 

966 Extract the relative path portion from the full S3 path, 

967 excluding the bucket name. 

968 

969 Returns 

970 ------- 

971 str 

972 Relative path within the bucket 

973 """ 

974 _, rpath, _ = self.__s3_utils.split_path(remote_file_path=self.get_path()) 

975 return rpath 

976 

977 def __str__(self): 

978 """ 

979 Get string representation of the remote S3 file. 

980 

981 Include the file path and remote storage configuration for debugging. 

982 

983 Returns 

984 ------- 

985 str 

986 Detailed string representation including remote config 

987 """ 

988 return f'"{self.__class__.__name__}" with path "{self._path}" on remote "{LynceusConfig.format_config(self._filesystem.lynceus_s3_config)}"'