from builtins import object
from contextlib import closing
from pewtils import is_not_null
from scandir import scandir
import boto3
import datetime
import hashlib
import json
import os
import pandas as pd
import pickle as pickle
import time
try:
from io import StringIO, BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
from StringIO import StringIO
[docs]class FileHandler(object):
"""
Read/write data files in a variety of formats, locally and in Amazon S3 buckets.
:param path: A valid path to the folder in local or s3 directory where files will be written to or read from
:type path: str
:param use_s3: Whether the path is an S3 location or local location
:type use_s3: bool
:param bucket: The name of the S3 bucket, required if ``use_s3=True``; will also try to fetch from the environment \
as S3_BUCKET
:type bucket: str
.. note:: Typical rectangular data files (i.e. ``csv``, ``tab``, ``xlsx``, ``xls``, ``dta`` file extension types) will be \
read to/written from a :py:class:`pandas.DataFrame` object. The exceptions are `pkl` and `json` objects which \
accept any serializable Python object and correctly-formatted JSON object respectively.
.. tip:: You can configure your environment to make it easier to automatically connect to S3 by defining the \
variable ``S3_BUCKET``.
Usage::
from pewtils.io import FileHandler
>>> h = FileHandler("./", use_s3=False) # current local folder
>>> df = h.read("my_csv", format="csv")
# Do something and save to Excel
>>> h.write("my_new_csv", df, format="xlsx")
>>> my_data = [{"key": "value"}]
>>> h.write("my_data", my_data, format="json")
>>> my_data = ["a", "python", "list"]
>>> h.write("my_data", my_data, format="pkl")
# To read/write to an S3 bucket
# The FileHandler detects your AWS tokens using boto3's standard methods to find them in ~/.aws or defined as environment variables.
>>> h = FileHandler("/my_folder", use_s3=True, bucket="my-bucket")
"""
def __init__(self, path, use_s3=None, bucket=None):
self.bucket = os.environ.get("S3_BUCKET", None) if bucket is None else bucket
self.path = path
self.use_s3 = use_s3 if is_not_null(self.bucket) else False
if self.use_s3:
s3_params = {}
self.s3 = boto3.client("s3")
else:
self.path = os.path.join(self.path)
if not os.path.exists(self.path):
try:
os.makedirs(self.path)
except Exception as e:
print("Warning: couldn't make directory '{}'".format(self.path))
print(e)
[docs] def iterate_path(self):
"""
Iterates over the directory and returns a list of filenames or S3 object keys
:return: Yields a list of filenames or S3 keys
:rtype: iterable
Usage::
from pewtils.io import FileHandler
>>> h = FileHandler("./", use_s3=False)
>>> for file in h.iterate_path(): print(file)
file1.csv
file2.pkl
file3.json
"""
if self.use_s3:
for key in self.s3.list_objects(Bucket=self.bucket, Prefix=self.path)['Contents']:
yield key["Key"]
else:
for f in scandir(self.path):
yield f.name
[docs] def clear_folder(self):
"""
Deletes the path (if local) or unlinks all keys in the bucket folder (if S3)
.. warning:: This is a destructive function, use with caution!
Usage::
from pewtils.io import FileHandler
>>> h = FileHandler("./", use_s3=False)
>>> len(list(h.iterate_path()))
3
>>> h.clear_folder()
>>> len(list(h.iterate_path()))
0
"""
if self.use_s3:
for key in self.s3.list_objects(Bucket=self.bucket, Prefix=self.path)['Contents']:
self.s3.delete_object(Bucket=self.bucket, Prefix=key['Key'])
else:
for f in scandir(self.path):
os.unlink(os.path.join(self.path, f.name))
[docs] def clear_file(self, key, format="pkl", hash_key=False):
"""
Deletes a specific file.
.. warning:: This is a destructive function, use with caution!
:param key: The name of the file to delete
:type key: str
:param format: The file extension
:type format: str
:param hash_key: If True, will hash the filename before looking it up; default is False.
:type hash_key: bool
Usage::
from pewtils.io import FileHandler
>>> h = FileHandler("./", use_s3=False)
>>> for file in h.iterate_path(): print(file)
file1.csv
file2.pkl
file3.json
>>> h.clear_file("file1", format="csv")
>>> for file in h.iterate_path(): print(file)
file2.pkl
file3.json
"""
if hash_key:
key = self.get_key_hash(key)
if self.use_s3:
filepath = "/".join([self.path, "{}.{}".format(key, format)])
key = self.s3.delete_object(Bucket=self.bucket, Key=filepath)
else:
key += ".{}".format(format)
path = os.path.join(self.path, key)
os.unlink(path)
[docs] def get_key_hash(self, key):
"""
Converts a key to a hashed representation. Allows you to pass arbitrary objects and convert their string \
representation into a shorter hashed key, so it can be useful for caching. You can call this method \
directly to see the hash that a key will be converted into, but this method is mainly used in conjunction \
with the :py:meth:`pewtils.FileHandler.write` and :py:meth:`pewtils.FileHandler.read` methods by passing in \
``hash_key=True``.
:param key: A raw string or Python object that can be meaningfully converted into a string representation
:type key: str or object
:return: A SHA224 hash representation of that key
:rtype: str
Usage::
from pewtils.io import FileHandler
>>> h = FileHandler("tests/files", use_s3=False)
>>> h.get_key_hash("temp")
"c51bf90ccb22befa316b7a561fe9d5fd9650180b14421fc6d71bcd57"
>>> h.get_key_hash({"key": "value"})
"37e13e1116c86a6e9f3f8926375c7cb977ca74d2d598572ced03cd09"
"""
try:
return hashlib.sha224(key.encode("utf8")).hexdigest()
except AttributeError:
return hashlib.sha224(str(key).encode("utf8")).hexdigest()
[docs] def write(
self, key, data, format="pkl", hash_key=False, add_timestamp=False, **io_kwargs
):
"""
Writes arbitrary data objects to a variety of file formats.
:param key: The name of the file or key (without a file suffix!)
:type key: str
:param data: The actual data to write to the file
:type data: object
:param format: The format the data should be saved in (pkl/csv/tab/xlsx/xls/dta/json). Defaults to pkl. \
This will be used as the file's suffix.
:type format: str
:param hash_key: Whether or not to hash the provided key before saving the file. (Default=False)
:type hash_key: bool
:param add_timestamp: Optionally add a timestamp to the filename
:type add_timestamp: bool
:param io_kwargs: Additional parameters to pass along to the Pandas save function, if applicable
:return: None
.. note:: When saving a ``csv``, ``tab``, ``xlsx``, ``xls``, or ``dta`` file, this function expects to receive a \
Pandas :py:class:`pandas.DataFrame`. When you use these formats, you can also pass optional ``io_kwargs`` \
which will be forwarded to the corresponding :py:mod:`pandas` method below:
- `dta`: :py:meth:`pandas.DataFrame.to_stata`
- `csv`: :py:meth:`pandas.DataFrame.to_csv`
- `tab`: :py:meth:`pandas.DataFrame.to_csv`
- `xlsx`: :py:meth:`pandas.DataFrame.to_excel`
- `xls`: :py:meth:`pandas.DataFrame.to_excel`
If you're trying to save an object to JSON, it assumes that you're passing it valid JSON. By default, \
the handler attempts to use pickling, allowing you to save anything you want, as long as it's serializable.
"""
format = format.strip(".")
if hash_key:
key = self.get_key_hash(key)
if add_timestamp:
key = "{}_{}".format(key, datetime.datetime.now())
def _get_output(output, data, io_kwargs):
if format == "tab":
io_kwargs["sep"] = "\t"
if format in ["csv", "tab"]:
data.to_csv(output, encoding="utf8", **io_kwargs)
elif format == "dta":
data.to_stata(output, **io_kwargs)
elif format in ["xls", "xlsx"]:
writer = pd.ExcelWriter(output, engine="xlsxwriter")
data.to_excel(writer, **io_kwargs)
writer.save()
data = output.getvalue()
return data
if format in ["csv", "xls", "xlsx", "tab", "dta"]:
try:
data = _get_output(BytesIO(), data, io_kwargs)
except Exception as e:
try:
data = _get_output(StringIO(), data, io_kwargs)
except:
raise Exception(
"Couldn't convert data into '{}' format".format(format)
)
elif format == "pkl":
data = pickle.dumps(data, **io_kwargs)
elif format == "json":
data = json.dumps(data, **io_kwargs)
key += ".{}".format(format)
if self.use_s3:
try:
upload = BytesIO(data)
except TypeError:
upload = BytesIO(data.encode())
self.s3.upload_fileobj(upload, Bucket=self.bucket, Key="/".join([self.path, key]))
else:
path = os.path.join(self.path, key)
if os.path.exists(self.path):
try:
with closing(open(path, "w")) as output:
output.write(data)
except:
with closing(open(path, "wb")) as output:
output.write(data)
[docs] def read(self, key, format="pkl", hash_key=False, **io_kwargs):
"""
Reads a file from the directory or S3 path, returning its contents.
:param key: The name of the file to read (without a suffix!)
:type key: str
:param format: The format of the file (pkl/json/csv/dta/xls/xlsx/tab); expects the file extension to match
:type format: str
:param hash_key: Whether the key should be hashed prior to looking for and retrieving the file.
:type hash_key: bool
:param io_kwargs: Optional arguments to be passed to the specific load function (dependent on file format)
:return: The file contents, in the requested format
.. note:: You can pass optional ``io_kwargs`` that will be forwarded to the function below that corresponds to \
the format of the file you're trying to read in
- `dta`: :py:meth:`pandas.DataFrame.read_stata`
- `csv`: :py:meth:`pandas.DataFrame.read_csv`
- `tab`: :py:meth:`pandas.DataFrame.read_csv`
- `xlsx`: :py:meth:`pandas.DataFrame.read_excel`
- `xls`: :py:meth:`pandas.DataFrame.read_excel`
"""
format = format.strip(".")
if hash_key:
key = self.get_key_hash(key)
data = None
filepath = "/".join([self.path, "{}.{}".format(key, format)])
if self.use_s3:
try:
data = StringIO()
except TypeError:
data = BytesIO()
self.s3.download_fileobj(data, Bucket=self.bucket, Key=filepath)
data = data.getvalue()
else:
if os.path.exists(filepath):
try:
with closing(open(filepath, "r")) as infile:
data = infile.read()
except:
# TODO: handle this exception more explicitly
with closing(open(filepath, "rb")) as infile:
data = infile.read()
if is_not_null(data):
if format == "pkl":
try:
data = pickle.loads(data)
except TypeError:
data = None
except ValueError:
if "attempt_count" not in io_kwargs:
io_kwargs["attempt_count"] = 1
print(
"Insecure pickle string; probably a concurrent read-write, \
will try again in 5 seconds (attempt #{})".format(
io_kwargs["attempt_count"]
)
)
time.sleep(5)
if io_kwargs["attempt_count"] <= 3:
io_kwargs["attempt_count"] += 1
data = self.read(
key, format=format, hash_key=hash_key, **io_kwargs
)
else:
data = None
except Exception as e:
print("Couldn't load pickle! {}".format(e))
data = None
elif format in ["tab", "csv"]:
if format == "tab":
io_kwargs["delimiter"] = "\t"
try:
data = pd.read_csv(BytesIO(data), **io_kwargs)
except:
data = pd.read_csv(StringIO(data), **io_kwargs)
elif format in ["xlsx", "xls"]:
# https://stackoverflow.com/questions/64264563/attributeerror-elementtree-object-has-no-attribute-getiterator-when-trying
if "engine" not in io_kwargs:
io_kwargs["engine"] = "openpyxl"
try:
data = pd.read_excel(BytesIO(data), **io_kwargs)
except:
data = pd.read_excel(StringIO(data), **io_kwargs)
elif format == "json":
try:
data = json.loads(data)
except:
pass
elif format == "dta":
try:
data = pd.read_stata(BytesIO(data), **io_kwargs)
except:
data = pd.read_stata(StringIO(data), **io_kwargs)
elif format == "txt":
if isinstance(data, bytes):
data = data.decode()
return data