Coverage for lynceus/files/remote/s3.py: 92%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-29 08:46 +0000

1from logging import Logger 

2from pathlib import Path 

3 

4import s3fs 

5 

6from lynceus.core.config import CONFIG_STORAGE_DYNAMIC_TYPE, CONFIG_STORAGE_IS_DYNAMIC, CONFIG_STORAGE_REMOTE_TYPE, LYNCEUS_S3_CONFIG_KEY 

7from lynceus.core.config.lynceus_config import LynceusConfig 

8from lynceus.core.exchange.lynceus_exchange import LynceusExchange 

9from lynceus.core.lynceus import LynceusSession 

10from lynceus.core.lynceus_client import LynceusClientClass 

11from lynceus.lynceus_exceptions import LynceusFileError 

12 

13 

14class S3Utils(LynceusClientClass): 

15 """ 

16 Utility class for S3 filesystem operations and management. 

17 

18 Provide S3-specific functionality including filesystem initialization, 

19 cache management, and file listing operations with glob support. 

20 """ 

21 

22 def __init__(self, *, lynceus_session: LynceusSession, lynceus_exchange: LynceusExchange | None, lynceus_s3_config: dict[str, str]): 

23 """ 

24 Initialize S3 utilities with configuration. 

25 

26 Parameters 

27 ---------- 

28 lynceus_session : LynceusSession 

29 Active Lynceus session 

30 lynceus_exchange : LynceusExchange, optional 

31 Exchange instance for communication (optional) 

32 lynceus_s3_config : dict 

33 S3 configuration dictionary with connection details 

34 """ 

35 super().__init__(lynceus_session=lynceus_session, logger_name='s3', lynceus_exchange=lynceus_exchange) 

36 self.__lynceus_s3_config = lynceus_s3_config 

37 self.__s3filesystem = None 

38 

39 def initialize(self): 

40 """ 

41 Initialize the S3 filesystem with the patched implementation. 

42 

43 Replace the default s3fs.S3FileSystem with the Lynceus-patched version 

44 that includes additional configuration handling. 

45 """ 

46 self._logger.info('Initializing s3fs according to configuration.') 

47 s3fs.S3FileSystem = S3FileSystemPatched 

48 

49 def get_s3filesystem(self): 

50 """ 

51 Get or create the S3 filesystem instance. 

52 

53 Lazy initialization of the S3 filesystem with Lynceus configuration. 

54 The filesystem is cached for reuse across operations. 

55 

56 Returns 

57 ------- 

58 S3FileSystemPatched 

59 Configured S3 filesystem instance 

60 """ 

61 if self.__s3filesystem is None: 

62 self.__s3filesystem = s3fs.S3FileSystem(**{LYNCEUS_S3_CONFIG_KEY: self.__lynceus_s3_config}) 

63 

64 return self.__s3filesystem 

65 

66 def force_cache_refresh(self, path: Path | str | None = None): 

67 """ 

68 Force refresh of the S3 filesystem cache. 

69 

70 Invalidate cached file information to ensure fresh data is retrieved 

71 on subsequent operations. Can target a specific path or refresh all cache. 

72 

73 Parameters 

74 ---------- 

75 path : Path or str, optional 

76 Specific path to refresh (None for full cache refresh) 

77 """ 

78 if self.__s3filesystem: 

79 self._logger.debug(f'Refreshing S3 fs cache ({path=}) ...') 

80 self.__s3filesystem.invalidate_cache(path=str(path) if path else None) 

81 

82 def split_path(self, *, remote_file_path: str): 

83 """ 

84 Split an S3 path into its components. 

85 

86 Use the S3 filesystem's split_path method to decompose 

87 a full S3 path into bucket, key, and other components. 

88 

89 Parameters 

90 ---------- 

91 remote_file_path : str 

92 Full S3 path to split 

93 

94 Returns 

95 ------- 

96 tuple 

97 Path components (bucket, key, version) 

98 """ 

