Skip to content

bgc_data_processing.comparison.matching

Data selectors objects.

SelectiveABFileLoader(provider_name, category, exclude, variables, grid_basename)

Bases: ABFileLoader

Load ABFile only on given points.

Parameters:

Name Type Description Default
provider_name str

Data provider name.

required
category str

Category provider belongs to.

required
exclude list[str]

Filenames to exclude from loading.

required
variables LoadingVariablesSet

Storer object containing all variables to consider for this data, both the one in the data file but and the one not represented in the file.

required
grid_basename str

Basename of the ab grid grid file for the loader. => files are considered to be loaded over the same grid.

required
Source code in src/bgc_data_processing/comparison/matching.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    provider_name: str,
    category: str,
    exclude: list[str],
    variables: "LoadingVariablesSet",
    grid_basename: str,
) -> None:
    super().__init__(
        provider_name=provider_name,
        category=category,
        exclude=exclude,
        variables=variables,
        grid_basename=grid_basename,
    )

load(filepath, constraints, mask)

Load a abfiles from basename.

Parameters:

Name Type Description Default
filepath Path | str

Path to the basename of the file to load.

required
constraints Constraints

Constraints slicer.

required

Returns:

Type Description
DataFrame

DataFrame corresponding to the file.

Source code in src/bgc_data_processing/comparison/matching.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def load(
    self,
    filepath: Path | str,
    constraints: Constraints,
    mask: "Mask",
) -> pd.DataFrame:
    """Load a abfiles from basename.

    Parameters
    ----------
    filepath: Path | str
        Path to the basename of the file to load.
    constraints : Constraints, optional
        Constraints slicer.

    Returns
    -------
    pd.DataFrame
        DataFrame corresponding to the file.
    """
    self._index = mask.index
    basename = ABFileLoader.convert_filepath_to_basename(filepath)
    raw_data = self._read(basename=str(basename), mask=mask)
    # transform thickness in depth
    with_depth = self._create_depth_column(raw_data)
    # create date columns
    with_dates = self._set_date_related_columns(with_depth, basename)
    # converts types
    typed = self._convert_types(with_dates)
    # apply corrections
    corrected = self._correct(typed)
    # apply constraints
    constrained = constraints.apply_constraints_to_dataframe(corrected)
    return self.remove_nan_rows(constrained)

from_abloader(loader) classmethod

Create a Selective loader based on an existing loader.

Parameters:

Name Type Description Default
loader ABFileLoader

Loader to use as reference.

required

Returns:

Type Description
SelectiveABFileLoader

Selective Loader.

Source code in src/bgc_data_processing/comparison/matching.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
@classmethod
def from_abloader(
    cls,
    loader: ABFileLoader,
) -> "SelectiveABFileLoader":
    """Create a Selective loader based on an existing loader.

    Parameters
    ----------
    loader : ABFileLoader
        Loader to use as reference.

    Returns
    -------
    SelectiveABFileLoader
        Selective Loader.
    """
    return SelectiveABFileLoader(
        provider_name=loader.provider,
        category=loader.category,
        exclude=loader.excluded_filenames,
        variables=loader.variables,
        grid_basename=loader.grid_basename,
    )

NearestNeighborStrategy(**model_kwargs)

Implement a closest point search using NearestNeighbor algorithm.

Parameters:

Name Type Description Default
**model_kwargs

Additional arguments to pass to sklearn.neighbors.NearestNeighbors. The value of 'n_neighbors' while be overridden by 1.

{}
Source code in src/bgc_data_processing/comparison/matching.py
309
310
311
def __init__(self, **model_kwargs) -> None:
    model_kwargs["n_neighbors"] = 1
    self.model_kwargs = model_kwargs

model_kwargs = model_kwargs instance-attribute

name: str property

Strategy name.

get_closest_indexes(simulations_lat_lon, observations_lat_lon)

Find closest simulation point for each observation point.

Parameters:

Name Type Description Default
simulations_lat_lon DataFrame

DataFrame with longitude and latitude for each simulations point.

required
observations_lat_lon DataFrame

DataFrame with longitude and latitude for each observation point.

required

Returns:

Type Description
Series

Index of closest point for every observation point.

