Source code for dataiter.data_frame

# -*- coding: utf-8 -*-

# Copyright (c) 2019 Osmo Salomaa
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import contextlib
import dataiter
import functools
import itertools
import json
import numpy as np
import pickle

from dataiter import deco
from dataiter import util
from dataiter import Vector
from math import inf

[docs] class DataFrameColumn(Vector): """ A column in a data frame. DataFrameColumn is a subclass of :class:`.Vector`. See the vector documentation for relevant properties and methods. """ def __new__(cls, object, dtype=None, nrow=None): object = util.sequencify(object) column = Vector(object, dtype) if nrow is not None and nrow != column.length: if column.length != 1 or nrow < 1: raise ValueError("Bad arguments for broadcast") column = column.repeat(nrow) return column.view(cls)
[docs] def __init__(self, object, dtype=None, nrow=None): """ Return a new data frame column. `dtype` is the NumPy-compatible data type for the vector. Providing `dtype` will make creating the vector faster, otherwise the appropriate data type will be guessed by introspecting the elements of `object`, which is potentially slow, especially for large objects. If provided, `nrow` is the row count to produce, i.e. the length to which `object` will be broadcast. >>> di.DataFrameColumn([1, 2, 3], int) >>> di.DataFrameColumn([1], int, nrow=10) """ super().__init__(object, dtype)
@property def nrow(self): """ Return the amount of rows. """ return self.length
[docs] class DataFrame(dict): """ A class for tabular data. DataFrame is a subclass of ``dict``, with columns being :class:`.DataFrameColumn`, which are :class:`.Vector`, which are NumPy ``ndarray``. This means that basic ``dict`` methods, such as ``items()``, ``keys()`` and ``values()`` can be used iterate over and manage the data as a whole and NumPy functions and array methods can be used for fast vectorized computations on the data. Columns can be accessed by attribute notation, e.g. ``data.x`` in addition to ``data["x"]``. In most cases, attribute access should be more convenient and is the way recommended by dataiter. You'll still need to use the bracket notation for any column names that are not valid identifiers, such as ones with spaces, or ones that conflict with dict methods, such as "items". DataFrame does not support indexing directly as the bracket notation is used to refer to dict keys, i.e. columns by name. If you want to index the whole data frame object, use the method :meth:`slice`. Individual columns are indexed the same as NumPy arrays. """ # List of names that are actual attributes, not columns ATTRIBUTES = ["colnames", "_group_colnames"] # Use dummy attributes corresponding to dictionary keys so that # Tab completion of column names at a Python shell would work. COLUMN_PLACEHOLDER = type("COLUMN_PLACEHOLDER", (), {})
[docs] def __init__(self, *args, **kwargs): """ Return a new data frame. `args` and `kwargs` are like for ``dict``. https://docs.python.org/3/library/stdtypes.html#dict """ super().__init__(*args, **kwargs) nrow = max(map(util.length, self.values()), default=0) for key, value in self.items(): if (isinstance(value, DataFrameColumn) and value.nrow == nrow): continue column = DataFrameColumn(value, nrow=nrow) super().__setitem__(key, column) for key in self: if not self.__hasattr(key) and key.isidentifier(): super().__setattr__(key, self.COLUMN_PLACEHOLDER) # Check that we have a uniform table. self._check_dimensions() self._group_colnames = ()
def __copy__(self): return self.__class__(self) def __deepcopy__(self, memo=None): return self.__class__({k: v.copy() for k, v in self.items()}) def __delattr__(self, name): if name in self: return self.__delitem__(name) return super().__delattr__(name) def __delitem__(self, key): # Note that this is not called for some methods, # at least pop, popitem and clear. if self[key] is self.COLUMN_PLACEHOLDER: super().__delattr__(key) return super().__delitem__(key) def __eq__(self, other): return (isinstance(other, DataFrame) and self.nrow == other.nrow and self.ncol == other.ncol and set(self.colnames) == set(other.colnames) and all(self[x].equal(other[x]) for x in self)) def __getattr__(self, name): if name in self: return self.__getitem__(name) raise AttributeError(name) def __getattribute__(self, name): value = super().__getattribute__(name) if name == "COLUMN_PLACEHOLDER": return value if value is self.COLUMN_PLACEHOLDER and name in self: return self[name] return value def __hasattr(self, name): # Return True if attribute exists and is not a column. return hasattr(self, name) and not isinstance(getattr(self, name), DataFrameColumn) @classmethod def __is_builtin_attr(cls, name): return name in cls.__list_builtin_attrs() @classmethod @functools.lru_cache(None) def __list_builtin_attrs(cls): return set(dir(cls())) def __setattr__(self, name, value): if name in self.ATTRIBUTES: return super().__setattr__(name, value) return self.__setitem__(name, value) def __setitem__(self, key, value): value = self._reconcile_column(value) if not self.__hasattr(key) and key.isidentifier(): super().__setattr__(key, self.COLUMN_PLACEHOLDER) return super().__setitem__(key, value) def __repr__(self): return self.to_string() def __str__(self): return self.to_string()
[docs] def aggregate(self, **colname_function_pairs): """ Return group-wise calculated summaries. Usually aggregation is preceded by grouping, which can be conveniently written via method chaining as ``data.group_by(...).aggregate(...)``. In `colname_function_pairs`, `function` receives as an argument a data frame object, a group-wise subset of all rows. It should return a scalar value. Common aggregation functions have shorthand helpers available under :mod:`dataiter`, see the guide on :doc:`aggregation </aggregation>` for details. >>> data = di.read_csv("data/listings.csv") >>> # The below aggregations are identical. Usually you'll get by >>> # with the shorthand helpers, but for complicated calculations, >>> # you might need custom lambda functions. >>> data.group_by("hood").aggregate(n=di.count(), price=di.mean("price")) >>> data.group_by("hood").aggregate(n=lambda x: x.nrow, price=lambda x: x.price.mean()) """ group_colnames = self._group_colnames data = self.sort(**dict.fromkeys(group_colnames, 1)) data._index_ = np.arange(data.nrow) stat = data.unique(*group_colnames).select("_index_", *group_colnames) indices = np.split(data._index_, stat._index_[1:]) group_aware = [getattr(x, "group_aware", False) for x in colname_function_pairs.values()] if any(group_aware): groups = Vector.fast(range(len(indices)), int) n = Vector.fast(map(len, indices), int) data._group_ = np.repeat(groups, n) slices = None for colname, function in colname_function_pairs.items(): if getattr(function, "group_aware", False): # function might leave Nones in its output, # once those are replaced with the proper default # we can do a fast conversion to DataFrameColumn. column = function(data) default = function.default for i in range(len(column)): if column[i] is None: column[i] = default assert len(column) == stat.nrow column = DataFrameColumn.fast(column) stat[colname] = column else: # When using an arbitrary function, we cannot know # what special values to expect and thus we end up # needing to use the slow Vector.__init__. if slices is None: slices = [data._view_rows(x) for x in indices] stat[colname] = [function(x) for x in slices] return stat.unselect("_index_", "_group_")
[docs] @deco.new_from_generator def anti_join(self, other, *by): """ Return rows with no matches in `other`. `by` are column names, by which to look for matching rows, or tuples of column names if the correspoding column name differs between `self` and `other`. >>> # All listings that don't have reviews >>> listings = di.read_csv("data/listings.csv") >>> reviews = di.read_csv("data/listings-reviews.csv") >>> listings.anti_join(reviews, "id") """ by1, by2 = self._split_join_by(*by) other = other.drop_na(*by2).unique(*by2) found, src = self._get_join_indices(other, by1, by2) for colname, column in self.items(): yield colname, np.delete(column, found)
[docs] @deco.new_from_generator def cbind(self, *others): """ Return data frame with columns from `others` added. >>> data = di.read_csv("data/listings.csv") >>> data.cbind(di.DataFrame(x=1)) """ found_colnames = set() data_frames = [self] + list(others) for i, data in enumerate(data_frames): for colname, column in data.items(): if colname in found_colnames: continue found_colnames.add(colname) column = self._reconcile_column(column) yield colname, column.copy()
def _check_dimensions(self): if not self: return nrows = [x.nrow for x in self.columns] if len(set(nrows)) == 1: return raise ValueError(f"Bad dimensions: {nrows!r}") def clear(self): """""" return self._new() @property def colnames(self): """ Get or set column names as a list. >>> data = di.read_csv("data/listings.csv") >>> data.head() >>> data.colnames >>> data.colnames = ["a", "b", "c", "d", "e", "f"] >>> data.head() """ return list(self) @colnames.setter def colnames(self, colnames): for fm, to in zip(list(self.keys()), colnames): self[to] = self.pop(fm) @property def columns(self): """ Return columns as a list. """ return list(self.values())
[docs] def compare(self, other, *by, ignore_columns=[], max_changed=inf): """ Find differences against another data frame. `by` are identifier columns which are used to uniquely identify rows and match them between `self` and `other`. `compare` will not work if your data lacks suitable identifiers. `ignore_columns` is an optional list of columns, differences in which to ignore. `compare` returns three data frames: added rows, removed rows and changed values. The first two are basically subsets of the rows of `self` and `other`, respectively. Changed values are returned as a data frame with one row per differing value (not per differing row). Listing changes will terminate once `max_changed` is reached. .. warning:: `compare` is experimental, do not rely on it reporting all of the differences correctly. Do not try to give it two huge data frames with very little in common, unless also giving some sensible value for `max_changed`. >>> old = di.read_csv("data/vehicles.csv") >>> new = old.modify(hwy=lambda x: np.minimum(100, x.hwy)) >>> added, removed, changed = new.compare(old, "id") >>> changed """ if self.unique(*by).nrow < self.nrow: raise ValueError(f"self not unique by {by}") if other.unique(*by).nrow < other.nrow: raise ValueError(f"other not unique by {by}") added = self.anti_join(other, *by) removed = other.anti_join(self, *by) x = self.modify(_i_=range(self.nrow)) y = other.modify(_j_=range(other.nrow)) z = x.inner_join(y.select("_j_", *by), *by) colnames = util.unique_keys(self.colnames + other.colnames) colnames = [x for x in colnames if x not in ignore_columns] changed = [] for i, j in zip(z._i_, z._j_): if len(changed) >= max_changed: print(f"max_changed={max_changed} reached, terminating") break for colname in colnames: if len(changed) >= max_changed: break # XXX: How to make a distinction between # a missing column and a missing value? xvalue = x[colname][i] if colname in x else None yvalue = y[colname][j] if colname in y else None if (xvalue != yvalue and not Vector([xvalue, yvalue]).is_na().all()): # XXX: We could have a name clash here. byrow = {k: x[k][i] for k in by} changed.append(dict(**byrow, column=colname, xvalue=xvalue, yvalue=yvalue)) added = added if added.nrow > 0 else None removed = removed if removed.nrow > 0 else None changed = self.from_json(changed) if changed else None return added, removed, changed
[docs] def copy(self): """ Return a shallow copy. """ return self.__copy__()
[docs] def count(self, *colnames): """ Return row counts grouped by `colnames`. >>> data = di.read_csv("data/listings.csv") >>> data.count("hood") """ return self.copy().group_by(*colnames).aggregate(n=dataiter.count())
[docs] def deepcopy(self): """ Return a deep copy. """ return self.__deepcopy__()
[docs] def drop_na(self, *colnames): """ Return data frame without rows that have missing values in `colnames`. >>> data = di.read_csv("data/listings.csv") >>> data.drop_na("sqft") """ drop = Vector.fast([False], bool).repeat(self.nrow) for colname in colnames: drop = drop | self[colname].is_na() return self.filter_out(drop)
[docs] @deco.new_from_generator def filter(self, rows=None, **colname_value_pairs): """ Return rows that match condition. Filtering can be done by either `rows` or `colname_value_pairs`. `rows` can be either a boolean vector or a function that receives the data frame as argument and returns a boolean vector. The latter is especially useful in a method chaining context where you don't have direct access to the data frame in question. Alternatively, `colname_value_pairs` provides a shorthand to check against a fixed value. See the example below of equivalent filtering all three ways. >>> data = di.read_csv("data/listings.csv") >>> data.filter((data.hood == "Manhattan") & (data.guests == 2)) >>> data.filter(lambda x: (x.hood == "Manhattan") & (x.guests == 2)) >>> data.filter(hood="Manhattan", guests=2) """ if rows is not None: if callable(rows): rows = rows(self) elif colname_value_pairs: rows = Vector.fast([True], bool).repeat(self.nrow) for colname, value in colname_value_pairs.items(): rows = rows & (self[colname] == value) rows = self._parse_rows_from_boolean(rows) for colname, column in self.items(): yield colname, np.take(column, rows)
[docs] @deco.new_from_generator def filter_out(self, rows=None, **colname_value_pairs): """ Return rows that don't match condition. Filtering can be done by either `rows` or `colname_value_pairs`. `rows` can be either a boolean vector or a function that receives the data frame as argument and returns a boolean vector. The latter is especially useful in a method chaining context where you don't have direct access to the data frame in question. Alternatively, `colname_value_pairs` provides a shorthand to check against a fixed value. See the example below of equivalent filtering all three ways. >>> data = di.read_csv("data/listings.csv") >>> data.filter_out(data.hood == "Manhattan") >>> data.filter_out(lambda x: x.hood == "Manhattan") >>> data.filter_out(hood="Manhattan") """ if rows is not None: if callable(rows): rows = rows(self) elif colname_value_pairs: rows = Vector.fast([True], bool).repeat(self.nrow) for colname, value in colname_value_pairs.items(): rows = rows & (self[colname] == value) rows = self._parse_rows_from_boolean(rows) for colname, column in self.items(): yield colname, np.delete(column, rows)
[docs] @classmethod def from_arrow(cls, data, *, strings_as_object=inf, dtypes={}): """ Return a new data frame from ``pyarrow.Table`` `data`. `strings_as_object` is a cutoff point. If any row has more characters than that, the whole column will use the object data type. This is intended to help limit memory use as NumPy strings are fixed-length and can take a huge amount of memory if even a single row is long. If set, `dtypes` overrides this. `dtypes` is an optional dict mapping column names to NumPy datatypes. """ # Arrow's 'to_numpy' is "limited to primitive types for which NumPy has # the same physical representation as Arrow, and assuming the Arrow # data has no nulls." Using Pandas is easier and probably good enough. return cls.from_pandas(data.to_pandas(), strings_as_object=strings_as_object, dtypes=dtypes)
[docs] @classmethod def from_json(cls, string, *, columns=[], dtypes={}, **kwargs): """ Return a new data frame from JSON `string`. `columns` is an optional list of columns to limit to. `dtypes` is an optional dict mapping column names to NumPy datatypes. `kwargs` are passed to ``json.load``. """ data = string if isinstance(data, str): data = json.loads(data, **kwargs) if not isinstance(data, list): raise TypeError("Not a list") keys = util.unique_keys(itertools.chain(*data)) if columns: keys = [x for x in keys if x in columns] data = {k: [x.get(k, None) for x in data] for k in keys} for name, dtype in dtypes.items(): data[name] = DataFrameColumn(data[name], dtype) return cls(**data)
[docs] @classmethod def from_pandas(cls, data, *, strings_as_object=inf, dtypes={}): """ Return a new data frame from ``pandas.DataFrame`` `data`. `strings_as_object` is a cutoff point. If any row has more characters than that, the whole column will use the object data type. This is intended to help limit memory use as NumPy strings are fixed-length and can take a huge amount of memory if even a single row is long. If set, `dtypes` overrides this. `dtypes` is an optional dict mapping column names to NumPy datatypes. """ if (not isinstance(strings_as_object, (int, float)) or isinstance(strings_as_object, bool)): raise TypeError("Expected a number for strings_as_object") dtypes = dtypes.copy() from pandas.api.types import is_object_dtype if strings_as_object < inf: for name in data.columns: if name not in dtypes and is_object_dtype(data[name]): with contextlib.suppress(AttributeError): if data[name].str.len().max() > strings_as_object: dtypes[name] = object data = {x: data[x].to_numpy(copy=True) for x in data.columns} for name, value in data.items(): # Pandas object columns are likely to be strings, # convert to list to force type guessing in Vector.__init__. if np.issubdtype(value.dtype, np.object_): data[name] = data[name].tolist() for name, dtype in dtypes.items(): data[name] = DataFrameColumn(data[name], dtype) return cls(**data)
[docs] def full_join(self, other, *by): """ Return data frame with matching rows merged from `self` and `other`. `full_join` keeps all rows from both data frames, merging matching ones. If there are multiple matches, the first one will be used. For rows, for which matches are not found, missing values are added. `by` are column names, by which to look for matching rows, or tuples of column names if the correspoding column name differs between `self` and `other`. >>> listings = di.read_csv("data/listings.csv") >>> reviews = di.read_csv("data/listings-reviews.csv") >>> listings.full_join(reviews, "id") """ a = self.modify(_aid_=np.arange(self.nrow)) b = other.modify(_bid_=np.arange(other.nrow)) ab = a.left_join(b, *by) # Check which rows of b were not joined into a. # If no rows remain, full join is the same as left join ab. b = b.anti_join(ab, "_bid_") if b.nrow == 0: return ab.unselect("_aid_", "_bid_") # Reverse the by-tuples for the reverse join ba, # so that the data frame and by orders match. by_reverse = [ tuple(reversed(x)) if isinstance(x, (list, tuple)) else x for x in by] ba = b.left_join(a, *by_reverse) for item in by: # For identifiers in by whose name differs in a and b, # rename and keep the variant found in a. if isinstance(item, (list, tuple)): ba[item[0]] = ba.pop(item[1]) return ab.rbind(ba).sort(_aid_=1, _bid_=1).unselect("_aid_", "_bid_")
def _get_join_indices(self, other, by1, by2): other_ids = list(zip(*[other[x] for x in by2])) other_by_id = {other_ids[i]: i for i in range(other.nrow)} self_ids = zip(*[self[x] for x in by1]) src = map(lambda x: other_by_id.get(x, -1), self_ids) src = np.fromiter(src, int, count=self.nrow) found = np.where(src > -1) return found, src
[docs] def group_by(self, *colnames): """ Return data frame with `colnames` set for grouped operations, such as :meth:`aggregate`. """ self._group_colnames = tuple(colnames) return self
[docs] def head(self, n=None): """ Return the first `n` rows. >>> data = di.read_csv("data/listings.csv") >>> data.head(5) """ if n is None: n = dataiter.DEFAULT_PEEK_ROWS n = min(self.nrow, n) return self.slice(np.arange(n))
[docs] @deco.new_from_generator def inner_join(self, other, *by): """ Return data frame with matching rows merged from `self` and `other`. `inner_join` keeps only rows found in both data frames, merging matching ones. If there are multiple matches, the first one will be used. `by` are column names, by which to look for matching rows, or tuples of column names if the correspoding column name differs between `self` and `other`. >>> listings = di.read_csv("data/listings.csv") >>> reviews = di.read_csv("data/listings-reviews.csv") >>> listings.inner_join(reviews, "id") """ by1, by2 = self._split_join_by(*by) other = other.drop_na(*by2).unique(*by2) found, src = self._get_join_indices(other, by1, by2) for colname, column in self.items(): yield colname, column[found].copy() for colname, column in other.items(): if colname in by2: continue if colname in self: continue yield colname, column[src[found]].copy()
[docs] @deco.new_from_generator def left_join(self, other, *by): """ Return data frame with matching rows merged from `self` and `other`. `left_join` keeps all rows in `self`, merging matching ones. If there are multiple matches, the first one will be used. For rows, for which matches are not found, missing values are added. `by` are column names, by which to look for matching rows, or tuples of column names if the correspoding column name differs between `self` and `other`. >>> listings = di.read_csv("data/listings.csv") >>> reviews = di.read_csv("data/listings-reviews.csv") >>> listings.left_join(reviews, "id") """ by1, by2 = self._split_join_by(*by) other = other.drop_na(*by2).unique(*by2) found, src = self._get_join_indices(other, by1, by2) for colname, column in self.items(): yield colname, column.copy() for colname, column in other.items(): if colname in by2: continue if colname in self: continue value = column.na_value dtype = column.na_dtype new = DataFrameColumn(value, dtype, self.nrow) new[found] = column[src[found]] yield colname, new.copy()
[docs] def map(self, function): """ Apply `function` to each row in data. `function` receives as arguments the full data frame and the loop index. The return value will be a list of whatever `function` returns. Note that `map` is an inefficient method as it iterates over rows instead of doing vectorized computation. `map` is mostly intended for complicated conditional cases that are difficult to express in vectorized form. >>> data = di.read_csv("data/listings-reviews.csv") >>> data.map(lambda x, i: (x.reviews[i], x.rating[i])) """ return [function(self, i) for i in range(self.nrow)]
[docs] @deco.new_from_generator def modify(self, **colname_value_pairs): """ Return data frame with columns modified. In `colname_value_pairs`, `value` can be either a vector or a function that receives the data frame as argument and returns a vector. See the example below of equivalent modification with both ways. Note that column modification can often be done simpler with a plain assignment, such as ``data.price_per_guest = data.price / data.guests``. `modify` just allows you to do the same in a method chain context. >>> data = di.read_csv("data/listings.csv") >>> data.modify(price_per_guest=data.price/data.guests) >>> data.modify(price_per_guest=lambda x: x.price / x.guests) If the data frame is grouped, then `colname_value_pairs` need to be functions, which are applied to group-wise subsets of the data frame. A common use for this is calculating group-wise fractions. >>> data = di.DataFrame(g=[1, 2, 2, 3, 3, 3]) >>> data.group_by("g").modify(f=lambda x: 1 / x.nrow) """ for colname, column in self.items(): yield colname, column.copy() if self._group_colnames: slices = self.split(*self._group_colnames) # Mapping over slices will produce contiguous groups in order # of self._group_colnames. Calculate and apply indexing that # will restore the original order. restore_indices = np.argsort(np.concatenate(slices)) slices = [self._view_rows(x) for x in slices] for colname, function in colname_value_pairs.items(): if not callable(function): raise ValueError(f"{colname} argument not callable") column = [DataFrameColumn(function(x), nrow=x.nrow) for x in slices] yield colname, np.concatenate(column)[restore_indices] else: for colname, value in colname_value_pairs.items(): value = value(self) if callable(value) else value yield colname, self._reconcile_column(value).copy()
@property def ncol(self): """ Return the amount of columns. >>> data = di.read_csv("data/listings.csv") >>> data.ncol """ self._check_dimensions() return len(self) def _new(self, *args, **kwargs): return self.__class__(*args, **kwargs) @property def nrow(self): """ Return the amount of rows. >>> data = di.read_csv("data/listings.csv") >>> data.nrow """ if not self: return 0 self._check_dimensions() return self[next(iter(self))].nrow def _parse_cols_from_boolean(self, cols): cols = Vector.fast(cols, bool) if len(cols) != self.ncol: raise ValueError("Bad length for boolean cols") return Vector.fast(np.nonzero(cols)[0], int) def _parse_cols_from_integer(self, cols): return Vector.fast(cols, int) def _parse_rows_from_boolean(self, rows): rows = Vector.fast(rows, bool) if len(rows) != self.nrow: raise ValueError("Bad length for boolean rows") return Vector.fast(np.nonzero(rows)[0], int) def _parse_rows_from_integer(self, rows): return Vector.fast(rows, int) def pop(self, key, *args, **kwargs): """""" value = super().pop(key, *args, **kwargs) if hasattr(self, key): if not self.__is_builtin_attr(key): super().__delattr__(key) return value def popitem(self): """""" key, value = super().popitem() if hasattr(self, key): if not self.__is_builtin_attr(key): super().__delattr__(key) return key, value
[docs] def print_(self, *, max_rows=None, max_width=None, truncate_width=None): """ Print data frame to ``sys.stdout``. `print_` does the same as calling Python's builtin ``print`` function, but since it's a method, you can use it at the end of a method chain instead of wrapping a ``print`` call around the whole chain. >>> di.read_csv("data/listings.csv").print_() """ print(self.to_string(max_rows=max_rows, max_width=max_width, truncate_width=truncate_width))
[docs] def print_memory_use(self): """ Print memory use by column and total. >>> data = di.read_csv("data/listings.csv") >>> data.print_memory_use() """ mem = DataFrame() for name, column in self.items(): new = DataFrame(column=name) new.dtype = str(column.dtype) new.item_size = column.itemsize new.total_size = column.get_memory_use() mem = mem.rbind(new) new = DataFrame(column="TOTAL") new.dtype = "--" new.item_size = mem.item_size.sum() new.total_size = mem.total_size.sum() mem = mem.rbind(new) # Format sizes into sensible values for display. mem.item_size = [f"{x:.0f} B" for x in mem.item_size] mem.total_size = [f"{x/1024**2:,.0f} MB" for x in mem.total_size] mem.colnames = [x.upper() for x in mem.colnames] print(mem)
[docs] def print_na_counts(self): """ Print counts of missing values by column. >>> data = di.read_csv("data/listings.csv") >>> data.print_na_counts() """ nas = DataFrame() for name in self.colnames: n = self[name].is_na().sum() if n == 0: continue nas = nas.rbind(DataFrame(column=name, nna=n)) if not nas: return nas.pna = [f"{100*x/self.nrow:.1f}%" for x in nas.nna] nas.colnames = [x.upper() for x in nas.colnames] print(nas)
[docs] @deco.new_from_generator def rbind(self, *others): """ Return data frame with rows from `others` added. >>> data = di.read_csv("data/listings.csv") >>> data.rbind(data) """ data_frames = [self] + list(others) colnames = util.unique_keys(itertools.chain(*data_frames)) def get_part(data, colname): if colname in data: return data[colname] for ref in data_frames: if colname not in ref: continue value = ref[colname].na_value dtype = ref[colname].na_dtype return Vector.fast([value], dtype).repeat(data.nrow) for colname in colnames: parts = [get_part(x, colname) for x in data_frames] total = DataFrameColumn(np.concatenate(parts)) yield colname, total
[docs] @classmethod def read_csv(cls, path, *, encoding="utf-8", sep=",", header=True, columns=[], strings_as_object=inf, dtypes={}): """ Return a new data frame from CSV file `path`. Will automatically decompress if `path` ends in ``.bz2|.gz|.xz``. `columns` is an optional list of columns to limit to. `strings_as_object` is a cutoff point. If any row has more characters than that, the whole column will use the object data type. This is intended to help limit memory use as NumPy strings are fixed-length and can take a huge amount of memory if even a single row is long. If set, `dtypes` overrides this. `dtypes` is an optional dict mapping column names to NumPy datatypes. """ import pandas as pd data = pd.read_csv(path, sep=sep, header=0 if header else None, usecols=columns or None, dtype=dtypes, parse_dates=False, encoding=encoding, low_memory=False) if not header: data.columns = util.generate_colnames(len(data.columns)) return cls.from_pandas(data, strings_as_object=strings_as_object, dtypes=dtypes)
[docs] @classmethod def read_json(cls, path, *, encoding="utf-8", columns=[], dtypes={}, **kwargs): """ Return a new data frame from JSON file `path`. Will automatically decompress if `path` ends in ``.bz2|.gz|.xz``. `columns` is an optional list of columns to limit to. `dtypes` is an optional dict mapping column names to NumPy datatypes. `kwargs` are passed to ``json.load``. """ with util.xopen(path, "rt", encoding=encoding) as f: return cls.from_json(f.read(), columns=columns, dtypes=dtypes, **kwargs)
[docs] @classmethod def read_npz(cls, path, *, allow_pickle=True): """ Return a new data frame from NumPy file `path`. See `numpy.load` for an explanation of `allow_pickle`: https://numpy.org/doc/stable/reference/generated/numpy.load.html """ with np.load(path, allow_pickle=allow_pickle) as data: return cls(**data)
[docs] @classmethod def read_parquet(cls, path, *, columns=[], strings_as_object=inf, dtypes={}): """ Return a new data frame from Parquet file `path`. `columns` is an optional list of columns to limit to. `strings_as_object` is a cutoff point. If any row has more characters than that, the whole column will use the object data type. This is intended to help limit memory use as NumPy strings are fixed-length and can take a huge amount of memory if even a single row is long. If set, `dtypes` overrides this. `dtypes` is an optional dict mapping column names to NumPy datatypes. """ import pyarrow.parquet as pq columns = columns or None data = pq.read_table(path, columns=columns) return cls.from_arrow(data, strings_as_object=strings_as_object, dtypes=dtypes)
[docs] @classmethod def read_pickle(cls, path): """ Return a new data frame from Pickle file `path`. Will automatically decompress if `path` ends in ``.bz2|.gz|.xz``. """ with util.xopen(path, "rb") as f: return cls(pickle.load(f))
def _reconcile_column(self, column): if isinstance(column, DataFrameColumn): if column.nrow == self.nrow: return column nrow = self.nrow if self else None return DataFrameColumn(column, nrow=nrow)
[docs] @deco.new_from_generator def rename(self, **to_from_pairs): """ Return data frame with columns renamed. >>> data = di.read_csv("data/listings.csv") >>> data.rename(listing_id="id") """ from_to_pairs = {v: k for k, v in to_from_pairs.items()} for fm in self.colnames: to = from_to_pairs.get(fm, fm) yield to, self[fm].copy()
[docs] def sample(self, n=None): """ Return randomly chosen `n` rows. >>> data = di.read_csv("data/listings.csv") >>> data.sample(5) """ if n is None: n = dataiter.DEFAULT_PEEK_ROWS n = min(self.nrow, n) rows = np.random.choice(self.nrow, n, replace=False) return self.slice(np.sort(rows))
[docs] @deco.new_from_generator def select(self, *colnames): """ Return data frame, keeping only `colnames`. >>> data = di.read_csv("data/listings.csv") >>> data.select("id", "hood", "zipcode") """ for colname in colnames: yield colname, self[colname].copy()
[docs] @deco.new_from_generator def semi_join(self, other, *by): """ Return rows with matches in `other`. `by` are column names, by which to look for matching rows, or tuples of column names if the correspoding column name differs between `self` and `other`. >>> # All listings that have reviews >>> listings = di.read_csv("data/listings.csv") >>> reviews = di.read_csv("data/listings-reviews.csv") >>> listings.semi_join(reviews, "id") """ by1, by2 = self._split_join_by(*by) other = other.unique(*by2) found, src = self._get_join_indices(other, by1, by2) for colname, column in self.items(): yield colname, column[found].copy()
[docs] @deco.new_from_generator def slice(self, rows=None, cols=None): """ Return a row-wise and/or column-wise subset of data frame. Both `rows` and `cols` should be integer vectors correspoding to the indices of the rows or columns to keep. >>> data = di.read_csv("data/listings.csv") >>> data.slice(rows=[0, 1, 2]) >>> data.slice(cols=[0, 1, 2]) >>> data.slice(rows=[0, 1, 2], cols=[0, 1, 2]) """ rows = np.arange(self.nrow) if rows is None else rows cols = np.arange(self.ncol) if cols is None else cols rows = self._parse_rows_from_integer(rows) cols = self._parse_cols_from_integer(cols) for colname in (self.colnames[x] for x in cols): yield colname, self[colname][rows].copy()
[docs] @deco.new_from_generator def slice_off(self, rows=None, cols=None): """ Return a row-wise and/or column-wise negative subset of data frame. Both `rows` and `cols` should be integer vectors correspoding to the indices of the rows or columns to drop. >>> data = di.read_csv("data/listings.csv") >>> data.slice_off(rows=[0, 1, 2]) >>> data.slice_off(cols=[0, 1, 2]) >>> data.slice_off(rows=[0, 1, 2], cols=[0, 1, 2]) """ rows = [] if rows is None else rows cols = [] if cols is None else cols rows = self._parse_rows_from_integer(rows) cols = self._parse_cols_from_integer(cols) for i, colname in enumerate(self.colnames): if i in cols: continue yield colname, np.delete(self[colname], rows)
[docs] @deco.new_from_generator def sort(self, **colname_dir_pairs): """ Return rows in sorted order. `colname_dir_pairs` defines the sort order by column name with `dir` being ``1`` for ascending sort, ``-1`` for descending. >>> data = di.read_csv("data/listings.csv") >>> data.sort(hood=1, zipcode=1) """ @deco.tuplefy def sort_key(): pairs = colname_dir_pairs.items() for colname, dir in reversed(list(pairs)): if dir not in [1, -1]: raise ValueError("dir should be 1 or -1") column = self[colname] if column.is_object(): # See Vector.sort for comparison. column = column.as_string() if column.is_string(): column[column.is_na()] = "\uffff" if dir < 0 and not (column.is_boolean() or column.is_number()): # Use rank for non-numeric so that we can sort descending. column = column.rank(method="min") yield column if dir > 0 else -column indices = np.lexsort(sort_key()) for colname, column in self.items(): yield colname, column[indices].copy()
[docs] def split(self, *by): """ Split data frame into groups and return a list of their rows. >>> data = di.DataFrame(x=[1, 2, 2, 3, 3, 3]) >>> data.split("x") """ data = self.select(*by) data._index_ = np.arange(data.nrow) data = data.sort(**dict.fromkeys(by, 1)) data._sorted_index_ = np.arange(data.nrow) stat = data.unique(*by) return np.split(data._index_, stat._sorted_index_[1:])
def _split_join_by(self, *by): by1 = [x if isinstance(x, str) else x[0] for x in by] by2 = [x if isinstance(x, str) else x[1] for x in by] return by1, by2
[docs] def tail(self, n=None): """ Return the last `n` rows. >>> data = di.read_csv("data/listings.csv") >>> data.tail(5) """ if n is None: n = dataiter.DEFAULT_PEEK_ROWS n = min(self.nrow, n) return self.slice(np.arange(self.nrow - n, self.nrow))
[docs] def to_arrow(self): """ Return data frame converted to a ``pyarrow.Table``. >>> data = di.read_csv("data/listings.csv") >>> data.to_arrow() """ import pyarrow as pa data = [pa.array(self[x].tolist()) for x in self.colnames] return pa.table(data, names=self.colnames)
[docs] def to_json(self, **kwargs): """ Return data frame converted to a JSON string. `kwargs` are passed to ``json.dump``. >>> data = di.read_csv("data/listings.csv") >>> data.to_json()[:100] """ return self.to_list_of_dicts().to_json(**kwargs)
[docs] def to_list_of_dicts(self): """ Return data frame converted to a :class:`.ListOfDicts`. >>> data = di.read_csv("data/listings.csv") >>> data.to_list_of_dicts() """ from dataiter import ListOfDicts data = [{} for i in range(self.nrow)] for colname in self.colnames: for i, value in enumerate(self[colname].tolist()): data[i][colname] = value return ListOfDicts(data)
[docs] def to_pandas(self): """ Return data frame converted to a ``pandas.DataFrame``. >>> data = di.read_csv("data/listings.csv") >>> data.to_pandas() """ import pandas as pd return pd.DataFrame({x: self[x].tolist() for x in self.colnames})
[docs] def to_string(self, *, max_rows=None, max_width=None, truncate_width=None): """ Return data frame as a string formatted for display. >>> data = di.read_csv("data/listings.csv") >>> data.to_string() """ if not self: return "" max_rows = max_rows or dataiter.PRINT_MAX_ROWS max_width = max_width or util.get_print_width() truncate_width = truncate_width or dataiter.PRINT_TRUNCATE_WIDTH n = min(self.nrow, max_rows) columns = {colname: util.upad( [colname] + [str(column.dtype)] + [str(x) for x in column[:n].to_strings( quote=False, pad=True, truncate_width=truncate_width)] ) for colname, column in self.items()} for column in columns.values(): column.insert(2, "─" * util.ulen(column[0])) row_numbers = [str(i) for i in range(n)] row_numbers = util.upad(["", "", ""] + row_numbers) # If the length of rows exceeds max_width, split to # batches of columns (like R's print.data.frame). rows_to_print = [] while columns: first = next(iter(columns.keys())) batch_rows = [" ".join(x) for x in zip( row_numbers, columns.pop(first))] for colname, column in list(columns.items()): width = util.ulen(batch_rows[0] + column[0]) + 1 if width > max_width: break for i in range(len(column)): batch_rows[i] += " " batch_rows[i] += column[i] del columns[colname] rows_to_print.append("" if rows_to_print else ".") rows_to_print += batch_rows rows_to_print.append(".") if max_rows < self.nrow: rows_to_print.append(f"... {self.nrow} rows total") return "\n".join(rows_to_print)
[docs] @deco.new_from_generator def unique(self, *colnames): """ Return unique rows by `colnames`. >>> data = di.read_csv("data/listings.csv") >>> data.unique("hood") """ colnames = colnames or self.colnames if (len(colnames) == 1 and not self[colnames[0]].is_object()): # Use a single column directly. by = self[colnames[0]] elif (len(set(self[x].dtype for x in colnames)) == 1 and not self[colnames[0]].is_object()): # Stack matching dtypes directly in a new array. by = np.column_stack([self[x] for x in colnames]) else: # Use rank for differing dtypes. by = np.column_stack([self[x].rank(method="min") for x in colnames]) indices = np.sort(np.unique(by, return_index=True, axis=0)[1]) for colname, column in self.items(): yield colname, column[indices].copy()
[docs] @deco.new_from_generator def unselect(self, *colnames): """ Return data frame, dropping `colnames`. >>> data = di.read_csv("data/listings.csv") >>> data.unselect("guests", "sqft", "price") """ for colname in self.colnames: if colname not in colnames: yield colname, self[colname].copy()
[docs] @deco.new_from_generator def update(self, other): """ Return data frame with columns from `other` added. >>> data = di.read_csv("data/listings.csv") >>> data.update(di.DataFrame(x=1)) """ for colname, column in self.items(): if colname in other: continue yield colname, column.copy() for colname, column in other.items(): column = self._reconcile_column(column) yield colname, column.copy()
def _view_rows(self, rows): # Initialize a blank instance and use base class update # to bypass __init__ and __setitem__ checks for speed. data = self.__class__() dict.update(data, {x: self[x][rows] for x in self}) return data
[docs] def write_csv(self, path, *, encoding="utf-8", header=True, sep=","): """ Write data frame to CSV file `path`. Will automatically compress if `path` ends in ``.bz2|.gz|.xz``. """ data = self.to_pandas() util.makedirs_for_file(path) data.to_csv(path, sep=sep, header=header, index=False, encoding=encoding)
[docs] def write_json(self, path, *, encoding="utf-8", **kwargs): """ Write data frame to JSON file `path`. Will automatically compress if `path` ends in ``.bz2|.gz|.xz``. `kwargs` are passed to ``json.JSONEncoder``. """ return self.to_list_of_dicts().write_json(path, encoding=encoding, **kwargs)
[docs] def write_npz(self, path, *, compress=False): """ Write data frame to NumPy file `path`. """ util.makedirs_for_file(path) savez = np.savez_compressed if compress else np.savez savez(path, **self)
[docs] def write_parquet(self, path, **kwargs): """ Write data frame to Parquet file `path`. `kwargs` are passed to ``pyarrow.parquet.write_table``. """ import pyarrow.parquet as pq data = self.to_arrow() util.makedirs_for_file(path) pq.write_table(data, path, **kwargs)
[docs] def write_pickle(self, path): """ Write data frame to Pickle file `path`. Will automatically compress if `path` ends in ``.bz2|.gz|.xz``. """ util.makedirs_for_file(path) with util.xopen(path, "wb") as f: out = {k: np.array(v, v.dtype) for k, v in self.items()} pickle.dump(out, f, pickle.HIGHEST_PROTOCOL)