dapper.da_methods.baseline
Unsophisticated" but robust (widely applicable) DA methods.
Many are based on bib.raanes2016thesis
.
1"""Unsophisticated" but robust (widely applicable) DA methods. 2 3Many are based on `bib.raanes2016thesis`. 4""" 5 6from typing import Callable, Optional 7 8import numpy as np 9 10import dapper.tools.series as series 11from dapper.stats import center 12from dapper.tools.linalg import mrdiv 13from dapper.tools.matrices import CovMat 14from dapper.tools.progressbar import progbar 15 16from . import da_method 17 18 19@da_method() 20class Climatology: 21 """A baseline/reference method. 22 23 Note that the "climatology" is computed from truth, which might be 24 (unfairly) advantageous if the simulation is too short (vs mixing time). 25 """ 26 27 def assimilate(self, HMM, xx, yy): 28 muC = np.mean(xx, 0) 29 AC = xx - muC 30 PC = CovMat(AC, "A") 31 32 self.stats.assess(0, mu=muC, Cov=PC) 33 self.stats.trHK[:] = 0 34 35 for k, ko, _, _ in progbar(HMM.tseq.ticker): 36 fau = "u" if ko is None else "fau" 37 self.stats.assess(k, ko, fau, mu=muC, Cov=PC) 38 39 40@da_method() 41class OptInterp: 42 """Optimal Interpolation -- a baseline/reference method. 43 44 Uses the Kalman filter equations, 45 but with a prior from the Climatology. 46 """ 47 48 def assimilate(self, HMM, xx, yy): 49 Id = np.eye(HMM.Nx) 50 51 # Compute "climatological" Kalman gain 52 muC = np.mean(xx, 0) 53 AC = xx - muC 54 PC = (AC.T @ AC) / (xx.shape[0] - 1) 55 56 # Setup scalar "time-series" covariance dynamics. 57 # ONLY USED FOR DIAGNOSTICS, not to affect the Kalman gain. 58 L = series.estimate_corr_length(AC.ravel(order="F")) 59 SM = fit_sigmoid(1 / 2, L, 0) 60 61 # Init 62 mu = muC 63 self.stats.assess(0, mu=mu, Cov=PC) 64 65 for k, ko, t, dt in progbar(HMM.tseq.ticker): 66 # Forecast 67 mu = HMM.Dyn(mu, t - dt, dt) 68 if ko is not None: 69 self.stats.assess(k, ko, "f", mu=muC, Cov=PC) 70 71 # Analysis 72 H = HMM.Obs(ko).linear(muC) 73 KG = mrdiv(PC @ H.T, H @ PC @ H.T + HMM.Obs(ko).noise.C.full) 74 mu = muC + KG @ (yy[ko] - HMM.Obs(ko)(muC)) 75 76 P = (Id - KG @ H) @ PC 77 SM = fit_sigmoid(P.trace() / PC.trace(), L, k) 78 79 self.stats.assess(k, ko, mu=mu, Cov=2 * PC * SM(k)) 80 81 82@da_method() 83class Var3D: 84 """3D-Var -- a baseline/reference method. 85 86 This implementation is not "Var"-ish: there is no *iterative* optimzt. 87 Instead, it does the full analysis update in one step: the Kalman filter, 88 with the background covariance being user specified, through B and xB. 89 """ 90 91 B: Optional[np.ndarray] = None 92 xB: float = 1.0 93 94 def assimilate(self, HMM, xx, yy): 95 Id = np.eye(HMM.Nx) 96 if isinstance(self.B, np.ndarray): 97 # compare ndarray 1st to avoid == error for ndarray 98 B = self.B.astype(float) 99 elif self.B in (None, "clim"): 100 # Use climatological cov, estimated from truth 101 B = np.cov(xx.T) 102 elif self.B == "eye": 103 B = Id 104 else: 105 raise ValueError("Bad input B.") 106 B *= self.xB 107 108 # ONLY USED FOR DIAGNOSTICS, not to change the Kalman gain. 109 CC = 2 * np.cov(xx.T) 110 L = series.estimate_corr_length(center(xx)[0].ravel(order="F")) 111 P = HMM.X0.C.full 112 SM = fit_sigmoid(P.trace() / CC.trace(), L, 0) 113 114 # Init 115 mu = HMM.X0.mu 116 self.stats.assess(0, mu=mu, Cov=P) 117 118 for k, ko, t, dt in progbar(HMM.tseq.ticker): 119 # Forecast 120 mu = HMM.Dyn(mu, t - dt, dt) 121 P = CC * SM(k) 122 123 if ko is not None: 124 self.stats.assess(k, ko, "f", mu=mu, Cov=P) 125 126 # Analysis 127 H = HMM.Obs(ko).linear(mu) 128 KG = mrdiv(B @ H.T, H @ B @ H.T + HMM.Obs(ko).noise.C.full) 129 mu = mu + KG @ (yy[ko] - HMM.Obs(ko)(mu)) 130 131 # Re-calibrate fit_sigmoid with new W0 = Pa/B 132 P = (Id - KG @ H) @ B 133 SM = fit_sigmoid(P.trace() / CC.trace(), L, k) 134 135 self.stats.assess(k, ko, mu=mu, Cov=P) 136 137 138def fit_sigmoid(Sb, L, kb): 139 """Return a sigmoid [function S(k)] for approximating error dynamics. 140 141 We use the logistic function for the sigmoid; it's the solution of the 142 "population growth" ODE: `dS/dt = a*S*(1-S/S(∞))`. 143 NB: It might be better to use the "error growth ODE" of Lorenz/Dalcher/Kalnay, 144 but this has a significantly more complicated closed-form solution, 145 and reduces to the above ODE when there's no model error (ODE source term). 146 147 The "normalized" sigmoid, `S1`, is symmetric around 0, and `S1(-∞)=0` and `S1(∞)=1`. 148 149 The sigmoid `S(k) = S1(a*(k-kb) + b)` is fitted (see docs/snippets/sigmoid.jpg) with 150 151 - `a` corresponding to a given corr. length `L`. 152 - `b` to match values of `S(kb)` and `Sb` 153 """ 154 155 def sigmoid(k): 156 return 1 / (1 + np.exp(-k)) # normalized sigmoid 157 158 def inv_sig(s): 159 return np.log(s / (1 - s)) # its inverse 160 161 a = 1 / L 162 b = inv_sig(Sb) 163 164 def S(k): 165 return sigmoid(b + a * (k - kb)) 166 167 return S 168 169 170@da_method() 171class Persistence: 172 """Sets estimate to the **true state** at the previous time index. 173 174 The analysis (`.a`) stat uses the previous obs. time. 175 The forecast and universal (`.f` and `.u`) stats use previous integration time 176 index. 177 """ 178 179 def assimilate(self, HMM, xx, yy): 180 prev = xx[0] 181 self.stats.assess(0, mu=prev) 182 for k, ko, _t, _dt in progbar(HMM.tseq.ticker): 183 self.stats.assess(k, ko, "fu", mu=xx[k - 1]) 184 if ko is not None: 185 self.stats.assess(k, ko, "a", mu=prev) 186 prev = xx[k] 187 188 189@da_method() 190class PreProg: 191 """Simply look-up the estimates in user-specified function (`schedule`). 192 193 For example, with `schedule` given by `lambda k, xx, yy: xx[k]` 194 the error (`err.rms, err.ma, ...`) should be 0. 195 """ 196 197 schedule: Callable 198 tag: str = None 199 200 def assimilate(self, HMM, xx, yy): 201 self.stats.assess(0, mu=self.schedule(0, xx, yy)) 202 for k, ko, _t, _dt in progbar(HMM.tseq.ticker): 203 self.stats.assess(k, ko, "fu", mu=self.schedule(k, xx, yy)) 204 if ko is not None: 205 self.stats.assess(k, ko, "a", mu=self.schedule(k, xx, yy))
20@da_method() 21class Climatology: 22 """A baseline/reference method. 23 24 Note that the "climatology" is computed from truth, which might be 25 (unfairly) advantageous if the simulation is too short (vs mixing time). 26 """ 27 28 def assimilate(self, HMM, xx, yy): 29 muC = np.mean(xx, 0) 30 AC = xx - muC 31 PC = CovMat(AC, "A") 32 33 self.stats.assess(0, mu=muC, Cov=PC) 34 self.stats.trHK[:] = 0 35 36 for k, ko, _, _ in progbar(HMM.tseq.ticker): 37 fau = "u" if ko is None else "fau" 38 self.stats.assess(k, ko, fau, mu=muC, Cov=PC)
A baseline/reference method.
Note that the "climatology" is computed from truth, which might be (unfairly) advantageous if the simulation is too short (vs mixing time).
28 def assimilate(self, HMM, xx, yy): 29 muC = np.mean(xx, 0) 30 AC = xx - muC 31 PC = CovMat(AC, "A") 32 33 self.stats.assess(0, mu=muC, Cov=PC) 34 self.stats.trHK[:] = 0 35 36 for k, ko, _, _ in progbar(HMM.tseq.ticker): 37 fau = "u" if ko is None else "fau" 38 self.stats.assess(k, ko, fau, mu=muC, Cov=PC)
41@da_method() 42class OptInterp: 43 """Optimal Interpolation -- a baseline/reference method. 44 45 Uses the Kalman filter equations, 46 but with a prior from the Climatology. 47 """ 48 49 def assimilate(self, HMM, xx, yy): 50 Id = np.eye(HMM.Nx) 51 52 # Compute "climatological" Kalman gain 53 muC = np.mean(xx, 0) 54 AC = xx - muC 55 PC = (AC.T @ AC) / (xx.shape[0] - 1) 56 57 # Setup scalar "time-series" covariance dynamics. 58 # ONLY USED FOR DIAGNOSTICS, not to affect the Kalman gain. 59 L = series.estimate_corr_length(AC.ravel(order="F")) 60 SM = fit_sigmoid(1 / 2, L, 0) 61 62 # Init 63 mu = muC 64 self.stats.assess(0, mu=mu, Cov=PC) 65 66 for k, ko, t, dt in progbar(HMM.tseq.ticker): 67 # Forecast 68 mu = HMM.Dyn(mu, t - dt, dt) 69 if ko is not None: 70 self.stats.assess(k, ko, "f", mu=muC, Cov=PC) 71 72 # Analysis 73 H = HMM.Obs(ko).linear(muC) 74 KG = mrdiv(PC @ H.T, H @ PC @ H.T + HMM.Obs(ko).noise.C.full) 75 mu = muC + KG @ (yy[ko] - HMM.Obs(ko)(muC)) 76 77 P = (Id - KG @ H) @ PC 78 SM = fit_sigmoid(P.trace() / PC.trace(), L, k) 79 80 self.stats.assess(k, ko, mu=mu, Cov=2 * PC * SM(k))
Optimal Interpolation -- a baseline/reference method.
Uses the Kalman filter equations, but with a prior from the Climatology.
49 def assimilate(self, HMM, xx, yy): 50 Id = np.eye(HMM.Nx) 51 52 # Compute "climatological" Kalman gain 53 muC = np.mean(xx, 0) 54 AC = xx - muC 55 PC = (AC.T @ AC) / (xx.shape[0] - 1) 56 57 # Setup scalar "time-series" covariance dynamics. 58 # ONLY USED FOR DIAGNOSTICS, not to affect the Kalman gain. 59 L = series.estimate_corr_length(AC.ravel(order="F")) 60 SM = fit_sigmoid(1 / 2, L, 0) 61 62 # Init 63 mu = muC 64 self.stats.assess(0, mu=mu, Cov=PC) 65 66 for k, ko, t, dt in progbar(HMM.tseq.ticker): 67 # Forecast 68 mu = HMM.Dyn(mu, t - dt, dt) 69 if ko is not None: 70 self.stats.assess(k, ko, "f", mu=muC, Cov=PC) 71 72 # Analysis 73 H = HMM.Obs(ko).linear(muC) 74 KG = mrdiv(PC @ H.T, H @ PC @ H.T + HMM.Obs(ko).noise.C.full) 75 mu = muC + KG @ (yy[ko] - HMM.Obs(ko)(muC)) 76 77 P = (Id - KG @ H) @ PC 78 SM = fit_sigmoid(P.trace() / PC.trace(), L, k) 79 80 self.stats.assess(k, ko, mu=mu, Cov=2 * PC * SM(k))
83@da_method() 84class Var3D: 85 """3D-Var -- a baseline/reference method. 86 87 This implementation is not "Var"-ish: there is no *iterative* optimzt. 88 Instead, it does the full analysis update in one step: the Kalman filter, 89 with the background covariance being user specified, through B and xB. 90 """ 91 92 B: Optional[np.ndarray] = None 93 xB: float = 1.0 94 95 def assimilate(self, HMM, xx, yy): 96 Id = np.eye(HMM.Nx) 97 if isinstance(self.B, np.ndarray): 98 # compare ndarray 1st to avoid == error for ndarray 99 B = self.B.astype(float) 100 elif self.B in (None, "clim"): 101 # Use climatological cov, estimated from truth 102 B = np.cov(xx.T) 103 elif self.B == "eye": 104 B = Id 105 else: 106 raise ValueError("Bad input B.") 107 B *= self.xB 108 109 # ONLY USED FOR DIAGNOSTICS, not to change the Kalman gain. 110 CC = 2 * np.cov(xx.T) 111 L = series.estimate_corr_length(center(xx)[0].ravel(order="F")) 112 P = HMM.X0.C.full 113 SM = fit_sigmoid(P.trace() / CC.trace(), L, 0) 114 115 # Init 116 mu = HMM.X0.mu 117 self.stats.assess(0, mu=mu, Cov=P) 118 119 for k, ko, t, dt in progbar(HMM.tseq.ticker): 120 # Forecast 121 mu = HMM.Dyn(mu, t - dt, dt) 122 P = CC * SM(k) 123 124 if ko is not None: 125 self.stats.assess(k, ko, "f", mu=mu, Cov=P) 126 127 # Analysis 128 H = HMM.Obs(ko).linear(mu) 129 KG = mrdiv(B @ H.T, H @ B @ H.T + HMM.Obs(ko).noise.C.full) 130 mu = mu + KG @ (yy[ko] - HMM.Obs(ko)(mu)) 131 132 # Re-calibrate fit_sigmoid with new W0 = Pa/B 133 P = (Id - KG @ H) @ B 134 SM = fit_sigmoid(P.trace() / CC.trace(), L, k) 135 136 self.stats.assess(k, ko, mu=mu, Cov=P)
3D-Var -- a baseline/reference method.
This implementation is not "Var"-ish: there is no iterative optimzt. Instead, it does the full analysis update in one step: the Kalman filter, with the background covariance being user specified, through B and xB.
95 def assimilate(self, HMM, xx, yy): 96 Id = np.eye(HMM.Nx) 97 if isinstance(self.B, np.ndarray): 98 # compare ndarray 1st to avoid == error for ndarray 99 B = self.B.astype(float) 100 elif self.B in (None, "clim"): 101 # Use climatological cov, estimated from truth 102 B = np.cov(xx.T) 103 elif self.B == "eye": 104 B = Id 105 else: 106 raise ValueError("Bad input B.") 107 B *= self.xB 108 109 # ONLY USED FOR DIAGNOSTICS, not to change the Kalman gain. 110 CC = 2 * np.cov(xx.T) 111 L = series.estimate_corr_length(center(xx)[0].ravel(order="F")) 112 P = HMM.X0.C.full 113 SM = fit_sigmoid(P.trace() / CC.trace(), L, 0) 114 115 # Init 116 mu = HMM.X0.mu 117 self.stats.assess(0, mu=mu, Cov=P) 118 119 for k, ko, t, dt in progbar(HMM.tseq.ticker): 120 # Forecast 121 mu = HMM.Dyn(mu, t - dt, dt) 122 P = CC * SM(k) 123 124 if ko is not None: 125 self.stats.assess(k, ko, "f", mu=mu, Cov=P) 126 127 # Analysis 128 H = HMM.Obs(ko).linear(mu) 129 KG = mrdiv(B @ H.T, H @ B @ H.T + HMM.Obs(ko).noise.C.full) 130 mu = mu + KG @ (yy[ko] - HMM.Obs(ko)(mu)) 131 132 # Re-calibrate fit_sigmoid with new W0 = Pa/B 133 P = (Id - KG @ H) @ B 134 SM = fit_sigmoid(P.trace() / CC.trace(), L, k) 135 136 self.stats.assess(k, ko, mu=mu, Cov=P)
139def fit_sigmoid(Sb, L, kb): 140 """Return a sigmoid [function S(k)] for approximating error dynamics. 141 142 We use the logistic function for the sigmoid; it's the solution of the 143 "population growth" ODE: `dS/dt = a*S*(1-S/S(∞))`. 144 NB: It might be better to use the "error growth ODE" of Lorenz/Dalcher/Kalnay, 145 but this has a significantly more complicated closed-form solution, 146 and reduces to the above ODE when there's no model error (ODE source term). 147 148 The "normalized" sigmoid, `S1`, is symmetric around 0, and `S1(-∞)=0` and `S1(∞)=1`. 149 150 The sigmoid `S(k) = S1(a*(k-kb) + b)` is fitted (see docs/snippets/sigmoid.jpg) with 151 152 - `a` corresponding to a given corr. length `L`. 153 - `b` to match values of `S(kb)` and `Sb` 154 """ 155 156 def sigmoid(k): 157 return 1 / (1 + np.exp(-k)) # normalized sigmoid 158 159 def inv_sig(s): 160 return np.log(s / (1 - s)) # its inverse 161 162 a = 1 / L 163 b = inv_sig(Sb) 164 165 def S(k): 166 return sigmoid(b + a * (k - kb)) 167 168 return S
Return a sigmoid [function S(k)] for approximating error dynamics.
We use the logistic function for the sigmoid; it's the solution of the
"population growth" ODE: dS/dt = a*S*(1-S/S(∞))
.
NB: It might be better to use the "error growth ODE" of Lorenz/Dalcher/Kalnay,
but this has a significantly more complicated closed-form solution,
and reduces to the above ODE when there's no model error (ODE source term).
The "normalized" sigmoid, S1
, is symmetric around 0, and S1(-∞)=0
and S1(∞)=1
.
The sigmoid S(k) = S1(a*(k-kb) + b)
is fitted (see docs/snippets/sigmoid.jpg) with
a
corresponding to a given corr. lengthL
.b
to match values ofS(kb)
andSb
171@da_method() 172class Persistence: 173 """Sets estimate to the **true state** at the previous time index. 174 175 The analysis (`.a`) stat uses the previous obs. time. 176 The forecast and universal (`.f` and `.u`) stats use previous integration time 177 index. 178 """ 179 180 def assimilate(self, HMM, xx, yy): 181 prev = xx[0] 182 self.stats.assess(0, mu=prev) 183 for k, ko, _t, _dt in progbar(HMM.tseq.ticker): 184 self.stats.assess(k, ko, "fu", mu=xx[k - 1]) 185 if ko is not None: 186 self.stats.assess(k, ko, "a", mu=prev) 187 prev = xx[k]
Sets estimate to the true state at the previous time index.
The analysis (.a
) stat uses the previous obs. time.
The forecast and universal (.f
and .u
) stats use previous integration time
index.
190@da_method() 191class PreProg: 192 """Simply look-up the estimates in user-specified function (`schedule`). 193 194 For example, with `schedule` given by `lambda k, xx, yy: xx[k]` 195 the error (`err.rms, err.ma, ...`) should be 0. 196 """ 197 198 schedule: Callable 199 tag: str = None 200 201 def assimilate(self, HMM, xx, yy): 202 self.stats.assess(0, mu=self.schedule(0, xx, yy)) 203 for k, ko, _t, _dt in progbar(HMM.tseq.ticker): 204 self.stats.assess(k, ko, "fu", mu=self.schedule(k, xx, yy)) 205 if ko is not None: 206 self.stats.assess(k, ko, "a", mu=self.schedule(k, xx, yy))
Simply look-up the estimates in user-specified function (schedule
).
For example, with schedule
given by lambda k, xx, yy: xx[k]
the error (err.rms, err.ma, ...
) should be 0.