dapper.xp_launch

Tools (notably xpList) for setup and running of experiments (known as xps).

See dapper.da_methods.da_method for the strict definition of xps.

  1"""Tools (notably `xpList`) for setup and running of experiments (known as `xp`s).
  2
  3See `dapper.da_methods.da_method` for the strict definition of `xp`s.
  4"""
  5
  6import copy
  7import dataclasses as dcs
  8import os
  9import re
 10import shutil
 11import sys
 12from functools import wraps
 13from pathlib import Path
 14from textwrap import dedent
 15
 16import dill
 17import numpy as np
 18import struct_tools
 19import tabulate as _tabulate
 20from tabulate import tabulate
 21from tqdm.auto import tqdm
 22
 23import dapper.stats
 24import dapper.tools.progressbar as pb
 25from dapper.tools.colors import stripe
 26from dapper.tools.datafiles import create_run_dir
 27from dapper.tools.remote.uplink import submit_job_GCP
 28from dapper.tools.seeding import set_seed
 29from dapper.tools.viz import collapse_str
 30
 31_tabulate.MIN_PADDING = 0
 32
 33
 34def seed_and_simulate(HMM, xp):
 35    """Default experiment setup (sets seed and simulates truth and obs).
 36
 37    Used by `xpList.launch` via `run_experiment`.
 38
 39    Parameters
 40    ----------
 41    HMM: HiddenMarkovModel
 42        Container defining the system.
 43    xp: object
 44        Type: a `dapper.da_methods.da_method`-decorated class.
 45
 46        .. warning:: `xp.seed` should be set (and `int`).
 47
 48            Without `xp.seed` the seed does not get set,
 49            and different `xp`s will use different seeds
 50            (unless you do some funky hacking).
 51            Reproducibility for a script as a whole can still be achieved
 52            by setting the seed at the outset of the script.
 53            To avoid even that, set `xp.seed` to `None` or `"clock"`.
 54
 55    Returns
 56    -------
 57    tuple (xx, yy)
 58        The simulated truth and observations.
 59    """
 60    set_seed(getattr(xp, 'seed', False))
 61    xx, yy = HMM.simulate()
 62    return HMM, xx, yy
 63
 64
 65def run_experiment(xp, label, savedir, HMM, setup=seed_and_simulate, free=True,
 66                   statkeys=False, fail_gently=False, **stat_kwargs):
 67    """Used by `xpList.launch` to run each single (DA) experiment ("xp").
 68
 69    This involves steps similar to `examples/basic_1.py`, i.e.:
 70
 71    - `setup`                    : Initialize experiment.
 72    - `xp.assimilate`            : run DA, pass on exception if fail_gently
 73    - `xp.stats.average_in_time` : result averaging
 74    - `xp.avrgs.tabulate`        : result printing
 75    - `dill.dump`                : result storage
 76
 77    Parameters
 78    ----------
 79    xp: object
 80        Type: a `dapper.da_methods.da_method`-decorated class.
 81    label: str
 82        Name attached to progressbar during assimilation.
 83    savedir: str
 84        Path of folder wherein to store the experiment data.
 85    HMM: HiddenMarkovModel
 86        Container defining the system.
 87    free: bool
 88        Whether (or not) to `del xp.stats` after the experiment is done,
 89        so as to free up memory and/or not save this data
 90        (just keeping `xp.avrgs`).
 91    statkeys: list
 92        A list of names (possibly in the form of abbreviations) of the
 93        statistical averages that should be printed immediately afther
 94        this xp.
 95    fail_gently: bool
 96        Whether (or not) to propagate exceptions.
 97    setup: function
 98        This function must take two arguments: `HMM` and `xp`, and return the `HMM` to
 99        be used by the DA methods (typically the same as the input `HMM`, but could be
100        modified), and the (typically synthetic) truth and obs time series.
101
102        This gives you the ability to customize almost any aspect of the individual
103        experiments within a batch launch of experiments (i.e. not just the parameters
104        of the DA. method).  Typically you will grab one or more parameter values stored
105        in the `xp` (see `dapper.da_methods.da_method`) and act on them, usually by
106        assigning them to some object that impacts the experiment.  Thus, by generating
107        a new `xp` for each such parameter value you can investigate the
108        impact/sensitivity of the results to this parameter.  Examples include:
109
110        - Setting the seed. See the default `setup`, namely `seed_and_simulate`,
111          for how this is, or should be, done.
112        - Setting some aspect of the `HMM` such as the observation noise,
113            or the interval between observations. This could be achieved for example by:
114
115                def setup(hmm, xp):
116                    hmm.Obs.noise = GaussRV(M=hmm.Nx, C=xp.obs_noise)
117                    hmm.tseq.dkObs = xp.time_between_obs
118                    import dapper as dpr
119                    return dpr.seed_and_simulate(hmm, xp)
120
121            This process could involve more steps, for example loading a full covariance
122            matrix from a data file, as specified by the `obs_noise` parameter, before
123            assigning it to `C`. Also note that the import statement is not strictly
124            necessary (assuming `dapper` was already imported in the outer scope,
125            typically the main script), **except** when running the experiments on a
126            remote server.
127
128            Sometimes, the parameter you want to set is not accessible as one of the
129            conventional attributes of the `HMM`. For example, the `Force` in the
130            Lorenz-96 model. In that case you can add these lines to the setup function:
131
132                import dapper.mods.Lorenz96 as core
133                core.Force = xp.the_force_parameter
134
135            However, if your model is an OOP instance, the import approach will not work
136            because it will serve you the original model instance, while `setup()` deals
137            with a copy of it. Instead, you could re-initialize the entire model in
138            `setup()` and overwrite `HMM.Dyn`. However, it is probably easier to just
139            assign the instance to some custom attribute before launching the
140            experiments, e.g. `HMM.Dyn.object = the_model_instance`, enabling you to set
141            parameters on `HMM.Dyn.object` in `setup()`. Note that this approach won't
142            work for modules (for ex., combining the above examples, `HMM.Dyn.object =
143            core`) because modules are not serializable.
144
145        - Using a different `HMM` entirely for the truth/obs (`xx`/`yy`) generation,
146          than the one that will be used by the DA. Or loading the truth/obs
147          time series from file. In both cases, you might also have to do some
148          cropping or slicing of `xx` and `yy` before returning them.
149    """
150    # Copy HMM to avoid changes made by setup affect subsequent experiments.
151    # Thus, experiments run in sequence behave the same as experiments run via
152    # multiprocessing (which serialize (i.e. copy) the HMM) or on a cluster.
153    hmm = copy.deepcopy(HMM)
154    # Note that "implicitly referenced" objects do not get copied. For example,
155    # if the model `step` function uses parameters defined in its module or object,
156    # these will be obtained by re-importing that model, unless it has been serialized.
157    # Serialization happens if the model instance (does not work for modules)
158    # is expliclity referenced, e.g. if you've done `HMM.Dyn.underlying_model = model`.
159
160    # GENERATE TRUTH/OBS
161    hmm, xx, yy = setup(hmm, xp)
162
163    # ASSIMILATE
164    xp.assimilate(hmm, xx, yy, label, fail_gently=fail_gently, **stat_kwargs)
165
166    # Clear references to mpl (for pickling purposes)
167    if hasattr(xp.stats, "LP_instance"):
168        del xp.stats.LP_instance
169
170    # AVERAGE
171    xp.stats.average_in_time(free=free)
172
173    # PRINT
174    if statkeys:
175        statkeys = () if statkeys is True else statkeys
176        print(xp.avrgs.tabulate(statkeys))
177
178    # SAVE
179    if savedir:
180        with open(Path(savedir)/"xp", "wb") as FILE:
181            dill.dump({'xp': xp}, FILE)
182
183
184class xpList(list):
185    """Subclass of `list` specialized for experiment ("xp") objects.
186
187    Main use: administrate experiment launches.
188
189    Modifications to `list`:
190
191    - `xpList.append` supports `unique` to enable lazy `xp` declaration.
192    - `__iadd__` (`+=`) supports adding single `xp`s.
193      this is hackey, but convenience is king.
194    - `__getitem__` supports lists, similar to `np.ndarray`
195    - `__repr__`: prints the list as rows of a table,
196      where the columns represent attributes whose value is not shared among all `xp`s.
197      Refer to `xpList.prep_table` for more information.
198
199    Add-ons:
200
201    - `xpList.launch`: run the experiments in current list.
202    - `xpList.prep_table`: find all attributes of the `xp`s in the list;
203      classify as distinct, redundant, or common.
204    - `xpList.gen_names`: use `xpList.prep_table` to generate
205      a short & unique name for each `xp` in the list.
206    - `xpList.tabulate_avrgs`: tabulate time-averaged results.
207    - `xpList.inds` to search by kw-attrs.
208
209    Parameters
210    ----------
211    args: entries
212        Nothing, or a list of `xp`s.
213
214    unique: bool
215        Duplicates won't get appended. Makes `append` (and `__iadd__`) relatively slow.
216        Use `extend` or `__add__` or `combinator` to bypass this validation.
217
218    Also see
219    --------
220    - Examples: `examples/basic_2`, `examples/basic_3`
221    - `dapper.xp_process.xpSpace`, which is used for experient result **presentation**,
222      as opposed to this class (`xpList`), which handles **launching** experiments.
223    """
224
225    def __init__(self, *args, unique=False):
226        self.unique = unique
227        super().__init__(*args)
228
229    def __iadd__(self, xp):
230        if not hasattr(xp, '__iter__'):
231            xp = [xp]
232        for item in xp:
233            self.append(item)
234        return self
235
236    def append(self, xp):
237        """Append **if** not `self.unique` & present."""
238        if not (self.unique and xp in self):
239            super().append(xp)
240
241    def __getitem__(self, keys):
242        """Indexing, also by a list"""
243        try:
244            B = [self[k] for k in keys]    # if keys is list
245        except TypeError:
246            B = super().__getitem__(keys)  # if keys is int, slice
247        if hasattr(B, '__len__'):
248            B = xpList(B)                  # Cast
249        return B
250
251    def inds(self, strict=True, missingval="NONSENSE", **kws):
252        """Find (all) indices of `xps` whose attributes match kws.
253
254        If strict, then `xp`s lacking a requested attr. will not match,
255        unless the `missingval` matches the required value.
256        """
257        def match(xp):
258            def missing(v): return missingval if strict else v
259            matches = [getattr(xp, k, missing(v)) == v for k, v in kws.items()]
260            return all(matches)
261
262        return [i for i, xp in enumerate(self) if match(xp)]
263
264    @property
265    def da_methods(self):
266        """List `da_method` attributes in this list."""
267        return [xp.da_method for xp in self]
268
269    def prep_table(self, nomerge=()):
270        """Classify all attrs. of all `xp`s as `distinct`, `redundant`, or `common`.
271
272        An attribute of the `xp`s is inserted in one of the 3 dicts as follows:
273        The attribute names become dict keys. If the values of an attribute
274        (collected from all of the `xp`s) are all __equal__, then the attribute
275        is inserted in `common`, but only with **a single value**.
276        If they are all the same **or missing**, then it is inserted in `redundant`
277        **with a single value**. Otherwise, it is inserted in `distinct`,
278        with **its full list of values** (filling with `None` where the attribute
279        was missing in the corresponding `xp`).
280
281        The attrs in `distinct` are sufficient to (but not generally necessary,
282        since there might exist a subset of attributes that) uniquely identify each `xp`
283        in the list (the `redundant` and `common` can be "squeezed" out).
284        Thus, a table of the `xp`s does not need to list all of the attributes.
285        This function also does the heavy lifting for `xpSpace.squeeze`.
286
287        Parameters
288        ----------
289        nomerge: list
290            Attributes that should always be seen as distinct.
291        """
292        def _aggregate_keys():
293            """Aggregate keys from all `xp`"""
294            if len(self) == 0:
295                return []
296
297            # Start with da_method
298            aggregate = ['da_method']
299
300            # Aggregate all other keys
301            for xp in self:
302
303                # Get dataclass fields
304                try:
305                    dc_fields = dcs.fields(xp.__class__)
306                    dc_names = [F.name for F in dc_fields]
307                    keys = xp.__dict__.keys()
308                except TypeError:
309                    # Assume namedtuple
310                    dc_names = []
311                    keys = xp._fields
312
313                # For all potential keys:
314                for k in keys:
315                    # If not already present:
316                    if k not in aggregate:
317
318                        # If dataclass, check repr:
319                        if k in dc_names:
320                            if dc_fields[dc_names.index(k)].repr:
321                                aggregate.append(k)
322                        # Else, just append
323                        else:
324                            aggregate.append(k)
325
326            # Remove unwanted
327            excluded  = [re.compile('^_'), 'avrgs', 'stats', 'HMM', 'duration']
328            aggregate = struct_tools.complement(aggregate, excluded)
329            return aggregate
330
331        def _getattr_safe(xp, key):
332            # Don't use None, to avoid mixing with actual None's
333            # TODO 4: use an object yet more likely to be unique.
334            missing = "N/A"
335            a = getattr(xp, key, missing)
336
337            # Replace ndarray by its id, since o/w it will
338            # complain that you must use all().
339            # Alternative: replace all == (and !=) below by "is".
340            #     Tabulation with multi-line params actually works,
341            #     (though it's still likely to take up too much space,
342            #     unless we set np.printoptions...).
343            #     However, then python (since 3.8) will complain about
344            #     comparison to literal.
345            if isinstance(a, np.ndarray):
346                shorten = 6
347                a = f"arr(<id {id(a)//10**shorten}>)"
348            # TODO 3: leave formatting to sub() below?
349            # TODO 4: do similar formatting for other non-trivial params?
350            # TODO 4: document alternative way to specify non-trivial params:
351            #         use key to be looked up in some globally accessible dct.
352            #         Advantage: names are meaningful, rather than ids.
353            return a
354
355        def replace_NA_by_None(vals):
356            """Supports different types of `vals`."""
357            def sub(v):
358                return None if v == "N/A" else v
359
360            if isinstance(vals, str):
361                vals = sub(vals)
362            else:
363                try:
364                    vals = [sub(v) for v in vals]
365                except TypeError:
366                    vals = sub(vals)
367            return vals
368
369        # Main
370        distinct, redundant, common = {}, {}, {}
371        for key in _aggregate_keys():
372            vals = [_getattr_safe(xp, key) for xp in self]
373
374            if struct_tools.flexcomp(key, *nomerge):
375                dct, vals = distinct, vals
376
377            elif all(vals[0] == v for v in vals):
378                dct, vals = common, vals[0]
379
380            else:
381                nonNA = next(v for v in vals if "N/A" != v)
382                if all(v == "N/A" or v == nonNA for v in vals):
383                    dct, vals = redundant, nonNA
384
385                else:
386                    dct, vals = distinct, vals
387
388            dct[key] = replace_NA_by_None(vals)
389
390        return distinct, redundant, common
391
392    def __repr__(self):
393        distinct, redundant, common = self.prep_table()
394        s = '<xpList> of length %d with attributes:\n' % len(self)
395        s += tabulate(distinct, headers="keys", showindex=True)
396        s += "\nOther attributes:\n"
397        s += str(struct_tools.AlignedDict({**redundant, **common}))
398        return s
399
400    def gen_names(self, abbrev=6, tab=False):
401        """Similiar to `self.__repr__()`, but:
402
403        - returns *list* of names
404        - tabulation is optional
405        - attaches (abbreviated) labels to each attribute
406        """
407        distinct, redundant, common = self.prep_table(nomerge=["da_method"])
408        labels = distinct.keys()
409        values = distinct.values()
410
411        # Label abbreviation
412        labels = [collapse_str(k, abbrev) for k in labels]
413
414        # Make label columns: insert None or lbl+":", depending on value
415        def column(lbl, vals):
416            return [None if v is None else lbl+":" for v in vals]
417        labels = [column(lbl, vals) for lbl, vals in zip(labels, values)]
418
419        # Interlace labels and values
420        table = [x for (a, b) in zip(labels, values) for x in (a, b)]
421
422        # Rm da_method label (but keep value)
423        table.pop(0)
424
425        # Transpose
426        table = list(map(list, zip(*table)))
427
428        # Tabulate
429        table = tabulate(table, tablefmt="plain")
430
431        # Rm space between lbls/vals
432        table = re.sub(':  +', ':', table)
433
434        # Rm alignment
435        if not tab:
436            table = re.sub(r' +', r' ', table)
437
438        return table.splitlines()
439
440    @wraps(dapper.stats.tabulate_avrgs)
441    def tabulate_avrgs(self, *args, colorize=True, **kwargs):
442        distinct, redundant, common = self.prep_table()
443        averages = dapper.stats.tabulate_avrgs([C.avrgs for C in self], *args, **kwargs)
444        columns = {**distinct, '|': ['|']*len(self), **averages}  # merge
445        table = tabulate(columns, headers="keys", showindex=True).replace('␣', ' ')
446        if colorize:
447            table = stripe(table)
448        return table
449
450    def launch(self, HMM, save_as="noname", mp=False, fail_gently=None, **kwargs):
451        """Essentially: `for xp in self: run_experiment(xp, ..., **kwargs)`.
452
453        See `run_experiment` for documentation on the `kwargs` and `fail_gently`.
454        See `dapper.tools.datafiles.create_run_dir` for documentation `save_as`.
455
456        Depending on `mp`, `run_experiment` is delegated as follows:
457
458        - `False`: caller process (no parallelisation)
459        - `True` or `"MP"` or an `int`: multiprocessing on this host
460        - `"GCP"` or `"Google"` or `dict(server="GCP")`: the DAPPER server
461          (Google Cloud Computing with HTCondor).
462            - Specify a list of files as `mp["files"]` to include them
463              in working directory of the server workers.
464            - In order to use absolute paths, the list should cosist
465              of tuples, where the first item is relative to the second
466              (which is an absolute path). The root is then not included
467              in the working directory of the server.
468            - If this dict field is empty, then all python files
469              in `sys.path[0]` are uploaded.
470
471        See `examples/basic_2.py` and `examples/basic_3.py` for example use.
472        """
473        # Parse mp option
474        if not mp:
475            mp = dict()
476        elif mp in [True, "MP"]:
477            mp = dict(server="local")
478        elif isinstance(mp, int):
479            mp = dict(server="local", NPROC=mp)
480        elif mp in ["GCP", "Google"]:
481            mp = dict(server="GCP", files=[], code="")
482
483        # Parse fail_gently
484        if fail_gently is None:
485            if mp and mp["server"] == "GCP":
486                fail_gently = False
487                # coz cloud processing is entirely de-coupled anyways
488            else:
489                fail_gently = True
490                # True unless otherwise requested
491        kwargs["fail_gently"] = fail_gently
492
493        # Bundle HMM with kwargs
494        kwargs['HMM'] = HMM
495
496        # Data path
497        save_as, xpi_dir = create_run_dir(save_as, mp)
498
499        # No parallelization
500        if not mp:
501            for ixp, (xp, label) in enumerate(zip(self, self.gen_names())):
502                run_experiment(xp, label, xpi_dir(ixp), **kwargs)
503
504        # Local multiprocessing
505        elif mp["server"].lower() == "local":
506            def run_with_kwargs(arg):
507                xp, ixp = arg
508                run_experiment(xp, None, xpi_dir(ixp), **kwargs)
509            args = zip(self, range(len(self)))
510
511            pb.disable_progbar          = True
512            pb.disable_user_interaction = True
513            NPROC = mp.get("NPROC", None)  # None => mp.cpu_count()
514            from dapper.tools.multiproc import mpd  # will fail on GCP
515            with mpd.Pool(NPROC) as pool:
516                list(tqdm(
517                    pool.imap(
518                        run_with_kwargs, args),
519                    total=len(self),
520                    desc="Parallel experim's",
521                    smoothing=0.1))
522            pb.disable_progbar          = False
523            pb.disable_user_interaction = False
524
525        # Google cloud platform, multiprocessing
526        elif mp["server"] == "GCP":
527            for ixp, xp in enumerate(self):
528                with open(xpi_dir(ixp)/"xp.var", "wb") as f:
529                    dill.dump(dict(xp=xp), f)
530
531            with open(save_as/"xp.com", "wb") as f:
532                dill.dump(kwargs, f)
533
534            # mkdir extra_files
535            extra_files = save_as / "extra_files"
536            os.mkdir(extra_files)
537            # Default extra_files: .py files in sys.path[0] (main script's path)
538            if not mp.get("files", []):
539                mp["files"] = [f.relative_to(sys.path[0]) for f in
540                               Path(sys.path[0]).glob("**/*.py")]
541                assert len(mp["files"]) < 1000, (
542                    "Too many files staged for upload to server."
543                    " This is the result of trying to include all files"
544                    f" under sys.path[0]: ({sys.path[0]})."
545                    " Consider moving your script to a project directory,"
546                    " or expliclity listing the files to be uploaded."
547                )
548
549            # Copy into extra_files
550            for f in mp["files"]:
551                if isinstance(f, (str, Path)):
552                    # Example: f = "A.py"
553                    path = Path(sys.path[0]) / f
554                    dst = f
555                else:  # instance of tuple(path, root)
556                    # Example: f = ("~/E/G/A.py", "G")
557                    path, root = f
558                    dst = Path(path).relative_to(root)
559                dst = extra_files / dst
560                os.makedirs(dst.parent, exist_ok=True)
561                try:
562                    shutil.copytree(path, dst)  # dir -r
563                except OSError:
564                    shutil.copy2(path, dst)  # file
565
566            with open(extra_files/"dpr_config.yaml", "w") as f:
567                f.write("\n".join([
568                    "data_root: '$cwd'",
569                    "liveplotting: no",
570                    "welcome_message: no"]))
571
572            # Loads PWD/xp_{var,com} and calls run_experiment()
573            with open(extra_files/"load_and_run.py", "w") as f:
574                f.write(dedent("""\
575                import dill
576                from dapper.xp_launch import run_experiment
577
578                # Load
579                with open("xp.com", "rb") as f: com = dill.load(f)
580                with open("xp.var", "rb") as f: var = dill.load(f)
581
582                # User-defined code
583                %s
584
585                # Run
586                try:
587                    result = run_experiment(var['xp'], None, ".", **com)
588                except SystemError as err:
589                    if err.args and "opcode" in err.args[0]:
590                        err.args += ("It seems your local python version"
591                                     " is incompatible with that of the cluster.",)
592                    raise
593                """) % dedent(mp["code"]))
594
595            submit_job_GCP(save_as)
596
597        return save_as
598
599
600def combinator(param_dict, **glob_dict):
601    """Mass creation of `xp`'s by combining the value lists in the `param_dict`.
602
603    Returns a function (`for_params`) that creates all possible combinations
604    of parameters (from their value list) for a given `dapper.da_methods.da_method`.
605    This is a good deal more efficient than relying on `xpList`'s `unique`. Parameters
606
607    - not found among the args of the given DA method are ignored by `for_params`.
608    - specified as keywords to the `for_params` fix the value
609      preventing using the corresponding (if any) value list in the `param_dict`.
610
611    .. warning::
612        Beware! If, eg., `infl` or `rot` are in `param_dict`, aimed at the `EnKF`,
613        but you forget that they are also attributes some method where you don't
614        actually want to use them (eg. `SVGDF`),
615        then you'll create many more than you intend.
616    """
617    def for_params(method, **fixed_params):
618        dc_fields = [f.name for f in dcs.fields(method)]
619        params = struct_tools.intersect(param_dict, dc_fields)
620        params = struct_tools.complement(params, fixed_params)
621        params = {**glob_dict, **params}  # glob_dict 1st
622
623        def xp1(dct):
624            xp = method(**struct_tools.intersect(dct, dc_fields), **fixed_params)
625            for key, v in struct_tools.intersect(dct, glob_dict).items():
626                setattr(xp, key, v)
627            return xp
628
629        return [xp1(dct) for dct in struct_tools.prodct(params)]
630    return for_params
def seed_and_simulate(HMM, xp):
35def seed_and_simulate(HMM, xp):
36    """Default experiment setup (sets seed and simulates truth and obs).
37
38    Used by `xpList.launch` via `run_experiment`.
39
40    Parameters
41    ----------
42    HMM: HiddenMarkovModel
43        Container defining the system.
44    xp: object
45        Type: a `dapper.da_methods.da_method`-decorated class.
46
47        .. warning:: `xp.seed` should be set (and `int`).
48
49            Without `xp.seed` the seed does not get set,
50            and different `xp`s will use different seeds
51            (unless you do some funky hacking).
52            Reproducibility for a script as a whole can still be achieved
53            by setting the seed at the outset of the script.
54            To avoid even that, set `xp.seed` to `None` or `"clock"`.
55
56    Returns
57    -------
58    tuple (xx, yy)
59        The simulated truth and observations.
60    """
61    set_seed(getattr(xp, 'seed', False))
62    xx, yy = HMM.simulate()
63    return HMM, xx, yy

