Skip to content

bgc_data_processing.parsers

Parsing tools to determine date ranges.

TomlParser(filepath, check_types=True)

Parsing class for config.toml.

Parameters:

Name Type Description Default
filepath Path | str

Path to the config file.

required
check_types bool

Whether to check types or not., by default True

True
Source code in src/bgc_data_processing/parsers.py
41
42
43
44
45
46
47
def __init__(self, filepath: Path | str, check_types: bool = True) -> None:
    self.filepath = Path(filepath)
    self._check = check_types
    with self.filepath.open("rb") as f:
        self._elements = tomllib.load(f)
    if check_types:
        self._parsed_types = self._parse_types(filepath=self.filepath)

filepath = Path(filepath) instance-attribute

raise_if_wrong_type_below(keys)

Verify types for all variables 'below' keys level.

Parameters:

Name Type Description Default
keys list[str]

'Root' level which to start checking types after

required

Raises:

Type Description
TypeError

if self._elements is not a dictionnary.

Source code in src/bgc_data_processing/parsers.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def raise_if_wrong_type_below(
    self,
    keys: list[str],
) -> None:
    """Verify types for all variables 'below' keys level.

    Parameters
    ----------
    keys : list[str]
        'Root' level which to start checking types after

    Raises
    ------
    TypeError
        if self._elements is not a dictionnary.
    """
    if not self._check:
        return
    if keys:
        var = self._get(keys)
        if not isinstance(var, dict):
            self.raise_if_wrong_type(keys)
        else:
            for key in var:
                self.raise_if_wrong_type_below(keys=[*keys, key])
    elif not isinstance(self._elements, dict):
        error_msg = "Wrong type for toml object, should be a dictionnary"
        raise TypeError(error_msg)
    else:
        for key in self._elements:
            self.raise_if_wrong_type_below(keys=[*keys, key])

raise_if_wrong_type(keys)

Raise a TypeError if the variable type is none of the specified types.

Parameters:

Name Type Description Default
keys list[str]

List path to the variable: ["VAR1", "VAR2", "VAR3"] is the path to the variable VAR1.VAR2.VAR3 in the toml.

required

Raises:

Type Description
TypeError

If the variable doesn't match any of the required types.

Source code in src/bgc_data_processing/parsers.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def raise_if_wrong_type(
    self,
    keys: list[str],
) -> None:
    """Raise a TypeError if the variable type is none of the specified types.

    Parameters
    ----------
    keys : list[str]
        List path to the variable: ["VAR1", "VAR2", "VAR3"]
        is the path to the variable VAR1.VAR2.VAR3 in the toml.

    Raises
    ------
    TypeError
        If the variable doesn't match any of the required types.
    """
    var = self._get(keys)
    types = self._get_type(keys)
    # Check type:
    is_any_type = any(self._check_type(var, var_type) for var_type in types)
    if not is_any_type:
        type_msg = f"Type of {'.'.join(keys)} from {self.filepath} is incorrect."
        crop = lambda x: str(x).split("'")[1]
        iterables = [t for t in types if isinstance(t, tuple)]
        str_iter = [crop(t[0]) + "[" + crop(t[1]) + "]" for t in iterables]
        str_other = [crop(t) for t in types if not isinstance(t, tuple)]
        str_types = ", ".join(str_other + str_iter)
        correct_type_msg = f"Must be of one of these types: {str_types}."
        error_msg = f"{type_msg} {correct_type_msg}"
        raise TypeError(error_msg)

ConfigParser(filepath, check_types=True, dates_vars_keys=None, dirs_vars_keys=None, existing_directory='raise')

Bases: TomlParser

Class to parse toml config scripts.

Parameters:

Name Type Description Default
filepath Path | str

Path to the file.

required
check_types bool

Whether to check types or not., by default True

True
dates_vars_keys list[str | list[str]] | None

Keys to variable defining dates., by default None

None
dirs_vars_keys list[str | list[str]] | None

Keys to variable defining directories., by default None

None
existing_directory str

Behavior for directory creation, 'raise' raises an error if the directory exists and is not empty, 'merge' will keep the directory as is but might replace its content when savong file and 'clean' will erase the directory if it exists.

'raise'
Source code in src/bgc_data_processing/parsers.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def __init__(
    self,
    filepath: Path | str,
    check_types: bool = True,
    dates_vars_keys: list[str | list[str]] | None = None,
    dirs_vars_keys: list[str | list[str]] | None = None,
    existing_directory: str = "raise",
) -> None:
    super().__init__(filepath, check_types)
    if dates_vars_keys is None:
        self.dates_vars_keys = []
    else:
        self.dates_vars_keys = dates_vars_keys
    self.dirs_vars_keys: list[list[str]] = []
    self._parsed = False
    if dirs_vars_keys is not None:
        for var in dirs_vars_keys:
            if isinstance(var, list):
                self.dirs_vars_keys.append(var)
            elif isinstance(var, str):
                self.dirs_vars_keys.append([var])
            else:
                error_msg = (
                    f"Unsupported type for directory key {var}: {type(var)}."
                )
                raise TypeError(error_msg)
    self.existing_dir_behavior = existing_directory
    self._dir_created = {
        "-".join(directory): False for directory in self.dirs_vars_keys
    }

dates_vars_keys = [] instance-attribute

dirs_vars_keys: list[list[str]] = [] instance-attribute

existing_dir_behavior = existing_directory instance-attribute

parse()

Parse the elements to verify types, convert dates and create directries.

Returns:

Type Description
dict

Transformed dictionnary

