Coverage for lynceus/files/remote/s3.py: 92%
62 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-29 08:46 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-29 08:46 +0000
1from logging import Logger
2from pathlib import Path
4import s3fs
6from lynceus.core.config import CONFIG_STORAGE_DYNAMIC_TYPE, CONFIG_STORAGE_IS_DYNAMIC, CONFIG_STORAGE_REMOTE_TYPE, LYNCEUS_S3_CONFIG_KEY
7from lynceus.core.config.lynceus_config import LynceusConfig
8from lynceus.core.exchange.lynceus_exchange import LynceusExchange
9from lynceus.core.lynceus import LynceusSession
10from lynceus.core.lynceus_client import LynceusClientClass
11from lynceus.lynceus_exceptions import LynceusFileError
14class S3Utils(LynceusClientClass):
15 """
16 Utility class for S3 filesystem operations and management.
18 Provide S3-specific functionality including filesystem initialization,
19 cache management, and file listing operations with glob support.
20 """
22 def __init__(self, *, lynceus_session: LynceusSession, lynceus_exchange: LynceusExchange | None, lynceus_s3_config: dict[str, str]):
23 """
24 Initialize S3 utilities with configuration.
26 Parameters
27 ----------
28 lynceus_session : LynceusSession
29 Active Lynceus session
30 lynceus_exchange : LynceusExchange, optional
31 Exchange instance for communication (optional)
32 lynceus_s3_config : dict
33 S3 configuration dictionary with connection details
34 """
35 super().__init__(lynceus_session=lynceus_session, logger_name='s3', lynceus_exchange=lynceus_exchange)
36 self.__lynceus_s3_config = lynceus_s3_config
37 self.__s3filesystem = None
39 def initialize(self):
40 """
41 Initialize the S3 filesystem with the patched implementation.
43 Replace the default s3fs.S3FileSystem with the Lynceus-patched version
44 that includes additional configuration handling.
45 """
46 self._logger.info('Initializing s3fs according to configuration.')
47 s3fs.S3FileSystem = S3FileSystemPatched
49 def get_s3filesystem(self):
50 """
51 Get or create the S3 filesystem instance.
53 Lazy initialization of the S3 filesystem with Lynceus configuration.
54 The filesystem is cached for reuse across operations.
56 Returns
57 -------
58 S3FileSystemPatched
59 Configured S3 filesystem instance
60 """
61 if self.__s3filesystem is None:
62 self.__s3filesystem = s3fs.S3FileSystem(**{LYNCEUS_S3_CONFIG_KEY: self.__lynceus_s3_config})
64 return self.__s3filesystem
66 def force_cache_refresh(self, path: Path | str | None = None):
67 """
68 Force refresh of the S3 filesystem cache.
70 Invalidate cached file information to ensure fresh data is retrieved
71 on subsequent operations. Can target a specific path or refresh all cache.
73 Parameters
74 ----------
75 path : Path or str, optional
76 Specific path to refresh (None for full cache refresh)
77 """
78 if self.__s3filesystem:
79 self._logger.debug(f'Refreshing S3 fs cache ({path=}) ...')
80 self.__s3filesystem.invalidate_cache(path=str(path) if path else None)
82 def split_path(self, *, remote_file_path: str):
83 """
84 Split an S3 path into its components.
86 Use the S3 filesystem's split_path method to decompose
87 a full S3 path into bucket, key, and other components.
89 Parameters
90 ----------
91 remote_file_path : str
92 Full S3 path to split
94 Returns
95 -------
96 tuple
97 Path components (bucket, key, version)
98 """
99 return self.get_s3filesystem().split_path(remote_file_path)
101 def list_remote_files(self, *,
102 remote_root_path: Path | str,
103 recursive: bool,
104 pattern: str | None = None,
105 maxdepth: int | None = None,
106 withdirs: bool | None = None,
107 detail: bool = False):
108 """
109 List files in a remote S3 directory with various filtering options.
111 Provide flexible file listing with support for recursive traversal,
112 pattern matching, depth control, and detailed metadata retrieval.
114 Parameters
115 ----------
116 remote_root_path : Path or str
117 Root path for the search
118 recursive : bool
119 Whether to search subdirectories recursively
120 pattern : str, optional
121 Optional glob pattern for file filtering
122 maxdepth : int, optional
123 Maximum directory depth for traversal
124 withdirs : bool, optional
125 Whether to include directories in results
126 detail : bool, default False
127 Whether to return detailed metadata or just paths
129 Returns
130 -------
131 list or dict
132 File paths (if detail=False) or detailed metadata (if detail=True)
134 Raises
135 ------
136 LynceusFileError
137 If an error occurs during file listing
138 """
140 def _retrieve_remote_files():
141 """
142 Internal function to retrieve remote files based on search criteria.
144 Returns
145 -------
146 list or dict
147 Raw file listing from S3 filesystem
148 """
149 if recursive:
150 # s3fs Globing feature does not support path with the 's3:/' prefix ...
151 return self.get_s3filesystem().glob(str(Path(remote_root_path) / Path(pattern or '**/*')),
152 maxdepth=maxdepth, detail=detail)
154 # Uses the find method, because ls one is NOT implemented in s3fs.
155 # Some options are used only if pattern is NOT defined.
156 find_kwargs: dict[str, str | bool | int] = {
157 'detail': detail
158 }
160 if pattern:
161 find_kwargs.update({'prefix': pattern})
162 else:
163 find_kwargs.update(
164 {
165 'maxdepth': maxdepth or 1,
166 'withdirs': withdirs if withdirs is not None else True
167 }
168 )
170 # s3 find feature does not support path with the 's3:/' prefix ...
171 return self.get_s3filesystem().find(path=remote_root_path, **find_kwargs)
173 def _extract_path(remote_file_path):
174 """
175 Extract the path component from a full S3 file path.
177 Parameters
178 ----------
179 remote_file_path : str
180 Full S3 path
182 Returns
183 -------
184 str
185 Path component (key) from the S3 path
186 """
187 return self.split_path(remote_file_path=remote_file_path)[1]
189 try:
190 self._logger.debug(f'Looking/globbing for remote files ({remote_root_path=}; {pattern=}) ...')
191 all_remote_file_metadata = _retrieve_remote_files()
193 # Processed result according to requested detail
194 # - only path are returned as a list
195 if not detail:
196 return [_extract_path(remote_file_path) for remote_file_path in all_remote_file_metadata]
198 # - path, and lots of metadata are returned as a dict
199 return {_extract_path(remote_file_key): remote_file_detailed_metadata
200 for remote_file_key, remote_file_detailed_metadata in all_remote_file_metadata.items()}
201 except Exception as exc: # pylint: disable=broad-except
202 # pylint: disable=raise-missing-from
203 raise LynceusFileError(f'An error occured while looking/globbing for remote files ({remote_root_path=}; {pattern=}) ...', from_exception=exc)
206# Patch and initialize s3fs once for all.
207# pylint: disable=useless-super-delegation,abstract-method
208class S3FileSystemPatched(s3fs.S3FileSystem):
209 """
210 Patched version of s3fs.S3FileSystem with Lynceus-specific enhancements.
212 Extend the base S3FileSystem to include:
213 - Lynceus configuration integration
214 - Enhanced OVH storage compatibility
215 - Custom client configuration handling
216 - Improved ls() method implementation
217 """
219 def __init__(self, *k, **kw):
220 """
221 Initialize the patched S3 filesystem with Lynceus configuration.
223 Extract Lynceus-specific configuration and set up the S3 client
224 with appropriate parameters for various S3-compatible storage providers.
226 Parameters
227 ----------
228 *k : tuple
229 Positional arguments passed to parent class
230 **kw : dict
231 Keyword arguments including LYNCEUS_S3_CONFIG_KEY
233 Raises
234 ------
235 ValueError
236 If LYNCEUS_S3_CONFIG_KEY is not provided
237 """
238 # Extracts s3 config key from parameters.
239 self.lynceus_s3_config = kw.pop(LYNCEUS_S3_CONFIG_KEY, None)
240 if not self.lynceus_s3_config:
241 raise ValueError(f'{LYNCEUS_S3_CONFIG_KEY} is a mandatory parameter when initializing S3FileSystemPatched')
243 lynceus: LynceusSession = LynceusSession.get_session(registration_key={'user': 's3filesystem'})
244 logger: Logger = lynceus.get_logger('s3Init')
245 logger.info(f'Using S3 config "{LynceusConfig.format_config(self.lynceus_s3_config)}".')
247 # Builds config kwargs (with various element required by OVH storage, from botocore 1.36+.
248 config_kwargs = {
249 'request_checksum_calculation': 'when_required',
250 'response_checksum_validation': 'when_required'
251 }
252 config_kwargs.update({key: value for key, value in self.lynceus_s3_config.items()
253 if key in ('signature_version',)})
255 # Builds client kwargs.
256 client_kwargs = {
257 'endpoint_url': self.lynceus_s3_config['s3_endpoint'],
258 }
260 # Adds any additional/extra parameters to client kwargs.
261 client_kwargs.update(
262 {key: value for key, value in self.lynceus_s3_config.items()
263 if key not in config_kwargs
264 and key not in (CONFIG_STORAGE_REMOTE_TYPE, 'endpoint_url', 'bucket_name', 'username',
265 'access_key_id', 'secret_access_key', 's3_endpoint', 'addressing_style',
266 CONFIG_STORAGE_IS_DYNAMIC, CONFIG_STORAGE_DYNAMIC_TYPE)}
267 )
268 logger.info(f'System will use these S3 client kwargs: "{LynceusConfig.format_config(client_kwargs)}".')
270 super().__init__(*k,
271 key=self.lynceus_s3_config.get('access_key_id'),
272 secret=self.lynceus_s3_config.get('secret_access_key'),
273 config_kwargs=config_kwargs,
274 client_kwargs=client_kwargs,
275 **kw)
277 def ls(self, path, detail=True, **kwargs):
278 """
279 List files and directories at the specified S3 path.
281 Improved implementation that uses the find method with appropriate
282 parameters for directory listing.
284 Parameters
285 ----------
286 path : str
287 S3 path to list
288 detail : bool, default True
289 Whether to return detailed file information
290 **kwargs
291 Additional arguments passed to find method
293 Returns
294 -------
295 list
296 File and directory information
297 """
298 # See: s3fs.core.S3FileSystem._find
299 return self.find(path, maxdepth=1, withdirs=True, detail=detail, **kwargs)