Default experiment setup (sets seed and simulates truth and obs).

Used by xpList.launch via run_experiment.

Parameters
  • HMM (HiddenMarkovModel): Container defining the system.
  • xp (object): Type: a dapper.da_methods.da_method-decorated class.

    xp.seed should be set (and int).

    Without xp.seed the seed does not get set, and different xps will use different seeds (unless you do some funky hacking). Reproducibility for a script as a whole can still be achieved by setting the seed at the outset of the script. To avoid even that, set xp.seed to None or "clock".

Returns
  • tuple (xx, yy): The simulated truth and observations.
def run_experiment( xp, label, savedir, HMM, setup=<function seed_and_simulate>, free=True, statkeys=False, fail_gently=False, **stat_kwargs):
 66def run_experiment(xp, label, savedir, HMM, setup=seed_and_simulate, free=True,
 67                   statkeys=False, fail_gently=False, **stat_kwargs):
 68    """Used by `xpList.launch` to run each single (DA) experiment ("xp").
 69
 70    This involves steps similar to `examples/basic_1.py`, i.e.:
 71
 72    - `setup`                    : Initialize experiment.
 73    - `xp.assimilate`            : run DA, pass on exception if fail_gently
 74    - `xp.stats.average_in_time` : result averaging
 75    - `xp.avrgs.tabulate`        : result printing
 76    - `dill.dump`                : result storage
 77
 78    Parameters
 79    ----------
 80    xp: object
 81        Type: a `dapper.da_methods.da_method`-decorated class.
 82    label: str
 83        Name attached to progressbar during assimilation.
 84    savedir: str
 85        Path of folder wherein to store the experiment data.
 86    HMM: HiddenMarkovModel
 87        Container defining the system.
 88    free: bool
 89        Whether (or not) to `del xp.stats` after the experiment is done,
 90        so as to free up memory and/or not save this data
 91        (just keeping `xp.avrgs`).
 92    statkeys: list
 93        A list of names (possibly in the form of abbreviations) of the
 94        statistical averages that should be printed immediately afther
 95        this xp.
 96    fail_gently: bool
 97        Whether (or not) to propagate exceptions.
 98    setup: function
 99        This function must take two arguments: `HMM` and `xp`, and return the `HMM` to
100        be used by the DA methods (typically the same as the input `HMM`, but could be
101        modified), and the (typically synthetic) truth and obs time series.
102
103        This gives you the ability to customize almost any aspect of the individual
104        experiments within a batch launch of experiments (i.e. not just the parameters
105        of the DA. method).  Typically you will grab one or more parameter values stored
106        in the `xp` (see `dapper.da_methods.da_method`) and act on them, usually by
107        assigning them to some object that impacts the experiment.  Thus, by generating
108        a new `xp` for each such parameter value you can investigate the
109        impact/sensitivity of the results to this parameter.  Examples include:
110
111        - Setting the seed. See the default `setup`, namely `seed_and_simulate`,
112          for how this is, or should be, done.
113        - Setting some aspect of the `HMM` such as the observation noise,
114            or the interval between observations. This could be achieved for example by:
115
116                def setup(hmm, xp):
117                    hmm.Obs.noise = GaussRV(M=hmm.Nx, C=xp.obs_noise)
118                    hmm.tseq.dkObs = xp.time_between_obs
119                    import dapper as dpr
120                    return dpr.seed_and_simulate(hmm, xp)
121
122            This process could involve more steps, for example loading a full covariance
123            matrix from a data file, as specified by the `obs_noise` parameter, before
124            assigning it to `C`. Also note that the import statement is not strictly
125            necessary (assuming `dapper` was already imported in the outer scope,
126            typically the main script), **except** when running the experiments on a
127            remote server.
128
129            Sometimes, the parameter you want to set is not accessible as one of the
130            conventional attributes of the `HMM`. For example, the `Force` in the
131            Lorenz-96 model. In that case you can add these lines to the setup function:
132
133                import dapper.mods.Lorenz96 as core
134                core.Force = xp.the_force_parameter
135
136            However, if your model is an OOP instance, the import approach will not work
137            because it will serve you the original model instance, while `setup()` deals
138            with a copy of it. Instead, you could re-initialize the entire model in
139            `setup()` and overwrite `HMM.Dyn`. However, it is probably easier to just
140            assign the instance to some custom attribute before launching the
141            experiments, e.g. `HMM.Dyn.object = the_model_instance`, enabling you to set
142            parameters on `HMM.Dyn.object` in `setup()`. Note that this approach won't
143            work for modules (for ex., combining the above examples, `HMM.Dyn.object =
144            core`) because modules are not serializable.
145
146        - Using a different `HMM` entirely for the truth/obs (`xx`/`yy`) generation,
147          than the one that will be used by the DA. Or loading the truth/obs
148          time series from file. In both cases, you might also have to do some
149          cropping or slicing of `xx` and `yy` before returning them.
150    """
151    # Copy HMM to avoid changes made by setup affect subsequent experiments.
152    # Thus, experiments run in sequence behave the same as experiments run via
153    # multiprocessing (which serialize (i.e. copy) the HMM) or on a cluster.
154    hmm = copy.deepcopy(HMM)
155    # Note that "implicitly referenced" objects do not get copied. For example,
156    # if the model `step` function uses parameters defined in its module or object,
157    # these will be obtained by re-importing that model, unless it has been serialized.
158    # Serialization happens if the model instance (does not work for modules)
159    # is expliclity referenced, e.g. if you've done `HMM.Dyn.underlying_model = model`.
160
161    # GENERATE TRUTH/OBS
162    hmm, xx, yy = setup(hmm, xp)
163
164    # ASSIMILATE
165    xp.assimilate(hmm, xx, yy, label, fail_gently=fail_gently, **stat_kwargs)
166
167    # Clear references to mpl (for pickling purposes)
168    if hasattr(xp.stats, "LP_instance"):
169        del xp.stats.LP_instance
170
171    # AVERAGE
172    xp.stats.average_in_time(free=free)
173
174    # PRINT
175    if statkeys:
176        statkeys = () if statkeys is True else statkeys
177        print(xp.avrgs.tabulate(statkeys))
178
179    # SAVE
180    if savedir:
181        with open(Path(savedir)/"xp", "wb") as FILE:
182            dill.dump({'xp': xp}, FILE)