Source code in src/bgc_data_processing/comparison/matching.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
@with_verbose(
    trigger_threshold=2,
    message=f"Closest index selection using {_strategy_name} strategy.",
)
def get_closest_indexes(
    self,
    simulations_lat_lon: pd.DataFrame,
    observations_lat_lon: pd.DataFrame,
) -> pd.Series:
    """Find closest simulation point for each observation point.

    Parameters
    ----------
    simulations_lat_lon : pd.DataFrame
        DataFrame with longitude and latitude for each simulations point.
    observations_lat_lon : pd.DataFrame
        DataFrame with longitude and latitude for each observation point.

    Returns
    -------
    pd.Series
        Index of closest point for every observation point.
    """
    model = NearestNeighbors(**self.model_kwargs)
    # Transforming to radian for haversine metric compatibility
    sim_radians = simulations_lat_lon * np.pi / 180
    obs_radians = observations_lat_lon * np.pi / 180
    model.fit(X=sim_radians)
    closest = model.kneighbors(
        obs_radians,
        return_distance=False,
    )
    return pd.Series(closest.flatten(), index=observations_lat_lon.index)

Mask(mask_2d, index_2d)

Mask to apply to ABFiles to filter data while loading.

Parameters:

Name Type Description Default
mask_2d ndarray

2D array to mask layers when loading them.

required
index_2d ndarray

2D array of indexes to use to reindex the filtered array.

required

Raises:

Type Description
ValueError

If the mask and the index have a different shape.

Source code in src/bgc_data_processing/comparison/matching.py
369
370
371
def __init__(self, mask_2d: np.ndarray, index_2d: np.ndarray) -> None:
    self._index_2d = index_2d
    self.mask = mask_2d

mask: np.ndarray property writable

2D boolean mask.

index: pd.Index property

Index for masked data reindexing.

Returns:

Type Description
Index

Data Index.

__call__(data_2d, **kwargs)

Apply mask to 2D data.

Parameters:

Name Type Description Default
data_2d ndarray

Data to apply the mask to.

required
**kwargs

Additional parameters to pass to pd.Series. The value of 'index' while be overridden by self._index.

{}

Returns:

Type Description
Series

Masked data as a pd.Series with self._index as index.

Source code in src/bgc_data_processing/comparison/matching.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def __call__(self, data_2d: np.ndarray, **kwargs) -> pd.Series:
    """Apply mask to 2D data.

    Parameters
    ----------
    data_2d : np.ndarray
        Data to apply the mask to.
    **kwargs:
        Additional parameters to pass to pd.Series.
        The value of 'index' while be overridden by self._index.

    Returns
    -------
    pd.Series
        Masked data as a pd.Series with self._index as index.
    """
    kwargs["index"] = self._index
    return pd.Series(data_2d[self._mask].flatten(), **kwargs)

intersect(mask_array)

Intersect the mask with another (same-shaped) boolean array.

Parameters:

Name Type Description Default
mask_array ndarray

Array to intersect with.

required

Returns:

Type Description
Mask

New mask whith self._mask & mask_array as mask array.

Raises:

Type Description
IncompatibleMaskShapeError

If mask_array has the wrong shape.

Source code in src/bgc_data_processing/comparison/matching.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def intersect(self, mask_array: np.ndarray) -> "Mask":
    """Intersect the mask with another (same-shaped) boolean array.

    Parameters
    ----------
    mask_array : np.ndarray
        Array to intersect with.

    Returns
    -------
    Mask
        New mask whith self._mask & mask_array as mask array.

    Raises
    ------
    IncompatibleMaskShapeError
        If mask_array has the wrong shape.
    """
    if mask_array.shape != self.mask.shape:
        raise IncompatibleMaskShapeError(self.mask.shape, mask_array.shape)
    return Mask(
        mask_2d=self._mask & mask_array,
        index_2d=self._index_2d,
    )

make_empty(grid) classmethod

Create a Mask with all values True with grid size.

Parameters:

Name Type Description Default
grid ABFileGrid

ABFileGrid to use to have the grid size.

required

Returns:

Type Description
Mask

Mask with only True values.

Source code in src/bgc_data_processing/comparison/matching.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
@classmethod
def make_empty(cls, grid: ABFileGrid) -> "Mask":
    """Create a Mask with all values True with grid size.

    Parameters
    ----------
    grid : ABFileGrid
        ABFileGrid to use to have the grid size.

    Returns
    -------
    Mask
        Mask with only True values.
    """
    return Mask(
        mask_2d=np.full((grid.jdm, grid.idm), True),
        index_2d=np.array(range(grid.jdm * grid.idm)),
    )

