diff --git a/graphite_api/app.py b/graphite_api/app.py index 6a46b13..6af0866 100644 --- a/graphite_api/app.py +++ b/graphite_api/app.py @@ -548,6 +548,117 @@ def render(): return response +@app.route('/tags/tagSeries', methods=['POST']) +def tags_series(): + path = RequestParams.get('path') + if not path: + return jsonify({'error': 'no path specified'}, status=400) + + return jsonify(app.store.tagdb.tag_series(path)) + + +@app.route('/tags/tagMultiSeries', methods=['POST']) +def tags_multiseries(): + paths = [] + # Normal format: ?path=name;tag1=value1;tag2=value2&path=name;tag1=value2;tag2=value2 + if len(RequestParams.getlist('path')) > 0: + paths = RequestParams.getlist('path') + # Rails/PHP/jQuery common practice format: ?path[]=...&path[]=... + elif len(RequestParams.getlist('path[]')) > 0: + paths = RequestParams.getlist('path[]') + else: + return jsonify({'error': 'no paths specified'}, status=400) + + return jsonify(app.store.tagdb.tag_multi_series(paths)) + + +@app.route('/tags/delSeries', methods=['POST']) +def tags_delseries(): + paths = [] + # Normal format: ?path=name;tag1=value1;tag2=value2&path=name;tag1=value2;tag2=value2 + if len(RequestParams.getlist('path')) > 0: + paths = RequestParams.getlist('path') + # Rails/PHP/jQuery common practice format: ?path[]=...&path[]=... + elif len(RequestParams.getlist('path[]')) > 0: + paths = RequestParams.getlist('path[]') + else: + return jsonify({'error': 'no path specified'}, status=400) + + return jsonify(app.store.tagdb.del_multi_series(paths)) + + +@app.route('/tags/findSeries', methods=methods) +def tags_findseries(): + exprs = [] + # Normal format: ?expr=tag1=value1&expr=tag2=value2 + if len(RequestParams.getlist('expr')) > 0: + exprs = RequestParams.getlist('expr') + # Rails/PHP/jQuery common practice format: ?expr[]=tag1=value1&expr[]=tag2=value2 + elif len(RequestParams.getlist('expr[]')) > 0: + exprs = RequestParams.getlist('expr[]') + + if not exprs: + return jsonify({'error': 'no tag expressions specified'}, status=400) + + return jsonify(app.store.tagdb.find_series(exprs)) + + +@app.route('/tags', methods=['GET']) +def tags_taglist(): + return jsonify(app.store.tagdb.list_tags( + tagFilter=RequestParams.get('filter'), + limit=RequestParams.get('limit'), + )) + + +@app.route('/tags/', methods=['GET']) +def tags_tagdetails(tag): + return jsonify(app.store.tagdb.get_tag( + tag, + valueFilter=RequestParams.get('filter'), + limit=RequestParams.get('limit'), + )) + + +@app.route('/tags/autoComplete/tags', methods=methods) +def tags_autocomplete_tags(): + exprs = [] + # Normal format: ?expr=tag1=value1&expr=tag2=value2 + if len(RequestParams.getlist('expr')) > 0: + exprs = RequestParams.getlist('expr') + # Rails/PHP/jQuery common practice format: ?expr[]=tag1=value1&expr[]=tag2=value2 + elif len(RequestParams.getlist('expr[]')) > 0: + exprs = RequestParams.getlist('expr[]') + + return jsonify(app.store.tagdb_auto_complete_tags( + exprs, + tagPrefix=RequestParams.get('tagPrefix'), + limit=RequestParams.get('limit'), + )) + + +@app.route('/tags/autoComplete/values', methods=methods) +def tags_autocomplete_values(): + exprs = [] + # Normal format: ?expr=tag1=value1&expr=tag2=value2 + if len(RequestParams.getlist('expr')) > 0: + exprs = RequestParams.getlist('expr') + # Rails/PHP/jQuery common practice format: ?expr[]=tag1=value1&expr[]=tag2=value2 + elif len(RequestParams.getlist('expr[]')) > 0: + exprs = RequestParams.getlist('expr[]') + + tag = RequestParams.get('tag') + if not tag: + return jsonify({'error': 'no tag specified'}, status=400) + + return jsonify(app.store.tagdb_auto_complete_values( + exprs, + tag, + valuePrefix=RequestParams.get('valuePrefix'), + limit=RequestParams.get('limit'), + )) + + def tree_json(nodes, base_path, wildcards=False): results = [] diff --git a/graphite_api/config.py b/graphite_api/config.py index 31ea4e2..1f1fd81 100644 --- a/graphite_api/config.py +++ b/graphite_api/config.py @@ -37,6 +37,16 @@ '/srv/graphite/whisper', ], }, + 'tagdb': { + 'path': 'graphite_api.tags.redis.RedisTagDB', + 'autocomplete_limit': 100, + 'redis': { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + 'password': '', + }, + }, 'time_zone': get_localzone().zone, } if default_conf['time_zone'] == 'local': # tzlocal didn't find anything @@ -134,7 +144,9 @@ def configure(app): finders = [] for finder in config['finders']: finders.append(load_by_path(finder)(config)) - loaded_config['store'] = Store(finders) + path = config.get('tagdb', {}).get('path') or 'graphite_api.tags.base.DummyTagDB' + tagdb = get_tagdb(path, config, app.cache) + loaded_config['store'] = Store(finders, tagdb=tagdb) app.config['GRAPHITE'] = loaded_config app.config['TIME_ZONE'] = config['time_zone'] logger.info("configured timezone", timezone=app.config['TIME_ZONE']) @@ -185,3 +197,9 @@ def configure_logging(config): logger.info("loading configuration", path=config['path']) else: logger.info("loading default configuration") + + +def get_tagdb(tagdb_path, config, cache=None): + module_name, class_name = tagdb_path.rsplit('.', 1) + module = import_module(module_name) + return getattr(module, class_name)(config, cache=cache) diff --git a/graphite_api/storage.py b/graphite_api/storage.py index 1062dcf..c390897 100644 --- a/graphite_api/storage.py +++ b/graphite_api/storage.py @@ -9,8 +9,9 @@ class Store(object): - def __init__(self, finders=None): + def __init__(self, finders=None, tagdb=None): self.finders = finders + self.tagdb = tagdb def find(self, pattern, startTime=None, endTime=None, local=True): query = FindQuery(pattern, startTime, endTime) diff --git a/graphite_api/tags/__init__.py b/graphite_api/tags/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/graphite_api/tags/base.py b/graphite_api/tags/base.py new file mode 100644 index 0000000..b1eabe3 --- /dev/null +++ b/graphite_api/tags/base.py @@ -0,0 +1,305 @@ +"""Base tag database""" +import abc +import bisect +import re +import time + +from structlog import get_logger + +from .utils import TaggedSeries + + +logger = get_logger() + + +class BaseTagDB(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, config, *args, **kwargs): + """Initialize the tag db.""" + self.config = config + self.cache = kwargs.get('cache') + + def find_series(self, tags, requestContext=None): + """ + Find series by tag, accepts a list of tag specifiers and returns a list of matching paths. + + Tags specifiers are strings, and may have the following formats: + + .. code-block:: none + + tag=spec tag value exactly matches spec + tag!=spec tag value does not exactly match spec + tag=~value tag value matches the regular expression spec + tag!=~spec tag value does not match the regular expression spec + + Any tag spec that matches an empty value is considered to match series that don't have that tag. + + At least one tag spec must require a non-empty value. + + Regular expression conditions are treated as being anchored at the start of the value. + + Matching paths are returned as a list of strings. + """ + start_time = time.time() + log_msg = 'completed in' + + try: + cacheKey = self.find_series_cachekey(tags, requestContext=requestContext) + result = self.cache.get(cacheKey) if self.cache else None + if result is not None: + log_msg = 'completed (cached) in' + else: + result = self._find_series(tags, requestContext) + if self.cache: + self.cache.set(cacheKey, result, 60) + except Exception: + log_msg = 'failed in' + raise + finally: + self.log_info('find_series', + '{msg} {sec:.6}s'.format( + msg=log_msg, + sec=time.time() - start_time, + ) + ) + + return result + + def find_series_cachekey(self, tags, requestContext=None): + return 'TagDB.find_series:' + ':'.join(sorted(tags)) + + @abc.abstractmethod + def _find_series(self, tags, requestContext=None): + """ + Internal function called by find_series, follows the same semantics allowing base class to implement caching + """ + + @abc.abstractmethod + def get_series(self, path, requestContext=None): + """ + Get series by path, accepts a path string and returns a TaggedSeries object describing the series. + + If the path is not found in the TagDB, returns None. + """ + + @abc.abstractmethod + def list_tags(self, tagFilter=None, limit=None, requestContext=None): + """ + List defined tags, returns a list of dictionaries describing the tags stored in the TagDB. + + Each tag dict contains the key "tag" which holds the name of the tag. Additional keys may be returned. + + .. code-block:: none + + [ + { + 'tag': 'tag1', + }, + ] + + Accepts an optional tagFilter parameter which is a regular expression used to filter the list of returned tags. + """ + + @abc.abstractmethod + def get_tag(self, tag, valueFilter=None, limit=None, requestContext=None): + """ + Get details of a particular tag, accepts a tag name and returns a dict describing the tag. + + The dict contains the key "tag" which holds the name of the tag. It also includes a "values" key, + which holds a list of the values for each tag. See list_values() for the structure of each value. + + .. code-block:: none + + { + 'tag': 'tag1', + 'values': [ + { + 'value': 'value1', + 'count': 1, + } + ], + } + + Accepts an optional valueFilter parameter which is a regular expression used to filter the list of returned values. + """ + + @abc.abstractmethod + def list_values(self, tag, valueFilter=None, limit=None, requestContext=None): + """ + List values for a particular tag, returns a list of dictionaries describing the values stored in the TagDB. + + Each value dict contains the key "value" which holds the value, and the key "count" which is the number of + series that have that value. Additional keys may be returned. + + .. code-block:: none + + [ + { + 'value': 'value1', + 'count': 1, + }, + ] + + Accepts an optional valueFilter parameter which is a regular expression used to filter the list of returned values. + """ + + @abc.abstractmethod + def tag_series(self, series, requestContext=None): + """ + Enter series into database. Accepts a series string, upserts into the TagDB and returns the canonicalized series name. + """ + + def tag_multi_series(self, seriesList, requestContext=None): + """ + Enter series into database. Accepts a list of series strings, upserts into the TagDB and returns a list of canonicalized series names. + """ + return [self.tag_series(series, requestContext) for series in seriesList] + + @abc.abstractmethod + def del_series(self, series, requestContext=None): + """ + Remove series from database. Accepts a series string and returns True + """ + + def del_multi_series(self, seriesList, requestContext=None): + """ + Remove series from database. Accepts a list of series strings, removes them from the TagDB and returns True + """ + for series in seriesList: + self.del_series(series, requestContext) + return True + + def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None): + """ + Return auto-complete suggestions for tags based on the matches for the specified expressions, optionally filtered by tag prefix + """ + if limit is None: + limit = self.config.get('tagdb', {}).get('autocomplete_limit') or 100 + else: + limit = int(limit) + + if not exprs: + return [ + tagInfo['tag'] for tagInfo in self.list_tags( + tagFilter='^(' + re.escape(tagPrefix) + ')' if tagPrefix else None, + limit=limit, + requestContext=requestContext, + ) + ] + + result = [] + + searchedTags = set([self.parse_tagspec(expr)[0] for expr in exprs]) + + for path in self.find_series(exprs, requestContext=requestContext): + try: + tags = self.parse(path).tags + except Exception: + continue + + for tag in tags: + if tag in searchedTags: + continue + if tagPrefix and not tag.startswith(tagPrefix): + continue + if tag in result: + continue + if len(result) == 0 or tag >= result[-1]: + if len(result) >= limit: + continue + result.append(tag) + else: + bisect.insort_left(result, tag) + if len(result) > limit: + del result[-1] + + return result + + def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None): + """ + Return auto-complete suggestions for tags and values based on the matches for the specified expressions, optionally filtered by tag and/or value prefix + """ + if limit is None: + limit = self.config.get('tagdb', {}).get('autocomplete_limit') or 100 + else: + limit = int(limit) + + if not exprs: + return [ + v['value'] for v in self.list_values( + tag, + valueFilter='^(' + re.escape(valuePrefix) + ')' if valuePrefix else None, + limit=limit, + requestContext=requestContext, + ) + ] + + result = [] + + for path in self.find_series(exprs + [tag + '!='], requestContext=requestContext): + try: + tags = self.parse(path).tags + except Exception: + continue + + if tag not in tags: + continue + value = tags[tag] + if valuePrefix and not value.startswith(valuePrefix): + continue + if value in result: + continue + if len(result) == 0 or value >= result[-1]: + if len(result) >= limit: + continue + result.append(value) + else: + bisect.insort_left(result, value) + if len(result) > limit: + del result[-1] + + return result + + def log_info(self, func, msg): + logger.info('%s.%s.%s :: %s' % (self.__module__, self.__class__.__name__, func, msg)) + + @staticmethod + def parse(path): + return TaggedSeries.parse(path) + + @staticmethod + def parse_tagspec(tagspec): + m = re.match('^([^;!=]+)(!?=~?)([^;]*)$', tagspec) + if m is None: + raise ValueError("Invalid tagspec %s" % tagspec) + + tag = m.group(1) + operator = m.group(2) + spec = m.group(3) + + return (tag, operator, spec) + + +class DummyTagDB(BaseTagDB): + + def _find_series(self, tags, requestContext=None): + return [] + + def get_series(self, path, requestContext=None): + return None + + def list_tags(self, tagFilter=None, limit=None, requestContext=None): + return [] + + def get_tag(self, tag, valueFilter=None, limit=None, requestContext=None): + return None + + def list_values(self, tag, valueFilter=None, limit=None, requestContext=None): + return [] + + def tag_series(self, series, requestContext=None): + raise NotImplementedError('Tagging not implemented with DummyTagDB') + + def del_series(self, series, requestContext=None): + return True diff --git a/graphite_api/tags/redis.py b/graphite_api/tags/redis.py new file mode 100644 index 0000000..185167c --- /dev/null +++ b/graphite_api/tags/redis.py @@ -0,0 +1,265 @@ +from __future__ import absolute_import + +import re +import bisect +import sys + +from .base import BaseTagDB, TaggedSeries + + +class RedisTagDB(BaseTagDB): + """ + Stores tag information in a Redis database. + + Keys used are: + + .. code-block:: none + + series # Set of all paths + series::tags # Hash of all tag:value pairs for path + tags # Set of all tags + tags::series # Set of paths with entry for tag + tags::values # Set of values for tag + tags::values: # Set of paths matching tag/value + + """ + def __init__(self, config, *args, **kwargs): + super(RedisTagDB, self).__init__(config, *args, **kwargs) + + from redis import Redis + + self.directories = config['whisper']['directories'] + self.r = Redis( + host=config['tagdb']['redis']['host'], + port=config['tagdb']['redis']['port'], + db=config['tagdb']['redis']['db'], + password=config['tagdb']['redis']['password'], + decode_responses=(sys.version_info[0] >= 3), + ) + + def _find_series(self, tags, requestContext=None): + selector = None + selector_cnt = None + filters = [] + + # loop through tagspecs, look for best spec to use as selector + for tagspec in tags: + (tag, operator, spec) = self.parse_tagspec(tagspec) + + if operator == '=': + matches_empty = spec == '' + if not matches_empty: + cnt = self.r.scard('tags:' + tag + ':values:' + spec) + if not selector or selector[1] != '=' or selector_cnt > cnt: + if selector: + filters.append(selector) + selector = (tag, operator, spec) + selector_cnt = cnt + continue + filters.append((tag, operator, spec)) + + elif operator == '=~': + pattern = re.compile(spec) + matches_empty = bool(pattern.match('')) + if not matches_empty and (not selector or selector[1] != '='): + cnt = self.r.scard('tags:' + tag + ':values') + if not selector or selector_cnt > cnt: + if selector: + filters.append(selector) + selector = (tag, operator, pattern) + selector_cnt = cnt + continue + filters.append((tag, operator, pattern)) + + elif operator == '!=': + matches_empty = spec != '' + if not matches_empty and (not selector or selector[1] != '='): + cnt = self.r.scard('tags:' + tag + ':values') + if not selector or selector_cnt > cnt: + if selector: + filters.append(selector) + selector = (tag, operator, spec) + selector_cnt = cnt + continue + filters.append((tag, operator, spec)) + + elif operator == '!=~': + pattern = re.compile(spec) + matches_empty = not pattern.match('') + if not matches_empty and (not selector or selector[1] != '='): + cnt = self.r.scard('tags:' + tag + ':values') + if not selector or selector_cnt > cnt: + if selector: + filters.append(selector) + selector = (tag, operator, pattern) + selector_cnt = cnt + continue + filters.append((tag, operator, pattern)) + + else: + raise ValueError("Invalid operator %s" % operator) + + if not selector: + raise ValueError("At least one tagspec must not match the empty string") + + # get initial list of series + (tag, operator, spec) = selector + + # find list of values that match the tagspec + values = None + if operator == '=': + values = [spec] + elif operator == '=~': + # see if we can identify a literal prefix to filter by in redis + match = None + m = re.match('([a-z0-9]+)([^*?|][^|]*)?$', spec.pattern) + if m: + match = m.group(1) + '*' + values = [value for value in self.r.sscan_iter('tags:' + tag + ':values', match=match) if spec.match(value) is not None] + elif operator == '!=': + values = [value for value in self.r.sscan_iter('tags:' + tag + ':values') if value != spec] + elif operator == '!=~': + values = [value for value in self.r.sscan_iter('tags:' + tag + ':values') if spec.match(value) is None] + + # if this query matched no values, just short-circuit since the result of the final intersect will be empty + if not values: + return [] + + results = [] + + # apply filters + operators = ['=', '!=', '=~', '!=~'] + filters.sort(key=lambda a: operators.index(a[1])) + + for series in self.r.sunion(*['tags:' + tag + ':values:' + value for value in values]): + try: + parsed = self.parse(series) + except Exception: + continue + + matched = True + + for (tag, operator, spec) in filters: + value = parsed.tags.get(tag, '') + if ( + (operator == '=' and value != spec) or + (operator == '=~' and spec.match(value) is None) or + (operator == '!=' and value == spec) or + (operator == '!=~' and spec.match(value) is not None) + ): + matched = False + break + + if matched: + bisect.insort_left(results, series) + + return results + + def get_series(self, path, requestContext=None): + tags = {} + + tags = self.r.hgetall('series:' + path + ':tags') + if not tags: + return None + + return TaggedSeries(tags['name'], tags) + + def list_tags(self, tagFilter=None, limit=None, requestContext=None): + result = [] + + if tagFilter: + tagFilter = re.compile(tagFilter) + + for tag in self.r.sscan_iter('tags'): + if tagFilter and tagFilter.match(tag) is None: + continue + if len(result) == 0 or tag >= result[-1]: + if limit and len(result) >= limit: + continue + result.append(tag) + else: + bisect.insort_left(result, tag) + if limit and len(result) > limit: + del result[-1] + + return [ + {'tag': tag} + for tag in result + ] + + def get_tag(self, tag, valueFilter=None, limit=None, requestContext=None): + if not self.r.sismember('tags', tag): + return None + + return { + 'tag': tag, + 'values': self.list_values( + tag, + valueFilter=valueFilter, + limit=limit, + requestContext=requestContext + ), + } + + def list_values(self, tag, valueFilter=None, limit=None, requestContext=None): + result = [] + + if valueFilter: + valueFilter = re.compile(valueFilter) + + for value in self.r.sscan_iter('tags:' + tag + ':values'): + if valueFilter and valueFilter.match(value) is None: + continue + if len(result) == 0 or value >= result[-1]: + if limit and len(result) >= limit: + continue + result.append(value) + else: + bisect.insort_left(result, value) + if limit and len(result) > limit: + del result[-1] + + return [ + {'value': value, 'count': self.r.scard('tags:' + tag + ':values:' + value)} + for value in result + ] + + def tag_series(self, series, requestContext=None): + # extract tags and normalize path + parsed = self.parse(series) + + path = parsed.path + + with self.r.pipeline() as pipe: + pipe.sadd('series', path) + + for tag, value in parsed.tags.items(): + pipe.hset('series:' + path + ':tags', tag, value) + + pipe.sadd('tags', tag) + pipe.sadd('tags:' + tag + ':series', path) + pipe.sadd('tags:' + tag + ':values', value) + pipe.sadd('tags:' + tag + ':values:' + value, path) + + pipe.execute() + + return path + + def del_series(self, series, requestContext=None): + # extract tags and normalize path + parsed = self.parse(series) + + path = parsed.path + + with self.r.pipeline() as pipe: + pipe.srem('series', path) + + pipe.delete('series:' + path + ':tags') + + for tag, value in parsed.tags.items(): + pipe.srem('tags:' + tag + ':series', path) + pipe.srem('tags:' + tag + ':values:' + value, path) + + pipe.execute() + + return True diff --git a/graphite_api/tags/utils.py b/graphite_api/tags/utils.py new file mode 100644 index 0000000..5f9d875 --- /dev/null +++ b/graphite_api/tags/utils.py @@ -0,0 +1,167 @@ +"""Utility functions for tag databases.""" +import re + +from hashlib import sha256 + + +class TaggedSeries(object): + prohibitedTagChars = ';!^=' + + @classmethod + def validateTagAndValue(cls, tag, value): + """validate the given tag / value based on the specs in the documentation""" + if len(tag) == 0: + raise Exception('Tag may not be empty') + if len(value) == 0: + raise Exception('Value for tag "{tag}" may not be empty'.format(tag=tag)) + + for char in cls.prohibitedTagChars: + if char in tag: + raise Exception('Character "{}" is not allowed in tag "{}"'.format(char, tag)) + + if ';' in value: + raise Exception('Character ";" is not allowed in value "{}" of tag {}'.format(value, tag)) + + if value[0] == '~': + raise Exception('Tag values are not allowed to start with "~" in tag "{tag}"'.format(tag=tag)) + + @classmethod + def parse(cls, path): + # if path is in openmetrics format: metric{tag="value",...} + if path[-2:] == '"}' and '{' in path: + return cls.parse_openmetrics(path) + + # path is a carbon path with optional tags: metric;tag=value;... + return cls.parse_carbon(path) + + @classmethod + def parse_openmetrics(cls, path): + """parse a path in openmetrics format: metric{tag="value",...} + + https://github.com/RichiH/OpenMetrics + """ + (metric, rawtags) = path[0:-1].split('{', 2) + if not metric: + raise Exception('Cannot parse path %s, no metric found' % path) + + tags = {} + + while len(rawtags) > 0: + m = re.match(r'([^=]+)="((?:[\\]["\\]|[^"\\])+)"(:?,|$)', rawtags) + if not m: + raise Exception('Cannot parse path %s, invalid segment %s' % (path, rawtags)) + + tag = m.group(1) + value = m.group(2).replace(r'\"', '"').replace(r'\\', '\\') + + cls.validateTagAndValue(tag, value) + + tags[tag] = value + rawtags = rawtags[len(m.group(0)):] + + tags['name'] = cls.sanitize_name_as_tag_value(metric) + return cls(metric, tags) + + @classmethod + def parse_carbon(cls, path): + """parse a carbon path with optional tags: metric;tag=value;...""" + segments = path.split(';') + + metric = segments[0] + if not metric: + raise Exception('Cannot parse path %s, no metric found' % path) + + tags = {} + + for segment in segments[1:]: + tag = segment.split('=', 1) + if len(tag) != 2 or not tag[0]: + raise Exception('Cannot parse path %s, invalid segment %s' % (path, segment)) + + cls.validateTagAndValue(*tag) + + tags[tag[0]] = tag[1] + + tags['name'] = cls.sanitize_name_as_tag_value(metric) + return cls(metric, tags) + + @staticmethod + def sanitize_name_as_tag_value(name): + """take a metric name and sanitize it so it is guaranteed to be a valid tag value""" + sanitized = name.lstrip('~') + + if len(sanitized) == 0: + raise Exception('Cannot use metric name %s as tag value, results in emptry string' % (name)) + + return sanitized + + @staticmethod + def format(tags): + return tags.get('name', '') + ''.join(sorted([ + ';%s=%s' % (tag, value) + for tag, value in tags.items() + if tag != 'name' + ])) + + @staticmethod + def encode(metric, sep='.', hash_only=False): + """ + Helper function to encode tagged series for storage in whisper etc + + When tagged series are detected, they are stored in a separate hierarchy of folders under a + top-level _tagged folder, where subfolders are created by using the first 3 hex digits of the + sha256 hash of the tagged metric path (4096 possible folders), and second-level subfolders are + based on the following 3 hex digits (another 4096 possible folders) for a total of 4096^2 + possible subfolders. The metric files themselves are created with any . in the metric path + replaced with -, to avoid any issues where metrics, tags or values containing a '.' would end + up creating further subfolders. This helper is used by both whisper and ceres, but by design + each carbon database and graphite-web finder is responsible for handling its own encoding so + that different backends can create their own schemes if desired. + + The hash_only parameter can be set to True to use the hash as the filename instead of a + human-readable name. This avoids issues with filename length restrictions, at the expense of + being unable to decode the filename and determine the original metric name. + + A concrete example: + + .. code-block:: none + + some.metric;tag1=value2;tag2=value.2 + + with sha256 hash starting effaae would be stored in: + + _tagged/eff/aae/some-metric;tag1=value2;tag2=value-2.wsp (whisper) + _tagged/eff/aae/some-metric;tag1=value2;tag2=value-2 (ceres) + + """ + if ';' in metric: + metric_hash = sha256(metric.encode('utf8')).hexdigest() + return sep.join([ + '_tagged', + metric_hash[0:3], + metric_hash[3:6], + metric_hash if hash_only else metric.replace('.', '_DOT_') + ]) + + # metric isn't tagged, just replace dots with the separator and trim any leading separator + return metric.replace('.', sep).lstrip(sep) + + @staticmethod + def decode(path, sep='.'): + """ + Helper function to decode tagged series from storage in whisper etc + """ + if path.startswith('_tagged'): + return path.split(sep, 3)[-1].replace('_DOT_', '.') + + # metric isn't tagged, just replace the separator with dots + return path.replace(sep, '.') + + def __init__(self, metric, tags, series_id=None): + self.metric = metric + self.tags = tags + self.id = series_id + + @property + def path(self): + return self.__class__.format(self.tags) diff --git a/setup.py b/setup.py index e2cf01f..1ddadba 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,7 @@ 'cairocffi', 'pyparsing>=1.5.7', 'pytz', + 'redis', 'six', 'structlog', 'tzlocal',