Used by xpList.launch to run each single (DA) experiment ("xp").

This involves steps similar to examples/basic_1.py, i.e.:

  • setup : Initialize experiment.
  • xp.assimilate : run DA, pass on exception if fail_gently
  • xp.stats.average_in_time : result averaging
  • xp.avrgs.tabulate : result printing
  • dill.dump : result storage
Parameters
  • xp (object): Type: a dapper.da_methods.da_method-decorated class.
  • label (str): Name attached to progressbar during assimilation.
  • savedir (str): Path of folder wherein to store the experiment data.
  • HMM (HiddenMarkovModel): Container defining the system.
  • free (bool): Whether (or not) to del xp.stats after the experiment is done, so as to free up memory and/or not save this data (just keeping xp.avrgs).
  • statkeys (list): A list of names (possibly in the form of abbreviations) of the statistical averages that should be printed immediately afther this xp.
  • fail_gently (bool): Whether (or not) to propagate exceptions.
  • setup (function): This function must take two arguments: HMM and xp, and return the HMM to be used by the DA methods (typically the same as the input HMM, but could be modified), and the (typically synthetic) truth and obs time series.

    This gives you the ability to customize almost any aspect of the individual experiments within a batch launch of experiments (i.e. not just the parameters of the DA. method). Typically you will grab one or more parameter values stored in the xp (see dapper.da_methods.da_method) and act on them, usually by assigning them to some object that impacts the experiment. Thus, by generating a new xp for each such parameter value you can investigate the impact/sensitivity of the results to this parameter. Examples include:

    • Setting the seed. See the default setup, namely seed_and_simulate, for how this is, or should be, done.
    • Setting some aspect of the HMM such as the observation noise, or the interval between observations. This could be achieved for example by:

      def setup(hmm, xp): hmm.Obs.noise = GaussRV(M=hmm.Nx, C=xp.obs_noise) hmm.tseq.dkObs = xp.time_between_obs import dapper as dpr return dpr.seed_and_simulate(hmm, xp)

      This process could involve more steps, for example loading a full covariance matrix from a data file, as specified by the obs_noise parameter, before assigning it to C. Also note that the import statement is not strictly necessary (assuming dapper was already imported in the outer scope, typically the main script), except when running the experiments on a remote server.

      Sometimes, the parameter you want to set is not accessible as one of the conventional attributes of the HMM. For example, the Force in the Lorenz-96 model. In that case you can add these lines to the setup function:

      import dapper.mods.Lorenz96 as core core.Force = xp.the_force_parameter

      However, if your model is an OOP instance, the import approach will not work because it will serve you the original model instance, while setup() deals with a copy of it. Instead, you could re-initialize the entire model in setup() and overwrite HMM.Dyn. However, it is probably easier to just assign the instance to some custom attribute before launching the experiments, e.g. HMM.Dyn.object = the_model_instance, enabling you to set parameters on HMM.Dyn.object in setup(). Note that this approach won't work for modules (for ex., combining the above examples, HMM.Dyn.object = core) because modules are not serializable.

    • Using a different HMM entirely for the truth/obs (xx/yy) generation, than the one that will be used by the DA. Or loading the truth/obs time series from file. In both cases, you might also have to do some cropping or slicing of xx and yy before returning them.