Match(obs_closests_indexes)

Match between observation indexes and simulations indexes.

Parameters:

Name Type Description Default
obs_closests_indexes Series

Closest simulated point index Series. The index is supposed to correspond to observations' index.

required
Source code in src/bgc_data_processing/comparison/matching.py
474
475
476
477
478
def __init__(self, obs_closests_indexes: pd.Series) -> None:
    index_link = obs_closests_indexes.to_frame(name=self.index_simulated)
    index_link.index.name = self.index_observed
    index_link.reset_index(inplace=True)
    self.index_link = index_link

index_simulated: str = 'sim_index' class-attribute instance-attribute

index_observed: str = 'obs_index' class-attribute instance-attribute

index_loaded: str = 'load_index' class-attribute instance-attribute

match(loaded_df)

Transform the DataFrame index to link it to observations' index.

Parameters:

Name Type Description Default
loaded_df DataFrame

DataFrame to change the index of.

required

Returns:

Type Description
DataFrame

Copy of loaded_df with a modified index, which correspond to observations' index values.

Source code in src/bgc_data_processing/comparison/matching.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
@with_verbose(trigger_threshold=1, message="Matching indexes.")
def match(self, loaded_df: pd.DataFrame) -> pd.DataFrame:
    """Transform the DataFrame index to link it to observations' index.

    Parameters
    ----------
    loaded_df : pd.DataFrame
        DataFrame to change the index of.

    Returns
    -------
    pd.DataFrame
        Copy of loaded_df with a modified index, which correspond to
        observations' index values.
    """
    loaded_index = pd.Series(loaded_df.index, name=self.index_simulated).to_frame()
    loaded_index.index.name = self.index_loaded
    loaded_index.reset_index(inplace=True)
    loaded_copy = loaded_df.copy()
    loaded_copy.index = loaded_index.index
    merge = pd.merge(
        left=loaded_index,
        right=self.index_link,
        left_on=self.index_simulated,
        right_on=self.index_simulated,
        how="left",
    )
    reshaped = loaded_copy.loc[merge[self.index_loaded], :]
    reshaped.index = merge[self.index_observed].values
    return reshaped

SelectiveDataSource(reference, strategy, provider_name, data_format, dirin, data_category, excluded_files, files_pattern, variable_ensemble, **kwargs)

Bases: DataSource

Selective Data Source.

Parameters:

Name Type Description Default
reference DataFrame

Reference Dataframe (observations).

required
strategy NearestNeighborStrategy

Closer point finding strategy.

required
provider_name str

Name of the data provider.

required
data_format str

Data format.

required
dirin Path | str

Input data directory.

required
data_category str

Category of the data.

required
excluded_files list[str]

Files not to load.

required
files_pattern FileNamePattern

Pattern to match to load files.

required
variable_ensemble SourceVariableSet

Ensembles of variables to consider.

required
Source code in src/bgc_data_processing/comparison/matching.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def __init__(
    self,
    reference: "Storer",
    strategy: NearestNeighborStrategy,
    provider_name: str,
    data_format: str,
    dirin: Path | str,
    data_category: str,
    excluded_files: list[str],
    files_pattern: "FileNamePattern",
    variable_ensemble: "SourceVariableSet",
    **kwargs,
) -> None:
    super().__init__(
        provider_name,
        data_format,
        dirin,
        data_category,
        excluded_files,
        files_pattern,
        variable_ensemble,
        **kwargs,
    )
    self.reference = reference.data
    self.strategy = strategy
    self.grid = self.loader.grid_file

reference = reference.data instance-attribute

strategy = strategy instance-attribute

grid = self.loader.grid_file instance-attribute

get_coord(var_name)

Get a coordinate field from loader.grid_file.

Parameters:

Name Type Description Default
var_name str

Name of the variable to retrieve.

required

Returns:

Type Description
Series

Loaded variable as pd.Series.

Raises:

Type Description
ABFileLoadingError

If the variable dosn't exist in the grid file.

