__author__ = "Jwely"
# local imports
from dnppy import textio
# standard imports
import numpy
import os
from datetime import datetime, timedelta
from calendar import monthrange, isleap
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
[docs]class time_series():
"""
A subsettable time series object
The primary motivation for creating this object was to allow
a time series to be subsetted into any number of small chunks
but retain the ability to process and interrogate the time series
at any level with the exact same external syntax.
A time series object is comprised of a matrix of data, and may contain
an object list of subset time_series objects. Potentially unlimited
nesting of time series datasets is possible, for example: a years worth
of hourly data may be subsetted into 1-month time series, while each
of those is in turn subsetted into days. The highest level time series
will still allow opperations to be performed upon it.
All internal methods are built to handle this flexible definition of a
time series, where the steps of the method depend on weather the time
series is at its smallest subset or not.
MEMORY WARNING. The entirety of the dataset is represented in every layer
of subsetting, so watch out for exploding memory consumption by
excessive subsetting of gigantic datasets.
"""
def __init__(self, name = "name", units = None, subsetted = False,
disc_level = 0, parent = None):
"""
Initializes the time series
Attributes:
self.name # the name of this time series
self.units # unit of time represented by subset if subsetted
self.subsetted # does this time series have subsets?
self.disc_level # the subset level of this time_series
self.fmt # for interpreting timestrings to time objs
self.headers # one header for each col in dataset
self.time_col # index of data column with time info
self.time # separate copy of data[time_col]
self.time_dom # self.time converted to list of datetime objs
self.time_dec_days # self.time converted to mono rising decimal days
self.time_seconds # self.time converted to mono rising seconds
self.center_time # time around which data in a subset it centered
self.start_dto # datetime_object that mono rising times start from
self.subsets # object list containing constituent time_seires
self.row_data # row wise dataset
self.col_data # column wise dataset, built as dict
self.bad_rows # subset from data attribute with "bad rows"
self.infilepath # tracks filepath of input CSV. used to DISALLOW overwriting
# source CSV with output CSV.
"""
self.name = name # the name of this time series (string)
self.units = units # unit of time represented by subset if subsetted (string)
self.subsetted = subsetted # does this time series have subsets? (bool)
self.disc_level = disc_level # the subset level of this time_series (int)
self.fmt = False # for interpreting timestrings to time objs (string)
self.headers = [] # one header for each col in dataset (list of strings)
self.time_col = 0 # index of data column with time info (int)
self.time = [] # separate copy of data[time_col] (list of strings)
self.time_dom = False # self.time converted to (list of datetime objs)
self.time_dec_days = [] # self.time converted to (mono rising decimal days floats)
self.time_seconds = [] # self.time converted to (mono rising seconds floats)
self.center_time = [] # time around which data in a subset it centered (dto)
self.start_dto = [] # datetime_object that mono rising times start from (dto)
self.mean_interval = 0 # average number of seconds between data points (float)
self.subsets = [] # object list containing constituent time_series
self.row_data = [] # row wise dataset
self.col_data = [] # column wise dataset, built as dict
self.bad_rows = [] # subset from data attribute with "bad rows"
self.infilepath = [] # tracks filepath of input CSV. used to DISALLOW overwriting
# source CSV with output CSV.
# run some methods to build subset attributes
if parent:
self._get_atts_from(parent)
return
def __getitem__(self, arg):
"""
allows subsets of the time series to be accessed as follows, for example:
Time series whos first row is:
['20010101', '045316', '1', '20010101045316', 366]
With no discretization layers
command : result
ts[0] : ['20010101', '045316', '1', '20010101045316', 366]
ts[0][0] : 20010101
with one discretization layer
ts[0] : <__main__.time_series instance at 0x00000000023CEB48>
ts[0][0] : ['20010101', '045316', '1', '20010101045316', 366]
ts[0][0][0] : 20010101
with two discretization layers
ts[0] : <__main__.time_series instance at 0x00000000023CEB48>
ts[0][0] : <__main__.time_series instance at 0x00000000023CCA48>
ts[0][0][0] : ['20010101', '045316', '1', '20010101045316', 366]
ts[0][0][0][0]: 20010101
"""
# allows finding a slice of subsets or rows from a subset.
if isinstance(arg, slice):
a = arg.start
b = arg.stop
if self.subsetted:
if b > len(self.subsets):
b = len(self.subsets)
return [self.subsets[x] for x in range(a,b)]
else:
if b > len(self.row_data):
b = len(self.row_data)
return [self.row_data[x] for x in range(a,b)]
# allows finding subsets or rows by index.
elif isinstance(arg, int):
if self.subsetted:
return self.subsets[arg]
else:
return list(self.row_data[arg])
# allows finding subsets by name.
elif isinstance(arg, str):
if self.subsetted:
for subset in self.subsets:
if subset.name == arg:
return subset
else:
raise Exception("no subset with name {0}".format(arg))
else:
raise Exception("String input is allowed for subsetted time_series only!")
else:
raise Exception("Unrecognized argument type! use int, slice, or string!")
[docs] def _get_atts_from(self, parent_time_series):
"""
Allows bulk setting of attributes. Useful for allowing a subset to inherit
information from its parent time_series
:param parent_time_series: a time_series object from which to inherit
"""
self.fmt = parent_time_series.fmt
self.headers = parent_time_series.headers
self.time_col = parent_time_series.time_col
self.time_header = parent_time_series.time_header
self.disc_level = parent_time_series.disc_level + 1
return
@staticmethod
[docs] def _fmt_to_units(in_fmt):
"""
converts fmt strings to unit names of associated datetime attributes
:param fmt: datetime object style unit characters like %Y or %m
:return units: english version of input format. Example %Y -> "year"
"""
fmtlist = ["%Y", "%m", "%b", "%d", "%j", "%H", "%M", "%S"]
unitlist = ["year","month","month","day","day","hour","minute","second"]
if in_fmt in fmtlist:
unit = unitlist[fmtlist.index(in_fmt)]
return unit
if in_fmt in unitlist:
return in_fmt
else:
raise Exception("'{0}' is an invalid unit or format!".format(in_fmt))
@staticmethod
[docs] def _units_to_fmt(units):
"""
converts unit names to fmt strings used by datetime.stftime.
:param units: units are strings like "year", "month", "hour"
:return fmt: returns "fmt" equivalent of english units
"""
fmtlist = ["%Y", "%b", "%m", "%j", "%d", "%H", "%M", "%S"]
unitlist = ["year","month","month","day","day","hour","minute","second"]
if units in unitlist:
fmt = fmtlist[unitlist.index(units)]
return fmt
if units == "%b":
return "%m"
if units in fmtlist:
return units
else:
raise Exception("'{0}' is an invalid unit or format!".format(units))
@staticmethod
[docs] def _center_datetime(datetime_obj, units):
"""
Returns datetime obj that is centered on the "unit" of the input datetime obj
When grouping datetimes together, center times are important. This function allows
a center time with units equal to the users input (years, months, days , ...) to be
generated from the first datetime of the time series.
:param datetime_obj: any datetime object
:param units: units by which to center input datetime object
:return center_datetime: returns centered datetime object.
"""
dto = datetime_obj
if units == "day":
# noon minus 1 millisecond. (better handles daily data with no hour info)
return datetime(dto.year, dto.month, dto.day, 11, 59, 59, 999)
if units == "month":
# we need to find the middle day of the month which could be 14 or 15
middle = (monthrange(dto.year, dto.month)[1]) / 2
return datetime(dto.year, dto.month, middle, 0)
if units == "year":
return datetime(dto.year, 7, 2, 0)
[docs] def _units_to_seconds(self, units, dto = None):
"""
converts other time units to seconds
:param units: some english unit such as "hour", "day", etc.
:return seconds: the numer of seconds in an input unit
"""
# ensure proper unit formatting
units = self._fmt_to_units(units)
if units == "second": return 1.0
if units == "minute": return 60.0
if units == "hour": return 60.0 * 60.0
if units == "day": return 60.0 * 60.0 * 24.0
if units == "month":
if dto:
return 60.0 * 60.0 * 24.0 * monthrange(dto.year, dto.month)[1]
else:
return 60.0 * 60.0 * 24.0 * (365.25/12)
if units == "year":
if dto:
return 60.0 * 60.0 * 24.0 * (365 + isleap(dto.year) *1)
else:
return 60.0 * 60.0 * 24.0 * 365.25
@staticmethod
[docs] def _seconds_to_units(seconds, units):
"""
converts seconds to other time units
:param seconds: number of seconds
:param units: units to convert those seconds to
:return: time equivalent of input seconds expressed as input units
"""
if units == "second":
return 1.0
if units == "minute":
return seconds / 60.0
if units == "hour":
return seconds / (60.0 * 60.0)
if units == "day":
return seconds / (60.0 * 60.0 * 24.0)
if units == "month":
return seconds / (60.0 * 60.0 * 24.0 * (365.25 / 12.0))
if units == "year":
return seconds / (60.0 * 60.0 * 24.0 * 365.25)
[docs] def _name_as_subset(self, binned = False):
"""
uses time series object to descriptively name itself. Naming
subsets as bins will name them based only on smallest unit of
discretization.
:param binned: set to True to name subsets as bins
"""
subset_units = self.units
if isinstance(self.center_time, datetime):
datetime_obj = self.center_time
else:
datetime_obj = self.time_dom[0]
if binned:
self.name = datetime_obj.strftime(self._units_to_fmt(self.units))
else:
if subset_units == "year" or subset_units == "%Y":
self.name = datetime_obj.strftime("%Y")
if subset_units == "month" or subset_units == "%m":
self.name = datetime_obj.strftime("%Y-%b")
if subset_units == "day" or subset_units == "%d":
self.name = datetime_obj.strftime("%Y-%m-%d")
if subset_units == "hour" or subset_units == "%H":
self.name = datetime_obj.strftime("%Y-%m-%d-%H")
if subset_units == "minute" or subset_units == "%M":
self.name = datetime_obj.strftime("%Y-%m-%d-%H:%M")
if subset_units == "second" or subset_units == "%S":
self.name = datetime_obj.strftime("%Y-%m-%d-%H:%M:%S")
return
[docs] def from_tdo(self, tdo):
"""
reads time series data from a dnppy.text_data_class object
:param tdo: a dnppy.text_data_class object containing time data
"""
self.headers = tdo.headers
self.enf_unique_headers()
self.row_data = tdo.row_data
self.build_col_data()
return
[docs] def from_csv(self, filepath, delim = ','):
"""
Simple reader of a delimited file. To read more complex text data
into a time series object, use a custom reader function to return
a text_data_class object and feed it into this time series with
time_series_object.from_tdo(text_data_object)
To read csvs straight to a time_series object, it must have headers.
"""
tdo = textio.read_csv(filepath, True, delim)
self.from_tdo(tdo)
return
[docs] def to_csv(self, csv_path):
"""
Writes the row data of this time_series to a csv file.
:param csv_path: filepath at which to create new csv file.
"""
# disallow overwriting the csv used as input. Added by request
if os.path.abspath(self.infilepath) == os.path.abspath(csv_path):
csv_path = csv_path.replace(".csv", "_out.csv")
print("Saved time series '{0}' with {1} rows and {2} columns".format(
self.name, len(self.row_data), len(self.col_data)))
tdo = textio.text_data( text_filepath = csv_path,
headers = self.headers,
row_data = self.row_data)
tdo.write_csv()
return
[docs] def from_list(self, data, headers, time_header, fmt):
"""
creates the time series data from a list
:param data: list of lists making up rows and columns of data
:param headers: list of headers (column names)
:param time_header: string of header over the column representing time
:param fmt: the format of data in that time column
"""
self.row_data = data
self.headers = headers
# populates self.time with time series
self.define_time(self.time_header, self.fmt)
self.build_col_data()
return
[docs] def build_col_data(self):
""" builds columnwise data matrix with an actual dict """
temp_col = zip(*self.row_data)
self.col_data = {}
for i, col in enumerate(temp_col):
self.col_data[self.headers[i]] = list(col)
return
[docs] def clean(self, col_header, high_thresh = False, low_thresh = False):
"""
Removes rows where the specified column has an invalid number
or is outside the defined thresholds (above high_thresh or below low_thresh)
:param col_header: name of column to clean
:param high_thresh: maximum valid value of data in that column
:param low_thresh: minimum valid value of data in that column
"""
# loop cleaning for multiple column header inputs
if isinstance(col_header, list):
for col_head in col_header:
self.clean(col_head)
# clean for just one input column header
else:
if not col_header in self.headers:
raise LookupError("{0} header not in dataset!".format(col_header))
col_index = self.headers.index(col_header)
temp_data = self.row_data
self.row_data = []
bad_count = 0
for row in temp_data:
try:
test = float(row[col_index])
if high_thresh == False and low_thresh == False:
self.row_data.append(row)
elif high_thresh == False and low_thresh != False:
if test >= low_thresh:
self.row_data.append(row)
elif high_thresh != False and low_thresh == False:
if test <= high_thresh:
self.row_data.append(row)
elif high_thresh != False and low_thresh != False:
if test >= low_thresh and test <= high_thresh:
self.row_data.append(row)
except:
bad_count += 1
self.bad_rows.append(row)
continue
if bad_count >0:
print("Removed {0} rows from '{1}' with invalid '{2}'".format(
bad_count, self.name, col_header))
# since rows have been removed, we must redefine the time domain. (sloppy, but concise)
self.define_time(self.time_header, self.fmt, self.start_dto)
if self.subsetted:
for subset in self.subsets:
subset.clean(col_header)
return
[docs] def rebuild(self, destroy_subsets = False):
"""
Reconstructs the time series from its constituent subsets
:param destroy_subsets: Set to TRUE to destroy the existing subsets
of the time series, which will allow them to
rebuilt in a different manner.
"""
# handles time series with multiple levels of discretezation
while self.subsets[0].subsetted:
for subset in self.subsets:
subset.rebuild(destroy_subsets)
else:
if self.subsetted:
self.row_data = []
for subset in self.subsets:
for row in subset.row_data:
self.row_data.append(row)
self.define_time(self.time_header, self.fmt)
if destroy_subsets:
self.subsets = []
self.subsetted = False
return
[docs] def merge_cols(self, header1, header2):
"""
merges two columns together (string concatenation) into a new column.
The new column will be named [header1]_[header2].
:param header1: the name of the 1st column to merge
:param header2: the name of the 2nd column to merge
"""
new_header = "_".join([header1, header2])
for i, entry in enumerate(self.row_data):
new_field = "".join([self.col_data[header1][i], self.col_data[header2][i]])
self.row_data[i].append(new_field)
# updates column and row data
self.headers.append(new_header)
self.build_col_data()
print("merged '{0}' and '{1}' columns into new column '{2}'".format(header1, header2, new_header))
if self.subsetted:
for subset in self.subsets:
subset.merge_cols(header1, header2)
return
[docs] def _build_time(self, time_header, fmt, start_date = False):
"""
This internal use function is called twice by "define_time". Once to turn
all the datestamps into datetime objects, then a second time once the entire
dataset has been sorted in ascending time order by those datetime objects.
This is to ensure all time values are in terms of the correct start time,
which can be no later than the earliest entry in the dataset.
:param time_header: name of column with time data in it
:param fmt: the fmt string to interpret time data into datetime objects
:param start_date: The date to count up from.
"""
# set format
self.fmt = fmt
# extract time column
self._extract_time(time_header)
# allows column index integers as time_header inputs
if isinstance(time_header, int):
time_header = self.headers[time_header]
# use manual start date (str or dto) or set to begining of first day on record
if isinstance(start_date, str):
start = datetime.strptime(start_date, fmt)
elif isinstance(start_date, datetime):
start = start_date
else:
earliest = datetime.strptime(self.time[0], fmt)
start = datetime(earliest.year, earliest.month, earliest.day, 0,0,0,0)
# convert datestamps into datetime objects
datestamp_list = self.time
self.time_dom = []
self.time_dec_days = []
self.time_seconds = []
for i,datestamp in enumerate(datestamp_list):
# If error, give user information about the line on which the error occurs
try:
t = datetime.strptime(datestamp, fmt)
except:
raise Exception("Input '{0}' in line {1} is not of format {2}".format(
datestamp, i+2 , fmt))
# initial absolute time ordering to help with sorting.
self.time_dom.append(t)
delta = t - start
delta = float(delta.total_seconds())
self.time_seconds.append(float(delta))
self.time_dec_days.append(float(delta / 86400))
self.start_dto = start
return
[docs] def define_time(self, time_header, fmt, start_date = False):
"""
Converts time strings into time objects for standardized processing. For tips
on how to use 'fmt' variable, see
[https://docs.python.org/2/library/datetime.html#strftime-strptime-behavior]
Time header variable can be either the header string, or a column index num
Creates
A converted list of time objects (self.time_dom)
A new list of monotonically increasing decimal days (self.time_dec_days)
A new list of monotonically increasing second values (self.time_seconds)
:param time_header: name of column with time data in it
:param fmt: the fmt string to interpret time data into datetime objects
:param start_date: The date to count up from.
"""
# build time vectors for the first time
self._build_time(time_header, fmt, start_date)
# sort data such that it is in ascending order by time.
sorted_rows = range(len(self.row_data))
indices = range(len(self.row_data))
indices = sorted(indices, key = self.time_seconds.__getitem__)
for i,j in enumerate(indices):
sorted_rows[i] = self.row_data[j]
self.row_data = sorted_rows
self.build_col_data()
# recalculate time domain information now that rows are in proper order
if min(self.time_seconds) < 0:
self._build_time(time_header, fmt, start_date)
# calculate the mean_interval in seconds
self.span = self.time_dom[-1] - self.time_dom[0]
self.mean_interval = self.span.total_seconds()/len(self.time_dom)
# perform same operation on each subset
if self.subsetted:
for subset in self.subsets:
subset.define_time(time_header, fmt)
return
[docs] def make_subsets(self, subset_units, overlap_width = 0,
cust_center_time = False, discard_old = False):
"""
splits the time series into individual time chunks
used for taking advanced statistics, usually periodic
in nature. Also useful for exploiting temporal relationships
in a dataset for a variety of purposes including data
sanitation, periodic curve fitting, etc.
:param subset_units:
subset_units follows convention of fmt. For example:
%Y groups files by year
%m groups files by month
%j groups file by julian day of year
:param overlap_width:
this variable can be set to greater than 0
to allow "window" type statistics, so each subset may contain
data points from adjacent subsets.
overlap_width = 1 is like a window size of 3
overlap_width = 2 is like a window size of 5
WARNING: this function is imperfect for making subsets by months.
the possible lengths of a month are extremely variant, so sometimes
data points at the ends of a month will get placed in the adjacent
month. If you absolutely require accurate month subsetting,
you should use this function to subset by year, then use the
"group_bins" function to bin each year by month. so,
.. code-block:: python
ts.subset("%b") # subset by month
ts.subset("%Y") # subset by year
ts.group_bins("%b") # bin by month
:param cust_center_time:
Allows a custom center time to be used! This was added so that
days could be centered around a specific daily acquisition time.
for example, its often useful to define a day as
satellite data acquisition time +/- 12 hours.
if used, "cust_center_time" must be a datetime object!
:param discard_old:
By default, performing a subsetting on a time series that already
has subsets does not subset the master time series, but instead
the lowest level subsets. Setting "discard_old" to "True" will
discard all previous subsets of the time series and start subsetting
from scratch.
"""
if discard_old:
self.subsetted = False
self.subsets = []
if self.subsetted:
for subset in self.subsets:
subset.make_subsets(subset_units, overlap_width)
else:
self.subsetted = True
# sanitize overlap
if overlap_width < 0:
print("overlap_width must be 0 or more, setting it to 0!")
overlap_width = 0
# convert units into subset units and into terms of seconds
# timedelta objects can only have units of days or seconds, so we use seconds
subset_units = self._fmt_to_units(subset_units)
# initial step width
step_width = self._units_to_seconds(subset_units)
if subset_units not in ['year','month','day','minute']:
raise Exception("Data is too high resolution to subset by {0}".format(subset_units))
print("Subsetting data by {0}".format(subset_units))
if self.time_dom == False:
raise Exception("must call 'define_time' method before taking subsets!")
# determine subset lists starting, end points and increment
time_s = self.time_dom[0]
time_f = self.time_dom[-1]
# set up starttime with custom center times.
if cust_center_time:
print("using custom center time")
if subset_units == "month":
ustart = datetime(time_s.year, time_s.month,
cust_center_time.day, cust_center_time.hour,
cust_center_time.minute, cust_center_time.second,
cust_center_time.microsecond)
elif subset_units == "hour":
ustart = datetime(time_s.year, time_s.month, time_s.day, time_s.hour,
cust_center_time.minute, cust_center_time.second,
cust_center_time.microsecond)
elif subset_units == "minute":
ustart = datetime(time_s.year, time_s.month, time_s.day, time_s.hour, time_s.minute,
cust_center_time.second, cust_center_time.microsecond)
else: #subset_units == "day":
ustart = datetime(time_s.year, time_s.month, time_s.day,
cust_center_time.hour, cust_center_time.minute,
cust_center_time.second, cust_center_time.microsecond)
td = time_f - time_s
uend = cust_center_time + timedelta(seconds = td.total_seconds())
# otherwise, set the centers with no offset
else:
ustart = self._center_datetime(time_s, subset_units)
uend = self._center_datetime(time_f, subset_units) + timedelta(seconds = step_width)
# Iterate through entire dataset one time step unit at a time.
delta = uend - ustart
center_time = ustart
while center_time < uend:
step_width = self._units_to_seconds(subset_units, center_time)
wind_seconds = step_width * (overlap_width + 0.5)
temp_data = []
for j, current_time in enumerate(self.time_dom):
dt = abs(center_time - current_time)
if dt.total_seconds() < wind_seconds:
temp_data.append(self.row_data[j])
# create the subset only if some data was found to populate it
if len(temp_data) > 0:
new_subset = time_series(units = subset_units, parent = self)
new_subset.center_time = center_time
new_subset.from_list(temp_data, self.headers, self.time_header, self.fmt)
new_subset.define_time(self.time_header, self.fmt)
new_subset._name_as_subset()
self.subsets.append(new_subset)
center_time += timedelta(seconds = step_width)
return
[docs] def group_bins(self, fmt_units, overlap_width = 0, cyclical = True):
"""
Sorts the time series into time chunks by common bin_unit
used for grouping data rows together. For example, if one used
this function on a 5 year dataset with a bin_unit of month,
then the time_series would be subseted into 12 sets (1 for each
month), which each set containing all entries for that month,
regardless of what year they occurred in.
:param fmt_units:
%Y groups files by year
%m groups files by month
%j groups file by julian day of year
:param overlap_width:
similarly to "make_subsets" the "overlap_width" variable can be
set to greater than 1 to allow "window" type statistics, so
each subset may contain data points from adjacent subsets.
However, for group_bins, overlap_width must be an integer.
:param cyclical:
"cyclical" of "True" will allow end points to be considered adjacent.
So, for example, January will be considered adjacent to December,
day 1 will be considered adjacent to day 365.
"""
ow = int(overlap_width)
if self.subsetted:
for subset in self.subsets:
subset.group_bins(fmt_units, overlap_width, cyclical)
else:
self.subsetted = True
# ensure proper unit format is present
fmt = self._units_to_fmt(fmt_units)
units = self._fmt_to_units(fmt_units)
# set up cyclical parameters
if fmt == "%j": cylen = 365
if fmt == "%d": cylen = 365
if fmt == "%m": cylen = 12
if fmt == "%b": cylen = 12
# initialize a grouping array to idenfity row indices for each subset
grouping = [int(obj.strftime(fmt)) for obj in self.time_dom]
for i in xrange(min(grouping),max(grouping) + 1):
subset_units = self._fmt_to_units(fmt)
new_subset = time_series( units = subset_units, parent = self)
# only take rows whos grouping is within ow of i
subset_rows = [j for j,g in enumerate(grouping) if g <= i+ow and g >=i-ow]
# fix endpoints
if cyclical:
if i <= ow:
subset_rows = subset_rows + [j for j,g in enumerate(grouping) if g-cylen <= i+ow and g-cylen >=i-ow]
elif i >= cylen - ow:
subset_rows = subset_rows + [j for j,g in enumerate(grouping) if g+cylen <= i+ow and g+cylen >=i-ow]
# grab row indeces from parent matrix to put in the subset
subset_data = [self.row_data[row] for row in subset_rows]
# run naming methods and definitions on the new subset
if not len(subset_data) == 0:
new_subset.from_list(subset_data, self.headers, self.time_header, self.fmt)
new_subset.define_time(self.time_header, self.fmt)
new_subset.center_time = self.time_dom[grouping.index(i)]
new_subset._name_as_subset(binned = True)
self.subsets.append(new_subset)
return
[docs] def column_stats(self, col_header):
"""
takes statistics on a specific column of data
creates object attributes according to the column name. for example:
for col_header = "temperature", the following attribute are created
self.temperature_max_v # maximum value
self.temperature_min_v # minimum value
self.temperature_max_i # index value where maximum occurs
self.temperature_min_i # index value where minimum occurs
self.temperature_avg # average
self.temperature_std # standard deviation
:param col_header: name of column on which to take statistics
:return statistics: a dictionary of the column statistical values
"""
print("calculating stats for time_series '{0}', col '{1}'".format(self.name,col_header))
# pull column data and find some stats
import numpy as np
self.clean(col_header)
col_data = map(float, self.col_data[col_header])
# build array of stats
stats = [max(col_data),
min(col_data),
col_data.index(max(col_data)),
col_data.index(min(col_data)),
np.mean(col_data),
np.std(col_data)]
# build array of names
names = ["{0}_max_v".format(col_header),
"{0}_min_v".format(col_header),
"{0}_max_i".format(col_header),
"{0}_min_i".format(col_header),
"{0}_avg".format(col_header),
"{0}_std".format(col_header)]
# set attributes for names and stats, also make dict for immediate return
statistics = {}
for i,stat in enumerate(stats):
setattr(self, names[i], stats[i])
statistics[names[i]] = stats[i]
if self.subsetted:
for subset in self.subsets:
subset.column_stats(col_header)
return statistics
[docs] def subset_stats(self, col_header):
"""
Creates a new time_series object, which is built from column statistics of this
time_series's subsets. For example:
Lets say we have a years worth of hourly temperature data, and we want to get
daily summaries of temperature statistics. To do this, the syntax would look
like this:
.. code-block:: python
temperature_ts.make_subsets(%d)
daily_sum_ts = temperature_ts.subset_stats("Temp")
This function is not yet finished.
"""
print("This function is unfinished") # flag
return
[docs] def column_plot(self, col_headers, title = "", xlabel = "", ylabel = "", save_path = None):
"""
plots a specific column or column(s) by header name
Accepts custom title input and y-axis label. If a save_path is
specified, it will save the plot to that path and close it automatically.
:param col_headers: list of columns to plot
:param title: title to place on plot
:param xlabel: label for x axis
:param ylabel: label for y axis
:param save_path: filepath at which to save figure as image.
"""
# figure out temporal resolution of data to appropiately label x-axis
if self.mean_interval > 2592000:
fmt = "%Y %b"
elif self.mean_interval > 86400:
fmt = "%Y %b %d"
elif self.mean_interval > 3600:
fmt = "%Y %b %d %H"
elif self.mean_interval > 60:
fmt = "%Y %b %d %H:%M"
else:
fmt = "%Y %b %d %H:%M:%S"
self.plot_fmt = fmt
# set col_headers input to type "list"
if isinstance(col_headers, str):
col_headers = [col_headers]
# initialize plot
fig, ax = plt.subplots(figsize = (16,10), dpi = 80)
for col_header in col_headers:
stats = self.column_stats(col_header)
ax.plot(self.time_dom, self.col_data[col_header], label = col_header)
# date formatting stuff
ax.fmt_xdata = mdates.DateFormatter(self.plot_fmt)
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter(self.plot_fmt))
plt.gcf().autofmt_xdate()
# legend and titling
ax.legend(loc = 2)
plt.ylabel(ylabel, fontsize = 16)
plt.ylabel(xlabel, fontsize = 16)
plt.suptitle(self.name)
plt.title(title, fontsize = 20)
plt.grid()
if save_path:
plt.show(block = False)
plt.savefig(save_path)
plt.close()
else:
plt.show(block = True)
return fig, ax
[docs] def normalize(self, col_header):
"""
Used to normalize specific columns in the time series. Normalization
will scale all value in the time series to be between 0 and 1
"""
# make sure data is cleaned for numerical formatting
self.clean(col_header)
temp_col = self.col_data[col_header]
temp_col = map(float, temp_col)
# perform normalization
minval = min(temp_col)
maxval = max(temp_col)
c = self.headers.index(col_header)
for i,row in enumerate(self.row_data):
self.row_data[i][c] = (float(row[c]) - minval) / (maxval - minval)
print("data in column '{0}' has been normalized!".format(col_header))
if self.subsetted:
for subset in self.subsets:
subset.normalize(col_header)
return
[docs] def add_mono_time(self):
""" Adds a monotonically increasing time column with units of decimal days """
# add an entry to every row item in row_data, then rebuild column data
if "decimal_days" not in self.headers:
self.headers.append("decimal_days")
for i, row in enumerate(self.row_data):
self.row_data[i].append(self.time_dec_days[i])
self.build_col_data()
if self.subsetted:
for subset in self.subsets:
subset.add_mono_time()
return
[docs] def interp_col(self, time_obj, col_header):
"""
For input column, interpolate values to estimate value at input time_obj.
input time_obj may also be of datestring matching declared fmt.
:param time_obj: A datetime object
:param col_header: The name of the column to interpolate at time (time_obj)
:return interp_y: The interpolated value of input column at input time
"""
# start by cleaning data by input column
self.clean(col_header)
# x and y data for interpolation
y = self.col_data[col_header]
x = self.time_seconds
if not isinstance(time_obj, datetime):
time_obj = datetime.strptime(time_obj, self.fmt)
delta = time_obj - self.start_dto
interp_x = delta.total_seconds()
interp_y = numpy.interp(interp_x, x, y)
print("Val in '{0}' at time '{1}' is '{2}'".format(col_header, time_obj, interp_y))
return interp_y
[docs] def interrogate(self):
""" prints a heads up stats table of all subsets in this time_series """
if self.disc_level == 0:
print("")
print("="*84)
print("time_series name \t\t len \t start \t\t\t end")
print("="*84)
# use leading and trailing spaces for visualizing subset depth
padded_name = self.disc_level * " " + self.name
print("{0} \t {1} \t {2} \t {3}".format(
padded_name.ljust(28, " "),
str(len(self.time)).ljust(5," "),
self.time_dom[0],
self.time_dom[-1]))
if self.subsetted:
for subset in self.subsets:
subset.interrogate()
return
# testing code
if __name__ == "__main__":
filepath = r"test_data\weather_dat.txt" # define filepath with text data
tdo = textio.read_DS3505(filepath) # build a "text data object" (tdo)
print(tdo.headers) # print the headers
print(tdo.row_data[0]) # print the first row
ts = time_series('weather_data') # initialize a time series named "weather_data"
ts.from_tdo(tdo) # populate it with the contents of the tdo
print ts.headers # view the headers of this time series
timecol = "YR--MODAHRMN" # identify the column with time information in it
fmt = "%Y%m%d%H%M" # specify the format of strings in this column
ts.define_time(timecol, fmt) # converts text data into datetimes
ts.interrogate() # print a heads up summary of the time series
ts.make_subsets("%d") # subset the data into daily chunks, no overlap
ts.interrogate() # print a heads up summary of the time series
ts.column_plot("TEMP") # no frills plot of temperature
ts.rename_header("TEMP","Temperature") # give header more descriptive name
ts.rename_header("DEWP","Dewpoint") # give header more descriptive name
jul21 = ts["2013-07-21"] # lets pull out the subset time_series for july 21st
jul21.column_plot(["Temperature","Dewpoint"], # Add a few nice labels to the plot
title = "Temperature and Dewpoint",
xlabel = "Date and Time",
ylabel = "Degrees F")
ts.make_subsets("%d", overlap_width = 1, # re-subset the time series with 1 day overlap width
discard_old = True) # and discard the old subsets
ts.interrogate() # print a heads up summary
jul21 = ts["2013-07-21"] # pull out the new july 21st subset
jul21.column_plot(["Temperature","Dewpoint"], # plot the data again, save it this time
title = "Temperature and Dewpoint",
ylabel = "Degrees F",
save_path = "test.png")
ts.add_mono_time() # add monotonically increasing time value to each row
ts.to_csv(r"test_data\weather_csv.txt") # save this time_series dataset to a CSV.
ts.from_csv(r"test_data\weather_csv.txt") # reload this time_series from a CSV. note that the time
# series must be re-subsetted as before, this information
# was not preserved.