class xpList(builtins.list):
185class xpList(list):
186    """Subclass of `list` specialized for experiment ("xp") objects.
187
188    Main use: administrate experiment launches.
189
190    Modifications to `list`:
191
192    - `xpList.append` supports `unique` to enable lazy `xp` declaration.
193    - `__iadd__` (`+=`) supports adding single `xp`s.
194      this is hackey, but convenience is king.
195    - `__getitem__` supports lists, similar to `np.ndarray`
196    - `__repr__`: prints the list as rows of a table,
197      where the columns represent attributes whose value is not shared among all `xp`s.
198      Refer to `xpList.prep_table` for more information.
199
200    Add-ons:
201
202    - `xpList.launch`: run the experiments in current list.
203    - `xpList.prep_table`: find all attributes of the `xp`s in the list;
204      classify as distinct, redundant, or common.
205    - `xpList.gen_names`: use `xpList.prep_table` to generate
206      a short & unique name for each `xp` in the list.
207    - `xpList.tabulate_avrgs`: tabulate time-averaged results.
208    - `xpList.inds` to search by kw-attrs.
209
210    Parameters
211    ----------
212    args: entries
213        Nothing, or a list of `xp`s.
214
215    unique: bool
216        Duplicates won't get appended. Makes `append` (and `__iadd__`) relatively slow.
217        Use `extend` or `__add__` or `combinator` to bypass this validation.
218
219    Also see
220    --------
221    - Examples: `examples/basic_2`, `examples/basic_3`
222    - `dapper.xp_process.xpSpace`, which is used for experient result **presentation**,
223      as opposed to this class (`xpList`), which handles **launching** experiments.
224    """
225
226    def __init__(self, *args, unique=False):
227        self.unique = unique
228        super().__init__(*args)
229
230    def __iadd__(self, xp):
231        if not hasattr(xp, '__iter__'):
232            xp = [xp]
233        for item in xp:
234            self.append(item)
235        return self
236
237    def append(self, xp):
238        """Append **if** not `self.unique` & present."""
239        if not (self.unique and xp in self):
240            super().append(xp)
241
242    def __getitem__(self, keys):
243        """Indexing, also by a list"""
244        try:
245            B = [self[k] for k in keys]    # if keys is list
246        except TypeError:
247            B = super().__getitem__(keys)  # if keys is int, slice
248        if hasattr(B, '__len__'):
249            B = xpList(B)                  # Cast
250        return B
251
252    def inds(self, strict=True, missingval="NONSENSE", **kws):
253        """Find (all) indices of `xps` whose attributes match kws.
254
255        If strict, then `xp`s lacking a requested attr. will not match,
256        unless the `missingval` matches the required value.
257        """
258        def match(xp):
259            def missing(v): return missingval if strict else v
260            matches = [getattr(xp, k, missing(v)) == v for k, v in kws.items()]
261            return all(matches)
262
263        return [i for i, xp in enumerate(self) if match(xp)]
264
265    @property
266    def da_methods(self):
267        """List `da_method` attributes in this list."""
268        return [xp.da_method for xp in self]
269
270    def prep_table(self, nomerge=()):
271        """Classify all attrs. of all `xp`s as `distinct`, `redundant`, or `common`.
272
273        An attribute of the `xp`s is inserted in one of the 3 dicts as follows:
274        The attribute names become dict keys. If the values of an attribute
275        (collected from all of the `xp`s) are all __equal__, then the attribute
276        is inserted in `common`, but only with **a single value**.
277        If they are all the same **or missing**, then it is inserted in `redundant`
278        **with a single value**. Otherwise, it is inserted in `distinct`,
279        with **its full list of values** (filling with `None` where the attribute
280        was missing in the corresponding `xp`).
281
282        The attrs in `distinct` are sufficient to (but not generally necessary,
283        since there might exist a subset of attributes that) uniquely identify each `xp`
284        in the list (the `redundant` and `common` can be "squeezed" out).
285        Thus, a table of the `xp`s does not need to list all of the attributes.
286        This function also does the heavy lifting for `xpSpace.squeeze`.
287
288        Parameters
289        ----------
290        nomerge: list
291            Attributes that should always be seen as distinct.
292        """
293        def _aggregate_keys():
294            """Aggregate keys from all `xp`"""
295            if len(self) == 0:
296                return []
297
298            # Start with da_method
299            aggregate = ['da_method']
300
301            # Aggregate all other keys
302            for xp in self:
303
304                # Get dataclass fields
305                try:
306                    dc_fields = dcs.fields(xp.__class__)
307                    dc_names = [F.name for F in dc_fields]
308                    keys = xp.__dict__.keys()
309                except TypeError:
310                    # Assume namedtuple
311                    dc_names = []
312                    keys = xp._fields
313
314                # For all potential keys:
315                for k in keys:
316                    # If not already present:
317                    if k not in aggregate:
318
319                        # If dataclass, check repr:
320                        if k in dc_names:
321                            if dc_fields[dc_names.index(k)].repr:
322                                aggregate.append(k)
323                        # Else, just append
324                        else:
325                            aggregate.append(k)
326
327            # Remove unwanted
328            excluded  = [re.compile('^_'), 'avrgs', 'stats', 'HMM', 'duration']
329            aggregate = struct_tools.complement(aggregate, excluded)
330            return aggregate
331
332        def _getattr_safe(xp, key):
333            # Don't use None, to avoid mixing with actual None's
334            # TODO 4: use an object yet more likely to be unique.
335            missing = "N/A"
336            a = getattr(xp, key, missing)
337
338            # Replace ndarray by its id, since o/w it will
339            # complain that you must use all().
340            # Alternative: replace all == (and !=) below by "is".
341            #     Tabulation with multi-line params actually works,
342            #     (though it's still likely to take up too much space,
343            #     unless we set np.printoptions...).
344            #     However, then python (since 3.8) will complain about
345            #     comparison to literal.
346            if isinstance(a, np.ndarray):
347                shorten = 6
348                a = f"arr(<id {id(a)//10**shorten}>)"
349            # TODO 3: leave formatting to sub() below?
350            # TODO 4: do similar formatting for other non-trivial params?
351            # TODO 4: document alternative way to specify non-trivial params:
352            #         use key to be looked up in some globally accessible dct.
353            #         Advantage: names are meaningful, rather than ids.
354            return a
355
356        def replace_NA_by_None(vals):
357            """Supports different types of `vals`."""
358            def sub(v):
359                return None if v == "N/A" else v
360
361            if isinstance(vals, str):
362                vals = sub(vals)
363            else:
364                try:
365                    vals = [sub(v) for v in vals]
366                except TypeError:
367                    vals = sub(vals)
368            return vals
369
370        # Main
371        distinct, redundant, common = {}, {}, {}
372        for key in _aggregate_keys():
373            vals = [_getattr_safe(xp, key) for xp in self]
374
375            if struct_tools.flexcomp(key, *nomerge):
376                dct, vals = distinct, vals
377
378            elif all(vals[0] == v for v in vals):
379                dct, vals = common, vals[0]
380
381            else:
382                nonNA = next(v for v in vals if "N/A" != v)
383                if all(v == "N/A" or v == nonNA for v in vals):
384                    dct, vals = redundant, nonNA
385
386                else:
387                    dct, vals = distinct, vals
388
389            dct[key] = replace_NA_by_None(vals)
390
391        return distinct, redundant, common
392
393    def __repr__(self):
394        distinct, redundant, common = self.prep_table()
395        s = '<xpList> of length %d with attributes:\n' % len(self)
396        s += tabulate(distinct, headers="keys", showindex=True)
397        s += "\nOther attributes:\n"
398        s += str(struct_tools.AlignedDict({**redundant, **common}))
399        return s
400
401    def gen_names(self, abbrev=6, tab=False):
402        """Similiar to `self.__repr__()`, but:
403
404        - returns *list* of names
405        - tabulation is optional
406        - attaches (abbreviated) labels to each attribute
407        """
408        distinct, redundant, common = self.prep_table(nomerge=["da_method"])
409        labels = distinct.keys()
410        values = distinct.values()
411
412        # Label abbreviation
413        labels = [collapse_str(k, abbrev) for k in labels]
414
415        # Make label columns: insert None or lbl+":", depending on value
416        def column(lbl, vals):
417            return [None if v is None else lbl+":" for v in vals]
418        labels = [column(lbl, vals) for lbl, vals in zip(labels, values)]
419
420        # Interlace labels and values
421        table = [x for (a, b) in zip(labels, values) for x in (a, b)]
422
423        # Rm da_method label (but keep value)
424        table.pop(0)
425
426        # Transpose
427        table = list(map(list, zip(*table)))
428
429        # Tabulate
430        table = tabulate(table, tablefmt="plain")
431
432        # Rm space between lbls/vals
433        table = re.sub(':  +', ':', table)
434
435        # Rm alignment
436        if not tab:
437            table = re.sub(r' +', r' ', table)
438
439        return table.splitlines()
440
441    @wraps(dapper.stats.tabulate_avrgs)
442    def tabulate_avrgs(self, *args, colorize=True, **kwargs):
443        distinct, redundant, common = self.prep_table()
444        averages = dapper.stats.tabulate_avrgs([C.avrgs for C in self], *args, **kwargs)
445        columns = {**distinct, '|': ['|']*len(self), **averages}  # merge
446        table = tabulate(columns, headers="keys", showindex=True).replace('␣', ' ')
447        if colorize:
448            table = stripe(table)
449        return table
450
451    def launch(self, HMM, save_as="noname", mp=False, fail_gently=None, **kwargs):
452        """Essentially: `for xp in self: run_experiment(xp, ..., **kwargs)`.
453
454        See `run_experiment` for documentation on the `kwargs` and `fail_gently`.
455        See `dapper.tools.datafiles.create_run_dir` for documentation `save_as`.
456
457        Depending on `mp`, `run_experiment` is delegated as follows:
458
459        - `False`: caller process (no parallelisation)
460        - `True` or `"MP"` or an `int`: multiprocessing on this host
461        - `"GCP"` or `"Google"` or `dict(server="GCP")`: the DAPPER server
462          (Google Cloud Computing with HTCondor).
463            - Specify a list of files as `mp["files"]` to include them
464              in working directory of the server workers.
465            - In order to use absolute paths, the list should cosist
466              of tuples, where the first item is relative to the second
467              (which is an absolute path). The root is then not included
468              in the working directory of the server.
469            - If this dict field is empty, then all python files
470              in `sys.path[0]` are uploaded.
471
472        See `examples/basic_2.py` and `examples/basic_3.py` for example use.
473        """
474        # Parse mp option
475        if not mp:
476            mp = dict()
477        elif mp in [True, "MP"]:
478            mp = dict(server="local")
479        elif isinstance(mp, int):
480            mp = dict(server="local", NPROC=mp)
481        elif mp in ["GCP", "Google"]:
482            mp = dict(server="GCP", files=[], code="")
483
484        # Parse fail_gently
485        if fail_gently is None:
486            if mp and mp["server"] == "GCP":
487                fail_gently = False
488                # coz cloud processing is entirely de-coupled anyways
489            else:
490                fail_gently = True
491                # True unless otherwise requested
492        kwargs["fail_gently"] = fail_gently
493
494        # Bundle HMM with kwargs
495        kwargs['HMM'] = HMM
496
497        # Data path
498        save_as, xpi_dir = create_run_dir(save_as, mp)
499
500        # No parallelization
501        if not mp:
502            for ixp, (xp, label) in enumerate(zip(self, self.gen_names())):
503                run_experiment(xp, label, xpi_dir(ixp), **kwargs)
504
505        # Local multiprocessing
506        elif mp["server"].lower() == "local":
507            def run_with_kwargs(arg):
508                xp, ixp = arg
509                run_experiment(xp, None, xpi_dir(ixp), **kwargs)
510            args = zip(self, range(len(self)))
511
512            pb.disable_progbar          = True
513            pb.disable_user_interaction = True
514            NPROC = mp.get("NPROC", None)  # None => mp.cpu_count()
515            from dapper.tools.multiproc import mpd  # will fail on GCP
516            with mpd.Pool(NPROC) as pool:
517                list(tqdm(
518                    pool.imap(
519                        run_with_kwargs, args),
520                    total=len(self),
521                    desc="Parallel experim's",
522                    smoothing=0.1))
523            pb.disable_progbar          = False
524            pb.disable_user_interaction = False
525
526        # Google cloud platform, multiprocessing
527        elif mp["server"] == "GCP":
528            for ixp, xp in enumerate(self):
529                with open(xpi_dir(ixp)/"xp.var", "wb") as f:
530                    dill.dump(dict(xp=xp), f)
531
532            with open(save_as/"xp.com", "wb") as f:
533                dill.dump(kwargs, f)
534
535            # mkdir extra_files
536            extra_files = save_as / "extra_files"
537            os.mkdir(extra_files)
538            # Default extra_files: .py files in sys.path[0] (main script's path)
539            if not mp.get("files", []):
540                mp["files"] = [f.relative_to(sys.path[0]) for f in
541                               Path(sys.path[0]).glob("**/*.py")]
542                assert len(mp["files"]) < 1000, (
543                    "Too many files staged for upload to server."
544                    " This is the result of trying to include all files"
545                    f" under sys.path[0]: ({sys.path[0]})."
546                    " Consider moving your script to a project directory,"
547                    " or expliclity listing the files to be uploaded."
548                )
549
550            # Copy into extra_files
551            for f in mp["files"]:
552                if isinstance(f, (str, Path)):
553                    # Example: f = "A.py"
554                    path = Path(sys.path[0]) / f
555                    dst = f
556                else:  # instance of tuple(path, root)
557                    # Example: f = ("~/E/G/A.py", "G")
558                    path, root = f
559                    dst = Path(path).relative_to(root)
560                dst = extra_files / dst
561                os.makedirs(dst.parent, exist_ok=True)
562                try:
563                    shutil.copytree(path, dst)  # dir -r
564                except OSError:
565                    shutil.copy2(path, dst)  # file
566
567            with open(extra_files/"dpr_config.yaml", "w") as f:
568                f.write("\n".join([
569                    "data_root: '$cwd'",
570                    "liveplotting: no",
571                    "welcome_message: no"]))
572
573            # Loads PWD/xp_{var,com} and calls run_experiment()
574            with open(extra_files/"load_and_run.py", "w") as f:
575                f.write(dedent("""\
576                import dill
577                from dapper.xp_launch import run_experiment
578
579                # Load
580                with open("xp.com", "rb") as f: com = dill.load(f)
581                with open("xp.var", "rb") as f: var = dill.load(f)
582
583                # User-defined code
584                %s
585
586                # Run
587                try:
588                    result = run_experiment(var['xp'], None, ".", **com)
589                except SystemError as err:
590                    if err.args and "opcode" in err.args[0]:
591                        err.args += ("It seems your local python version"
592                                     " is incompatible with that of the cluster.",)
593                    raise
594                """) % dedent(mp["code"]))
595
596            submit_job_GCP(save_as)
597
598        return save_as

