# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Dataset is currently unstable. APIs subject to change without notice."""
import pyarrow as pa
from pyarrow.util import _stringify_path, _is_path_like
from pyarrow._dataset import ( # noqa
CsvFileFormat,
Expression,
Dataset,
DatasetFactory,
DirectoryPartitioning,
FileFormat,
FileFragment,
FileSystemDataset,
FileSystemDatasetFactory,
FileSystemFactoryOptions,
FileWriteOptions,
Fragment,
HivePartitioning,
IpcFileFormat,
IpcFileWriteOptions,
ParquetDatasetFactory,
ParquetFactoryOptions,
ParquetFileFormat,
ParquetFileFragment,
ParquetFileWriteOptions,
ParquetReadOptions,
Partitioning,
PartitioningFactory,
RowGroupInfo,
Scanner,
ScanTask,
UnionDataset,
UnionDatasetFactory,
_get_partition_keys,
_filesystemdataset_write,
)
[docs]def field(name):
"""Reference a named column of the dataset.
Stores only the field's name. Type and other information is known only when
the expression is bound to a dataset having an explicit scheme.
Parameters
----------
name : string
The name of the field the expression references to.
Returns
-------
field_expr : Expression
"""
return Expression._field(name)
[docs]def scalar(value):
"""Expression representing a scalar value.
Parameters
----------
value : bool, int, float or string
Python value of the scalar. Note that only a subset of types are
currently supported.
Returns
-------
scalar_expr : Expression
"""
return Expression._scalar(value)
[docs]def partitioning(schema=None, field_names=None, flavor=None,
dictionaries=None):
"""
Specify a partitioning scheme.
The supported schemes include:
- "DirectoryPartitioning": this scheme expects one segment in the file path
for each field in the specified schema (all fields are required to be
present). For example given schema<year:int16, month:int8> the path
"/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
- "HivePartitioning": a scheme for "/$key=$value/" nested directories as
found in Apache Hive. This is a multi-level, directory based partitioning
scheme. Data is partitioned by static values of a particular column in
the schema. Partition keys are represented in the form $key=$value in
directory names. Field order is ignored, as are missing or unrecognized
field names.
For example, given schema<year:int16, month:int8, day:int8>, a possible
path would be "/year=2009/month=11/day=15" (but the field order does not
need to match).
Parameters
----------
schema : pyarrow.Schema, default None
The schema that describes the partitions present in the file path.
If not specified, and `field_names` and/or `flavor` are specified,
the schema will be inferred from the file path (and a
PartitioningFactory is returned).
field_names : list of str, default None
A list of strings (field names). If specified, the schema's types are
inferred from the file paths (only valid for DirectoryPartitioning).
flavor : str, default None
The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
a HivePartitioning.
dictionaries : List[Array]
If the type of any field of `schema` is a dictionary type, the
corresponding entry of `dictionaries` must be an array containing
every value which may be taken by the corresponding column or an
error will be raised in parsing.
Returns
-------
Partitioning or PartitioningFactory
Examples
--------
Specify the Schema for paths like "/2009/June":
>>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())]))
or let the types be inferred by only specifying the field names:
>>> partitioning(field_names=["year", "month"])
For paths like "/2009/June", the year will be inferred as int32 while month
will be inferred as string.
Create a Hive scheme for a path like "/year=2009/month=11":
>>> partitioning(
... pa.schema([("year", pa.int16()), ("month", pa.int8())]),
... flavor="hive")
A Hive scheme can also be discovered from the directory structure (and
types will be inferred):
>>> partitioning(flavor="hive")
"""
if flavor is None:
# default flavor
if schema is not None:
if field_names is not None:
raise ValueError(
"Cannot specify both 'schema' and 'field_names'")
return DirectoryPartitioning(schema, dictionaries)
elif field_names is not None:
if isinstance(field_names, list):
return DirectoryPartitioning.discover(field_names)
else:
raise ValueError(
"Expected list of field names, got {}".format(
type(field_names)))
else:
raise ValueError(
"For the default directory flavor, need to specify "
"a Schema or a list of field names")
elif flavor == 'hive':
if field_names is not None:
raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
elif schema is not None:
if isinstance(schema, pa.Schema):
return HivePartitioning(schema, dictionaries)
else:
raise ValueError(
"Expected Schema for 'schema', got {}".format(
type(schema)))
else:
return HivePartitioning.discover()
else:
raise ValueError("Unsupported flavor")
def _ensure_partitioning(scheme):
"""
Validate input and return a Partitioning(Factory).
It passes None through if no partitioning scheme is defined.
"""
if scheme is None:
pass
elif isinstance(scheme, str):
scheme = partitioning(flavor=scheme)
elif isinstance(scheme, list):
scheme = partitioning(field_names=scheme)
elif isinstance(scheme, (Partitioning, PartitioningFactory)):
pass
else:
ValueError("Expected Partitioning or PartitioningFactory, got {}"
.format(type(scheme)))
return scheme
def _ensure_format(obj):
if isinstance(obj, FileFormat):
return obj
elif obj == "parquet":
return ParquetFileFormat()
elif obj in {"ipc", "arrow", "feather"}:
return IpcFileFormat()
elif obj == "csv":
return CsvFileFormat()
else:
raise ValueError("format '{}' is not supported".format(obj))
def _ensure_multiple_sources(paths, filesystem=None):
"""
Treat a list of paths as files belonging to a single file system
If the file system is local then also validates that all paths
are referencing existing *files* otherwise any non-file paths will be
silently skipped (for example on a remote filesystem).
Parameters
----------
paths : list of path-like
Note that URIs are not allowed.
filesystem : FileSystem or str, optional
If an URI is passed, then its path component will act as a prefix for
the file paths.
Returns
-------
(FileSystem, list of str)
File system object and a list of normalized paths.
Raises
------
TypeError
If the passed filesystem has wrong type.
IOError
If the file system is local and a referenced path is not available or
not a file.
"""
from pyarrow.fs import (
LocalFileSystem, SubTreeFileSystem, _MockFileSystem, FileType,
_ensure_filesystem
)
if filesystem is None:
# fall back to local file system as the default
filesystem = LocalFileSystem()
else:
# construct a filesystem if it is a valid URI
filesystem = _ensure_filesystem(filesystem)
is_local = (
isinstance(filesystem, (LocalFileSystem, _MockFileSystem)) or
(isinstance(filesystem, SubTreeFileSystem) and
isinstance(filesystem.base_fs, LocalFileSystem))
)
# allow normalizing irregular paths such as Windows local paths
paths = [filesystem.normalize_path(_stringify_path(p)) for p in paths]
# validate that all of the paths are pointing to existing *files*
# possible improvement is to group the file_infos by type and raise for
# multiple paths per error category
if is_local:
for info in filesystem.get_file_info(paths):
file_type = info.type
if file_type == FileType.File:
continue
elif file_type == FileType.NotFound:
raise FileNotFoundError(info.path)
elif file_type == FileType.Directory:
raise IsADirectoryError(
'Path {} points to a directory, but only file paths are '
'supported. To construct a nested or union dataset pass '
'a list of dataset objects instead.'.format(info.path)
)
else:
raise IOError(
'Path {} exists but its type is unknown (could be a '
'special file such as a Unix socket or character device, '
'or Windows NUL / CON / ...)'.format(info.path)
)
return filesystem, paths
def _ensure_single_source(path, filesystem=None):
"""
Treat path as either a recursively traversable directory or a single file.
Parameters
----------
path : path-like
filesystem : FileSystem or str, optional
If an URI is passed, then its path component will act as a prefix for
the file paths.
Returns
-------
(FileSystem, list of str or fs.Selector)
File system object and either a single item list pointing to a file or
an fs.Selector object pointing to a directory.
Raises
------
TypeError
If the passed filesystem has wrong type.
FileNotFoundError
If the referenced file or directory doesn't exist.
"""
from pyarrow.fs import FileType, FileSelector, _resolve_filesystem_and_path
# at this point we already checked that `path` is a path-like
filesystem, path = _resolve_filesystem_and_path(path, filesystem)
# ensure that the path is normalized before passing to dataset discovery
path = filesystem.normalize_path(path)
# retrieve the file descriptor
file_info = filesystem.get_file_info(path)
# depending on the path type either return with a recursive
# directory selector or as a list containing a single file
if file_info.type == FileType.Directory:
paths_or_selector = FileSelector(path, recursive=True)
elif file_info.type == FileType.File:
paths_or_selector = [path]
else:
raise FileNotFoundError(path)
return filesystem, paths_or_selector
def _filesystem_dataset(source, schema=None, filesystem=None,
partitioning=None, format=None,
partition_base_dir=None, exclude_invalid_files=None,
selector_ignore_prefixes=None):
"""
Create a FileSystemDataset which can be used to build a Dataset.
Parameters are documented in the dataset function.
Returns
-------
FileSystemDataset
"""
format = _ensure_format(format or 'parquet')
partitioning = _ensure_partitioning(partitioning)
if isinstance(source, (list, tuple)):
fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
else:
fs, paths_or_selector = _ensure_single_source(source, filesystem)
options = FileSystemFactoryOptions(
partitioning=partitioning,
partition_base_dir=partition_base_dir,
exclude_invalid_files=exclude_invalid_files,
selector_ignore_prefixes=selector_ignore_prefixes
)
factory = FileSystemDatasetFactory(fs, paths_or_selector, format, options)
return factory.finish(schema)
def _union_dataset(children, schema=None, **kwargs):
if any(v is not None for v in kwargs.values()):
raise ValueError(
"When passing a list of Datasets, you cannot pass any additional "
"arguments"
)
if schema is None:
# unify the children datasets' schemas
schema = pa.unify_schemas([child.schema for child in children])
# create datasets with the requested schema
children = [child.replace_schema(schema) for child in children]
return UnionDataset(schema, children)
[docs]def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None,
partitioning=None, partition_base_dir=None):
"""
Create a FileSystemDataset from a `_metadata` file created via
`pyarrrow.parquet.write_metadata`.
Parameters
----------
metadata_path : path,
Path pointing to a single file parquet metadata file
schema : Schema, optional
Optionally provide the Schema for the Dataset, in which case it will
not be inferred from the source.
filesystem : FileSystem or URI string, default None
If a single path is given as source and filesystem is None, then the
filesystem will be inferred from the path.
If an URI string is passed, then a filesystem object is constructed
using the URI's optional path component as a directory prefix. See the
examples below.
Note that the URIs on Windows must follow 'file:///C:...' or
'file:/C:...' patterns.
format : ParquetFileFormat
An instance of a ParquetFileFormat if special options needs to be
passed.
partitioning : Partitioning, PartitioningFactory, str, list of str
The partitioning scheme specified with the ``partitioning()``
function. A flavor string can be used as shortcut, and with a list of
field names a DirectionaryPartitioning will be inferred.
partition_base_dir : str, optional
For the purposes of applying the partitioning, paths will be
stripped of the partition_base_dir. Files not matching the
partition_base_dir prefix will be skipped for partitioning discovery.
The ignored files will still be part of the Dataset, but will not
have partition information.
Returns
-------
FileSystemDataset
"""
from pyarrow.fs import LocalFileSystem, _ensure_filesystem
if format is None:
format = ParquetFileFormat()
elif not isinstance(format, ParquetFileFormat):
raise ValueError("format argument must be a ParquetFileFormat")
if filesystem is None:
filesystem = LocalFileSystem()
else:
filesystem = _ensure_filesystem(filesystem)
metadata_path = filesystem.normalize_path(_stringify_path(metadata_path))
options = ParquetFactoryOptions(
partition_base_dir=partition_base_dir,
partitioning=_ensure_partitioning(partitioning)
)
factory = ParquetDatasetFactory(
metadata_path, filesystem, format, options=options)
return factory.finish(schema)
[docs]def dataset(source, schema=None, format=None, filesystem=None,
partitioning=None, partition_base_dir=None,
exclude_invalid_files=None, ignore_prefixes=None):
"""
Open a dataset.
Datasets provides functionality to efficiently work with tabular,
potentially larger than memory and multi-file dataset.
- A unified interface for different sources, like Parquet and Feather
- Discovery of sources (crawling directories, handle directory-based
partitioned datasets, basic schema normalization)
- Optimized reading with predicate pushdown (filtering rows), projection
(selecting columns), parallel reading or fine-grained managing of tasks.
Note that this is the high-level API, to have more control over the dataset
construction use the low-level API classes (FileSystemDataset,
FilesystemDatasetFactory, etc.)
Parameters
----------
source : path, list of paths, dataset, list of datasets or URI
Path pointing to a single file:
Open a FileSystemDataset from a single file.
Path pointing to a directory:
The directory gets discovered recursively according to a
partitioning scheme if given.
List of file paths:
Create a FileSystemDataset from explicitly given files. The files
must be located on the same filesystem given by the filesystem
parameter.
Note that in contrary of construction from a single file, passing
URIs as paths is not allowed.
List of datasets:
A nested UnionDataset gets constructed, it allows arbitrary
composition of other datasets.
Note that additional keyword arguments are not allowed.
schema : Schema, optional
Optionally provide the Schema for the Dataset, in which case it will
not be inferred from the source.
format : FileFormat or str
Currently "parquet" and "ipc"/"arrow"/"feather" are supported. For
Feather, only version 2 files are supported.
filesystem : FileSystem or URI string, default None
If a single path is given as source and filesystem is None, then the
filesystem will be inferred from the path.
If an URI string is passed, then a filesystem object is constructed
using the URI's optional path component as a directory prefix. See the
examples below.
Note that the URIs on Windows must follow 'file:///C:...' or
'file:/C:...' patterns.
partitioning : Partitioning, PartitioningFactory, str, list of str
The partitioning scheme specified with the ``partitioning()``
function. A flavor string can be used as shortcut, and with a list of
field names a DirectionaryPartitioning will be inferred.
partition_base_dir : str, optional
For the purposes of applying the partitioning, paths will be
stripped of the partition_base_dir. Files not matching the
partition_base_dir prefix will be skipped for partitioning discovery.
The ignored files will still be part of the Dataset, but will not
have partition information.
exclude_invalid_files : bool, optional (default True)
If True, invalid files will be excluded (file format specific check).
This will incur IO for each files in a serial and single threaded
fashion. Disabling this feature will skip the IO, but unsupported
files may be present in the Dataset (resulting in an error at scan
time).
ignore_prefixes : list, optional
Files matching any of these prefixes will be ignored by the
discovery process. This is matched to the basename of a path.
By default this is ['.', '_'].
Note that discovery happens only if a directory is passed as source.
Returns
-------
dataset : Dataset
Either a FileSystemDataset or a UnionDataset depending on the source
parameter.
Examples
--------
Opening a single file:
>>> dataset("path/to/file.parquet", format="parquet")
Opening a single file with an explicit schema:
>>> dataset("path/to/file.parquet", schema=myschema, format="parquet")
Opening a dataset for a single directory:
>>> dataset("path/to/nyc-taxi/", format="parquet")
>>> dataset("s3://mybucket/nyc-taxi/", format="parquet")
Opening a dataset from a list of relatives local paths:
>>> dataset([
... "part0/data.parquet",
... "part1/data.parquet",
... "part3/data.parquet",
... ], format='parquet')
With filesystem provided:
>>> paths = [
... 'part0/data.parquet',
... 'part1/data.parquet',
... 'part3/data.parquet',
... ]
>>> dataset(paths, filesystem='file:///directory/prefix, format='parquet')
Which is equivalent with:
>>> fs = SubTreeFileSystem("/directory/prefix", LocalFileSystem())
>>> dataset(paths, filesystem=fs, format='parquet')
With a remote filesystem URI:
>>> paths = [
... 'nested/directory/part0/data.parquet',
... 'nested/directory/part1/data.parquet',
... 'nested/directory/part3/data.parquet',
... ]
>>> dataset(paths, filesystem='s3://bucket/', format='parquet')
Similarly to the local example, the directory prefix may be included in the
filesystem URI:
>>> dataset(paths, filesystem='s3://bucket/nested/directory',
... format='parquet')
Construction of a nested dataset:
>>> dataset([
... dataset("s3://old-taxi-data", format="parquet"),
... dataset("local/path/to/data", format="ipc")
... ])
"""
# collect the keyword arguments for later reuse
kwargs = dict(
schema=schema,
filesystem=filesystem,
partitioning=partitioning,
format=format,
partition_base_dir=partition_base_dir,
exclude_invalid_files=exclude_invalid_files,
selector_ignore_prefixes=ignore_prefixes
)
# TODO(kszucs): support InMemoryDataset for a table input
if _is_path_like(source):
return _filesystem_dataset(source, **kwargs)
elif isinstance(source, (tuple, list)):
if all(_is_path_like(elem) for elem in source):
return _filesystem_dataset(source, **kwargs)
elif all(isinstance(elem, Dataset) for elem in source):
return _union_dataset(source, **kwargs)
else:
unique_types = set(type(elem).__name__ for elem in source)
type_names = ', '.join('{}'.format(t) for t in unique_types)
raise TypeError(
'Expected a list of path-like or dataset objects. The given '
'list contains the following types: {}'.format(type_names)
)
else:
raise TypeError(
'Expected a path-like, list of path-likes or a list of Datasets '
'instead of the given type: {}'.format(type(source).__name__)
)
def _ensure_write_partitioning(scheme):
if scheme is None:
scheme = partitioning(pa.schema([]))
if not isinstance(scheme, Partitioning):
# TODO support passing field names, and get types from schema
raise ValueError("partitioning needs to be actual Partitioning object")
return scheme
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
max_partitions=None):
"""
Write a dataset to a given format and partitioning.
Parameters
----------
data : Dataset, Table/RecordBatch, or list of Table/RecordBatch
The data to write. This can be a Dataset instance or
in-memory Arrow data.
base_dir : str
The root directory where to write the dataset.
basename_template : str, optional
A template string used to generate basenames of written data files.
The token '{i}' will be replaced with an automatically incremented
integer. If not specified, it defaults to
"part-{i}." + format.default_extname
format : FileFormat or str
The format in which to write the dataset. Currently supported:
"parquet", "ipc"/"feather". If a FileSystemDataset is being written
and `format` is not specified, it defaults to the same format as the
specified FileSystemDataset. When writing a Table or RecordBatch, this
keyword is required.
partitioning : Partitioning, optional
The partitioning scheme specified with the ``partitioning()``
function.
schema : Schema, optional
filesystem : FileSystem, optional
file_options : FileWriteOptions, optional
FileFormat specific write options, created using the
``FileFormat.make_write_options()`` function.
use_threads : bool, default True
Write files in parallel. If enabled, then maximum parallelism will be
used determined by the number of available CPU cores.
max_partitions : int, default 1024
Maximum number of partitions any batch may be written into.
"""
from pyarrow.fs import _resolve_filesystem_and_path
if isinstance(data, Dataset):
schema = schema or data.schema
elif isinstance(data, (pa.Table, pa.RecordBatch)):
schema = schema or data.schema
data = [data]
elif isinstance(data, list):
schema = schema or data[0].schema
else:
raise ValueError(
"Only Dataset, Table/RecordBatch or a list of Table/RecordBatch "
"objects are supported."
)
if format is None and isinstance(data, FileSystemDataset):
format = data.format
else:
format = _ensure_format(format)
if file_options is None:
file_options = format.make_write_options()
if format != file_options.format:
raise TypeError("Supplied FileWriteOptions have format {}, "
"which doesn't match supplied FileFormat {}".format(
format, file_options))
if basename_template is None:
basename_template = "part-{i}." + format.default_extname
if max_partitions is None:
max_partitions = 1024
partitioning = _ensure_write_partitioning(partitioning)
filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem)
_filesystemdataset_write(
data, base_dir, basename_template, schema,
filesystem, partitioning, file_options, use_threads,
max_partitions
)