Source code in src/bgc_data_processing/parsers.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def parse(
    self,
) -> dict:
    """Parse the elements to verify types, convert dates and create directries.

    Returns
    -------
    dict
        Transformed dictionnary
    """
    if self._parsed:
        return
    self._parsed = True
    self.raise_if_wrong_type_below([])
    for keys in self.dates_vars_keys:
        all_keys = [keys] if isinstance(keys, str) else keys
        date = dt.datetime.strptime(self._get(all_keys), "%Y%m%d")
        self._set(all_keys, date)

get(keys)

Get a variable by giving the list of keys to reach the variable.

Parameters:

Name Type Description Default
keys list[str]

Keys to the variable.

required

Returns:

Type Description
Any

The desired variable.

Source code in src/bgc_data_processing/parsers.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@directory_check
def get(self, keys: list[str]) -> Any:
    """Get a variable by giving the list of keys to reach the variable.

    Parameters
    ----------
    keys : list[str]
        Keys to the variable.

    Returns
    -------
    Any
        The desired variable.
    """
    return super()._get(keys)

__getitem__(__k)

Return self._elements[__k].

Parameters:

Name Type Description Default
__k str

Key

required

Returns:

Type Description
Any

Value associated to __k.

Source code in src/bgc_data_processing/parsers.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
@directory_check
def __getitem__(self, __k: str) -> Any:
    """Return self._elements[__k].

    Parameters
    ----------
    __k : str
        Key

    Returns
    -------
    Any
        Value associated to __k.
    """
    self.parse()
    return self._elements[__k]

__repr__()

Represent the object as a string.

Returns:

Type Description
str

self._elements.repr()

Source code in src/bgc_data_processing/parsers.py
438
439
440
441
442
443
444
445
446
def __repr__(self) -> str:
    """Represent the object as a string.

    Returns
    -------
    str
        self._elements.__repr__()
    """
    return self._elements.__repr__()

DefaultTemplatesParser

Bases: TomlParser

Parser for variables.toml to create Template Variables.

variables: dict[str, TemplateVar] property

Return the dictionnary with all created variables.

Returns:

Type Description
dict[str, TemplateVar]

Dictionnary mapping variables names to variables templates.

to_list()

Return the variable ensemble as a list.

Returns:

Type Description
list[TemplateVar]

LIst of all templates.

Source code in src/bgc_data_processing/parsers.py
452
453
454
455
456
457
458
459
460
def to_list(self) -> list[TemplateVar]:
    """Return the variable ensemble as a list.

    Returns
    -------
    list[TemplateVar]
        LIst of all templates.
    """
    return list(self.variables.values())

__getitem__(__k)

Return self.variables[__k].

Parameters:

Name Type Description Default
__k str

Variable name as defined in variables.toml.

required

Returns:

Type Description
TemplateVar

Template Variable associated to __k.

Source code in src/bgc_data_processing/parsers.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def __getitem__(self, __k: str) -> TemplateVar:
    """Return self.variables[__k].

    Parameters
    ----------
    __k : str
        Variable name as defined in variables.toml.

    Returns
    -------
    TemplateVar
        Template Variable associated to __k.
    """
    return self.variables[__k]

WaterMassesParser

Bases: TomlParser

Parser for water_masses.toml to create WaterMass objects.

variables: dict[str, WaterMass] property

Return the dictionnary with all created WaterMass.

Returns:

Type Description
dict[str, WaterMass]

Dictionnary mapping WaterMass acronyms to WaterMass.

__getitem__(__k)

Return self.variables[__k].

Parameters:

Name Type Description Default
__k str

WaterMass acronym as defined in water_masses.toml.

required

Returns:

Type Description
WaterMass

WaterMass associated to __k.

Source code in src/bgc_data_processing/parsers.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def __getitem__(self, __k: str) -> WaterMass:
    """Return self.variables[__k].

    Parameters
    ----------
    __k : str
        WaterMass acronym as defined in water_masses.toml.

    Returns
    -------
    WaterMass
        WaterMass associated to __k.
    """
    return self.variables[__k]

directory_check(get_variable)

Use as decorator to create directories only when needed.

Parameters:

Name Type Description Default
get_variable Callable

get of getitem function.

required

Returns:

Type Description
Callable

Wrapper function.

Raises:

Type Description
IsADirectoryError

If the directory exists

Source code in src/bgc_data_processing/parsers.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def directory_check(get_variable: Callable) -> Callable:
    """Use as decorator to create directories only when needed.

    Parameters
    ----------
    get_variable : Callable
        get of __getitem__ function.

    Returns
    -------
    Callable
        Wrapper function.

    Raises
    ------
    IsADirectoryError
        If the directory exists
    """

    @wraps(get_variable)
    def wrapper_func(self: "ConfigParser", keys: str | list[str]):
        keys_dirs = [keys] if isinstance(keys, str) else keys
        if (
            keys_dirs in self.dirs_vars_keys
            and not self._dir_created["-".join(keys_dirs)]
        ):
            directory = Path(get_variable(self, keys))
            if directory.is_dir():
                if [p for p in directory.glob("*.*") if p.name != ".gitignore"]:
                    if self.existing_dir_behavior == "raise":
                        error_msg = (
                            f"Directory {directory} already exists and is not empty."
                        )
                        raise IsADirectoryError(error_msg)
                    if self.existing_dir_behavior == "merge":
                        pass
                    elif self.existing_dir_behavior == "clean":
                        shutil.rmtree(directory)
                        directory.mkdir()
            else:
                directory.mkdir()
                gitignore = directory.joinpath(".gitignore")
                with gitignore.open("w") as file:
                    file.write("*")
            self._dir_created["-".join(keys_dirs)] = True
            return directory
        return get_variable(self, keys)

    return wrapper_func