Subclass of list specialized for experiment ("xp") objects.

Main use: administrate experiment launches.

Modifications to list:

  • xpList.append supports unique to enable lazy xp declaration.
  • __iadd__ (+=) supports adding single xps. this is hackey, but convenience is king.
  • __getitem__ supports lists, similar to np.ndarray
  • __repr__: prints the list as rows of a table, where the columns represent attributes whose value is not shared among all xps. Refer to xpList.prep_table for more information.

Add-ons:

Parameters
  • args (entries): Nothing, or a list of xps.
  • unique (bool): Duplicates won't get appended. Makes append (and __iadd__) relatively slow. Use extend or __add__ or combinator to bypass this validation.
Also see
  • Examples: examples/basic_2, examples/basic_3
  • dapper.xp_process.xpSpace, which is used for experient result presentation, as opposed to this class (xpList), which handles launching experiments.
xpList(*args, unique=False)
226    def __init__(self, *args, unique=False):
227        self.unique = unique
228        super().__init__(*args)
unique
def append(self, xp):
237    def append(self, xp):
238        """Append **if** not `self.unique` & present."""
239        if not (self.unique and xp in self):
240            super().append(xp)

Append if not self.unique & present.

def inds(self, strict=True, missingval='NONSENSE', **kws):
252    def inds(self, strict=True, missingval="NONSENSE", **kws):
253        """Find (all) indices of `xps` whose attributes match kws.
254
255        If strict, then `xp`s lacking a requested attr. will not match,
256        unless the `missingval` matches the required value.
257        """
258        def match(xp):
259            def missing(v): return missingval if strict else v
260            matches = [getattr(xp, k, missing(v)) == v for k, v in kws.items()]
261            return all(matches)
262
263        return [i for i, xp in enumerate(self) if match(xp)]

