dapper.xp_launch
Tools (notably xpList
) for setup and running of experiments (known as xp
s).
See dapper.da_methods.da_method
for the strict definition of xp
s.
1"""Tools (notably `xpList`) for setup and running of experiments (known as `xp`s). 2 3See `dapper.da_methods.da_method` for the strict definition of `xp`s. 4""" 5 6import copy 7import dataclasses as dcs 8import os 9import re 10import shutil 11import sys 12from functools import wraps 13from pathlib import Path 14from textwrap import dedent 15 16import dill 17import numpy as np 18import struct_tools 19import tabulate as _tabulate 20from tabulate import tabulate 21from tqdm.auto import tqdm 22 23import dapper.stats 24import dapper.tools.progressbar as pb 25from dapper.tools.colors import stripe 26from dapper.tools.datafiles import create_run_dir 27from dapper.tools.remote.uplink import submit_job_GCP 28from dapper.tools.seeding import set_seed 29from dapper.tools.viz import collapse_str 30 31_tabulate.MIN_PADDING = 0 32 33 34def seed_and_simulate(HMM, xp): 35 """Default experiment setup (sets seed and simulates truth and obs). 36 37 Used by `xpList.launch` via `run_experiment`. 38 39 Parameters 40 ---------- 41 HMM: HiddenMarkovModel 42 Container defining the system. 43 xp: object 44 Type: a `dapper.da_methods.da_method`-decorated class. 45 46 .. warning:: `xp.seed` should be set (and `int`). 47 48 Without `xp.seed` the seed does not get set, 49 and different `xp`s will use different seeds 50 (unless you do some funky hacking). 51 Reproducibility for a script as a whole can still be achieved 52 by setting the seed at the outset of the script. 53 To avoid even that, set `xp.seed` to `None` or `"clock"`. 54 55 Returns 56 ------- 57 tuple (xx, yy) 58 The simulated truth and observations. 59 """ 60 set_seed(getattr(xp, 'seed', False)) 61 xx, yy = HMM.simulate() 62 return HMM, xx, yy 63 64 65def run_experiment(xp, label, savedir, HMM, setup=seed_and_simulate, free=True, 66 statkeys=False, fail_gently=False, **stat_kwargs): 67 """Used by `xpList.launch` to run each single (DA) experiment ("xp"). 68 69 This involves steps similar to `examples/basic_1.py`, i.e.: 70 71 - `setup` : Initialize experiment. 72 - `xp.assimilate` : run DA, pass on exception if fail_gently 73 - `xp.stats.average_in_time` : result averaging 74 - `xp.avrgs.tabulate` : result printing 75 - `dill.dump` : result storage 76 77 Parameters 78 ---------- 79 xp: object 80 Type: a `dapper.da_methods.da_method`-decorated class. 81 label: str 82 Name attached to progressbar during assimilation. 83 savedir: str 84 Path of folder wherein to store the experiment data. 85 HMM: HiddenMarkovModel 86 Container defining the system. 87 free: bool 88 Whether (or not) to `del xp.stats` after the experiment is done, 89 so as to free up memory and/or not save this data 90 (just keeping `xp.avrgs`). 91 statkeys: list 92 A list of names (possibly in the form of abbreviations) of the 93 statistical averages that should be printed immediately afther 94 this xp. 95 fail_gently: bool 96 Whether (or not) to propagate exceptions. 97 setup: function 98 This function must take two arguments: `HMM` and `xp`, and return the `HMM` to 99 be used by the DA methods (typically the same as the input `HMM`, but could be 100 modified), and the (typically synthetic) truth and obs time series. 101 102 This gives you the ability to customize almost any aspect of the individual 103 experiments within a batch launch of experiments (i.e. not just the parameters 104 of the DA. method). Typically you will grab one or more parameter values stored 105 in the `xp` (see `dapper.da_methods.da_method`) and act on them, usually by 106 assigning them to some object that impacts the experiment. Thus, by generating 107 a new `xp` for each such parameter value you can investigate the 108 impact/sensitivity of the results to this parameter. Examples include: 109 110 - Setting the seed. See the default `setup`, namely `seed_and_simulate`, 111 for how this is, or should be, done. 112 - Setting some aspect of the `HMM` such as the observation noise, 113 or the interval between observations. This could be achieved for example by: 114 115 def setup(hmm, xp): 116 hmm.Obs.noise = GaussRV(M=hmm.Nx, C=xp.obs_noise) 117 hmm.tseq.dkObs = xp.time_between_obs 118 import dapper as dpr 119 return dpr.seed_and_simulate(hmm, xp) 120 121 This process could involve more steps, for example loading a full covariance 122 matrix from a data file, as specified by the `obs_noise` parameter, before 123 assigning it to `C`. Also note that the import statement is not strictly 124 necessary (assuming `dapper` was already imported in the outer scope, 125 typically the main script), **except** when running the experiments on a 126 remote server. 127 128 Sometimes, the parameter you want to set is not accessible as one of the 129 conventional attributes of the `HMM`. For example, the `Force` in the 130 Lorenz-96 model. In that case you can add these lines to the setup function: 131 132 import dapper.mods.Lorenz96 as core 133 core.Force = xp.the_force_parameter 134 135 However, if your model is an OOP instance, the import approach will not work 136 because it will serve you the original model instance, while `setup()` deals 137 with a copy of it. Instead, you could re-initialize the entire model in 138 `setup()` and overwrite `HMM.Dyn`. However, it is probably easier to just 139 assign the instance to some custom attribute before launching the 140 experiments, e.g. `HMM.Dyn.object = the_model_instance`, enabling you to set 141 parameters on `HMM.Dyn.object` in `setup()`. Note that this approach won't 142 work for modules (for ex., combining the above examples, `HMM.Dyn.object = 143 core`) because modules are not serializable. 144 145 - Using a different `HMM` entirely for the truth/obs (`xx`/`yy`) generation, 146 than the one that will be used by the DA. Or loading the truth/obs 147 time series from file. In both cases, you might also have to do some 148 cropping or slicing of `xx` and `yy` before returning them. 149 """ 150 # Copy HMM to avoid changes made by setup affect subsequent experiments. 151 # Thus, experiments run in sequence behave the same as experiments run via 152 # multiprocessing (which serialize (i.e. copy) the HMM) or on a cluster. 153 hmm = copy.deepcopy(HMM) 154 # Note that "implicitly referenced" objects do not get copied. For example, 155 # if the model `step` function uses parameters defined in its module or object, 156 # these will be obtained by re-importing that model, unless it has been serialized. 157 # Serialization happens if the model instance (does not work for modules) 158 # is expliclity referenced, e.g. if you've done `HMM.Dyn.underlying_model = model`. 159 160 # GENERATE TRUTH/OBS 161 hmm, xx, yy = setup(hmm, xp) 162 163 # ASSIMILATE 164 xp.assimilate(hmm, xx, yy, label, fail_gently=fail_gently, **stat_kwargs) 165 166 # Clear references to mpl (for pickling purposes) 167 if hasattr(xp.stats, "LP_instance"): 168 del xp.stats.LP_instance 169 170 # AVERAGE 171 xp.stats.average_in_time(free=free) 172 173 # PRINT 174 if statkeys: 175 statkeys = () if statkeys is True else statkeys 176 print(xp.avrgs.tabulate(statkeys)) 177 178 # SAVE 179 if savedir: 180 with open(Path(savedir)/"xp", "wb") as FILE: 181 dill.dump({'xp': xp}, FILE) 182 183 184class xpList(list): 185 """Subclass of `list` specialized for experiment ("xp") objects. 186 187 Main use: administrate experiment launches. 188 189 Modifications to `list`: 190 191 - `xpList.append` supports `unique` to enable lazy `xp` declaration. 192 - `__iadd__` (`+=`) supports adding single `xp`s. 193 this is hackey, but convenience is king. 194 - `__getitem__` supports lists, similar to `np.ndarray` 195 - `__repr__`: prints the list as rows of a table, 196 where the columns represent attributes whose value is not shared among all `xp`s. 197 Refer to `xpList.prep_table` for more information. 198 199 Add-ons: 200 201 - `xpList.launch`: run the experiments in current list. 202 - `xpList.prep_table`: find all attributes of the `xp`s in the list; 203 classify as distinct, redundant, or common. 204 - `xpList.gen_names`: use `xpList.prep_table` to generate 205 a short & unique name for each `xp` in the list. 206 - `xpList.tabulate_avrgs`: tabulate time-averaged results. 207 - `xpList.inds` to search by kw-attrs. 208 209 Parameters 210 ---------- 211 args: entries 212 Nothing, or a list of `xp`s. 213 214 unique: bool 215 Duplicates won't get appended. Makes `append` (and `__iadd__`) relatively slow. 216 Use `extend` or `__add__` or `combinator` to bypass this validation. 217 218 Also see 219 -------- 220 - Examples: `examples/basic_2`, `examples/basic_3` 221 - `dapper.xp_process.xpSpace`, which is used for experient result **presentation**, 222 as opposed to this class (`xpList`), which handles **launching** experiments. 223 """ 224 225 def __init__(self, *args, unique=False): 226 self.unique = unique 227 super().__init__(*args) 228 229 def __iadd__(self, xp): 230 if not hasattr(xp, '__iter__'): 231 xp = [xp] 232 for item in xp: 233 self.append(item) 234 return self 235 236 def append(self, xp): 237 """Append **if** not `self.unique` & present.""" 238 if not (self.unique and xp in self): 239 super().append(xp) 240 241 def __getitem__(self, keys): 242 """Indexing, also by a list""" 243 try: 244 B = [self[k] for k in keys] # if keys is list 245 except TypeError: 246 B = super().__getitem__(keys) # if keys is int, slice 247 if hasattr(B, '__len__'): 248 B = xpList(B) # Cast 249 return B 250 251 def inds(self, strict=True, missingval="NONSENSE", **kws): 252 """Find (all) indices of `xps` whose attributes match kws. 253 254 If strict, then `xp`s lacking a requested attr. will not match, 255 unless the `missingval` matches the required value. 256 """ 257 def match(xp): 258 def missing(v): return missingval if strict else v 259 matches = [getattr(xp, k, missing(v)) == v for k, v in kws.items()] 260 return all(matches) 261 262 return [i for i, xp in enumerate(self) if match(xp)] 263 264 @property 265 def da_methods(self): 266 """List `da_method` attributes in this list.""" 267 return [xp.da_method for xp in self] 268 269 def prep_table(self, nomerge=()): 270 """Classify all attrs. of all `xp`s as `distinct`, `redundant`, or `common`. 271 272 An attribute of the `xp`s is inserted in one of the 3 dicts as follows: 273 The attribute names become dict keys. If the values of an attribute 274 (collected from all of the `xp`s) are all __equal__, then the attribute 275 is inserted in `common`, but only with **a single value**. 276 If they are all the same **or missing**, then it is inserted in `redundant` 277 **with a single value**. Otherwise, it is inserted in `distinct`, 278 with **its full list of values** (filling with `None` where the attribute 279 was missing in the corresponding `xp`). 280 281 The attrs in `distinct` are sufficient to (but not generally necessary, 282 since there might exist a subset of attributes that) uniquely identify each `xp` 283 in the list (the `redundant` and `common` can be "squeezed" out). 284 Thus, a table of the `xp`s does not need to list all of the attributes. 285 This function also does the heavy lifting for `xpSpace.squeeze`. 286 287 Parameters 288 ---------- 289 nomerge: list 290 Attributes that should always be seen as distinct. 291 """ 292 def _aggregate_keys(): 293 """Aggregate keys from all `xp`""" 294 if len(self) == 0: 295 return [] 296 297 # Start with da_method 298 aggregate = ['da_method'] 299 300 # Aggregate all other keys 301 for xp in self: 302 303 # Get dataclass fields 304 try: 305 dc_fields = dcs.fields(xp.__class__) 306 dc_names = [F.name for F in dc_fields] 307 keys = xp.__dict__.keys() 308 except TypeError: 309 # Assume namedtuple 310 dc_names = [] 311 keys = xp._fields 312 313 # For all potential keys: 314 for k in keys: 315 # If not already present: 316 if k not in aggregate: 317 318 # If dataclass, check repr: 319 if k in dc_names: 320 if dc_fields[dc_names.index(k)].repr: 321 aggregate.append(k) 322 # Else, just append 323 else: 324 aggregate.append(k) 325 326 # Remove unwanted 327 excluded = [re.compile('^_'), 'avrgs', 'stats', 'HMM', 'duration'] 328 aggregate = struct_tools.complement(aggregate, excluded) 329 return aggregate 330 331 def _getattr_safe(xp, key): 332 # Don't use None, to avoid mixing with actual None's 333 # TODO 4: use an object yet more likely to be unique. 334 missing = "N/A" 335 a = getattr(xp, key, missing) 336 337 # Replace ndarray by its id, since o/w it will 338 # complain that you must use all(). 339 # Alternative: replace all == (and !=) below by "is". 340 # Tabulation with multi-line params actually works, 341 # (though it's still likely to take up too much space, 342 # unless we set np.printoptions...). 343 # However, then python (since 3.8) will complain about 344 # comparison to literal. 345 if isinstance(a, np.ndarray): 346 shorten = 6 347 a = f"arr(<id {id(a)//10**shorten}>)" 348 # TODO 3: leave formatting to sub() below? 349 # TODO 4: do similar formatting for other non-trivial params? 350 # TODO 4: document alternative way to specify non-trivial params: 351 # use key to be looked up in some globally accessible dct. 352 # Advantage: names are meaningful, rather than ids. 353 return a 354 355 def replace_NA_by_None(vals): 356 """Supports different types of `vals`.""" 357 def sub(v): 358 return None if v == "N/A" else v 359 360 if isinstance(vals, str): 361 vals = sub(vals) 362 else: 363 try: 364 vals = [sub(v) for v in vals] 365 except TypeError: 366 vals = sub(vals) 367 return vals 368 369 # Main 370 distinct, redundant, common = {}, {}, {} 371 for key in _aggregate_keys(): 372 vals = [_getattr_safe(xp, key) for xp in self] 373 374 if struct_tools.flexcomp(key, *nomerge): 375 dct, vals = distinct, vals 376 377 elif all(vals[0] == v for v in vals): 378 dct, vals = common, vals[0] 379 380 else: 381 nonNA = next(v for v in vals if "N/A" != v) 382 if all(v == "N/A" or v == nonNA for v in vals): 383 dct, vals = redundant, nonNA 384 385 else: 386 dct, vals = distinct, vals 387 388 dct[key] = replace_NA_by_None(vals) 389 390 return distinct, redundant, common 391 392 def __repr__(self): 393 distinct, redundant, common = self.prep_table() 394 s = '<xpList> of length %d with attributes:\n' % len(self) 395 s += tabulate(distinct, headers="keys", showindex=True) 396 s += "\nOther attributes:\n" 397 s += str(struct_tools.AlignedDict({**redundant, **common})) 398 return s 399 400 def gen_names(self, abbrev=6, tab=False): 401 """Similiar to `self.__repr__()`, but: 402 403 - returns *list* of names 404 - tabulation is optional 405 - attaches (abbreviated) labels to each attribute 406 """ 407 distinct, redundant, common = self.prep_table(nomerge=["da_method"]) 408 labels = distinct.keys() 409 values = distinct.values() 410 411 # Label abbreviation 412 labels = [collapse_str(k, abbrev) for k in labels] 413 414 # Make label columns: insert None or lbl+":", depending on value 415 def column(lbl, vals): 416 return [None if v is None else lbl+":" for v in vals] 417 labels = [column(lbl, vals) for lbl, vals in zip(labels, values)] 418 419 # Interlace labels and values 420 table = [x for (a, b) in zip(labels, values) for x in (a, b)] 421 422 # Rm da_method label (but keep value) 423 table.pop(0) 424 425 # Transpose 426 table = list(map(list, zip(*table))) 427 428 # Tabulate 429 table = tabulate(table, tablefmt="plain") 430 431 # Rm space between lbls/vals 432 table = re.sub(': +', ':', table) 433 434 # Rm alignment 435 if not tab: 436 table = re.sub(r' +', r' ', table) 437 438 return table.splitlines() 439 440 @wraps(dapper.stats.tabulate_avrgs) 441 def tabulate_avrgs(self, *args, colorize=True, **kwargs): 442 distinct, redundant, common = self.prep_table() 443 averages = dapper.stats.tabulate_avrgs([C.avrgs for C in self], *args, **kwargs) 444 columns = {**distinct, '|': ['|']*len(self), **averages} # merge 445 table = tabulate(columns, headers="keys", showindex=True).replace('␣', ' ') 446 if colorize: 447 table = stripe(table) 448 return table 449 450 def launch(self, HMM, save_as="noname", mp=False, fail_gently=None, **kwargs): 451 """Essentially: `for xp in self: run_experiment(xp, ..., **kwargs)`. 452 453 See `run_experiment` for documentation on the `kwargs` and `fail_gently`. 454 See `dapper.tools.datafiles.create_run_dir` for documentation `save_as`. 455 456 Depending on `mp`, `run_experiment` is delegated as follows: 457 458 - `False`: caller process (no parallelisation) 459 - `True` or `"MP"` or an `int`: multiprocessing on this host 460 - `"GCP"` or `"Google"` or `dict(server="GCP")`: the DAPPER server 461 (Google Cloud Computing with HTCondor). 462 - Specify a list of files as `mp["files"]` to include them 463 in working directory of the server workers. 464 - In order to use absolute paths, the list should cosist 465 of tuples, where the first item is relative to the second 466 (which is an absolute path). The root is then not included 467 in the working directory of the server. 468 - If this dict field is empty, then all python files 469 in `sys.path[0]` are uploaded. 470 471 See `examples/basic_2.py` and `examples/basic_3.py` for example use. 472 """ 473 # Parse mp option 474 if not mp: 475 mp = dict() 476 elif mp in [True, "MP"]: 477 mp = dict(server="local") 478 elif isinstance(mp, int): 479 mp = dict(server="local", NPROC=mp) 480 elif mp in ["GCP", "Google"]: 481 mp = dict(server="GCP", files=[], code="") 482 483 # Parse fail_gently 484 if fail_gently is None: 485 if mp and mp["server"] == "GCP": 486 fail_gently = False 487 # coz cloud processing is entirely de-coupled anyways 488 else: 489 fail_gently = True 490 # True unless otherwise requested 491 kwargs["fail_gently"] = fail_gently 492 493 # Bundle HMM with kwargs 494 kwargs['HMM'] = HMM 495 496 # Data path 497 save_as, xpi_dir = create_run_dir(save_as, mp) 498 499 # No parallelization 500 if not mp: 501 for ixp, (xp, label) in enumerate(zip(self, self.gen_names())): 502 run_experiment(xp, label, xpi_dir(ixp), **kwargs) 503 504 # Local multiprocessing 505 elif mp["server"].lower() == "local": 506 def run_with_kwargs(arg): 507 xp, ixp = arg 508 run_experiment(xp, None, xpi_dir(ixp), **kwargs) 509 args = zip(self, range(len(self))) 510 511 pb.disable_progbar = True 512 pb.disable_user_interaction = True 513 NPROC = mp.get("NPROC", None) # None => mp.cpu_count() 514 from dapper.tools.multiproc import mpd # will fail on GCP 515 with mpd.Pool(NPROC) as pool: 516 list(tqdm( 517 pool.imap( 518 run_with_kwargs, args), 519 total=len(self), 520 desc="Parallel experim's", 521 smoothing=0.1)) 522 pb.disable_progbar = False 523 pb.disable_user_interaction = False 524 525 # Google cloud platform, multiprocessing 526 elif mp["server"] == "GCP": 527 for ixp, xp in enumerate(self): 528 with open(xpi_dir(ixp)/"xp.var", "wb") as f: 529 dill.dump(dict(xp=xp), f) 530 531 with open(save_as/"xp.com", "wb") as f: 532 dill.dump(kwargs, f) 533 534 # mkdir extra_files 535 extra_files = save_as / "extra_files" 536 os.mkdir(extra_files) 537 # Default extra_files: .py files in sys.path[0] (main script's path) 538 if not mp.get("files", []): 539 mp["files"] = [f.relative_to(sys.path[0]) for f in 540 Path(sys.path[0]).glob("**/*.py")] 541 assert len(mp["files"]) < 1000, ( 542 "Too many files staged for upload to server." 543 " This is the result of trying to include all files" 544 f" under sys.path[0]: ({sys.path[0]})." 545 " Consider moving your script to a project directory," 546 " or expliclity listing the files to be uploaded." 547 ) 548 549 # Copy into extra_files 550 for f in mp["files"]: 551 if isinstance(f, (str, Path)): 552 # Example: f = "A.py" 553 path = Path(sys.path[0]) / f 554 dst = f 555 else: # instance of tuple(path, root) 556 # Example: f = ("~/E/G/A.py", "G") 557 path, root = f 558 dst = Path(path).relative_to(root) 559 dst = extra_files / dst 560 os.makedirs(dst.parent, exist_ok=True) 561 try: 562 shutil.copytree(path, dst) # dir -r 563 except OSError: 564 shutil.copy2(path, dst) # file 565 566 with open(extra_files/"dpr_config.yaml", "w") as f: 567 f.write("\n".join([ 568 "data_root: '$cwd'", 569 "liveplotting: no", 570 "welcome_message: no"])) 571 572 # Loads PWD/xp_{var,com} and calls run_experiment() 573 with open(extra_files/"load_and_run.py", "w") as f: 574 f.write(dedent("""\ 575 import dill 576 from dapper.xp_launch import run_experiment 577 578 # Load 579 with open("xp.com", "rb") as f: com = dill.load(f) 580 with open("xp.var", "rb") as f: var = dill.load(f) 581 582 # User-defined code 583 %s 584 585 # Run 586 try: 587 result = run_experiment(var['xp'], None, ".", **com) 588 except SystemError as err: 589 if err.args and "opcode" in err.args[0]: 590 err.args += ("It seems your local python version" 591 " is incompatible with that of the cluster.",) 592 raise 593 """) % dedent(mp["code"])) 594 595 submit_job_GCP(save_as) 596 597 return save_as 598 599 600def combinator(param_dict, **glob_dict): 601 """Mass creation of `xp`'s by combining the value lists in the `param_dict`. 602 603 Returns a function (`for_params`) that creates all possible combinations 604 of parameters (from their value list) for a given `dapper.da_methods.da_method`. 605 This is a good deal more efficient than relying on `xpList`'s `unique`. Parameters 606 607 - not found among the args of the given DA method are ignored by `for_params`. 608 - specified as keywords to the `for_params` fix the value 609 preventing using the corresponding (if any) value list in the `param_dict`. 610 611 .. warning:: 612 Beware! If, eg., `infl` or `rot` are in `param_dict`, aimed at the `EnKF`, 613 but you forget that they are also attributes some method where you don't 614 actually want to use them (eg. `SVGDF`), 615 then you'll create many more than you intend. 616 """ 617 def for_params(method, **fixed_params): 618 dc_fields = [f.name for f in dcs.fields(method)] 619 params = struct_tools.intersect(param_dict, dc_fields) 620 params = struct_tools.complement(params, fixed_params) 621 params = {**glob_dict, **params} # glob_dict 1st 622 623 def xp1(dct): 624 xp = method(**struct_tools.intersect(dct, dc_fields), **fixed_params) 625 for key, v in struct_tools.intersect(dct, glob_dict).items(): 626 setattr(xp, key, v) 627 return xp 628 629 return [xp1(dct) for dct in struct_tools.prodct(params)] 630 return for_params
35def seed_and_simulate(HMM, xp): 36 """Default experiment setup (sets seed and simulates truth and obs). 37 38 Used by `xpList.launch` via `run_experiment`. 39 40 Parameters 41 ---------- 42 HMM: HiddenMarkovModel 43 Container defining the system. 44 xp: object 45 Type: a `dapper.da_methods.da_method`-decorated class. 46 47 .. warning:: `xp.seed` should be set (and `int`). 48 49 Without `xp.seed` the seed does not get set, 50 and different `xp`s will use different seeds 51 (unless you do some funky hacking). 52 Reproducibility for a script as a whole can still be achieved 53 by setting the seed at the outset of the script. 54 To avoid even that, set `xp.seed` to `None` or `"clock"`. 55 56 Returns 57 ------- 58 tuple (xx, yy) 59 The simulated truth and observations. 60 """ 61 set_seed(getattr(xp, 'seed', False)) 62 xx, yy = HMM.simulate() 63 return HMM, xx, yy
Default experiment setup (sets seed and simulates truth and obs).
Used by xpList.launch
via run_experiment
.
Parameters
- HMM (HiddenMarkovModel): Container defining the system.
xp (object): Type: a
dapper.da_methods.da_method
-decorated class.xp.seed
should be set (andint
).Without
xp.seed
the seed does not get set, and differentxp
s will use different seeds (unless you do some funky hacking). Reproducibility for a script as a whole can still be achieved by setting the seed at the outset of the script. To avoid even that, setxp.seed
toNone
or"clock"
.
Returns
- tuple (xx, yy): The simulated truth and observations.
66def run_experiment(xp, label, savedir, HMM, setup=seed_and_simulate, free=True, 67 statkeys=False, fail_gently=False, **stat_kwargs): 68 """Used by `xpList.launch` to run each single (DA) experiment ("xp"). 69 70 This involves steps similar to `examples/basic_1.py`, i.e.: 71 72 - `setup` : Initialize experiment. 73 - `xp.assimilate` : run DA, pass on exception if fail_gently 74 - `xp.stats.average_in_time` : result averaging 75 - `xp.avrgs.tabulate` : result printing 76 - `dill.dump` : result storage 77 78 Parameters 79 ---------- 80 xp: object 81 Type: a `dapper.da_methods.da_method`-decorated class. 82 label: str 83 Name attached to progressbar during assimilation. 84 savedir: str 85 Path of folder wherein to store the experiment data. 86 HMM: HiddenMarkovModel 87 Container defining the system. 88 free: bool 89 Whether (or not) to `del xp.stats` after the experiment is done, 90 so as to free up memory and/or not save this data 91 (just keeping `xp.avrgs`). 92 statkeys: list 93 A list of names (possibly in the form of abbreviations) of the 94 statistical averages that should be printed immediately afther 95 this xp. 96 fail_gently: bool 97 Whether (or not) to propagate exceptions. 98 setup: function 99 This function must take two arguments: `HMM` and `xp`, and return the `HMM` to 100 be used by the DA methods (typically the same as the input `HMM`, but could be 101 modified), and the (typically synthetic) truth and obs time series. 102 103 This gives you the ability to customize almost any aspect of the individual 104 experiments within a batch launch of experiments (i.e. not just the parameters 105 of the DA. method). Typically you will grab one or more parameter values stored 106 in the `xp` (see `dapper.da_methods.da_method`) and act on them, usually by 107 assigning them to some object that impacts the experiment. Thus, by generating 108 a new `xp` for each such parameter value you can investigate the 109 impact/sensitivity of the results to this parameter. Examples include: 110 111 - Setting the seed. See the default `setup`, namely `seed_and_simulate`, 112 for how this is, or should be, done. 113 - Setting some aspect of the `HMM` such as the observation noise, 114 or the interval between observations. This could be achieved for example by: 115 116 def setup(hmm, xp): 117 hmm.Obs.noise = GaussRV(M=hmm.Nx, C=xp.obs_noise) 118 hmm.tseq.dkObs = xp.time_between_obs 119 import dapper as dpr 120 return dpr.seed_and_simulate(hmm, xp) 121 122 This process could involve more steps, for example loading a full covariance 123 matrix from a data file, as specified by the `obs_noise` parameter, before 124 assigning it to `C`. Also note that the import statement is not strictly 125 necessary (assuming `dapper` was already imported in the outer scope, 126 typically the main script), **except** when running the experiments on a 127 remote server. 128 129 Sometimes, the parameter you want to set is not accessible as one of the 130 conventional attributes of the `HMM`. For example, the `Force` in the 131 Lorenz-96 model. In that case you can add these lines to the setup function: 132 133 import dapper.mods.Lorenz96 as core 134 core.Force = xp.the_force_parameter 135 136 However, if your model is an OOP instance, the import approach will not work 137 because it will serve you the original model instance, while `setup()` deals 138 with a copy of it. Instead, you could re-initialize the entire model in 139 `setup()` and overwrite `HMM.Dyn`. However, it is probably easier to just 140 assign the instance to some custom attribute before launching the 141 experiments, e.g. `HMM.Dyn.object = the_model_instance`, enabling you to set 142 parameters on `HMM.Dyn.object` in `setup()`. Note that this approach won't 143 work for modules (for ex., combining the above examples, `HMM.Dyn.object = 144 core`) because modules are not serializable. 145 146 - Using a different `HMM` entirely for the truth/obs (`xx`/`yy`) generation, 147 than the one that will be used by the DA. Or loading the truth/obs 148 time series from file. In both cases, you might also have to do some 149 cropping or slicing of `xx` and `yy` before returning them. 150 """ 151 # Copy HMM to avoid changes made by setup affect subsequent experiments. 152 # Thus, experiments run in sequence behave the same as experiments run via 153 # multiprocessing (which serialize (i.e. copy) the HMM) or on a cluster. 154 hmm = copy.deepcopy(HMM) 155 # Note that "implicitly referenced" objects do not get copied. For example, 156 # if the model `step` function uses parameters defined in its module or object, 157 # these will be obtained by re-importing that model, unless it has been serialized. 158 # Serialization happens if the model instance (does not work for modules) 159 # is expliclity referenced, e.g. if you've done `HMM.Dyn.underlying_model = model`. 160 161 # GENERATE TRUTH/OBS 162 hmm, xx, yy = setup(hmm, xp) 163 164 # ASSIMILATE 165 xp.assimilate(hmm, xx, yy, label, fail_gently=fail_gently, **stat_kwargs) 166 167 # Clear references to mpl (for pickling purposes) 168 if hasattr(xp.stats, "LP_instance"): 169 del xp.stats.LP_instance 170 171 # AVERAGE 172 xp.stats.average_in_time(free=free) 173 174 # PRINT 175 if statkeys: 176 statkeys = () if statkeys is True else statkeys 177 print(xp.avrgs.tabulate(statkeys)) 178 179 # SAVE 180 if savedir: 181 with open(Path(savedir)/"xp", "wb") as FILE: 182 dill.dump({'xp': xp}, FILE)
Used by xpList.launch
to run each single (DA) experiment ("xp").
This involves steps similar to examples/basic_1.py
, i.e.:
setup
: Initialize experiment.xp.assimilate
: run DA, pass on exception if fail_gentlyxp.stats.average_in_time
: result averagingxp.avrgs.tabulate
: result printingdill.dump
: result storage
Parameters
- xp (object):
Type: a
dapper.da_methods.da_method
-decorated class. - label (str): Name attached to progressbar during assimilation.
- savedir (str): Path of folder wherein to store the experiment data.
- HMM (HiddenMarkovModel): Container defining the system.
- free (bool):
Whether (or not) to
del xp.stats
after the experiment is done, so as to free up memory and/or not save this data (just keepingxp.avrgs
). - statkeys (list): A list of names (possibly in the form of abbreviations) of the statistical averages that should be printed immediately afther this xp.
- fail_gently (bool): Whether (or not) to propagate exceptions.
setup (function): This function must take two arguments:
HMM
andxp
, and return theHMM
to be used by the DA methods (typically the same as the inputHMM
, but could be modified), and the (typically synthetic) truth and obs time series.This gives you the ability to customize almost any aspect of the individual experiments within a batch launch of experiments (i.e. not just the parameters of the DA. method). Typically you will grab one or more parameter values stored in the
xp
(seedapper.da_methods.da_method
) and act on them, usually by assigning them to some object that impacts the experiment. Thus, by generating a newxp
for each such parameter value you can investigate the impact/sensitivity of the results to this parameter. Examples include:- Setting the seed. See the default
setup
, namelyseed_and_simulate
, for how this is, or should be, done. Setting some aspect of the
HMM
such as the observation noise, or the interval between observations. This could be achieved for example by:def setup(hmm, xp): hmm.Obs.noise = GaussRV(M=hmm.Nx, C=xp.obs_noise) hmm.tseq.dkObs = xp.time_between_obs import dapper as dpr return dpr.seed_and_simulate(hmm, xp)
This process could involve more steps, for example loading a full covariance matrix from a data file, as specified by the
obs_noise
parameter, before assigning it toC
. Also note that the import statement is not strictly necessary (assumingdapper
was already imported in the outer scope, typically the main script), except when running the experiments on a remote server.Sometimes, the parameter you want to set is not accessible as one of the conventional attributes of the
HMM
. For example, theForce
in the Lorenz-96 model. In that case you can add these lines to the setup function:import dapper.mods.Lorenz96 as core core.Force = xp.the_force_parameter
However, if your model is an OOP instance, the import approach will not work because it will serve you the original model instance, while
setup()
deals with a copy of it. Instead, you could re-initialize the entire model insetup()
and overwriteHMM.Dyn
. However, it is probably easier to just assign the instance to some custom attribute before launching the experiments, e.g.HMM.Dyn.object = the_model_instance
, enabling you to set parameters onHMM.Dyn.object
insetup()
. Note that this approach won't work for modules (for ex., combining the above examples,HMM.Dyn.object = core
) because modules are not serializable.Using a different
HMM
entirely for the truth/obs (xx
/yy
) generation, than the one that will be used by the DA. Or loading the truth/obs time series from file. In both cases, you might also have to do some cropping or slicing ofxx
andyy
before returning them.
- Setting the seed. See the default
185class xpList(list): 186 """Subclass of `list` specialized for experiment ("xp") objects. 187 188 Main use: administrate experiment launches. 189 190 Modifications to `list`: 191 192 - `xpList.append` supports `unique` to enable lazy `xp` declaration. 193 - `__iadd__` (`+=`) supports adding single `xp`s. 194 this is hackey, but convenience is king. 195 - `__getitem__` supports lists, similar to `np.ndarray` 196 - `__repr__`: prints the list as rows of a table, 197 where the columns represent attributes whose value is not shared among all `xp`s. 198 Refer to `xpList.prep_table` for more information. 199 200 Add-ons: 201 202 - `xpList.launch`: run the experiments in current list. 203 - `xpList.prep_table`: find all attributes of the `xp`s in the list; 204 classify as distinct, redundant, or common. 205 - `xpList.gen_names`: use `xpList.prep_table` to generate 206 a short & unique name for each `xp` in the list. 207 - `xpList.tabulate_avrgs`: tabulate time-averaged results. 208 - `xpList.inds` to search by kw-attrs. 209 210 Parameters 211 ---------- 212 args: entries 213 Nothing, or a list of `xp`s. 214 215 unique: bool 216 Duplicates won't get appended. Makes `append` (and `__iadd__`) relatively slow. 217 Use `extend` or `__add__` or `combinator` to bypass this validation. 218 219 Also see 220 -------- 221 - Examples: `examples/basic_2`, `examples/basic_3` 222 - `dapper.xp_process.xpSpace`, which is used for experient result **presentation**, 223 as opposed to this class (`xpList`), which handles **launching** experiments. 224 """ 225 226 def __init__(self, *args, unique=False): 227 self.unique = unique 228 super().__init__(*args) 229 230 def __iadd__(self, xp): 231 if not hasattr(xp, '__iter__'): 232 xp = [xp] 233 for item in xp: 234 self.append(item) 235 return self 236 237 def append(self, xp): 238 """Append **if** not `self.unique` & present.""" 239 if not (self.unique and xp in self): 240 super().append(xp) 241 242 def __getitem__(self, keys): 243 """Indexing, also by a list""" 244 try: 245 B = [self[k] for k in keys] # if keys is list 246 except TypeError: 247 B = super().__getitem__(keys) # if keys is int, slice 248 if hasattr(B, '__len__'): 249 B = xpList(B) # Cast 250 return B 251 252 def inds(self, strict=True, missingval="NONSENSE", **kws): 253 """Find (all) indices of `xps` whose attributes match kws. 254 255 If strict, then `xp`s lacking a requested attr. will not match, 256 unless the `missingval` matches the required value. 257 """ 258 def match(xp): 259 def missing(v): return missingval if strict else v 260 matches = [getattr(xp, k, missing(v)) == v for k, v in kws.items()] 261 return all(matches) 262 263 return [i for i, xp in enumerate(self) if match(xp)] 264 265 @property 266 def da_methods(self): 267 """List `da_method` attributes in this list.""" 268 return [xp.da_method for xp in self] 269 270 def prep_table(self, nomerge=()): 271 """Classify all attrs. of all `xp`s as `distinct`, `redundant`, or `common`. 272 273 An attribute of the `xp`s is inserted in one of the 3 dicts as follows: 274 The attribute names become dict keys. If the values of an attribute 275 (collected from all of the `xp`s) are all __equal__, then the attribute 276 is inserted in `common`, but only with **a single value**. 277 If they are all the same **or missing**, then it is inserted in `redundant` 278 **with a single value**. Otherwise, it is inserted in `distinct`, 279 with **its full list of values** (filling with `None` where the attribute 280 was missing in the corresponding `xp`). 281 282 The attrs in `distinct` are sufficient to (but not generally necessary, 283 since there might exist a subset of attributes that) uniquely identify each `xp` 284 in the list (the `redundant` and `common` can be "squeezed" out). 285 Thus, a table of the `xp`s does not need to list all of the attributes. 286 This function also does the heavy lifting for `xpSpace.squeeze`. 287 288 Parameters 289 ---------- 290 nomerge: list 291 Attributes that should always be seen as distinct. 292 """ 293 def _aggregate_keys(): 294 """Aggregate keys from all `xp`""" 295 if len(self) == 0: 296 return [] 297 298 # Start with da_method 299 aggregate = ['da_method'] 300 301 # Aggregate all other keys 302 for xp in self: 303 304 # Get dataclass fields 305 try: 306 dc_fields = dcs.fields(xp.__class__) 307 dc_names = [F.name for F in dc_fields] 308 keys = xp.__dict__.keys() 309 except TypeError: 310 # Assume namedtuple 311 dc_names = [] 312 keys = xp._fields 313 314 # For all potential keys: 315 for k in keys: 316 # If not already present: 317 if k not in aggregate: 318 319 # If dataclass, check repr: 320 if k in dc_names: 321 if dc_fields[dc_names.index(k)].repr: 322 aggregate.append(k) 323 # Else, just append 324 else: 325 aggregate.append(k) 326 327 # Remove unwanted 328 excluded = [re.compile('^_'), 'avrgs', 'stats', 'HMM', 'duration'] 329 aggregate = struct_tools.complement(aggregate, excluded) 330 return aggregate 331 332 def _getattr_safe(xp, key): 333 # Don't use None, to avoid mixing with actual None's 334 # TODO 4: use an object yet more likely to be unique. 335 missing = "N/A" 336 a = getattr(xp, key, missing) 337 338 # Replace ndarray by its id, since o/w it will 339 # complain that you must use all(). 340 # Alternative: replace all == (and !=) below by "is". 341 # Tabulation with multi-line params actually works, 342 # (though it's still likely to take up too much space, 343 # unless we set np.printoptions...). 344 # However, then python (since 3.8) will complain about 345 # comparison to literal. 346 if isinstance(a, np.ndarray): 347 shorten = 6 348 a = f"arr(<id {id(a)//10**shorten}>)" 349 # TODO 3: leave formatting to sub() below? 350 # TODO 4: do similar formatting for other non-trivial params? 351 # TODO 4: document alternative way to specify non-trivial params: 352 # use key to be looked up in some globally accessible dct. 353 # Advantage: names are meaningful, rather than ids. 354 return a 355 356 def replace_NA_by_None(vals): 357 """Supports different types of `vals`.""" 358 def sub(v): 359 return None if v == "N/A" else v 360 361 if isinstance(vals, str): 362 vals = sub(vals) 363 else: 364 try: 365 vals = [sub(v) for v in vals] 366 except TypeError: 367 vals = sub(vals) 368 return vals 369 370 # Main 371 distinct, redundant, common = {}, {}, {} 372 for key in _aggregate_keys(): 373 vals = [_getattr_safe(xp, key) for xp in self] 374 375 if struct_tools.flexcomp(key, *nomerge): 376 dct, vals = distinct, vals 377 378 elif all(vals[0] == v for v in vals): 379 dct, vals = common, vals[0] 380 381 else: 382 nonNA = next(v for v in vals if "N/A" != v) 383 if all(v == "N/A" or v == nonNA for v in vals): 384 dct, vals = redundant, nonNA 385 386 else: 387 dct, vals = distinct, vals 388 389 dct[key] = replace_NA_by_None(vals) 390 391 return distinct, redundant, common 392 393 def __repr__(self): 394 distinct, redundant, common = self.prep_table() 395 s = '<xpList> of length %d with attributes:\n' % len(self) 396 s += tabulate(distinct, headers="keys", showindex=True) 397 s += "\nOther attributes:\n" 398 s += str(struct_tools.AlignedDict({**redundant, **common})) 399 return s 400 401 def gen_names(self, abbrev=6, tab=False): 402 """Similiar to `self.__repr__()`, but: 403 404 - returns *list* of names 405 - tabulation is optional 406 - attaches (abbreviated) labels to each attribute 407 """ 408 distinct, redundant, common = self.prep_table(nomerge=["da_method"]) 409 labels = distinct.keys() 410 values = distinct.values() 411 412 # Label abbreviation 413 labels = [collapse_str(k, abbrev) for k in labels] 414 415 # Make label columns: insert None or lbl+":", depending on value 416 def column(lbl, vals): 417 return [None if v is None else lbl+":" for v in vals] 418 labels = [column(lbl, vals) for lbl, vals in zip(labels, values)] 419 420 # Interlace labels and values 421 table = [x for (a, b) in zip(labels, values) for x in (a, b)] 422 423 # Rm da_method label (but keep value) 424 table.pop(0) 425 426 # Transpose 427 table = list(map(list, zip(*table))) 428 429 # Tabulate 430 table = tabulate(table, tablefmt="plain") 431 432 # Rm space between lbls/vals 433 table = re.sub(': +', ':', table) 434 435 # Rm alignment 436 if not tab: 437 table = re.sub(r' +', r' ', table) 438 439 return table.splitlines() 440 441 @wraps(dapper.stats.tabulate_avrgs) 442 def tabulate_avrgs(self, *args, colorize=True, **kwargs): 443 distinct, redundant, common = self.prep_table() 444 averages = dapper.stats.tabulate_avrgs([C.avrgs for C in self], *args, **kwargs) 445 columns = {**distinct, '|': ['|']*len(self), **averages} # merge 446 table = tabulate(columns, headers="keys", showindex=True).replace('␣', ' ') 447 if colorize: 448 table = stripe(table) 449 return table 450 451 def launch(self, HMM, save_as="noname", mp=False, fail_gently=None, **kwargs): 452 """Essentially: `for xp in self: run_experiment(xp, ..., **kwargs)`. 453 454 See `run_experiment` for documentation on the `kwargs` and `fail_gently`. 455 See `dapper.tools.datafiles.create_run_dir` for documentation `save_as`. 456 457 Depending on `mp`, `run_experiment` is delegated as follows: 458 459 - `False`: caller process (no parallelisation) 460 - `True` or `"MP"` or an `int`: multiprocessing on this host 461 - `"GCP"` or `"Google"` or `dict(server="GCP")`: the DAPPER server 462 (Google Cloud Computing with HTCondor). 463 - Specify a list of files as `mp["files"]` to include them 464 in working directory of the server workers. 465 - In order to use absolute paths, the list should cosist 466 of tuples, where the first item is relative to the second 467 (which is an absolute path). The root is then not included 468 in the working directory of the server. 469 - If this dict field is empty, then all python files 470 in `sys.path[0]` are uploaded. 471 472 See `examples/basic_2.py` and `examples/basic_3.py` for example use. 473 """ 474 # Parse mp option 475 if not mp: 476 mp = dict() 477 elif mp in [True, "MP"]: 478 mp = dict(server="local") 479 elif isinstance(mp, int): 480 mp = dict(server="local", NPROC=mp) 481 elif mp in ["GCP", "Google"]: 482 mp = dict(server="GCP", files=[], code="") 483 484 # Parse fail_gently 485 if fail_gently is None: 486 if mp and mp["server"] == "GCP": 487 fail_gently = False 488 # coz cloud processing is entirely de-coupled anyways 489 else: 490 fail_gently = True 491 # True unless otherwise requested 492 kwargs["fail_gently"] = fail_gently 493 494 # Bundle HMM with kwargs 495 kwargs['HMM'] = HMM 496 497 # Data path 498 save_as, xpi_dir = create_run_dir(save_as, mp) 499 500 # No parallelization 501 if not mp: 502 for ixp, (xp, label) in enumerate(zip(self, self.gen_names())): 503 run_experiment(xp, label, xpi_dir(ixp), **kwargs) 504 505 # Local multiprocessing 506 elif mp["server"].lower() == "local": 507 def run_with_kwargs(arg): 508 xp, ixp = arg 509 run_experiment(xp, None, xpi_dir(ixp), **kwargs) 510 args = zip(self, range(len(self))) 511 512 pb.disable_progbar = True 513 pb.disable_user_interaction = True 514 NPROC = mp.get("NPROC", None) # None => mp.cpu_count() 515 from dapper.tools.multiproc import mpd # will fail on GCP 516 with mpd.Pool(NPROC) as pool: 517 list(tqdm( 518 pool.imap( 519 run_with_kwargs, args), 520 total=len(self), 521 desc="Parallel experim's", 522 smoothing=0.1)) 523 pb.disable_progbar = False 524 pb.disable_user_interaction = False 525 526 # Google cloud platform, multiprocessing 527 elif mp["server"] == "GCP": 528 for ixp, xp in enumerate(self): 529 with open(xpi_dir(ixp)/"xp.var", "wb") as f: 530 dill.dump(dict(xp=xp), f) 531 532 with open(save_as/"xp.com", "wb") as f: 533 dill.dump(kwargs, f) 534 535 # mkdir extra_files 536 extra_files = save_as / "extra_files" 537 os.mkdir(extra_files) 538 # Default extra_files: .py files in sys.path[0] (main script's path) 539 if not mp.get("files", []): 540 mp["files"] = [f.relative_to(sys.path[0]) for f in 541 Path(sys.path[0]).glob("**/*.py")] 542 assert len(mp["files"]) < 1000, ( 543 "Too many files staged for upload to server." 544 " This is the result of trying to include all files" 545 f" under sys.path[0]: ({sys.path[0]})." 546 " Consider moving your script to a project directory," 547 " or expliclity listing the files to be uploaded." 548 ) 549 550 # Copy into extra_files 551 for f in mp["files"]: 552 if isinstance(f, (str, Path)): 553 # Example: f = "A.py" 554 path = Path(sys.path[0]) / f 555 dst = f 556 else: # instance of tuple(path, root) 557 # Example: f = ("~/E/G/A.py", "G") 558 path, root = f 559 dst = Path(path).relative_to(root) 560 dst = extra_files / dst 561 os.makedirs(dst.parent, exist_ok=True) 562 try: 563 shutil.copytree(path, dst) # dir -r 564 except OSError: 565 shutil.copy2(path, dst) # file 566 567 with open(extra_files/"dpr_config.yaml", "w") as f: 568 f.write("\n".join([ 569 "data_root: '$cwd'", 570 "liveplotting: no", 571 "welcome_message: no"])) 572 573 # Loads PWD/xp_{var,com} and calls run_experiment() 574 with open(extra_files/"load_and_run.py", "w") as f: 575 f.write(dedent("""\ 576 import dill 577 from dapper.xp_launch import run_experiment 578 579 # Load 580 with open("xp.com", "rb") as f: com = dill.load(f) 581 with open("xp.var", "rb") as f: var = dill.load(f) 582 583 # User-defined code 584 %s 585 586 # Run 587 try: 588 result = run_experiment(var['xp'], None, ".", **com) 589 except SystemError as err: 590 if err.args and "opcode" in err.args[0]: 591 err.args += ("It seems your local python version" 592 " is incompatible with that of the cluster.",) 593 raise 594 """) % dedent(mp["code"])) 595 596 submit_job_GCP(save_as) 597 598 return save_as
Subclass of list
specialized for experiment ("xp") objects.
Main use: administrate experiment launches.
Modifications to list
:
xpList.append
supportsunique
to enable lazyxp
declaration.__iadd__
(+=
) supports adding singlexp
s. this is hackey, but convenience is king.__getitem__
supports lists, similar tonp.ndarray
__repr__
: prints the list as rows of a table, where the columns represent attributes whose value is not shared among allxp
s. Refer toxpList.prep_table
for more information.
Add-ons:
xpList.launch
: run the experiments in current list.xpList.prep_table
: find all attributes of thexp
s in the list; classify as distinct, redundant, or common.xpList.gen_names
: usexpList.prep_table
to generate a short & unique name for eachxp
in the list.xpList.tabulate_avrgs
: tabulate time-averaged results.xpList.inds
to search by kw-attrs.
Parameters
- args (entries):
Nothing, or a list of
xp
s. - unique (bool):
Duplicates won't get appended. Makes
append
(and__iadd__
) relatively slow. Useextend
or__add__
orcombinator
to bypass this validation.
Also see
- Examples:
examples/basic_2
,examples/basic_3
dapper.xp_process.xpSpace
, which is used for experient result presentation, as opposed to this class (xpList
), which handles launching experiments.
237 def append(self, xp): 238 """Append **if** not `self.unique` & present.""" 239 if not (self.unique and xp in self): 240 super().append(xp)
Append if not self.unique
& present.
252 def inds(self, strict=True, missingval="NONSENSE", **kws): 253 """Find (all) indices of `xps` whose attributes match kws. 254 255 If strict, then `xp`s lacking a requested attr. will not match, 256 unless the `missingval` matches the required value. 257 """ 258 def match(xp): 259 def missing(v): return missingval if strict else v 260 matches = [getattr(xp, k, missing(v)) == v for k, v in kws.items()] 261 return all(matches) 262 263 return [i for i, xp in enumerate(self) if match(xp)]
Find (all) indices of xps
whose attributes match kws.
If strict, then xp
s lacking a requested attr. will not match,
unless the missingval
matches the required value.
265 @property 266 def da_methods(self): 267 """List `da_method` attributes in this list.""" 268 return [xp.da_method for xp in self]
List da_method
attributes in this list.
270 def prep_table(self, nomerge=()): 271 """Classify all attrs. of all `xp`s as `distinct`, `redundant`, or `common`. 272 273 An attribute of the `xp`s is inserted in one of the 3 dicts as follows: 274 The attribute names become dict keys. If the values of an attribute 275 (collected from all of the `xp`s) are all __equal__, then the attribute 276 is inserted in `common`, but only with **a single value**. 277 If they are all the same **or missing**, then it is inserted in `redundant` 278 **with a single value**. Otherwise, it is inserted in `distinct`, 279 with **its full list of values** (filling with `None` where the attribute 280 was missing in the corresponding `xp`). 281 282 The attrs in `distinct` are sufficient to (but not generally necessary, 283 since there might exist a subset of attributes that) uniquely identify each `xp` 284 in the list (the `redundant` and `common` can be "squeezed" out). 285 Thus, a table of the `xp`s does not need to list all of the attributes. 286 This function also does the heavy lifting for `xpSpace.squeeze`. 287 288 Parameters 289 ---------- 290 nomerge: list 291 Attributes that should always be seen as distinct. 292 """ 293 def _aggregate_keys(): 294 """Aggregate keys from all `xp`""" 295 if len(self) == 0: 296 return [] 297 298 # Start with da_method 299 aggregate = ['da_method'] 300 301 # Aggregate all other keys 302 for xp in self: 303 304 # Get dataclass fields 305 try: 306 dc_fields = dcs.fields(xp.__class__) 307 dc_names = [F.name for F in dc_fields] 308 keys = xp.__dict__.keys() 309 except TypeError: 310 # Assume namedtuple 311 dc_names = [] 312 keys = xp._fields 313 314 # For all potential keys: 315 for k in keys: 316 # If not already present: 317 if k not in aggregate: 318 319 # If dataclass, check repr: 320 if k in dc_names: 321 if dc_fields[dc_names.index(k)].repr: 322 aggregate.append(k) 323 # Else, just append 324 else: 325 aggregate.append(k) 326 327 # Remove unwanted 328 excluded = [re.compile('^_'), 'avrgs', 'stats', 'HMM', 'duration'] 329 aggregate = struct_tools.complement(aggregate, excluded) 330 return aggregate 331 332 def _getattr_safe(xp, key): 333 # Don't use None, to avoid mixing with actual None's 334 # TODO 4: use an object yet more likely to be unique. 335 missing = "N/A" 336 a = getattr(xp, key, missing) 337 338 # Replace ndarray by its id, since o/w it will 339 # complain that you must use all(). 340 # Alternative: replace all == (and !=) below by "is". 341 # Tabulation with multi-line params actually works, 342 # (though it's still likely to take up too much space, 343 # unless we set np.printoptions...). 344 # However, then python (since 3.8) will complain about 345 # comparison to literal. 346 if isinstance(a, np.ndarray): 347 shorten = 6 348 a = f"arr(<id {id(a)//10**shorten}>)" 349 # TODO 3: leave formatting to sub() below? 350 # TODO 4: do similar formatting for other non-trivial params? 351 # TODO 4: document alternative way to specify non-trivial params: 352 # use key to be looked up in some globally accessible dct. 353 # Advantage: names are meaningful, rather than ids. 354 return a 355 356 def replace_NA_by_None(vals): 357 """Supports different types of `vals`.""" 358 def sub(v): 359 return None if v == "N/A" else v 360 361 if isinstance(vals, str): 362 vals = sub(vals) 363 else: 364 try: 365 vals = [sub(v) for v in vals] 366 except TypeError: 367 vals = sub(vals) 368 return vals 369 370 # Main 371 distinct, redundant, common = {}, {}, {} 372 for key in _aggregate_keys(): 373 vals = [_getattr_safe(xp, key) for xp in self] 374 375 if struct_tools.flexcomp(key, *nomerge): 376 dct, vals = distinct, vals 377 378 elif all(vals[0] == v for v in vals): 379 dct, vals = common, vals[0] 380 381 else: 382 nonNA = next(v for v in vals if "N/A" != v) 383 if all(v == "N/A" or v == nonNA for v in vals): 384 dct, vals = redundant, nonNA 385 386 else: 387 dct, vals = distinct, vals 388 389 dct[key] = replace_NA_by_None(vals) 390 391 return distinct, redundant, common
Classify all attrs. of all xp
s as distinct
, redundant
, or common
.
An attribute of the xp
s is inserted in one of the 3 dicts as follows:
The attribute names become dict keys. If the values of an attribute
(collected from all of the xp
s) are all __equal__, then the attribute
is inserted in common
, but only with a single value.
If they are all the same or missing, then it is inserted in redundant
with a single value. Otherwise, it is inserted in distinct
,
with its full list of values (filling with None
where the attribute
was missing in the corresponding xp
).
The attrs in distinct
are sufficient to (but not generally necessary,
since there might exist a subset of attributes that) uniquely identify each xp
in the list (the redundant
and common
can be "squeezed" out).
Thus, a table of the xp
s does not need to list all of the attributes.
This function also does the heavy lifting for xpSpace.squeeze
.
Parameters
- nomerge (list): Attributes that should always be seen as distinct.
401 def gen_names(self, abbrev=6, tab=False): 402 """Similiar to `self.__repr__()`, but: 403 404 - returns *list* of names 405 - tabulation is optional 406 - attaches (abbreviated) labels to each attribute 407 """ 408 distinct, redundant, common = self.prep_table(nomerge=["da_method"]) 409 labels = distinct.keys() 410 values = distinct.values() 411 412 # Label abbreviation 413 labels = [collapse_str(k, abbrev) for k in labels] 414 415 # Make label columns: insert None or lbl+":", depending on value 416 def column(lbl, vals): 417 return [None if v is None else lbl+":" for v in vals] 418 labels = [column(lbl, vals) for lbl, vals in zip(labels, values)] 419 420 # Interlace labels and values 421 table = [x for (a, b) in zip(labels, values) for x in (a, b)] 422 423 # Rm da_method label (but keep value) 424 table.pop(0) 425 426 # Transpose 427 table = list(map(list, zip(*table))) 428 429 # Tabulate 430 table = tabulate(table, tablefmt="plain") 431 432 # Rm space between lbls/vals 433 table = re.sub(': +', ':', table) 434 435 # Rm alignment 436 if not tab: 437 table = re.sub(r' +', r' ', table) 438 439 return table.splitlines()
Similiar to self.__repr__()
, but:
- returns list of names
- tabulation is optional
- attaches (abbreviated) labels to each attribute
666def tabulate_avrgs(avrgs_list, statkeys=(), decimals=None): 667 """Tabulate avrgs (val±prec).""" 668 if not statkeys: 669 statkeys = ['rmse.a', 'rmv.a', 'rmse.f'] 670 671 columns = {} 672 for stat in statkeys: 673 column = [getattr(a, stat, None) for a in avrgs_list] 674 column = unpack_uqs(column, decimals) 675 if not column: 676 raise ValueError(f"The stat. key '{stat}' was not" 677 " found among any of the averages.") 678 vals = align_col([stat] + column["val"]) 679 precs = align_col(['1σ'] + column["prec"], just="<") 680 headr = vals[0]+' '+precs[0] 681 mattr = [f"{v} ±{c}" for v, c in zip(vals, precs)][1:] 682 columns[headr] = mattr 683 684 return columns
Tabulate avrgs (val±prec).
451 def launch(self, HMM, save_as="noname", mp=False, fail_gently=None, **kwargs): 452 """Essentially: `for xp in self: run_experiment(xp, ..., **kwargs)`. 453 454 See `run_experiment` for documentation on the `kwargs` and `fail_gently`. 455 See `dapper.tools.datafiles.create_run_dir` for documentation `save_as`. 456 457 Depending on `mp`, `run_experiment` is delegated as follows: 458 459 - `False`: caller process (no parallelisation) 460 - `True` or `"MP"` or an `int`: multiprocessing on this host 461 - `"GCP"` or `"Google"` or `dict(server="GCP")`: the DAPPER server 462 (Google Cloud Computing with HTCondor). 463 - Specify a list of files as `mp["files"]` to include them 464 in working directory of the server workers. 465 - In order to use absolute paths, the list should cosist 466 of tuples, where the first item is relative to the second 467 (which is an absolute path). The root is then not included 468 in the working directory of the server. 469 - If this dict field is empty, then all python files 470 in `sys.path[0]` are uploaded. 471 472 See `examples/basic_2.py` and `examples/basic_3.py` for example use. 473 """ 474 # Parse mp option 475 if not mp: 476 mp = dict() 477 elif mp in [True, "MP"]: 478 mp = dict(server="local") 479 elif isinstance(mp, int): 480 mp = dict(server="local", NPROC=mp) 481 elif mp in ["GCP", "Google"]: 482 mp = dict(server="GCP", files=[], code="") 483 484 # Parse fail_gently 485 if fail_gently is None: 486 if mp and mp["server"] == "GCP": 487 fail_gently = False 488 # coz cloud processing is entirely de-coupled anyways 489 else: 490 fail_gently = True 491 # True unless otherwise requested 492 kwargs["fail_gently"] = fail_gently 493 494 # Bundle HMM with kwargs 495 kwargs['HMM'] = HMM 496 497 # Data path 498 save_as, xpi_dir = create_run_dir(save_as, mp) 499 500 # No parallelization 501 if not mp: 502 for ixp, (xp, label) in enumerate(zip(self, self.gen_names())): 503 run_experiment(xp, label, xpi_dir(ixp), **kwargs) 504 505 # Local multiprocessing 506 elif mp["server"].lower() == "local": 507 def run_with_kwargs(arg): 508 xp, ixp = arg 509 run_experiment(xp, None, xpi_dir(ixp), **kwargs) 510 args = zip(self, range(len(self))) 511 512 pb.disable_progbar = True 513 pb.disable_user_interaction = True 514 NPROC = mp.get("NPROC", None) # None => mp.cpu_count() 515 from dapper.tools.multiproc import mpd # will fail on GCP 516 with mpd.Pool(NPROC) as pool: 517 list(tqdm( 518 pool.imap( 519 run_with_kwargs, args), 520 total=len(self), 521 desc="Parallel experim's", 522 smoothing=0.1)) 523 pb.disable_progbar = False 524 pb.disable_user_interaction = False 525 526 # Google cloud platform, multiprocessing 527 elif mp["server"] == "GCP": 528 for ixp, xp in enumerate(self): 529 with open(xpi_dir(ixp)/"xp.var", "wb") as f: 530 dill.dump(dict(xp=xp), f) 531 532 with open(save_as/"xp.com", "wb") as f: 533 dill.dump(kwargs, f) 534 535 # mkdir extra_files 536 extra_files = save_as / "extra_files" 537 os.mkdir(extra_files) 538 # Default extra_files: .py files in sys.path[0] (main script's path) 539 if not mp.get("files", []): 540 mp["files"] = [f.relative_to(sys.path[0]) for f in 541 Path(sys.path[0]).glob("**/*.py")] 542 assert len(mp["files"]) < 1000, ( 543 "Too many files staged for upload to server." 544 " This is the result of trying to include all files" 545 f" under sys.path[0]: ({sys.path[0]})." 546 " Consider moving your script to a project directory," 547 " or expliclity listing the files to be uploaded." 548 ) 549 550 # Copy into extra_files 551 for f in mp["files"]: 552 if isinstance(f, (str, Path)): 553 # Example: f = "A.py" 554 path = Path(sys.path[0]) / f 555 dst = f 556 else: # instance of tuple(path, root) 557 # Example: f = ("~/E/G/A.py", "G") 558 path, root = f 559 dst = Path(path).relative_to(root) 560 dst = extra_files / dst 561 os.makedirs(dst.parent, exist_ok=True) 562 try: 563 shutil.copytree(path, dst) # dir -r 564 except OSError: 565 shutil.copy2(path, dst) # file 566 567 with open(extra_files/"dpr_config.yaml", "w") as f: 568 f.write("\n".join([ 569 "data_root: '$cwd'", 570 "liveplotting: no", 571 "welcome_message: no"])) 572 573 # Loads PWD/xp_{var,com} and calls run_experiment() 574 with open(extra_files/"load_and_run.py", "w") as f: 575 f.write(dedent("""\ 576 import dill 577 from dapper.xp_launch import run_experiment 578 579 # Load 580 with open("xp.com", "rb") as f: com = dill.load(f) 581 with open("xp.var", "rb") as f: var = dill.load(f) 582 583 # User-defined code 584 %s 585 586 # Run 587 try: 588 result = run_experiment(var['xp'], None, ".", **com) 589 except SystemError as err: 590 if err.args and "opcode" in err.args[0]: 591 err.args += ("It seems your local python version" 592 " is incompatible with that of the cluster.",) 593 raise 594 """) % dedent(mp["code"])) 595 596 submit_job_GCP(save_as) 597 598 return save_as
Essentially: for xp in self: run_experiment(xp, ..., **kwargs)
.
See run_experiment
for documentation on the kwargs
and fail_gently
.
See dapper.tools.datafiles.create_run_dir
for documentation save_as
.
Depending on mp
, run_experiment
is delegated as follows:
False
: caller process (no parallelisation)True
or"MP"
or anint
: multiprocessing on this host"GCP"
or"Google"
ordict(server="GCP")
: the DAPPER server (Google Cloud Computing with HTCondor).- Specify a list of files as
mp["files"]
to include them in working directory of the server workers. - In order to use absolute paths, the list should cosist of tuples, where the first item is relative to the second (which is an absolute path). The root is then not included in the working directory of the server.
- If this dict field is empty, then all python files
in
sys.path[0]
are uploaded.
- Specify a list of files as
See examples/basic_2.py
and examples/basic_3.py
for example use.
Inherited Members
- builtins.list
- clear
- copy
- insert
- extend
- pop
- remove
- index
- count
- reverse
- sort
601def combinator(param_dict, **glob_dict): 602 """Mass creation of `xp`'s by combining the value lists in the `param_dict`. 603 604 Returns a function (`for_params`) that creates all possible combinations 605 of parameters (from their value list) for a given `dapper.da_methods.da_method`. 606 This is a good deal more efficient than relying on `xpList`'s `unique`. Parameters 607 608 - not found among the args of the given DA method are ignored by `for_params`. 609 - specified as keywords to the `for_params` fix the value 610 preventing using the corresponding (if any) value list in the `param_dict`. 611 612 .. warning:: 613 Beware! If, eg., `infl` or `rot` are in `param_dict`, aimed at the `EnKF`, 614 but you forget that they are also attributes some method where you don't 615 actually want to use them (eg. `SVGDF`), 616 then you'll create many more than you intend. 617 """ 618 def for_params(method, **fixed_params): 619 dc_fields = [f.name for f in dcs.fields(method)] 620 params = struct_tools.intersect(param_dict, dc_fields) 621 params = struct_tools.complement(params, fixed_params) 622 params = {**glob_dict, **params} # glob_dict 1st 623 624 def xp1(dct): 625 xp = method(**struct_tools.intersect(dct, dc_fields), **fixed_params) 626 for key, v in struct_tools.intersect(dct, glob_dict).items(): 627 setattr(xp, key, v) 628 return xp 629 630 return [xp1(dct) for dct in struct_tools.prodct(params)] 631 return for_params
Mass creation of xp
's by combining the value lists in the param_dict
.
Returns a function (for_params
) that creates all possible combinations
of parameters (from their value list) for a given dapper.da_methods.da_method
.
This is a good deal more efficient than relying on xpList
's unique
. Parameters
- not found among the args of the given DA method are ignored by
for_params
. - specified as keywords to the
for_params
fix the value preventing using the corresponding (if any) value list in theparam_dict
.
Beware! If, eg., infl
or rot
are in param_dict
, aimed at the EnKF
,
but you forget that they are also attributes some method where you don't
actually want to use them (eg. SVGDF
),
then you'll create many more than you intend.