Coverage for lynceus/utils/storage.py: 86%
71 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-29 08:46 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-29 08:46 +0000
1from logging import Logger
2from pathlib import Path
4from lynceus.core.config import (
5 CONFIG_PROJECT_KEY,
6 CONFIG_PROJECT_ROOT_PATH_HOLDER,
7 CONFIG_STORAGE_LOCAL,
8)
9from lynceus.core.config.lynceus_config import LynceusConfig
10from lynceus.core.exchange.lynceus_exchange import LynceusExchange
11from lynceus.core.lynceus import LynceusSession
12from lynceus.files.file_factory import LynceusFileFactory
13from lynceus.files.lynceus_file import LynceusFile
14from lynceus.files.storage import StorageMetadataBase
15from lynceus.lynceus_exceptions import LynceusConfigError, LynceusFileError
16from lynceus.utils import lookup_root_path
19def create_storage_file_factory(
20 *,
21 name: str,
22 lynceus_session: LynceusSession,
23 lynceus_config: LynceusConfig,
24 logger: Logger,
25 log_prefix: str,
26 remote_config_section: str | None,
27 remote_mode_forced_by_cli: bool = True,
28 source_path_format: str = "{target}",
29 dest_path_format: str = "{dest_file_name}",
30 lynceus_exchange: LynceusExchange | None = None,
31 remote_dynamic_type_class_map: dict[str, type[StorageMetadataBase]] | None = None,
32 dynamic_storage_mandatory_param_map: dict[str, bool] | None = None,
33) -> LynceusFileFactory | None:
34 """
35 Create a storage file factory for managing file operations.
37 Create and configure a LynceusFileFactory instance for handling file storage
38 operations across different storage backends (local, remote, dynamic remote).
39 Validate configuration and return None if the specified configuration section
40 doesn't exist.
42 Parameters
43 ----------
44 name : str
45 Name identifier for the storage factory.
46 lynceus_session : LynceusSession
47 The Lynceus session instance.
48 lynceus_config : LynceusConfig
49 Configuration object containing storage settings.
50 logger : Logger
51 Logger instance for logging operations.
52 log_prefix : str
53 Prefix string for log messages.
54 remote_config_section : str | None
55 Configuration section name for remote storage.
56 remote_mode_forced_by_cli : bool, optional
57 Whether remote mode is forced by CLI. Defaults to True.
58 source_path_format : str, optional
59 Format string for source paths. Defaults to '{target}'.
60 dest_path_format : str, optional
61 Format string for destination paths. Defaults to '{dest_file_name}'.
62 lynceus_exchange : LynceusExchange | None, optional
63 Exchange instance for remote operations. Defaults to None.
64 remote_dynamic_type_class_map : dict[str, type[StorageMetadataBase]] | None, optional
65 Mapping of storage types to metadata classes for dynamic remote storage. Defaults to None.
66 dynamic_storage_mandatory_param_map : dict[str, bool] | None, optional
67 Mapping of parameter names to their mandatory status for dynamic storage. Defaults to None.
69 Returns
70 -------
71 LynceusFileFactory | None
72 A configured file factory instance, or None if
73 the remote config section doesn't exist.
75 Warns
76 -----
77 Logs a warning if the specified remote_config_section doesn't exist in the configuration.
78 """
79 # Safe-guard: ensure specified config_section exists in Lynceus configuration.
80 if remote_config_section and not lynceus_config.has_section(remote_config_section):
81 logger.warning(
82 f'{log_prefix} unable to register storage "{name}", because configuration section "{remote_config_section}" does not exist. Fix your configuration.'
83 )
84 return None
86 # Creates a new Lynceus file factory corresponding to needs.
87 return LynceusFileFactory(
88 name=name,
89 lynceus_session=lynceus_session,
90 lynceus_config=lynceus_config,
91 remote_config_section=remote_config_section,
92 remote_mode_forced_by_cli=remote_mode_forced_by_cli,
93 source_path_format=source_path_format,
94 dest_path_format=dest_path_format,
95 lynceus_exchange=lynceus_exchange,
96 remote_dynamic_type_class_map=remote_dynamic_type_class_map,
97 dynamic_storage_mandatory_param_map=dynamic_storage_mandatory_param_map,
98 )
101def extract_dynamic_remote_storage_params(
102 lynceus_config: LynceusConfig,
103 *,
104 dynamic_storage_mandatory_param_map: dict[str, bool] | None = None,
105) -> dict[str, str | int]:
106 """
107 Extract and validate parameters for dynamic remote storage from configuration.
109 Retrieve configuration parameters required for dynamic remote storage operations,
110 validate mandatory parameters, and handle type conversion for numeric values.
112 Parameters
113 ----------
114 lynceus_config : LynceusConfig
115 Configuration object to extract parameters from.
116 dynamic_storage_mandatory_param_map : dict[str, bool] | None, optional
117 Mapping of parameter names to their mandatory status. If a parameter is
118 marked as mandatory (True) and missing, raises an exception. Defaults to None.
120 Returns
121 -------
122 dict[str, str | int]
123 Dictionary of extracted parameters with string or integer values.
125 Raises
126 ------
127 LynceusConfigError
128 If the project configuration section is missing or if a
129 mandatory parameter is not found in the configuration.
131 Notes
132 -----
133 Automatically converts numeric string values to integers for compatibility
134 with different input sources (config files vs API/CLI/Tests).
135 """
136 if not lynceus_config.has_section(CONFIG_PROJECT_KEY):
137 raise LynceusConfigError(
138 f"Unable to find [{CONFIG_PROJECT_KEY}] configuration section in specified configuration file."
139 )
141 dynamic_remote_storage_params = {}
142 for param, is_mandatory in dynamic_storage_mandatory_param_map.items():
143 value = lynceus_config.get_config(CONFIG_PROJECT_KEY, param, default=None)
144 if value is None:
145 if is_mandatory:
146 raise LynceusConfigError(
147 f'Unable to find "{param}" option (mandatory for dynamic remote storage) inside'
148 + f" [{CONFIG_PROJECT_KEY}] configuration section in specified configuration file."
149 )
150 continue
152 # Checks the type of value, can be either:
153 # - string if coming from a static configuration file
154 # - int if coming from API, CLI or Tests
155 if isinstance(value, str):
156 value = value if not value.isnumeric() else int(value)
158 dynamic_remote_storage_params[param] = value
160 return dynamic_remote_storage_params
163def get_lynceus_file_from_metadata(
164 *,
165 file_metadata: str,
166 lynceus_config: LynceusConfig,
167 logger: Logger,
168 log_prefix: str,
169 storage_file_factory_map: dict[str, LynceusFileFactory],
170 locally_retrieved_repository_root_path: Path | None,
171 must_exist: bool,
172 overriden_root_path_if_local: Path = None,
173 dynamic_storage_mandatory_param_map: dict[str, bool] | None = None,
174) -> LynceusFile:
175 """
176 Create a LynceusFile instance from file metadata string.
178 Parse file metadata to extract storage name and file path, then create
179 an appropriate LynceusFile instance using the corresponding storage factory.
180 Handle different storage types including local, remote, and dynamic remote storage.
182 Parameters
183 ----------
184 file_metadata : str
185 Metadata string in format 'storage_name:file_path'.
186 lynceus_config : LynceusConfig
187 Configuration object for storage settings.
188 logger : Logger
189 Logger instance for logging operations.
190 log_prefix : str
191 Prefix string for log messages.
192 storage_file_factory_map : dict[str, LynceusFileFactory]
193 Mapping of storage names to their corresponding file factory instances.
194 locally_retrieved_repository_root_path : Path | None
195 Root path of locally retrieved repository, required for local file operations.
196 must_exist : bool
197 Whether the file must exist when creating the LynceusFile instance.
198 overriden_root_path_if_local : Path, optional
199 Override root path for local files. Defaults to None.
200 dynamic_storage_mandatory_param_map : dict[str, bool] | None, optional
201 Mapping of parameter names to mandatory status for dynamic storage. Defaults to None.
203 Returns
204 -------
205 LynceusFile
206 A configured LynceusFile instance ready for file operations.
208 Raises
209 ------
210 LynceusConfigError
211 If the specified storage is not configured in the factory map.
212 LynceusFileError
213 If there's an error preparing dynamic remote storage.
214 ValueError
215 If repository root path is required but not provided for local files.
217 Notes
218 -----
219 For local files, automatically handles path resolution including special
220 placeholders and relative path conversion to absolute paths.
221 """
222 # Extracts storage name and file path from metadata.
223 storage_name, file_path = LynceusFile.extract_storage_and_path(file_metadata)
224 dest_path_format: str | None = None
225 override_dest_path_kwargs: dict[str, str] | None = None
227 storage_file_factory: LynceusFileFactory = storage_file_factory_map.get(
228 storage_name
229 )
230 if not storage_file_factory:
231 raise LynceusConfigError(
232 f'{log_prefix} file option ("{file_metadata}"), is hosted on remote storage "{storage_name}"'
233 + ", which is not configured!"
234 + f" Available/configured remote storages: {storage_file_factory_map}."
235 )
237 # Manages dynamic remote storage if needed.
238 if storage_file_factory.is_dynamic_remote:
239 # TODO: limitation is that ALL remote file coming from a dynamic remote, share the same parameters linked to the parent project.
240 # Thus: atm it is NOT possible to have a reference file on a dynamic remote and the solution file on another dynamic remote, for the same project.
241 try:
242 dynamic_remote_storage_params: dict[str, str | int] = (
243 extract_dynamic_remote_storage_params(
244 lynceus_config,
245 dynamic_storage_mandatory_param_map=dynamic_storage_mandatory_param_map,
246 )
247 )
248 storage_file_factory.update_dynamic_storage_params(
249 dynamic_remote_storage_params
250 )
252 # Checks if the path must be formatted.
253 if "{" in file_path:
254 dest_path_format = str(file_path)
255 override_dest_path_kwargs = dynamic_remote_storage_params
257 except LynceusConfigError as exc:
258 # pylint: disable=raise-missing-from
259 raise LynceusFileError(
260 f'Unable to prepare system to use dynamic remote storage "{storage_name}"',
261 exc,
262 )
264 # Manages Local file if needed:
265 if storage_name == CONFIG_STORAGE_LOCAL:
266 if overriden_root_path_if_local:
267 file_path: str = str(
268 lookup_root_path(
269 file_path,
270 remaining_iteration=4,
271 root_path=overriden_root_path_if_local,
272 )
273 / Path(file_path)
274 )
275 else:
276 # - adds special CONFIG_PROJECT_ROOT_PATH_HOLDER keyword at beginning if relative path
277 if CONFIG_PROJECT_ROOT_PATH_HOLDER not in file_path and not file_path.startswith("/"):
278 file_path: str = CONFIG_PROJECT_ROOT_PATH_HOLDER + file_path
280 # - replaces CONFIG_PROJECT_ROOT_PATH_HOLDER keyword by retrieved repository root path
281 if CONFIG_PROJECT_ROOT_PATH_HOLDER in file_path:
282 if not locally_retrieved_repository_root_path:
283 raise ValueError(
284 "Repository should have been locally retrieved for this request."
285 )
287 root_dir: str = str(locally_retrieved_repository_root_path)
288 file_path: str = file_path.replace(
289 CONFIG_PROJECT_ROOT_PATH_HOLDER, root_dir + "/"
290 )
292 # Creates a LynceusFile instance.
293 logger.debug(
294 f"{log_prefix} creating LynceusFile ({file_path=}; {dest_path_format=}; {override_dest_path_kwargs=}) ..."
295 )
296 lynceus_file: LynceusFile = storage_file_factory.new_file(
297 source_name=None,
298 source_file_name=file_path,
299 dest_path_format=dest_path_format,
300 override_dest_path_kwargs=override_dest_path_kwargs,
301 create_sub_directories=False,
302 must_exist=must_exist,
303 )
305 return lynceus_file
308def retrieve_remote_file_locally(
309 *,
310 lynceus_file: LynceusFile,
311 logger: Logger,
312 log_prefix: str,
313 dest_dir_path: Path,
314 extension_if_none: str | None = None,
315) -> Path | None:
316 """
317 Download a remote file to local filesystem for processing.
319 Downloads remote files to a local directory to enable operations that require
320 local file access (e.g., scoring engines, third-party tools). If the file is
321 already local, returns its existing path.
323 Parameters
324 ----------
325 lynceus_file : LynceusFile
326 The file object to download. If None, returns None.
327 logger : Logger
328 Logger instance for logging operations.
329 log_prefix : str
330 Prefix string for log messages.
331 dest_dir_path : Path
332 Local directory path where the file should be downloaded.
333 extension_if_none : str | None, optional
334 File extension to add if the file
335 has no extension. Should include the dot (e.g., '.txt'). Defaults to None.
337 Returns
338 -------
339 Path | None
340 Path to the local file (existing or downloaded), or None if
341 lynceus_file is None.
343 Notes
344 -----
345 This method is particularly useful when:
346 - Scoring engines cannot work with remote files
347 - Third-party tool configuration files are stored remotely
348 - Local file access is required for processing
350 Examples
351 --------
352 >>> local_path = retrieve_remote_file_locally(
353 ... lynceus_file=remote_config_file,
354 ... logger=logger,
355 ... log_prefix='[Config]',
356 ... dest_dir_path=Path('/tmp/config'),
357 ... extension_if_none='.json'
358 ... )
359 """
360 # This method is useful in several situations, for instance:
361 # - scoring engine is unable to work with remote file ... retrieve the remote file locally first
362 # - third-party tool configuration file can be overriden and put on remote storage, so we retrieve them locally first
363 if lynceus_file is None:
364 return None
366 if lynceus_file.is_local():
367 return lynceus_file.path
369 local_file_name: str = lynceus_file.get_name()
370 local_path: Path = dest_dir_path / Path(local_file_name)
371 if not lynceus_file.get_extension() and extension_if_none:
372 local_path = local_path.with_suffix(
373 extension_if_none
374 if extension_if_none.startswith(".")
375 else f".{extension_if_none}"
376 )
378 lynceus_file.download_to(local_path)
379 logger.debug(f"{log_prefix} saved {lynceus_file=} to {local_path=} to ease usage.")
380 return local_path