Find (all) indices of xps whose attributes match kws.

If strict, then xps lacking a requested attr. will not match, unless the missingval matches the required value.

da_methods
265    @property
266    def da_methods(self):
267        """List `da_method` attributes in this list."""
268        return [xp.da_method for xp in self]

List da_method attributes in this list.

def prep_table(self, nomerge=()):
270    def prep_table(self, nomerge=()):
271        """Classify all attrs. of all `xp`s as `distinct`, `redundant`, or `common`.
272
273        An attribute of the `xp`s is inserted in one of the 3 dicts as follows:
274        The attribute names become dict keys. If the values of an attribute
275        (collected from all of the `xp`s) are all __equal__, then the attribute
276        is inserted in `common`, but only with **a single value**.
277        If they are all the same **or missing**, then it is inserted in `redundant`
278        **with a single value**. Otherwise, it is inserted in `distinct`,
279        with **its full list of values** (filling with `None` where the attribute
280        was missing in the corresponding `xp`).
281
282        The attrs in `distinct` are sufficient to (but not generally necessary,
283        since there might exist a subset of attributes that) uniquely identify each `xp`
284        in the list (the `redundant` and `common` can be "squeezed" out).
285        Thus, a table of the `xp`s does not need to list all of the attributes.
286        This function also does the heavy lifting for `xpSpace.squeeze`.
287
288        Parameters
289        ----------
290        nomerge: list
291            Attributes that should always be seen as distinct.
292        """
293        def _aggregate_keys():
294            """Aggregate keys from all `xp`"""
295            if len(self) == 0:
296                return []
297
298            # Start with da_method
299            aggregate = ['da_method']
300
301            # Aggregate all other keys
302            for xp in self:
303
304                # Get dataclass fields
305                try:
306                    dc_fields = dcs.fields(xp.__class__)
307                    dc_names = [F.name for F in dc_fields]
308                    keys = xp.__dict__.keys()
309                except TypeError:
310                    # Assume namedtuple
311                    dc_names = []
312                    keys = xp._fields
313
314                # For all potential keys:
315                for k in keys:
316                    # If not already present:
317                    if k not in aggregate:
318
319                        # If dataclass, check repr:
320                        if k in dc_names:
321                            if dc_fields[dc_names.index(k)].repr:
322                                aggregate.append(k)
323                        # Else, just append
324                        else:
325                            aggregate.append(k)
326
327            # Remove unwanted
328            excluded  = [re.compile('^_'), 'avrgs', 'stats', 'HMM', 'duration']
329            aggregate = struct_tools.complement(aggregate, excluded)
330            return aggregate
331
332        def _getattr_safe(xp, key):
333            # Don't use None, to avoid mixing with actual None's
334            # TODO 4: use an object yet more likely to be unique.
335            missing = "N/A"
336            a = getattr(xp, key, missing)
337
338            # Replace ndarray by its id, since o/w it will
339            # complain that you must use all().
340            # Alternative: replace all == (and !=) below by "is".
341            #     Tabulation with multi-line params actually works,
342            #     (though it's still likely to take up too much space,
343            #     unless we set np.printoptions...).
344            #     However, then python (since 3.8) will complain about
345            #     comparison to literal.
346            if isinstance(a, np.ndarray):
347                shorten = 6
348                a = f"arr(<id {id(a)//10**shorten}>)"
349            # TODO 3: leave formatting to sub() below?
350            # TODO 4: do similar formatting for other non-trivial params?
351            # TODO 4: document alternative way to specify non-trivial params:
352            #         use key to be looked up in some globally accessible dct.
353            #         Advantage: names are meaningful, rather than ids.
354            return a
355
356        def replace_NA_by_None(vals):
357            """Supports different types of `vals`."""
358            def sub(v):
359                return None if v == "N/A" else v
360
361            if isinstance(vals, str):
362                vals = sub(vals)
363            else:
364                try:
365                    vals = [sub(v) for v in vals]
366                except TypeError:
367                    vals = sub(vals)
368            return vals
369
370        # Main
371        distinct, redundant, common = {}, {}, {}
372        for key in _aggregate_keys():
373            vals = [_getattr_safe(xp, key) for xp in self]
374
375            if struct_tools.flexcomp(key, *nomerge):
376                dct, vals = distinct, vals
377
378            elif all(vals[0] == v for v in vals):
379                dct, vals = common, vals[0]
380
381            else:
382                nonNA = next(v for v in vals if "N/A" != v)
383                if all(v == "N/A" or v == nonNA for v in vals):
384                    dct, vals = redundant, nonNA
385
386                else:
387                    dct, vals = distinct, vals
388
389            dct[key] = replace_NA_by_None(vals)
390
391        return distinct, redundant, common

