Context
Suppose there are two jointly distributed (i.e, not independent) random variables $X$ and $Y$, for which we have corresponding vectors of observations $\boldsymbol{x}$ and $\boldsymbol{y}$, recorded at epoch timestamps $\boldsymbol{t}$ seconds. We wish to measure a weighted sample covariance between these observations, with weights derived from observation times, where the most recent observations have the largest weights. For performance reasons, our algorithm should only utilise a "single pass", meaning we iterate through the vectors $\boldsymbol{x}$, $\boldsymbol{y}$ and $\boldsymbol{t}$ only once.
Definitions
The unweighted sample covariance of these observations is defined as: $$\text{Cov}(\boldsymbol{x}, \boldsymbol{y}) \> = \> \frac{\sum_{i=1}^N (x_i - x^*)(y_i - y^*)}{N-1} \> \in \> (-\infty,+\infty )$$ where $N$ is the number of observations, $x^*$ is the mean of $\boldsymbol{x}$ and $y^*$ is the mean of $\boldsymbol{y}$. This quantifies the relationship between the variances of $\boldsymbol{x}$ and $\boldsymbol{y}$:
- a positive covariance indicates that $X$ and $Y$ tend to move together,
- a negative covariance indicates that they move in opposite directions, and
- zero covariance suggests that $X$ and $Y$ are actually independent.
To implement "exponential decay" using the non-decreasing observation times $\boldsymbol{t}$, given a half life of $T$ seconds, we define observation weights: $$w_i \> = \> 2^{-\frac{t_N - t_i}{T}}$$ for $i \in [1, N]$, such that the final weight $w_N = 1$, an intermediate weight $w_i = \frac{1}{2}$ when $t_N - t_i = T$ (hence the name "half life") and $w_i \to 0$ as $t_i$ decreases. These weights can then used to compute weighted means of $\boldsymbol{x}$ and $\boldsymbol{y}$: $$\mu = \frac{\sum_{i=1}^N w_i x_i}{\sum_{i=1}^N w_i} \>\> \text{ and } \>\> \nu = \frac{\sum_{i=1}^N w_i y_i}{\sum_{i=1}^N w_i}.$$ Using these, we can define the biased, weighted covariance of $\boldsymbol{x}$ and $\boldsymbol{y}$ as: $$\rho^2 \> = \> \frac{\sum_{i=1}^N w_i (x_i - \mu)(y_i - \nu)}{\sum_{i=1}^N w_i},$$ and the unbiased, weighted covariance as: $$\phi^2 \> = \> \frac{\sum_{i=1}^N w_i}{\left( \sum_{i=1}^N w_i \right)^2 - \sum_{i=1}^N w_i^2} \sum_{i=1}^N w_i (x_i - \mu)(y_i - \nu).$$ To help convince yourself of this definition, try substituting uniform weights $w_i = 1$. The above expression will reduce to the unweighted sample covariance defined at the beginning of this section.
Derivation
In order to implement a single-pass algorithm, we need to derive recursive formulas for the weighted means and covariance. Let $W_n \> = \> \sum_{i=1}^n w_i$ be the sum of the first $n$ weights in $\boldsymbol{w}$. Then the weighted mean of the first $n$ observations of $X$ is $$\begin{aligned} \mu_n \> &= \> \frac{1}{W_n} \sum_{i=1}^n w_i x_i \\ &= \> \frac{1}{W_n} \left( \sum_{i=1}^{n-1} w_i x_i + w_n x_n \right) \\ &= \> \frac{1}{W_n} \left( \frac{W_{n-1}}{W_{n-1}} \sum_{i=1}^{n-1} w_i x_i + w_n x_n \right) \\ &= \> \frac{W_{n-1}}{W_n} \left( \frac{1}{W_{n-1}} \sum_{i=1}^{n-1} w_i x_i \right) + \frac{w_n}{W_n} x_n \\ &= \> \frac{W_{n-1}}{W_n} \mu_{n-1} + \frac{w_n}{W_n} x_n \\ &= \> \left( 1 - \frac{w_n}{W_n} \right) \mu_{n-1} + \frac{w_n}{W_n} x_n, \end{aligned}$$ thus we have a formula for the $n^{\text{th}}$ weighted mean as a linear combination of the $(n-1)^{\text{th}}$ weighted mean and the $n^{\text{th}}$ observation.
Next, we look at deriving Welford's method for computing weighted variance (a specific case of weighted covariance) in a single pass. We define the "sum of weighted squared deviations" of the first $n$ observations of $X$ as $$\beta_n \> = \> \sum_{i=1}^n w_i (x_i - \mu_n)^2$$ such that the biased, weighted variance $\sigma_n^2 = \frac{\beta_n}{W_n}$. Consider the difference between consecutive sums: $$\begin{aligned} \beta_n - \beta_{n-1} \> &= \> \sum_{i=1}^n w_i (x_i - \mu_n)^2 - \sum_{i=1}^{n-1} w_i (x_i - \mu_{n-1})^2 \\ &= \> w_n(x_n - \mu_n)^2 + \sum_{i=1}^{n-1} w_i \left( (x_i - \mu_n)^2 - (x_i - \mu_{n-1})^2 \right) \\ &= \> w_n(x_n - \mu_n)^2 + \sum_{i=1}^{n-1} w_i \left( (x_i - \mu_n - x_i + \mu_{n-1}) (x_i - \mu_n + x_i - \mu_{n-1}) \right) \\ &= \> w_n(x_n - \mu_n)^2 + \sum_{i=1}^{n-1} w_i \left( (\mu_{n-1} - \mu_n) (x_i - \mu_n + x_i - \mu_{n-1}) \right) \\ &= \> w_n(x_n - \mu_n)^2 + (\mu_{n-1} - \mu_n) \sum_{i=1}^{n-1} w_i \left( (x_i - \mu_n) + (x_i - \mu_{n-1}) \right). \end{aligned}$$ To continue simplifying this expression, we use the fact that the weighted sum of deviations from the mean is zero: $$ \sum_{i=1}^k w_i(x_i - \mu_k) \> = \> \sum_{i=1}^k w_i x_i - \mu_k \sum_{i=1}^k w_i \> = \> \sum_{i=1}^k w_i x_i - \left( \frac{\sum_{i=1}^k w_i x_i}{\sum_{i=1}^k w_i} \right) \sum_{i=1}^k w_i \> = \> \sum_{i=1}^k w_i x_i - \sum_{i=1}^k w_i x_i \> = \> 0. $$ Thus, we have that $$\begin{aligned} \beta_n - \beta_{n-1} \> &= \> w_n(x_n - \mu_n)^2 + (\mu_{n-1} - \mu_n) \sum_{i=1}^{n-1} w_i (x_i - \mu_n) \\ &= \> w_n(x_n - \mu_n)^2 + (\mu_{n-1} - \mu_n) \left( -w_n(x_n - \mu_n) \right) \\ &= \> w_n(x_n - \mu_n)(x_n - \mu_n - \mu_{n-1} + \mu_n) \\ &= \> w_n(x_n - \mu_n)(x_n - \mu_{n-1}). \end{aligned}$$ Therefore $$\beta_n \> = \> \beta_{n-1} + w_n(x_n - \mu_n)(x_n - \mu_{n-1})$$ and $$\begin{aligned} \sigma_n^2 \> &= \> \frac{\beta_n}{W_n} \\ &= \> \frac{1}{W_n} \left( \beta_{n-1} + w_n(x_n - \mu_n)(x_n - \mu_{n-1}) \right) \\ &= \> \frac{1}{W_n} \left( W_{n-1}\sigma_{n-1}^2 + w_n(x_n - \mu_n)(x_n - \mu_{n-1}) \right) \\ &= \> \frac{W_{n-1}}{W_n}\sigma_{n-1}^2 + \frac{w_n}{W_n}(x_n - \mu_n)(x_n - \mu_{n-1}) \\ &= \> \left( 1 - \frac{w_n}{W_n} \right)\sigma_{n-1}^2 + \frac{w_n}{W_n}(x_n - \mu_n)(x_n - \mu_{n-1}), \end{aligned}$$ which is Welford's method: a formula for the $n^{\text{th}}$ weighted variance with respect to the $(n-1)^{\text{th}}$ weighted variance and the $n^{\text{th}}$ observation (as well as the $(n-1)^{\text{th}}$ and $n^{\text{th}}$ weighted means). Now consider the biased, weighted covariance of the first $n$ observations of $X$ and $Y$, given weighted means $\mu_n$ and $\nu_n$ respectively: $$\rho_n^2 \> = \> \frac{\sum_{i=1}^n w_i(x_i - \mu_n)(y_i - \nu_n)}{W_n}.$$ We can generalise Welford's method for the variance $\sigma_n^2$ to the covariance $\rho_n^2$. Skipping the proof of this generalisation, we have that $\rho_n^2 = \frac{\gamma_n}{W_n}$ where $$\begin{aligned} \gamma_n \> &= \> \sum_{i=1}^n w_i (x_i - \mu_n)(y_i - \nu_n) \\ &= \> \gamma_{n-1} + w_n(x_n - \mu_n)(y_n - \nu_{n-1}) \\ &= \> \gamma_{n-1} + w_n(x_n - \mu_{n-1})(y_n - \nu_n). \end{aligned}$$ There appears to be an asymmetry between the two possible recursive formulas above, however it can be shown that these two formulas are equivalent (i.e., we can choose to use the current mean for either the $X$ or $Y$ term, and the previous mean for the other). Then $$\begin{aligned} \rho_n^2 \> &= \> \frac{\gamma_n}{W_n} \\ &= \> \frac{\gamma_{n-1}}{W_n} + \frac{w_n}{W_n} (x_n - \mu_{n-1})(y_n - \nu_n) \\ &= \> \frac{W_{n-1}}{W_n} \rho_{n-1}^2 + \frac{w_n}{W_n} (x_n - \mu_{n-1})(y_n - \nu_n) \\ &= \> \left( 1 - \frac{w_n}{W_n} \right) \rho_{n-1}^2 + \frac{w_n}{W_n} (x_n - \mu_{n-1})(y_n - \nu_n), \end{aligned}$$ which is Welford's formula for the $n^{\text{th}}$ weighted covariance. Finally, to compute the unbiased, weighted covariance $$\phi_n^2 \> = \> \frac{W_n}{W_n^2 - \sum_{i=1}^n w_i^2} \sum_{i=1}^n w_i(x_i - \mu_n)(y_i - \nu_n),$$ we can simply multiply the biased, weighted covariance by a correction factor: $$\phi_n^2 \> = \> \frac{W_n^2}{W_n^2 - \sum_{i=1}^n w_i^2} \rho_n^2,$$ which can simply be applied to the final covariance result before our algorithm terminates.
Implementation
Before implementing our single-pass algorithm, we will write a simple, naive function to compute the exponentially decayed covariance, such that we can compare the output of our two implementations for correctness. The naive approach uses basic Numpy array routines to calculate the covariance by the formula defined at the bottom of the "Definitions" section of this article.
import numpy as np
import numpy.typing as npt
def naive_exp_covariance(
x: npt.NDArray,
y: npt.NDArray,
t: npt.NDArray,
half_life: int
) -> float:
assert len(x) == len(y) == len(t), "Input arrays must have the same length"
assert half_life > 0, "Half-life must be a positive integer (seconds)"
assert np.all(np.diff(t) >= 0), "Observation timestamps t must be non-decreasing"
x_clean = x[(~np.isnan(x)) & (~np.isnan(y))]
y_clean = y[(~np.isnan(x)) & (~np.isnan(y))]
t_clean = t[(~np.isnan(x)) & (~np.isnan(y))]
x, y, t = x_clean, y_clean, t_clean
w = np.exp(-np.log(2) * (t[-1] - t) / half_life)
w_sum = np.sum(w)
w_sum_2 = np.sum(w ** 2)
x_mean = np.sum(w * x) / w_sum
y_mean = np.sum(w * y) / w_sum
return (w_sum / (w_sum ** 2 - w_sum_2)) * np.sum(w * (x - x_mean) * (y - y_mean))
To achieve the best performance, we will implement our single-pass algorithm using
Numba, a Python library that translates
decorated functions into optimised machine code at runtime, allowing us to write our function
in friendly Python syntax while still achieving C-like performance. Numba supports a subset of
inbuilt Python & Numpy functionality, but is more than sufficient for our purposes here. We use
the @njit
decorator to indicate that our function should be compiled using Numba's
JIT compiler, and we use Numba's provided types to specify the input and output types of our
function. To call this function from our Python code, we simply provide Numpy arrays (with a
matching dtype
) as input and receive a float as output. Our single-pass algorithm
receives just four inputs:
x: types.float64[:]
: a Numpy array of floating point observations of $X$;y: types.float64[:]
: a Numpy array of floating point observations of $Y$;t: types.int64[:]
: a Numpy array of integer, epoch timestamps corresponding to the observations; andhalf_life: int
: the exponential decay half life for observation weight calculation in seconds.
This algorithm will loop once through both arrays of observations. For each pair of non-NaN entries, we first compute the weight using the exponential decay formula, then update the linear and square weight sums and the weighted means of $X$ and $Y$ (using the recursive formulas derived at the beginning of the "Derivation" section above). Finally, we update the biased, weighted covariance using its recursive formula. The final result is then corrected to return the unbiased, weighted covariance as a floating point number.
import numpy as np
from numba import njit
from numba.core import types
@njit
def exp_covariance(
x: types.float64[:],
y: types.float64[:],
t: types.int64[:],
half_life: int
) -> float:
"""
A single-pass, exponentially-decayed, unbiased covariance calculation, given
vectors of observations for random variables X and Y along with a vector of
observation timestamps t (seconds), and a half-life in seconds for the
exponential decay.
"""
assert len(x) == len(y) == len(t), "Input arrays must have the same length"
assert half_life > 0, "Half-life must be a positive integer (seconds)"
w_sum = 0
w_sum_2 = 0
x_mean = 0
y_mean = 0
covariance = 0
for i in range(len(x)):
if not np.isnan(x[i]) and not np.isnan(y[i]):
w = np.exp(-np.log(2) * (t[-1] - t[i]) / half_life)
w_sum += w
w_sum_2 += w ** 2
x_mean_old = x_mean
x_mean = (1 - w / w_sum) * x_mean + (w / w_sum) * x[i]
y_mean = (1 - w / w_sum) * y_mean + (w / w_sum) * y[i]
covariance = (1 - w / w_sum) * covariance + (w / w_sum) * (x[i] - x_mean_old) * (y[i] - y_mean)
return (w_sum ** 2 / (w_sum ** 2 - w_sum_2)) * covariance
Performance
To assess the accuracy of both our naive and single-pass algorithms, we can write a simple test case using Pytest, which is Python's most popular testing framework. For this test case, we include some NaN values to ensure that both algorithms handle missing data correctly. After manually calculating the weighted covariance of the test data, we can assert that the output of both algorithms is within a small tolerance of the expected result.
import numpy as np
import pytest
from covariance import exp_covariance, naive_exp_covariance
@pytest.mark.parametrize(
"x,y,t,half_life,expected_cov",
[
(
np.array([np.nan, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]),
np.array([0.0, np.nan, -1.0, -2.0, 0.0, 1.0, 2.0, 0.0]),
np.array([0, 60, 120, 180, 240, 300, 360, 420]),
120,
# x = [2.0, 3.0, 4.0, 5.0, 6.0, 7.0]
# y = [-1.0, -2.0, 0.0, 1.0, 2.0, 0.0]
# t = [120, 180, 240, 300, 360, 420]
# w = [0.177, 0.25, 0.354, 0.5, 0.707, 1.0]
# w_sum = 2.988
# w_sum_2 = 1.969
# x_mean = 5.442,
# y_mean = 0.414
1.029
)
]
)
def test_exp_covariance(x, y, t, half_life, expected_cov):
naive_cov = naive_exp_covariance(x, y, t, half_life)
fast_cov = exp_covariance(x, y, t, half_life)
assert abs(naive_cov - expected_cov) < 1e-3
assert abs(fast_cov - expected_cov) < 1e-3
If this test case is saved in a file named test_covariance.py
, then the test can be run
with the command pytest test_covariance.py
, which will raise an error if the output of
either algorithm deviates from the expected result by more than $10^{-3}$. If you try running this
test yourself, you will find that it passes - hence both algorithms are accurate.
To compare the runtime of the single-pass algorithm against the naive approach, we can generate a large set of fake observations and timestamps.
import numpy as np
SIZE = 100_000_000
x = np.random.normal(loc=0.0, scale=1.0, size=SIZE)
y = x + np.random.uniform(low=0.0, high=1.0, size=SIZE)
t = np.arange(SIZE)
half_life = SIZE / 10
Then, using the IPython magic function %timeit
, we can record approximate runtimes for
both algorithms on this input of 100 million observations, which I found to be:
- 2.83s $\pm$ 26.8ms for the naive algorithm; and
- 1.21s $\pm$ 5.45ms for the single-pass algorithm.
We can therefore conclude that the single-pass algorithm runs in just 40% of the naive algorithm's runtime, which is a significant performance improvement!
References
- The GSL Team. “Statistics — GSL 2.8 Documentation.” GNU Scientific Library, 2024, www.gnu.org/software/gsl/doc/html/statistics.html#weighted-samples.
- Willmert, Justin. "Notes on Calculating Online Statistics." Justinwillmert.com, 12 Dec. 2022, justinwillmert.com/articles/2022/notes-on-calculating-online-statistics/.
If you have any corrections or questions - please send me an email and we can have a discussion. Any edits or additions to the original article & code will be noted here.