Source code in src/bgc_data_processing/comparison/matching.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def get_coord(self, var_name: str) -> pd.Series:
    """Get a coordinate field from loader.grid_file.

    Parameters
    ----------
    var_name : str
        Name of the variable to retrieve.

    Returns
    -------
    pd.Series
        Loaded variable as pd.Series.

    Raises
    ------
    ABFileLoadingError
        If the variable dosn't exist in the grid file.
    """
    var = self.loader.variables.get(var_name)
    found = False
    for alias, _, _ in var.aliases:
        if alias in self.grid.fieldnames:
            mask_2d: np.ma.masked_array = self.grid.read_field(alias)
            found = True
            break
    if not found:
        error_msg = (
            f"Grid File has no data for the variable {var.name}."
            f"Possible fieldnames are {self.grid.fieldnames}."
        )
        raise ABFileLoadingError(error_msg)
    value = mask_2d.filled(np.nan)
    return pd.Series(value.flatten(), name=var.label)

get_x_y_indexes()

Get x and y indexes.

Returns:

Type Description
tuple[Series, Series]

X indexes series, Y indexes series.

Source code in src/bgc_data_processing/comparison/matching.py
635
636
637
638
639
640
641
642
643
644
645
646
647
@with_verbose(trigger_threshold=2, message="Collecting grid file's indexes.")
def get_x_y_indexes(self) -> tuple[pd.Series, pd.Series]:
    """Get x and y indexes.

    Returns
    -------
    tuple[pd.Series, pd.Series]
        X indexes series, Y indexes series.
    """
    y_coords, x_coords = np.meshgrid(range(self.grid.idm), range(self.grid.jdm))
    x_coords_series = pd.Series(x_coords.flatten())
    y_coords_series = pd.Series(y_coords.flatten())
    return x_coords_series, y_coords_series

select(data_slice)

Select closest points in an abfile using self.strategy.

Parameters:

Name Type Description Default
data_slice DataFrame

Sice of data to select from.

required

Returns:

Type Description
tuple[Mask, Match]

Mask to use for loader, Match to link observations to simulations.

Source code in src/bgc_data_processing/comparison/matching.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
@with_verbose(trigger_threshold=1, message="Selecting Data.")
def select(
    self,
    data_slice: pd.DataFrame,
) -> tuple["Mask", "Match"]:
    """Select closest points in an abfile using self.strategy.

    Parameters
    ----------
    data_slice: pd.DataFrame
        Sice of data to select from.

    Returns
    -------
    tuple[Mask, Match]
        Mask to use for loader, Match to link observations to simulations.
    """
    lat_series = self.get_coord(self.loader.variables.latitude_var_name)
    lon_series = self.get_coord(self.loader.variables.longitude_var_name)
    sims = pd.concat([lat_series, lon_series], axis=1)
    x_coords_series, y_coords_series = self.get_x_y_indexes()
    index = self.strategy.get_closest_indexes(
        simulations_lat_lon=sims,
        observations_lat_lon=data_slice[sims.columns],
    )
    indexes = np.array(range(self.grid.jdm * self.grid.idm))
    indexes_2d = indexes.reshape((self.grid.jdm, self.grid.idm))
    selected_xs = x_coords_series.loc[index.values]
    selected_ys = y_coords_series.loc[index.values]
    to_keep = np.full(shape=(self.grid.jdm, self.grid.idm), fill_value=False)
    to_keep[selected_xs, selected_ys] = True
    return Mask(to_keep, indexes_2d), Match(index)

parse_date_from_filepath(filepath) staticmethod

Parse date from abfile basename.

Parameters:

Name Type Description Default
filepath Path | str

File path.

required

Returns:

Type Description
date

Corresponding date.

Source code in src/bgc_data_processing/comparison/matching.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
@staticmethod
@with_verbose(trigger_threshold=0, message="Loading data from [filepath].")
def parse_date_from_filepath(filepath: Path | str) -> dt.date:
    """Parse date from abfile basename.

    Parameters
    ----------
    filepath : Path | str
        File path.

    Returns
    -------
    dt.date
        Corresponding date.
    """
    basename = ABFileLoader.convert_filepath_to_basename(filepath)
    date_part_basename = Path(basename).name.split(".")[-1]
    date = dt.datetime.strptime(date_part_basename, "%Y_%j_%H")
    return date.date()

get_basenames(constraints)

Return basenames of files matching constraints.

Parameters:

Name Type Description Default
constraints Constraints

Data constraints, only year constraint is used.

required

Returns:

Type Description
list[Path]

List of basenames matching constraints.

