Source code for timetable.timetable
"""
Functions for timetable modification, for example merging, tagging and cutting
at specific dates.
"""
from __future__ import absolute_import
import re
from heapq import heappush, heappop
from datetime import timedelta
from itertools import count
from timetable import (parse_ical, iCalTimezone, Recurrence, parse_datetime,
uidgroups_by_type, generate_item_timetable)
tag_pat_braces = re.compile('\[([^]]*)\] *')
[docs]def annotate_tags(tag_pat=tag_pat_braces, emptytag='misc'):
"""Returns an annotation function which parses the items summary for a tag.
A tag is identified through the regular expression *tag_pat*. If no tag
is found, *emptytag* is applied.
.. note::
The summary is assumed to be encoded as UTF-8.
"""
def annotate(start, end, entry):
summary = str(entry['item'][b'summary'][0].value.decode('utf-8'))
match = tag_pat.search(summary)
tag = ''
if match:
tag = match.group(1).strip()
if not tag:
tag = emptytag
entry['tags'] = tag
return annotate
[docs]def collect_keys(key='tags', collection='entries'):
"""Returns an annotation function which collects *key* from a *collection*.
The resulting set of keys is added to the entries dictionary under the key
*key*.
This function is useful to extract tags from a merged timetable as returned
by :func:`merge_intersections` for example.
"""
def collect(start, end, entry):
result = set()
for item in entry[collection]:
result.add(item[key])
entry[key] = result
return collect
[docs]def compute_duration(key='tags'):
"""Returns an annotation function which computes the duration of an entry.
The duration is allocated equally for each *key* of the entry (e.g. divided
by the amount of keys)."""
def compute(start, end, entry):
entry['duration'] = (end - start).total_seconds() / len(entry[key])
return compute
[docs]def merge_timetables(timetables):
"""Generates a merged timetable from the given *timetables*. Entries are
sorted by their start time."""
# Force timetables into generators.
timetables = [iter(timetable) for timetable in timetables]
entry_id = count()
pending = []
for generator in timetables:
try:
start, end, entry = next(generator)
heappush(pending, (start, next(entry_id), end, entry, generator))
except StopIteration:
pass
while pending:
cur_start, idx, cur_end, entry, generator = heappop(pending)
yield cur_start, cur_end, entry
try:
start, end, entry = next(generator)
if cur_start > start:
raise RuntimeError('Input timetables are not sorted by start '
'date')
heappush(pending, (start, next(entry_id), end, entry, generator))
except StopIteration:
pass
[docs]def generate_timetable(calendar, itemtype=b'vevent'):
"""Generates a timetable from all items of type *itemtype* in the given
*calendar*."""
timezones = {}
for item in calendar.items:
if item.type == b'vtimezone':
timezones[item[b'tzid'][0].value] = iCalTimezone(item)
# Create generators for each item and let them be merged.
return merge_timetables([
generate_item_timetable(uid, group, timezones)
for uid, group in uidgroups_by_type(calendar, itemtype).items()
])
[docs]def clip_timetable(timetable, clip_start=None, clip_end=None, pending=None):
"""Generates a timetable by clipping entries from the given *timetable*.
Entries ending before *clip_start* are discarded as well as entries
starting after *clip_end*. Start and end times of entries lying on the
boundaries modified to match *clip_start* resp. *clip_end*. Entries on the
*clip_end* are added to the list *pending*, if it is supplied."""
if pending is None:
pending = []
idx = 0
while idx < len(pending):
start, end, value = pending[idx]
# Clip entry.
if clip_start is not None:
if end <= clip_start:
pending.pop(idx)
continue
if start < clip_start:
start = clip_start
if clip_end is not None:
if start >= clip_end:
break
if end > clip_end:
end = clip_end
yield start, end, value
if clip_end is not None and pending[0][1] > clip_end:
idx += 1
else:
pending.pop(idx)
# Process next entries.
for entry in timetable:
# Clip entry.
start, end, value = entry
if clip_start is not None:
if end <= clip_start:
continue
if start < clip_start:
start = clip_start
if clip_end is not None:
if start >= clip_end:
pending.append(entry)
break
if end > clip_end:
end = clip_end
pending.append(entry)
yield start, end, value
[docs]def cut_timetable(timetable, cuts=(None, None)):
"""Generates a timetable by cutting entries of *timetable* at the given
*cuts* datetimes."""
cuts = list(cuts)
if len(cuts) < 2:
raise ValueError('Expected at least two cut dates')
# Convert timetable into an iterator.
timetable = iter(timetable)
pending = []
for left_cut, right_cut in zip(cuts[:-1], cuts[1:]):
yield clip_timetable(timetable, left_cut, right_cut, pending)
[docs]def annotate_timetable(timetable, *annotate_funcs):
"""Annotates all entries of *timetable* with the result of all
*annotate_funcs*. The annotation functions must accept the arguments
*start*, *end*, *entry*.
"""
for start, end, entry in timetable:
for annotate_func in annotate_funcs:
annotate_func(start, end, entry)
yield start, end, entry
[docs]def load_timetable(calendar_data, clip_start, clip_end,
tag_pat=tag_pat_braces):
"""Loads and tags all events from the calendar files in the
*calendar_files* dictionary. The keys in *calendar_files* are passed into
:meth:`annotate_tags` as emptytag. The values in *calendar_files* are
filenames to iCal calendars. All events are clipped to *clip_start* and
*clip_end*."""
calendars = {}
for name, data in calendar_data.items():
# FIXME Allows the iCal spec multiple calendars per file?
calendars[name] = parse_ical(data)[0]
return list(clip_timetable(merge_timetables([
annotate_timetable(
generate_timetable(calendar),
annotate_tags(tag_pat=tag_pat_braces, emptytag=name))
for name, calendar in calendars.items()
]), clip_start=clip_start, clip_end=clip_end))
[docs]def merge_intersection(entries, start, end):
"""Generates a non-overlapping timetable from *entries*. *start*
and *end* limit the timespan for the intersection generation. The resulting
timetable contains entry dictionaries with the single key ``entries``,
whose value is the list of merged entries."""
if start is None:
return
dts = set([evt[1] if evt[1] < end else end for evt in entries])
dts.update([start, end])
dts = sorted(dts)
for start, end in zip(dts[:-1], dts[1:]):
entries[:] = [evt for evt in entries if evt[1] > start]
if not entries:
continue
yield start, end, {'entries': [evt[2] for evt in entries]}
[docs]def merge_intersections(timetable):
"""Generates a timetable with merged intersection of entries in
*timetable*. The resulting timetable will only contain entries with a
single key ``entries``, whose value is the list of the merged entries."""
from_dt = None
active = []
# Generate intersections by jumping from start to start of events.
for start, end, entry in timetable:
if from_dt is not None:
if start < from_dt:
raise ValueError('Timetable is not sorted by start date')
for intersection in merge_intersection(active, from_dt, start):
yield intersection
active.append((start, end, entry))
from_dt = start
# Generate left-over intersections.
if active:
end = max(entry[1] for entry in active)
for intersection in merge_intersection(active, from_dt, end):
yield intersection
[docs]def sum_timetable(timetable, cuts, key='tags'):
"""Computes a dictionary with timeseries of the activity for each *key*
in the given *cuts*. The activity duration is distributed evenly if there
are intersections."""
cuts = list(cuts)
sums = {}
for idx, sub_timetable in enumerate(cut_timetable(timetable, cuts)):
sub_timetable = merge_intersections(sub_timetable)
sub_timetable = annotate_timetable(sub_timetable, collect_keys(key))
for start, end, entry in sub_timetable:
duration = (end - start).total_seconds() / len(entry[key])
for name in entry[key]:
if not name in sums:
sums[name] = [0] * (len(cuts))
sums[name][idx + 1] += duration
return sums