dapper.da_methods.baseline

Unsophisticated" but robust (widely applicable) DA methods.

Many are based on bib.raanes2016thesis.

  1"""Unsophisticated" but robust (widely applicable) DA methods.
  2
  3Many are based on `bib.raanes2016thesis`.
  4"""
  5
  6from typing import Callable, Optional
  7
  8import numpy as np
  9
 10import dapper.tools.series as series
 11from dapper.stats import center
 12from dapper.tools.linalg import mrdiv
 13from dapper.tools.matrices import CovMat
 14from dapper.tools.progressbar import progbar
 15
 16from . import da_method
 17
 18
 19@da_method()
 20class Climatology:
 21    """A baseline/reference method.
 22
 23    Note that the "climatology" is computed from truth, which might be
 24    (unfairly) advantageous if the simulation is too short (vs mixing time).
 25    """
 26
 27    def assimilate(self, HMM, xx, yy):
 28        muC = np.mean(xx, 0)
 29        AC = xx - muC
 30        PC = CovMat(AC, "A")
 31
 32        self.stats.assess(0, mu=muC, Cov=PC)
 33        self.stats.trHK[:] = 0
 34
 35        for k, ko, _, _ in progbar(HMM.tseq.ticker):
 36            fau = "u" if ko is None else "fau"
 37            self.stats.assess(k, ko, fau, mu=muC, Cov=PC)
 38
 39
 40@da_method()
 41class OptInterp:
 42    """Optimal Interpolation -- a baseline/reference method.
 43
 44    Uses the Kalman filter equations,
 45    but with a prior from the Climatology.
 46    """
 47
 48    def assimilate(self, HMM, xx, yy):
 49        Id = np.eye(HMM.Nx)
 50
 51        # Compute "climatological" Kalman gain
 52        muC = np.mean(xx, 0)
 53        AC = xx - muC
 54        PC = (AC.T @ AC) / (xx.shape[0] - 1)
 55
 56        # Setup scalar "time-series" covariance dynamics.
 57        # ONLY USED FOR DIAGNOSTICS, not to affect the Kalman gain.
 58        L = series.estimate_corr_length(AC.ravel(order="F"))
 59        SM = fit_sigmoid(1 / 2, L, 0)
 60
 61        # Init
 62        mu = muC
 63        self.stats.assess(0, mu=mu, Cov=PC)
 64
 65        for k, ko, t, dt in progbar(HMM.tseq.ticker):
 66            # Forecast
 67            mu = HMM.Dyn(mu, t - dt, dt)
 68            if ko is not None:
 69                self.stats.assess(k, ko, "f", mu=muC, Cov=PC)
 70
 71                # Analysis
 72                H = HMM.Obs(ko).linear(muC)
 73                KG = mrdiv(PC @ H.T, H @ PC @ H.T + HMM.Obs(ko).noise.C.full)
 74                mu = muC + KG @ (yy[ko] - HMM.Obs(ko)(muC))
 75
 76                P = (Id - KG @ H) @ PC
 77                SM = fit_sigmoid(P.trace() / PC.trace(), L, k)
 78
 79            self.stats.assess(k, ko, mu=mu, Cov=2 * PC * SM(k))
 80
 81
 82@da_method()
 83class Var3D:
 84    """3D-Var -- a baseline/reference method.
 85
 86    This implementation is not "Var"-ish: there is no *iterative* optimzt.
 87    Instead, it does the full analysis update in one step: the Kalman filter,
 88    with the background covariance being user specified, through B and xB.
 89    """
 90
 91    B: Optional[np.ndarray] = None
 92    xB: float = 1.0
 93
 94    def assimilate(self, HMM, xx, yy):
 95        Id = np.eye(HMM.Nx)
 96        if isinstance(self.B, np.ndarray):
 97            # compare ndarray 1st to avoid == error for ndarray
 98            B = self.B.astype(float)
 99        elif self.B in (None, "clim"):
