Source code for thermostat.parallel
import pandas as pd
from eemeter.location import _load_zipcode_to_station_index
from collections import defaultdict
from itertools import cycle
from zipfile import ZipFile
import tempfile
import os
[docs]def schedule_batches(metadata_filename, n_batches, zip_files=False, batches_dir=None):
""" Batch scheduler for large sets of thermostats. Can either create
zipped directories ready be sent to separate processors for parallel
processing, or unpackaged metadata dataframes for more flexible processing.
Parameters
----------
metadata_filename : str
Full path to location of file containing CSV formatted metadata for
n_batches : int
Number of batches desired. Should be <= the number of available
thermostats.
zip_files : boolean
If True, create zipped directories of metadata and interval data. Each
batch will be named batch_XXXXX.zip, and will contain a directory named
`data`, which contains metadata and interval data for the batch. Must
supply `batches_dir` argument to use this option.
batches_dir : str
Path to directory in which to save created batches. Ignored for
zip_files=False.
Returns
-------
batches : list of str or list of pd.DataFrame
If zip_files is True, then returns list of names of created zip files.
Otherwise, returns list of metadata dataframes containing batches.
"""
if zip_files:
if batches_dir is None:
message = "Cannot have batches_dir==None when zip_files==True. " \
"Please supply a directory in which to save batches."
raise ValueError(message)
metadata_df = pd.read_csv(metadata_filename, dtype={"zipcode": str})
index = _load_zipcode_to_station_index()
stations = [index[zipcode] for zipcode in metadata_df.zipcode]
n_rows = metadata_df.shape[0]
# group rows by stations then order groups by number of stations.
rows_by_station = defaultdict(list)
for station, (i, row) in zip(stations, metadata_df.iterrows()):
rows_by_station[station].append(row)
ordered_rows = [rows_by_station[i[0]] for i in sorted([ (s, len(rs))
for s, rs in rows_by_station.items()], key=(lambda x: x[1]))]
# iterate over row groups, greedily adding contents to batches
batches = [[] for i in range(n_batches)]
batch_sizes = _get_batch_sizes(n_rows, n_batches)
current_batch = cycle(range(n_batches))
for rows in ordered_rows:
n_rows = len(rows)
n_rows_taken = 0
while n_rows_taken < n_rows:
batch_i = next(current_batch)
target_batch_size = batch_sizes[batch_i]
batch = batches[batch_i]
space_left = target_batch_size - len(batch)
rows_to_add = rows[n_rows_taken:n_rows_taken + space_left]
n_rows_taken += len(rows_to_add)
batch.extend(rows_to_add)
batch_dfs = [pd.DataFrame(rows) for rows in batches]
if zip_files:
if not os.path.exists(batches_dir):
os.makedirs(batches_dir)
batch_zipfile_names = []
for i, batch_df in enumerate(batch_dfs):
batch_name = "batch_{:05d}.zip".format(i)
batch_zipfile_name = os.path.join(batches_dir, batch_name)
batch_zipfile_names.append(batch_zipfile_name)
_, fname = tempfile.mkstemp()
batch_df.to_csv(fname, index=False)
with ZipFile(batch_zipfile_name, 'w') as batch_zip:
batch_zip.write(fname, arcname=os.path.join('data', 'metadata.csv'))
for filename in batch_df.interval_data_filename:
interval_data_source = os.path.join(os.path.dirname(metadata_filename), filename)
batch_zip.write(interval_data_source, arcname=os.path.join('data', filename))
return batch_zipfile_names
else:
return batch_dfs
def _get_batch_sizes(n_rows, n_batches):
n_base = int(n_rows / n_batches)
remainder = (n_rows % n_batches)
return [n_base + int(i < remainder) for i in range(n_batches)]