Classify all attrs. of all xps as distinct, redundant, or common.

An attribute of the xps is inserted in one of the 3 dicts as follows: The attribute names become dict keys. If the values of an attribute (collected from all of the xps) are all __equal__, then the attribute is inserted in common, but only with a single value. If they are all the same or missing, then it is inserted in redundant with a single value. Otherwise, it is inserted in distinct, with its full list of values (filling with None where the attribute was missing in the corresponding xp).

The attrs in distinct are sufficient to (but not generally necessary, since there might exist a subset of attributes that) uniquely identify each xp in the list (the redundant and common can be "squeezed" out). Thus, a table of the xps does not need to list all of the attributes. This function also does the heavy lifting for xpSpace.squeeze.

Parameters
  • nomerge (list): Attributes that should always be seen as distinct.
def gen_names(self, abbrev=6, tab=False):
401    def gen_names(self, abbrev=6, tab=False):
402        """Similiar to `self.__repr__()`, but:
403
404        - returns *list* of names
405        - tabulation is optional
406        - attaches (abbreviated) labels to each attribute
407        """
408        distinct, redundant, common = self.prep_table(nomerge=["da_method"])
409        labels = distinct.keys()
410        values = distinct.values()
411
412        # Label abbreviation
413        labels = [collapse_str(k, abbrev) for k in labels]
414
415        # Make label columns: insert None or lbl+":", depending on value
416        def column(lbl, vals):
417            return [None if v is None else lbl+":" for v in vals]
418        labels = [column(lbl, vals) for lbl, vals in zip(labels, values)]
419
420        # Interlace labels and values
421        table = [x for (a, b) in zip(labels, values) for x in (a, b)]
422
423        # Rm da_method label (but keep value)
424        table.pop(0)
425
426        # Transpose
427        table = list(map(list, zip(*table)))
428
429        # Tabulate
430        table = tabulate(table, tablefmt="plain")
431
432        # Rm space between lbls/vals
433        table = re.sub(':  +', ':', table)
434
435        # Rm alignment
436        if not tab:
437            table = re.sub(r' +', r' ', table)
438
439        return table.splitlines()

Similiar to self.__repr__(), but:

  • returns list of names
  • tabulation is optional
  • attaches (abbreviated) labels to each attribute
def tabulate_avrgs(avrgs_list, statkeys=(), decimals=None):
666def tabulate_avrgs(avrgs_list, statkeys=(), decimals=None):
667    """Tabulate avrgs (val±prec)."""
668    if not statkeys:
669        statkeys = ['rmse.a', 'rmv.a', 'rmse.f']
670
671    columns = {}
672    for stat in statkeys:
673        column = [getattr(a, stat, None) for a in avrgs_list]
674        column = unpack_uqs(column, decimals)
675        if not column:
676            raise ValueError(f"The stat. key '{stat}' was not"
677                             " found among any of the averages.")
678        vals  = align_col([stat] + column["val"])
679        precs = align_col(['1σ'] + column["prec"], just="<")
680        headr = vals[0]+'  '+precs[0]
681        mattr = [f"{v} ±{c}" for v, c in zip(vals, precs)][1:]
682        columns[headr] = mattr
683
684    return columns

Tabulate avrgs (val±prec).