100            # Use climatological cov, estimated from truth
101            B = np.cov(xx.T)
102        elif self.B == "eye":
103            B = Id
104        else:
105            raise ValueError("Bad input B.")
106        B *= self.xB
107
108        # ONLY USED FOR DIAGNOSTICS, not to change the Kalman gain.
109        CC = 2 * np.cov(xx.T)
110        L = series.estimate_corr_length(center(xx)[0].ravel(order="F"))
111        P = HMM.X0.C.full
112        SM = fit_sigmoid(P.trace() / CC.trace(), L, 0)
113
114        # Init
115        mu = HMM.X0.mu
116        self.stats.assess(0, mu=mu, Cov=P)
117
118        for k, ko, t, dt in progbar(HMM.tseq.ticker):
119            # Forecast
120            mu = HMM.Dyn(mu, t - dt, dt)
121            P = CC * SM(k)
122
123            if ko is not None:
124                self.stats.assess(k, ko, "f", mu=mu, Cov=P)
125
126                # Analysis
127                H = HMM.Obs(ko).linear(mu)
128                KG = mrdiv(B @ H.T, H @ B @ H.T + HMM.Obs(ko).noise.C.full)
129                mu = mu + KG @ (yy[ko] - HMM.Obs(ko)(mu))
130
131                # Re-calibrate fit_sigmoid with new W0 = Pa/B
132                P = (Id - KG @ H) @ B
133                SM = fit_sigmoid(P.trace() / CC.trace(), L, k)
134
135            self.stats.assess(k, ko, mu=mu, Cov=P)
136
137
138def fit_sigmoid(Sb, L, kb):
139    """Return a sigmoid [function S(k)] for approximating error dynamics.
140
141    We use the logistic function for the sigmoid; it's the solution of the
142    "population growth" ODE: `dS/dt = a*S*(1-S/S(∞))`.
143    NB: It might be better to use the "error growth ODE" of Lorenz/Dalcher/Kalnay,
144    but this has a significantly more complicated closed-form solution,
145    and reduces to the above ODE when there's no model error (ODE source term).
146
147    The "normalized" sigmoid, `S1`, is symmetric around 0, and `S1(-∞)=0` and `S1(∞)=1`.
148
149    The sigmoid `S(k) = S1(a*(k-kb) + b)` is fitted (see docs/snippets/sigmoid.jpg) with
150
151    - `a` corresponding to a given corr. length `L`.
152    - `b` to match values of `S(kb)` and `Sb`
153    """
154
155    def sigmoid(k):
156        return 1 / (1 + np.exp(-k))  # normalized sigmoid
157
158    def inv_sig(s):
159        return np.log(s / (1 - s))  # its inverse
160
161    a = 1 / L
162    b = inv_sig(Sb)
163
164    def S(k):
165        return sigmoid(b + a * (k - kb))
166
167    return S
168
169
170@da_method()
171class Persistence:
172    """Sets estimate to the **true state** at the previous time index.
173
174    The analysis (`.a`) stat uses the previous obs. time.
175    The forecast and universal (`.f` and `.u`) stats use previous integration time
176    index.
177    """
178
179    def assimilate(self, HMM, xx, yy):
180        prev = xx[0]
181        self.stats.assess(0, mu=prev)
182        for k, ko, _t, _dt in progbar(HMM.tseq.ticker):
183            self.stats.assess(k, ko, "fu", mu=xx[k - 1])
184            if ko is not None:
185                self.stats.assess(k, ko, "a", mu=prev)
186                prev = xx[k]
187
188
189@da_method()
190class PreProg:
191    """Simply look-up the estimates in user-specified function (`schedule`).
192
193    For example, with `schedule` given by `lambda k, xx, yy: xx[k]`
194    the error (`err.rms, err.ma, ...`) should be 0.
195    """
196
197    schedule: Callable
198    tag: str = None
199
200    def assimilate(self, HMM, xx, yy):
201        self.stats.assess(0, mu=self.schedule(0, xx, yy))
202        for k, ko, _t, _dt in progbar(HMM.tseq.ticker):
203            self.stats.assess(k, ko, "fu", mu=self.schedule(k, xx, yy))
204            if ko is not None:
205                self.stats.assess(k, ko, "a", mu=self.schedule(k, xx, yy))
@da_method()
class Climatology:
20@da_method()
21class Climatology:
22    """A baseline/reference method.
23
24    Note that the "climatology" is computed from truth, which might be
25    (unfairly) advantageous if the simulation is too short (vs mixing time).
26    """
27
28    def assimilate(self, HMM, xx, yy):
29        muC = np.mean(xx, 0)
30        AC = xx - muC
31        PC = CovMat(AC, "A")
32
33        self.stats.assess(0, mu=muC, Cov=PC)
34        self.stats.trHK[:] = 0
35
36        for k, ko, _, _ in progbar(HMM.tseq.ticker):
37            fau = "u" if ko is None else "fau"
38            self.stats.assess(k, ko, fau, mu=muC, Cov=PC)

A baseline/reference method.

Note that the "climatology" is computed from truth, which might be (unfairly) advantageous if the simulation is too short (vs mixing time).