99 return self.get_s3filesystem().split_path(remote_file_path) 

100 

101 def list_remote_files(self, *, 

102 remote_root_path: Path | str, 

103 recursive: bool, 

104 pattern: str | None = None, 

105 maxdepth: int | None = None, 

106 withdirs: bool | None = None, 

107 detail: bool = False): 

108 """ 

109 List files in a remote S3 directory with various filtering options. 

110 

111 Provide flexible file listing with support for recursive traversal, 

112 pattern matching, depth control, and detailed metadata retrieval. 

113 

114 Parameters 

115 ---------- 

116 remote_root_path : Path or str 

117 Root path for the search 

118 recursive : bool 

119 Whether to search subdirectories recursively 

120 pattern : str, optional 

121 Optional glob pattern for file filtering 

122 maxdepth : int, optional 

123 Maximum directory depth for traversal 

124 withdirs : bool, optional 

125 Whether to include directories in results 

126 detail : bool, default False 

127 Whether to return detailed metadata or just paths 

128 

129 Returns 

130 ------- 

131 list or dict 

132 File paths (if detail=False) or detailed metadata (if detail=True) 

133 

134 Raises 

135 ------ 

136 LynceusFileError 

137 If an error occurs during file listing 

138 """ 

139 

140 def _retrieve_remote_files(): 

141 """ 

142 Internal function to retrieve remote files based on search criteria. 

143 

144 Returns 

145 ------- 

146 list or dict 

147 Raw file listing from S3 filesystem 

148 """ 

149 if recursive: 

150 # s3fs Globing feature does not support path with the 's3:/' prefix ... 

151 return self.get_s3filesystem().glob(str(Path(remote_root_path) / Path(pattern or '**/*')), 

152 maxdepth=maxdepth, detail=detail) 

153 

154 # Uses the find method, because ls one is NOT implemented in s3fs. 

155 # Some options are used only if pattern is NOT defined. 

156 find_kwargs: dict[str, str | bool | int] = { 

157 'detail': detail 

158 } 

159 

160 if pattern: 

161 find_kwargs.update({'prefix': pattern}) 

162 else: 

163 find_kwargs.update( 

164 { 

165 'maxdepth': maxdepth or 1, 

166 'withdirs': withdirs if withdirs is not None else True 

167 } 

168 ) 

169 

170 # s3 find feature does not support path with the 's3:/' prefix ... 

171 return self.get_s3filesystem().find(path=remote_root_path, **find_kwargs) 

172 

173 def _extract_path(remote_file_path): 

174 """ 

175 Extract the path component from a full S3 file path. 

176 

177 Parameters 

178 ---------- 

179 remote_file_path : str 

180 Full S3 path 

181 

182 Returns 

183 ------- 

184 str 

185 Path component (key) from the S3 path 

186 """ 

187 return self.split_path(remote_file_path=remote_file_path)[1] 

188 

189 try: 

190 self._logger.debug(f'Looking/globbing for remote files ({remote_root_path=}; {pattern=}) ...') 

191 all_remote_file_metadata = _retrieve_remote_files() 

192 

193 # Processed result according to requested detail 

194 # - only path are returned as a list 

195 if not detail: 

196 return [_extract_path(remote_file_path) for remote_file_path in all_remote_file_metadata] 

197 

198 # - path, and lots of metadata are returned as a dict 

199 return {_extract_path(remote_file_key): remote_file_detailed_metadata 

200 for remote_file_key, remote_file_detailed_metadata in all_remote_file_metadata.items()} 

201 except Exception as exc: # pylint: disable=broad-except 

202 # pylint: disable=raise-missing-from 

203 raise LynceusFileError(f'An error occured while looking/globbing for remote files ({remote_root_path=}; {pattern=}) ...', from_exception=exc) 

204 

205 

206# Patch and initialize s3fs once for all. 

207# pylint: disable=useless-super-delegation,abstract-method 