def launch(self, HMM, save_as='noname', mp=False, fail_gently=None, **kwargs):
451    def launch(self, HMM, save_as="noname", mp=False, fail_gently=None, **kwargs):
452        """Essentially: `for xp in self: run_experiment(xp, ..., **kwargs)`.
453
454        See `run_experiment` for documentation on the `kwargs` and `fail_gently`.
455        See `dapper.tools.datafiles.create_run_dir` for documentation `save_as`.
456
457        Depending on `mp`, `run_experiment` is delegated as follows:
458
459        - `False`: caller process (no parallelisation)
460        - `True` or `"MP"` or an `int`: multiprocessing on this host
461        - `"GCP"` or `"Google"` or `dict(server="GCP")`: the DAPPER server
462          (Google Cloud Computing with HTCondor).
463            - Specify a list of files as `mp["files"]` to include them
464              in working directory of the server workers.
465            - In order to use absolute paths, the list should cosist
466              of tuples, where the first item is relative to the second
467              (which is an absolute path). The root is then not included
468              in the working directory of the server.
469            - If this dict field is empty, then all python files
470              in `sys.path[0]` are uploaded.
471
472        See `examples/basic_2.py` and `examples/basic_3.py` for example use.
473        """
474        # Parse mp option
475        if not mp:
476            mp = dict()
477        elif mp in [True, "MP"]:
478            mp = dict(server="local")
479        elif isinstance(mp, int):
480            mp = dict(server="local", NPROC=mp)
481        elif mp in ["GCP", "Google"]:
482            mp = dict(server="GCP", files=[], code="")
483
484        # Parse fail_gently
485        if fail_gently is None:
486            if mp and mp["server"] == "GCP":
487                fail_gently = False
488                # coz cloud processing is entirely de-coupled anyways
489            else:
490                fail_gently = True
491                # True unless otherwise requested
492        kwargs["fail_gently"] = fail_gently
493
494        # Bundle HMM with kwargs
495        kwargs['HMM'] = HMM
496
497        # Data path
498        save_as, xpi_dir = create_run_dir(save_as, mp)
499
500        # No parallelization
501        if not mp:
502            for ixp, (xp, label) in enumerate(zip(self, self.gen_names())):
503                run_experiment(xp, label, xpi_dir(ixp), **kwargs)
504
505        # Local multiprocessing
506        elif mp["server"].lower() == "local":
507            def run_with_kwargs(arg):
508                xp, ixp = arg
509                run_experiment(xp, None, xpi_dir(ixp), **kwargs)
510            args = zip(self, range(len(self)))
511
512            pb.disable_progbar          = True
513            pb.disable_user_interaction = True
514            NPROC = mp.get("NPROC", None)  # None => mp.cpu_count()
515            from dapper.tools.multiproc import mpd  # will fail on GCP
516            with mpd.Pool(NPROC) as pool:
517                list(tqdm(
518                    pool.imap(
519                        run_with_kwargs, args),
520                    total=len(self),
521                    desc="Parallel experim's",
522                    smoothing=0.1))
523            pb.disable_progbar          = False
524            pb.disable_user_interaction = False
525
526        # Google cloud platform, multiprocessing
527        elif mp["server"] == "GCP":
528            for ixp, xp in enumerate(self):
529                with open(xpi_dir(ixp)/"xp.var", "wb") as f:
530                    dill.dump(dict(xp=xp), f)
531
532            with open(save_as/"xp.com", "wb") as f:
533                dill.dump(kwargs, f)
534
535            # mkdir extra_files
536            extra_files = save_as / "extra_files"
537            os.mkdir(extra_files)
538            # Default extra_files: .py files in sys.path[0] (main script's path)
539            if not mp.get("files", []):
540                mp["files"] = [f.relative_to(sys.path[0]) for f in
541                               Path(sys.path[0]).glob("**/*.py")]
542                assert len(mp["files"]) < 1000, (
543                    "Too many files staged for upload to server."
544                    " This is the result of trying to include all files"
545                    f" under sys.path[0]: ({sys.path[0]})."
546                    " Consider moving your script to a project directory,"
547                    " or expliclity listing the files to be uploaded."
548                )
549
550            # Copy into extra_files
551            for f in mp["files"]:
552                if isinstance(f, (str, Path)):
553                    # Example: f = "A.py"
554                    path = Path(sys.path[0]) / f
555                    dst = f
556                else:  # instance of tuple(path, root)
557                    # Example: f = ("~/E/G/A.py", "G")
558                    path, root = f
559                    dst = Path(path).relative_to(root)
560                dst = extra_files / dst
561                os.makedirs(dst.parent, exist_ok=True)
562                try:
563                    shutil.copytree(path, dst)  # dir -r
564                except OSError:
565                    shutil.copy2(path, dst)  # file
566
567            with open(extra_files/"dpr_config.yaml", "w") as f:
568                f.write("\n".join([
569                    "data_root: '$cwd'",
570                    "liveplotting: no",
571                    "welcome_message: no"]))
572
573            # Loads PWD/xp_{var,com} and calls run_experiment()
574            with open(extra_files/"load_and_run.py", "w") as f:
575                f.write(dedent("""\
576                import dill
577                from dapper.xp_launch import run_experiment
578
579                # Load
580                with open("xp.com", "rb") as f: com = dill.load(f)
581                with open("xp.var", "rb") as f: var = dill.load(f)
582
583                # User-defined code
584                %s
585
586                # Run
587                try:
588                    result = run_experiment(var['xp'], None, ".", **com)
589                except SystemError as err:
590                    if err.args and "opcode" in err.args[0]:
591                        err.args += ("It seems your local python version"
592                                     " is incompatible with that of the cluster.",)
593                    raise
594                """) % dedent(mp["code"]))
595
596            submit_job_GCP(save_as)
597
598        return save_as

Essentially: for xp in self: run_experiment(xp, ..., **kwargs).

See run_experiment for documentation on the kwargs and fail_gently. See dapper.tools.datafiles.create_run_dir for documentation save_as.

Depending on mp, run_experiment is delegated as follows:

  • False: caller process (no parallelisation)
  • True or "MP" or an int: multiprocessing on this host
  • "GCP" or "Google" or dict(server="GCP"): the DAPPER server (Google Cloud Computing with HTCondor).
    • Specify a list of files as mp["files"] to include them in working directory of the server workers.
    • In order to use absolute paths, the list should cosist of tuples, where the first item is relative to the second (which is an absolute path). The root is then not included in the working directory of the server.
    • If this dict field is empty, then all python files in sys.path[0] are uploaded.

See examples/basic_2.py and examples/basic_3.py for example use.

Inherited Members
builtins.list
clear
copy
insert
extend
pop
remove
index
count
reverse
sort
def combinator(param_dict, **glob_dict):
601def combinator(param_dict, **glob_dict):
602    """Mass creation of `xp`'s by combining the value lists in the `param_dict`.
603
604    Returns a function (`for_params`) that creates all possible combinations
605    of parameters (from their value list) for a given `dapper.da_methods.da_method`.
606    This is a good deal more efficient than relying on `xpList`'s `unique`. Parameters
607
608    - not found among the args of the given DA method are ignored by `for_params`.
609    - specified as keywords to the `for_params` fix the value
610      preventing using the corresponding (if any) value list in the `param_dict`.
611
612    .. warning::
613        Beware! If, eg., `infl` or `rot` are in `param_dict`, aimed at the `EnKF`,
614        but you forget that they are also attributes some method where you don't
615        actually want to use them (eg. `SVGDF`),
616        then you'll create many more than you intend.
617    """
618    def for_params(method, **fixed_params):
619        dc_fields = [f.name for f in dcs.fields(method)]
620        params = struct_tools.intersect(param_dict, dc_fields)
621        params = struct_tools.complement(params, fixed_params)
622        params = {**glob_dict, **params}  # glob_dict 1st
623
624        def xp1(dct):
625            xp = method(**struct_tools.intersect(dct, dc_fields), **fixed_params)
626            for key, v in struct_tools.intersect(dct, glob_dict).items():
627                setattr(xp, key, v)
628            return xp
629
630        return [xp1(dct) for dct in struct_tools.prodct(params)]
631    return for_params

Mass creation of xp's by combining the value lists in the param_dict.

Returns a function (for_params) that creates all possible combinations of parameters (from their value list) for a given dapper.da_methods.da_method. This is a good deal more efficient than relying on xpList's unique. Parameters

  • not found among the args of the given DA method are ignored by for_params.
  • specified as keywords to the for_params fix the value preventing using the corresponding (if any) value list in the param_dict.

Beware! If, eg., infl or rot are in param_dict, aimed at the EnKF, but you forget that they are also attributes some method where you don't actually want to use them (eg. SVGDF), then you'll create many more than you intend.