def assimilate(self, HMM, xx, yy):
28    def assimilate(self, HMM, xx, yy):
29        muC = np.mean(xx, 0)
30        AC = xx - muC
31        PC = CovMat(AC, "A")
32
33        self.stats.assess(0, mu=muC, Cov=PC)
34        self.stats.trHK[:] = 0
35
36        for k, ko, _, _ in progbar(HMM.tseq.ticker):
37            fau = "u" if ko is None else "fau"
38            self.stats.assess(k, ko, fau, mu=muC, Cov=PC)
def stat(self, name, value):
138        def stat(self, name, value):
139            dapper.stats.register_stat(self.stats, name, value)
da_method = 'Climatology'
@da_method()
class OptInterp:
41@da_method()
42class OptInterp:
43    """Optimal Interpolation -- a baseline/reference method.
44
45    Uses the Kalman filter equations,
46    but with a prior from the Climatology.
47    """
48
49    def assimilate(self, HMM, xx, yy):
50        Id = np.eye(HMM.Nx)
51
52        # Compute "climatological" Kalman gain
53        muC = np.mean(xx, 0)
54        AC = xx - muC
55        PC = (AC.T @ AC) / (xx.shape[0] - 1)
56
57        # Setup scalar "time-series" covariance dynamics.
58        # ONLY USED FOR DIAGNOSTICS, not to affect the Kalman gain.
59        L = series.estimate_corr_length(AC.ravel(order="F"))
60        SM = fit_sigmoid(1 / 2, L, 0)
61
62        # Init
63        mu = muC
64        self.stats.assess(0, mu=mu, Cov=PC)
65
66        for k, ko, t, dt in progbar(HMM.tseq.ticker):
67            # Forecast
68            mu = HMM.Dyn(mu, t - dt, dt)
69            if ko is not None:
70                self.stats.assess(k, ko, "f", mu=muC, Cov=PC)
71
72                # Analysis
73                H = HMM.Obs(ko).linear(muC)
74                KG = mrdiv(PC @ H.T, H @ PC @ H.T + HMM.Obs(ko).noise.C.full)
75                mu = muC + KG @ (yy[ko] - HMM.Obs(ko)(muC))
76
77                P = (Id - KG @ H) @ PC
78                SM = fit_sigmoid(P.trace() / PC.trace(), L, k)
79
80            self.stats.assess(k, ko, mu=mu, Cov=2 * PC * SM(k))

Optimal Interpolation -- a baseline/reference method.

Uses the Kalman filter equations, but with a prior from the Climatology.

def assimilate(self, HMM, xx, yy):
49    def assimilate(self, HMM, xx, yy):
50        Id = np.eye(HMM.Nx)
51
52        # Compute "climatological" Kalman gain
53        muC = np.mean(xx, 0)
54        AC = xx - muC
55        PC = (AC.T @ AC) / (xx.shape[0] - 1)
56
57        # Setup scalar "time-series" covariance dynamics.
58        # ONLY USED FOR DIAGNOSTICS, not to affect the Kalman gain.
59        L = series.estimate_corr_length(AC.ravel(order="F"))
60        SM = fit_sigmoid(1 / 2, L, 0)
61
62        # Init
63        mu = muC
64        self.stats.assess(0, mu=mu, Cov=PC)
65
66        for k, ko, t, dt in progbar(HMM.tseq.ticker):
67            # Forecast
68            mu = HMM.Dyn(mu, t - dt, dt)
69            if ko is not None:
70                self.stats.assess(k, ko, "f", mu=muC, Cov=PC)
71
72                # Analysis
73                H = HMM.Obs(ko).linear(muC)
74                KG = mrdiv(PC @ H.T, H @ PC @ H.T + HMM.Obs(ko).noise.C.full)
75                mu = muC + KG @ (yy[ko] - HMM.Obs(ko)(muC))
76
77                P = (Id - KG @ H) @ PC
78                SM = fit_sigmoid(P.trace() / PC.trace(), L, k)
79
80            self.stats.assess(k, ko, mu=mu, Cov=2 * PC * SM(k))
def stat(self, name, value):
138        def stat(self, name, value):
139            dapper.stats.register_stat(self.stats, name, value)
da_method = 'OptInterp'
@da_method()
class Var3D:
 83@da_method()
 84class Var3D:
 85    """3D-Var -- a baseline/reference method.
 86
 87    This implementation is not "Var"-ish: there is no *iterative* optimzt.
 88    Instead, it does the full analysis update in one step: the Kalman filter,
 89    with the background covariance being user specified, through B and xB.
 90    """
 91
 92    B: Optional[np.ndarray] = None
 93    xB: float = 1.0
 94
 95    def assimilate(self, HMM, xx, yy):
 96        Id = np.eye(HMM.Nx)
 97        if isinstance(self.B, np.ndarray):
 98            # compare ndarray 1st to avoid == error for ndarray
 99            B = self.B.astype(float)
