Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/content/multiprocess/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ it's common to have processes rather than threads to handle large workloads.
To handle this the client library can be put in multiprocess mode.
This comes with a number of limitations:

- Registries can not be used as normal, all instantiated metrics are exported
- Registries can not be used as normal
- Registering metrics to a registry later used by a `MultiProcessCollector`
may cause duplicate metrics to be exported
- Filtering on metrics works but might be inefficient
- Custom collectors do not work (e.g. cpu and memory metrics)
- Gauges cannot use `set_function`
- Info and Enum metrics do not work
Expand Down Expand Up @@ -49,7 +50,7 @@ MY_COUNTER = Counter('my_counter', 'Description of my counter')

# Expose metrics.
def app(environ, start_response):
registry = CollectorRegistry()
registry = CollectorRegistry(support_collectors_without_names=True)
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
status = '200 OK'
Expand Down
9 changes: 7 additions & 2 deletions prometheus_client/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ class CollectorRegistry(Collector):
exposition formats.
"""

def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None,
support_collectors_without_names: bool = False):
self._collector_to_names: Dict[Collector, List[str]] = {}
self._names_to_collectors: Dict[str, Collector] = {}
self._auto_describe = auto_describe
self._lock = Lock()
self._target_info: Optional[Dict[str, str]] = {}
self._support_collectors_without_names = support_collectors_without_names
self._collectors_without_names: List[Collector] = []
self.set_target_info(target_info)

def register(self, collector: Collector) -> None:
Expand All @@ -46,6 +49,8 @@ def register(self, collector: Collector) -> None:
for name in names:
self._names_to_collectors[name] = collector
self._collector_to_names[collector] = names
if self._support_collectors_without_names and not names:
self._collectors_without_names.append(collector)

def unregister(self, collector: Collector) -> None:
"""Remove a collector from the registry."""
Expand Down Expand Up @@ -148,7 +153,7 @@ def __init__(self, names: Iterable[str], registry: CollectorRegistry):
self._registry = registry

def collect(self) -> Iterable[Metric]:
collectors = set()
collectors = set(self._registry._collectors_without_names)
target_info_metric = None
with self._registry._lock:
if 'target_info' in self._name_set and self._registry._target_info:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,32 @@ def test_qs_parsing(self):
asyncio.new_event_loop().run_until_complete(
self.communicator.wait()
)

def test_qs_parsing_multi(self):
"""Only metrics that match the 'name[]' query string param appear"""

app = make_asgi_app(self.registry)
metrics = [
("asdf", "first test metric", 1),
("bsdf", "second test metric", 2),
("csdf", "third test metric", 3)
]

for m in metrics:
self.increment_metrics(*m)

self.seed_app(app)
self.scope['query_string'] = "&".join([f"name[]={m[0]}_total" for m in metrics[0:2]]).encode("utf-8")
self.send_default_request()

outputs = self.get_all_output()
response_body = outputs[1]
output = response_body['body'].decode('utf8')

self.assert_metrics(output, *metrics[0])
self.assert_metrics(output, *metrics[1])
self.assert_not_metrics(output, *metrics[2])

asyncio.new_event_loop().run_until_complete(
self.communicator.wait()
)
18 changes: 18 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,24 @@ def test_restricted_registry_does_not_call_extra(self):
self.assertEqual([m], list(registry.restricted_registry(['s_sum']).collect()))
mock_collector.collect.assert_not_called()

def test_restricted_registry_ignore_no_names_collectors(self):
from unittest.mock import MagicMock
registry = CollectorRegistry()
mock_collector = MagicMock()
mock_collector.describe.return_value = []
registry.register(mock_collector)
self.assertEqual(list(registry.restricted_registry(['metric']).collect()), [])
mock_collector.collect.assert_not_called()

def test_restricted_registry_collects_no_names_collectors(self):
from unittest.mock import MagicMock
registry = CollectorRegistry(support_collectors_without_names=True)
mock_collector = MagicMock()
mock_collector.describe.return_value = []
registry.register(mock_collector)
self.assertEqual(list(registry.restricted_registry(['metric']).collect()), [])
mock_collector.collect.assert_called()

def test_restricted_registry_does_not_yield_while_locked(self):
registry = CollectorRegistry(target_info={'foo': 'bar'})
Summary('s', 'help', registry=registry).observe(7)
Expand Down
31 changes: 30 additions & 1 deletion tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def setUp(self):
self.tempdir = tempfile.mkdtemp()
os.environ['PROMETHEUS_MULTIPROC_DIR'] = self.tempdir
values.ValueClass = MultiProcessValue(lambda: 123)
self.registry = CollectorRegistry()
self.registry = CollectorRegistry(support_collectors_without_names=True)
self.collector = MultiProcessCollector(self.registry)

@property
Expand Down Expand Up @@ -301,6 +301,35 @@ def add_label(key, value):

self.assertEqual(metrics['h'].samples, expected_histogram)

def test_restrict(self):
pid = 0
values.ValueClass = MultiProcessValue(lambda: pid)
labels = {i: i for i in 'abcd'}

def add_label(key, value):
l = labels.copy()
l[key] = value
return l

c = Counter('c', 'help', labelnames=labels.keys(), registry=None)
g = Gauge('g', 'help', labelnames=labels.keys(), registry=None)

c.labels(**labels).inc(1)
g.labels(**labels).set(1)

pid = 1

c.labels(**labels).inc(1)
g.labels(**labels).set(1)

metrics = {m.name: m for m in self.registry.restricted_registry(['c_total']).collect()}

self.assertEqual(metrics.keys(), {'c'})

self.assertEqual(
metrics['c'].samples, [Sample('c_total', labels, 2.0)]
)

def test_collect_preserves_help(self):
pid = 0
values.ValueClass = MultiProcessValue(lambda: pid)
Expand Down