Source code in src/bgc_data_processing/comparison/matching.py
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def get_basenames(
    self,
    constraints: "Constraints",
) -> list[Path]:
    """Return basenames of files matching constraints.

    Parameters
    ----------
    constraints : Constraints
        Data constraints, only year constraint is used.

    Returns
    -------
    list[Path]
        List of basenames matching constraints.
    """
    date_label = self.loader.variables.get(
        self.loader.variables.date_var_name,
    ).label
    date_constraint = constraints.get_constraint_parameters(date_label)
    pattern_matcher = self._files_pattern.build_from_constraint(date_constraint)
    pattern_matcher.validate = self.loader.is_file_valid
    return pattern_matcher.select_matching_filepath(
        research_directory=self.dirin,
    )

load_all(constraints)

Load all files for the loader.

Parameters:

Name Type Description Default
constraints Constraints

Constraints slicer., by default Constraints()

required

Returns:

Type Description
Storer

Storer for the loaded data.

Source code in src/bgc_data_processing/comparison/matching.py
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
def load_all(self, constraints: "Constraints") -> "Storer":
    """Load all files for the loader.

    Parameters
    ----------
    constraints : Constraints, optional
        Constraints slicer., by default Constraints()

    Returns
    -------
    Storer
        Storer for the loaded data.
    """
    date_var_name = self.loader.variables.date_var_name
    date_var_label = self.loader.variables.get(date_var_name).label
    filepaths = self.get_basenames(
        constraints,
    )
    datas: list[pd.DataFrame] = []
    for filepath in filepaths:
        date = self.parse_date_from_filepath(filepath=filepath)
        data_slice = self.reference[self.reference[date_var_label].dt.date == date]
        if data_slice.empty:
            continue
        mask, match = self.select(data_slice)
        sim_data = self.loader.load(
            filepath,
            constraints=constraints,
            mask=mask,
        )
        datas.append(match.match(sim_data))
    concatenated = pd.concat(datas, axis=0)
    storer = Storer(
        data=concatenated,
        category=self.loader.category,
        providers=[self.loader.provider],
        variables=self._store_vars,
    )
    self._insert_all_features(storer)
    self._remove_temporary_variables(storer)
    return storer

load_and_save(saving_directory, dateranges_gen, constraints)

Save all the data before saving it all in the saving directory.

Parameters:

Name Type Description Default
saving_directory Path | str

Path to the directory to save in.

required
dateranges_gen DateRangeGenerator

Generator to use to retrieve dateranges.

required
constraints Constraints

Contraints ot apply on data.

required
Source code in src/bgc_data_processing/comparison/matching.py
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
def load_and_save(
    self,
    saving_directory: Path | str,
    dateranges_gen: "DateRangeGenerator",
    constraints: "Constraints",
) -> None:
    """Save all the data before saving it all in the saving directory.

    Parameters
    ----------
    saving_directory : Path | str
        Path to the directory to save in.
    dateranges_gen : DateRangeGenerator
        Generator to use to retrieve dateranges.
    constraints : Constraints
        Contraints ot apply on data.
    """
    storer = self.load_all(constraints=constraints)
    saver = StorerSaver(storer)
    saver.save_from_daterange(
        dateranges_gen=dateranges_gen,
        saving_directory=Path(saving_directory),
    )

from_data_source(reference, strategy, dsource) classmethod

Create the sleective data source from an existing data source.

Parameters:

Name Type Description Default
reference Storer

Reference Dataframe (observations).

required
strategy NearestNeighborStrategy

Closer point finding strategy.

required
dsource DataSource

Template DataSource

required

Returns:

Type Description
SelectiveDataSource

Selective datasource from Template.

Source code in src/bgc_data_processing/comparison/matching.py
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
@classmethod
def from_data_source(
    cls,
    reference: Storer,
    strategy: NearestNeighborStrategy,
    dsource: DataSource,
) -> "SelectiveDataSource":
    """Create the sleective data source from an existing data source.

    Parameters
    ----------
    reference : Storer
        Reference Dataframe (observations).
    strategy : NearestNeighborStrategy
        Closer point finding strategy.
    dsource : DataSource
        Template DataSource

    Returns
    -------
    SelectiveDataSource
        Selective datasource from Template.
    """
    return cls(
        reference=reference,
        strategy=strategy,
        **dsource.as_template,
    )