100        elif self.B in (None, "clim"):
101            # Use climatological cov, estimated from truth
102            B = np.cov(xx.T)
103        elif self.B == "eye":
104            B = Id
105        else:
106            raise ValueError("Bad input B.")
107        B *= self.xB
108
109        # ONLY USED FOR DIAGNOSTICS, not to change the Kalman gain.
110        CC = 2 * np.cov(xx.T)
111        L = series.estimate_corr_length(center(xx)[0].ravel(order="F"))
112        P = HMM.X0.C.full
113        SM = fit_sigmoid(P.trace() / CC.trace(), L, 0)
114
115        # Init
116        mu = HMM.X0.mu
117        self.stats.assess(0, mu=mu, Cov=P)
118
119        for k, ko, t, dt in progbar(HMM.tseq.ticker):
120            # Forecast
121            mu = HMM.Dyn(mu, t - dt, dt)
122            P = CC * SM(k)
123
124            if ko is not None:
125                self.stats.assess(k, ko, "f", mu=mu, Cov=P)
126
127                # Analysis
128                H = HMM.Obs(ko).linear(mu)
129                KG = mrdiv(B @ H.T, H @ B @ H.T + HMM.Obs(ko).noise.C.full)
130                mu = mu + KG @ (yy[ko] - HMM.Obs(ko)(mu))
131
132                # Re-calibrate fit_sigmoid with new W0 = Pa/B
133                P = (Id - KG @ H) @ B
134                SM = fit_sigmoid(P.trace() / CC.trace(), L, k)
135
136            self.stats.assess(k, ko, mu=mu, Cov=P)

3D-Var -- a baseline/reference method.

This implementation is not "Var"-ish: there is no iterative optimzt. Instead, it does the full analysis update in one step: the Kalman filter, with the background covariance being user specified, through B and xB.

Var3D(B: Optional[numpy.ndarray] = None, xB: float = 1.0)
B: Optional[numpy.ndarray] = None
xB: float = 1.0
def assimilate(self, HMM, xx, yy):
 95    def assimilate(self, HMM, xx, yy):
 96        Id = np.eye(HMM.Nx)
 97        if isinstance(self.B, np.ndarray):
 98            # compare ndarray 1st to avoid == error for ndarray
 99            B = self.B.astype(float)
100        elif self.B in (None, "clim"):
101            # Use climatological cov, estimated from truth
102            B = np.cov(xx.T)
103        elif self.B == "eye":
104            B = Id
105        else:
106            raise ValueError("Bad input B.")
107        B *= self.xB
108
109        # ONLY USED FOR DIAGNOSTICS, not to change the Kalman gain.
110        CC = 2 * np.cov(xx.T)
111        L = series.estimate_corr_length(center(xx)[0].ravel(order="F"))
112        P = HMM.X0.C.full
113        SM = fit_sigmoid(P.trace() / CC.trace(), L, 0)
114
115        # Init
116        mu = HMM.X0.mu
117        self.stats.assess(0, mu=mu, Cov=P)
118
119        for k, ko, t, dt in progbar(HMM.tseq.ticker):
120            # Forecast
121            mu = HMM.Dyn(mu, t - dt, dt)
122            P = CC * SM(k)
123
124            if ko is not None:
125                self.stats.assess(k, ko, "f", mu=mu, Cov=P)
126
127                # Analysis
128                H = HMM.Obs(ko).linear(mu)
129                KG = mrdiv(B @ H.T, H @ B @ H.T + HMM.Obs(ko).noise.C.full)
130                mu = mu + KG @ (yy[ko] - HMM.Obs(ko)(mu))
131
132                # Re-calibrate fit_sigmoid with new W0 = Pa/B
133                P = (Id - KG @ H) @ B
134                SM = fit_sigmoid(P.trace() / CC.trace(), L, k)
135
136            self.stats.assess(k, ko, mu=mu, Cov=P)
def stat(self, name, value):
138        def stat(self, name, value):
139            dapper.stats.register_stat(self.stats, name, value)
da_method = 'Var3D'
def fit_sigmoid(Sb, L, kb):
139def fit_sigmoid(Sb, L, kb):
140    """Return a sigmoid [function S(k)] for approximating error dynamics.
141
142    We use the logistic function for the sigmoid; it's the solution of the
143    "population growth" ODE: `dS/dt = a*S*(1-S/S(∞))`.
144    NB: It might be better to use the "error growth ODE" of Lorenz/Dalcher/Kalnay,
145    but this has a significantly more complicated closed-form solution,
146    and reduces to the above ODE when there's no model error (ODE source term).
147
148    The "normalized" sigmoid, `S1`, is symmetric around 0, and `S1(-∞)=0` and `S1(∞)=1`.
149
150    The sigmoid `S(k) = S1(a*(k-kb) + b)` is fitted (see docs/snippets/sigmoid.jpg) with
151
152    - `a` corresponding to a given corr. length `L`.
153    - `b` to match values of `S(kb)` and `Sb`
154    """
155
156    def sigmoid(k):
157        return 1 / (1 + np.exp(-k))  # normalized sigmoid
158
159    def inv_sig(s):
160        return np.log(s / (1 - s))  # its inverse
161
162    a = 1 / L
163    b = inv_sig(Sb)
164
165    def S(k):
166        return sigmoid(b + a * (k - kb))
167
168    return S

