Source code for pyinterp.statistics.streaming_histogram

# Copyright (c) 2022 CNES
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
from typing import Any, Iterable, Optional, Union
import dask.array as da
import numpy as np
from .. import core


def _delayed(
    attr: str,
    values: da.Array,
    weights: Optional[da.Array] = None,
    axis: Optional[Iterable[int]] = None,
    bin_count: Optional[int] = None,
) -> Union[core.StreamingHistogramFloat64, core.StreamingHistogramFloat32]:
    """Calculate the descriptive statistics of a dask array."""
    if weights is not None and values.shape != weights.shape:
        raise ValueError("values and weights must have the same shape")

    def _process_block(attr, x, w, axis, bin_count):
        instance = getattr(core, attr)(values=x,
                                       weights=w,
                                       axis=axis,
                                       bin_count=bin_count)
        return np.array([instance], dtype="object")

    drop_axis = list(range(values.ndim))[1:]

    return da.map_blocks(_process_block,
                         attr,
                         values,
                         weights,
                         axis,
                         bin_count,
                         drop_axis=drop_axis,
                         dtype="object").sum().compute()  # type: ignore


[docs]class StreamingHistogram: """Streaming histogram. The bins in the histogram have no predefined size, so that as values are pushed into the histogram, bins are added and merged as soon as their numbers exceed the maximum allowed capacity. A particularly interesting feature of streaming histograms is that they can be used to approximate quantiles without sorting (or even storing) values individually. The histograms can be constructed independently and merged, making them usable with Dask. .. seealso:: Yael Ben-Haim and Elad Tom-Tov, A Streaming Parallel Decision Tree Algorithm, Journal of Machine Learning Research, 11, 28, 849-872, http://jmlr.org/papers/v11/ben-haim10a.html .. note:: If you do not want to estimate the quantiles of the dataset, use the class :py:class:`DescriptiveStatistics <pyinterp.DescriptiveStatistics>` which will give you more accurate results. """
[docs] def __init__(self, values: Union[da.Array, np.ndarray], weights: Optional[Union[da.Array, np.ndarray]] = None, axis: Optional[Union[int, Iterable[int]]] = None, bin_count: Optional[int] = None, dtype: Optional[np.dtype] = None) -> None: """Initializes a new histogram. Args: values (numpy.ndarray, dask.Array): Array containing numbers whose statistics are desired. .. note:: NaNs are automatically ignored. weights (numpy.ndarray, dask.Array, optional): An array of weights associated with the values. If not provided, all values are assumed to have equal weight. axis (int, iterable, optional): Axis or axes along which to compute the statistics. If not provided, the statistics are computed over the flattened array. bin_count (int, optional): The maximum number of bins to use in the histogram. If the number of bins exceeds the number of values, the histogram will be trimmed. Default is ``None``, which will set the number of bins to 100. dtype (numpy.dtype, optional): Data type of the returned array. By default, the data type is numpy.float64. """ if isinstance(axis, int): axis = (axis, ) dtype = dtype or np.dtype("float64") if dtype == np.dtype("float64"): attr = f"StreamingHistogramFloat64" elif dtype == np.dtype("float32"): attr = f"StreamingHistogramFloat32" else: raise ValueError(f"dtype {dtype} not handled by the object") if isinstance(values, da.Array) or isinstance(weights, da.Array): self._instance = _delayed( attr, da.asarray(values), weights=da.asarray(weights) if weights is not None else None, axis=axis, bin_count=bin_count) else: self._instance: Union[core.StreamingHistogramFloat64, core.StreamingHistogramFloat32] = getattr( core, attr)(values, weights=weights, axis=axis, bin_count=bin_count)
[docs] def __iadd__(self, other: Any) -> "StreamingHistogram": """Adds a new histogram to the current one. Args: other (object): The histogram to add to the current one. Returns: StreamingHistogram: Returns itself. """ if isinstance(other, StreamingHistogram): if type(self._instance) != type(other._instance): raise TypeError("StreamingHistogram types must match") self._instance += other._instance # type: ignore else: raise TypeError("unsupported operand type(s) for +=" f": '{type(self)}' and '{type(other)}'") return self
[docs] def bins(self) -> np.ndarray: """Returns the histogram bins. Returns: numpy.ndarray: The histogram bins. """ return self._instance.bins()
[docs] def size(self) -> np.ndarray: """Returns the number of bins allocated to calculate the histogram. If :py:meth:`size() <pyinterp.StreamingHistogram.size>` is equal to :py:meth:`count() <pyinterp.StreamingHistogram.count>` then the histogram used to calculate the statistics is un-compressed. Otherwise, the histogram is compressed, which means that the calculated statistical quantities are an approximation of the statistical variables. Returns: numpy.ndarray: Returns number of bins allocated to calculate the histogram. """ return self._instance.size()
[docs] def count(self) -> np.ndarray: """Returns the count of samples. Returns: numpy.ndarray: Returns the count of samples. """ return self._instance.count()
[docs] def kurtosis(self) -> np.ndarray: """Returns the kurtosis of samples. Returns: numpy.ndarray: Returns the kurtosis of samples. """ return self._instance.kurtosis()
[docs] def max(self) -> np.ndarray: """Returns the maximum of samples. Returns: numpy.ndarray: Returns the maximum of samples. """ return self._instance.max()
[docs] def mean(self) -> np.ndarray: """Returns the mean of samples. Returns: numpy.ndarray: Returns the mean of samples. """ return self._instance.mean()
[docs] def min(self) -> np.ndarray: """Returns the minimum of samples. Returns: numpy.ndarray: Returns the minimum of samples. """ return self._instance.min()
[docs] def skewness(self) -> np.ndarray: """Returns the skewness of samples. Returns: numpy.ndarray: Returns the skewness of samples. """ return self._instance.skewness()
[docs] def sum_of_weights(self) -> np.ndarray: """Returns the sum of weights. Returns: numpy.ndarray: Returns the sum of weights. """ return self._instance.sum_of_weights()
[docs] def var(self) -> np.ndarray: """Returns the variance of samples. Returns: numpy.ndarray: Returns the variance of samples. """ return self._instance.variance()
[docs] def std(self) -> np.ndarray: """Returns the standard deviation of samples. Returns: numpy.ndarray: Returns the standard deviation of samples. """ return np.sqrt(self.var())
[docs] def quantile(self, q: float = 0.5) -> np.ndarray: """Returns the q quantile of samples. Args: q (float): Quantile to compute. Default is ``0.5`` (median). Returns: numpy.ndarray: Returns the q quantile of samples. """ return self._instance.quantile(q)