208class S3FileSystemPatched(s3fs.S3FileSystem): 

209 """ 

210 Patched version of s3fs.S3FileSystem with Lynceus-specific enhancements. 

211 

212 Extend the base S3FileSystem to include: 

213 - Lynceus configuration integration 

214 - Enhanced OVH storage compatibility 

215 - Custom client configuration handling 

216 - Improved ls() method implementation 

217 """ 

218 

219 def __init__(self, *k, **kw): 

220 """ 

221 Initialize the patched S3 filesystem with Lynceus configuration. 

222 

223 Extract Lynceus-specific configuration and set up the S3 client 

224 with appropriate parameters for various S3-compatible storage providers. 

225 

226 Parameters 

227 ---------- 

228 *k : tuple 

229 Positional arguments passed to parent class 

230 **kw : dict 

231 Keyword arguments including LYNCEUS_S3_CONFIG_KEY 

232 

233 Raises 

234 ------ 

235 ValueError 

236 If LYNCEUS_S3_CONFIG_KEY is not provided 

237 """ 

238 # Extracts s3 config key from parameters. 

239 self.lynceus_s3_config = kw.pop(LYNCEUS_S3_CONFIG_KEY, None) 

240 if not self.lynceus_s3_config: 

241 raise ValueError(f'{LYNCEUS_S3_CONFIG_KEY} is a mandatory parameter when initializing S3FileSystemPatched') 

242 

243 lynceus: LynceusSession = LynceusSession.get_session(registration_key={'user': 's3filesystem'}) 

244 logger: Logger = lynceus.get_logger('s3Init') 

245 logger.info(f'Using S3 config "{LynceusConfig.format_config(self.lynceus_s3_config)}".') 

246 

247 # Builds config kwargs (with various element required by OVH storage, from botocore 1.36+. 

248 config_kwargs = { 

249 'request_checksum_calculation': 'when_required', 

250 'response_checksum_validation': 'when_required' 

251 } 

252 config_kwargs.update({key: value for key, value in self.lynceus_s3_config.items() 

253 if key in ('signature_version',)}) 

254 

255 # Builds client kwargs. 

256 client_kwargs = { 

257 'endpoint_url': self.lynceus_s3_config['s3_endpoint'], 

258 } 

259 

260 # Adds any additional/extra parameters to client kwargs. 

261 client_kwargs.update( 

262 {key: value for key, value in self.lynceus_s3_config.items() 

263 if key not in config_kwargs 

264 and key not in (CONFIG_STORAGE_REMOTE_TYPE, 'endpoint_url', 'bucket_name', 'username', 

265 'access_key_id', 'secret_access_key', 's3_endpoint', 'addressing_style', 

266 CONFIG_STORAGE_IS_DYNAMIC, CONFIG_STORAGE_DYNAMIC_TYPE)} 

267 ) 

268 logger.info(f'System will use these S3 client kwargs: "{LynceusConfig.format_config(client_kwargs)}".') 

269 

270 super().__init__(*k, 

271 key=self.lynceus_s3_config.get('access_key_id'), 

272 secret=self.lynceus_s3_config.get('secret_access_key'), 

273 config_kwargs=config_kwargs, 

274 client_kwargs=client_kwargs, 

275 **kw) 

276 

277 def ls(self, path, detail=True, **kwargs): 

278 """ 

279 List files and directories at the specified S3 path. 

280 

281 Improved implementation that uses the find method with appropriate 

282 parameters for directory listing. 

283 

284 Parameters 

285 ---------- 

286 path : str 

287 S3 path to list 

288 detail : bool, default True 

289 Whether to return detailed file information 

290 **kwargs 

291 Additional arguments passed to find method 

292 

293 Returns 

294 ------- 

295 list 

296 File and directory information 

297 """ 

298 # See: s3fs.core.S3FileSystem._find 

299 return self.find(path, maxdepth=1, withdirs=True, detail=detail, **kwargs)