Return a sigmoid [function S(k)] for approximating error dynamics.

We use the logistic function for the sigmoid; it's the solution of the "population growth" ODE: dS/dt = a*S*(1-S/S(∞)). NB: It might be better to use the "error growth ODE" of Lorenz/Dalcher/Kalnay, but this has a significantly more complicated closed-form solution, and reduces to the above ODE when there's no model error (ODE source term).

The "normalized" sigmoid, S1, is symmetric around 0, and S1(-∞)=0 and S1(∞)=1.

The sigmoid S(k) = S1(a*(k-kb) + b) is fitted (see docs/snippets/sigmoid.jpg) with

  • a corresponding to a given corr. length L.
  • b to match values of S(kb) and Sb
@da_method()
class Persistence:
171@da_method()
172class Persistence:
173    """Sets estimate to the **true state** at the previous time index.
174
175    The analysis (`.a`) stat uses the previous obs. time.
176    The forecast and universal (`.f` and `.u`) stats use previous integration time
177    index.
178    """
179
180    def assimilate(self, HMM, xx, yy):
181        prev = xx[0]
182        self.stats.assess(0, mu=prev)
183        for k, ko, _t, _dt in progbar(HMM.tseq.ticker):
184            self.stats.assess(k, ko, "fu", mu=xx[k - 1])
185            if ko is not None:
186                self.stats.assess(k, ko, "a", mu=prev)
187                prev = xx[k]

Sets estimate to the true state at the previous time index.

The analysis (.a) stat uses the previous obs. time. The forecast and universal (.f and .u) stats use previous integration time index.

def assimilate(self, HMM, xx, yy):
180    def assimilate(self, HMM, xx, yy):
181        prev = xx[0]
182        self.stats.assess(0, mu=prev)
183        for k, ko, _t, _dt in progbar(HMM.tseq.ticker):
184            self.stats.assess(k, ko, "fu", mu=xx[k - 1])
185            if ko is not None:
186                self.stats.assess(k, ko, "a", mu=prev)
187                prev = xx[k]
def stat(self, name, value):
138        def stat(self, name, value):
139            dapper.stats.register_stat(self.stats, name, value)
da_method = 'Persistence'
@da_method()
class PreProg:
190@da_method()
191class PreProg:
192    """Simply look-up the estimates in user-specified function (`schedule`).
193
194    For example, with `schedule` given by `lambda k, xx, yy: xx[k]`
195    the error (`err.rms, err.ma, ...`) should be 0.
196    """
197
198    schedule: Callable
199    tag: str = None
200
201    def assimilate(self, HMM, xx, yy):
202        self.stats.assess(0, mu=self.schedule(0, xx, yy))
203        for k, ko, _t, _dt in progbar(HMM.tseq.ticker):
204            self.stats.assess(k, ko, "fu", mu=self.schedule(k, xx, yy))
205            if ko is not None:
206                self.stats.assess(k, ko, "a", mu=self.schedule(k, xx, yy))

Simply look-up the estimates in user-specified function (schedule).

For example, with schedule given by lambda k, xx, yy: xx[k] the error (err.rms, err.ma, ...) should be 0.

PreProg(schedule: Callable, tag: str = None)
schedule: Callable
tag: str = None
def assimilate(self, HMM, xx, yy):
201    def assimilate(self, HMM, xx, yy):
202        self.stats.assess(0, mu=self.schedule(0, xx, yy))
203        for k, ko, _t, _dt in progbar(HMM.tseq.ticker):
204            self.stats.assess(k, ko, "fu", mu=self.schedule(k, xx, yy))
205            if ko is not None:
206                self.stats.assess(k, ko, "a", mu=self.schedule(k, xx, yy))
def stat(self, name, value):
138        def stat(self, name, value):
139            dapper.stats.register_stat(self.stats, name, value)
da_method = 'PreProg'