Source code for dataiter.dt

# -*- coding: utf-8 -*-

# Copyright (c) 2022 Osmo Salomaa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import datetime
import numpy as np

from dataiter import util
from dataiter import Vector

[docs] def day(x): """ Extract day of the month from datetime `x`. >>> x = dt.new(["2022-10-15"]) >>> dt.day(x) """ return _pull_int(x, lambda y: y.day)
[docs] def from_string(x, format): """ Initialize a datetime scalar or vector from `x`. `format` uses Python ``strptime`` format codes: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes >>> x = di.Vector(["15.10.2022"]) >>> dt.from_string(x, "%d.%m.%Y") """ if util.is_scalar(x): x = np.array([x], str) return from_string(x, format)[0] x = util.sequencify(x) assert isinstance(x, np.ndarray) assert np.issubdtype(x.dtype, np.unicode_) out = np.full_like(x, None, object) out = Vector.fast(out, object) na = x == "" f = np.vectorize(lambda x: datetime.datetime.strptime(x, format)) out[~na] = f(x[~na].astype(object)) out = out.as_datetime() if len(out[~na]) > 0: if (hour(out[~na]) == 0).all(): if (minute(out[~na]) == 0).all(): if (second(out[~na]) == 0).all(): out = out.as_date() return out
[docs] def hour(x): """ Extract hour from datetime `x`. >>> x = dt.new(["2022-10-15T12:34:56"]) >>> dt.hour(x) """ return _pull_int(x, lambda y: y.hour)
[docs] def isoweek(x): """ Extract ISO 8601 week from datetime `x`. >>> x = dt.new(["2022-10-15"]) >>> dt.isoweek(x) """ return _pull_int(x, lambda y: y.isocalendar()[1])
[docs] def isoweekday(x): """ Extract day of the week from datetime `x`. Day of the week is an integer between 1 and 7, where 1 is Monday and 7 is Sunday. See also: :func:`weekday` >>> x = dt.new(["2022-10-15"]) >>> dt.isoweekday(x) """ return _pull_int(x, lambda y: y.isoweekday())
[docs] def microsecond(x): """ Extract microsecond from datetime `x`. >>> x = dt.new(["2022-10-15T12:34:56.789"]) >>> dt.microsecond(x) """ return _pull_int(x, lambda y: y.microsecond)
[docs] def minute(x): """ Extract minute from datetime `x`. >>> x = dt.new(["2022-10-15T12:34:56"]) >>> dt.minute(x) """ return _pull_int(x, lambda y: y.minute)
[docs] def month(x): """ Extract month from datetime `x`. >>> x = dt.new(["2022-10-15"]) >>> dt.month(x) """ return _pull_int(x, lambda y: y.month)
[docs] def new(x): """ Initialize a datetime scalar or vector from `x`. >>> dt.new("2022-10-15") >>> dt.new("2022-10-15T12:00:00") >>> dt.new(["2022-10-15"]) >>> dt.new(["2022-10-15T12:00:00"]) """ if util.is_scalar(x): return np.datetime64(x) return Vector.fast(map(np.datetime64, x), np.datetime64)
[docs] def now(): """ Return the current local datetime. >>> dt.now() """ return np.datetime64(datetime.datetime.now())
def _pull_datetime(x, function): if util.is_scalar(x): x = np.array([x], np.datetime64) return _pull_datetime(x, function)[0] x = util.sequencify(x) assert isinstance(x, np.ndarray) assert np.issubdtype(x.dtype, np.datetime64) out = np.full_like(x, np.nan) out = Vector.fast(out, np.datetime64) na = np.isnat(x) if na.all(): return out f = np.vectorize(function) out[~na] = f(x[~na].astype(object)) return out def _pull_int(x, function): if util.is_scalar(x): x = np.array([x], np.datetime64) return _pull_int(x, function)[0] x = util.sequencify(x) assert isinstance(x, np.ndarray) assert np.issubdtype(x.dtype, np.datetime64) out = np.full_like(x, np.nan, float) out = Vector.fast(out, float) na = np.isnat(x) if na.all(): return out f = np.vectorize(function) out[~na] = f(x[~na].astype(object)) return out if na.any() else out.as_integer() def _pull_str(x, function): if util.is_scalar(x): x = np.array([x], np.datetime64) return _pull_str(x, function)[0] x = util.sequencify(x) assert isinstance(x, np.ndarray) assert np.issubdtype(x.dtype, np.datetime64) out = np.full_like(x, "", object) out = Vector.fast(out, object) na = np.isnat(x) if na.all(): return out f = np.vectorize(function) out[~na] = f(x[~na].astype(object)) return out.as_string()
[docs] def quarter(x): """ Extract quarter from datetime `x`. >>> x = dt.new(["2022-10-15"]) >>> dt.quarter(x) """ y = np.ceil(month(x) / 3) return y if np.isnan(y).any() else y.astype(int)
[docs] def replace(x, year=None, month=None, day=None, hour=None, minute=None, second=None, microsecond=None): """ Return datetime `x` with given components replaced. >>> x = dt.new(["2022-10-15"]) >>> dt.replace(x, month=1, day=1) """ kwargs = {k: v for k, v in locals().items() if k != "x" and v is not None} if all(map(util.is_scalar, kwargs.values())): return _pull_datetime(x, lambda y: y.replace(**kwargs)) for value in kwargs.values(): assert util.is_scalar(value) or len(value) == len(x) scalar_keys = [x for x in kwargs if util.is_scalar(kwargs[x])] vector_keys = [x for x in kwargs if x not in scalar_keys] # Like _pull_datetime, but no vectorized function. x = util.sequencify(x) assert isinstance(x, np.ndarray) assert np.issubdtype(x.dtype, np.datetime64) out = np.full_like(x, np.nan) out = Vector.fast(out, np.datetime64) na = np.isnat(x) xobj = x.astype(object) kwargs_scalar = {x: kwargs[x] for x in scalar_keys} for i in np.flatnonzero(~na): for key in vector_keys: kwargs_scalar[key] = kwargs[key][i] out[i] = xobj[i].replace(**kwargs_scalar) return out
[docs] def second(x): """ Extract second from datetime `x`. >>> x = dt.new(["2022-10-15T12:34:56"]) >>> dt.second(x) """ return _pull_int(x, lambda y: y.second)
[docs] def to_string(x, format): """ Format datetime `x` as string. `format` uses Python ``strftime`` format codes: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes >>> x = dt.new(["2022-10-15"]) >>> dt.to_string(x, "%d.%m.%Y") """ return _pull_str(x, lambda x: x.strftime(format))
[docs] def today(): """ Return the current local date. >>> dt.today() """ return np.datetime64(datetime.date.today())
[docs] def weekday(x): """ Extract day of the week from datetime `x`. Day of the week is an integer between 0 and 6, where 0 is Monday and 6 is Sunday. See also: :func:`isoweekday` >>> x = dt.new(["2022-10-15"]) >>> dt.weekday(x) """ return _pull_int(x, lambda y: y.weekday())
[docs] def year(x): """ Extract year from datetime `x`. >>> x = dt.new(["2022-10-15"]) >>> dt.year(x) """ return _pull_int(x, lambda y: y.year)